Source code for coopihc.agents.lqrcontrollers.LQRController

import numpy


from coopihc.agents.BaseAgent import BaseAgent
from coopihc.base.State import State
from coopihc.base.elements import discrete_array_element, array_element, cat_element
from coopihc.policy.LinearFeedback import LinearFeedback
from coopihc.observation.RuleObservationEngine import RuleObservationEngine
from coopihc.observation.utils import base_task_engine_specification


[docs]class LQRController(BaseAgent): """A Linear Quadratic Regulator. This agent will read a state named 'x' from the task, and produce actions according to: .. math:: \\text{action} = -K X where K is the so-called feedback gain, which has to be specified externally. For an example, see the :py:class:`coopihc.agents.lqrcontrollers.FHDT_LQRController.FHDT_LQRController` source code. The controller will also output observation rewards J, for state X and action u .. math:: J = -X^t Q X - u^t R u .. note:: This class is meant to be subclassed .. warning:: Tested only on 1d output. :param role: "user" or "assistant" :type role: string :param Q: State cost :type Q: numpy.ndarray :param R: Control cost :type R: numpy.ndarray """ def __init__(self, role, Q, R, *args, **kwargs): self.R = R self.Q = Q self.role = role # ================== Policy ================ action_state = State() action_state["action"] = array_element( low=numpy.full((1,), -numpy.inf), high=numpy.full((1,), numpy.inf) ) agent_policy = LinearFeedback( action_state, ("task_state", "x"), ) # ================== Observation Engine class RuleObsWithRewards(RuleObservationEngine): def __init__( self, Q, R, *args, deterministic_specification=base_task_engine_specification, extradeterministicrules={}, extraprobabilisticrules={}, mapping=None, **kwargs ): self.R = R self.Q = Q super().__init__( *args, deterministic_specification=base_task_engine_specification, extradeterministicrules={}, extraprobabilisticrules={}, mapping=None, **kwargs ) def observe(self, game_state=None): obs, _ = super().observe(game_state=game_state) x = obs["task_state"]["x"].view(numpy.ndarray) u = obs["user_action"]["action"].view(numpy.ndarray) reward = -x.T @ self.R @ x - u.T @ self.Q @ u return obs, reward observation_engine = RuleObsWithRewards( self.R, self.Q, deterministic_specification=base_task_engine_specification ) super().__init__( "user", agent_policy=agent_policy, agent_observation_engine=observation_engine, )
[docs] def render(self, *args, **kwargs): """render Displays actions selected by the LQR agent. """ mode = kwargs.get("mode") if mode is None: mode = "text" if "plot" in mode: axtask, axuser, axassistant = args[:3] if self.ax is None: self.ax = axuser self.ax.set_xlabel("Time (s)") self.ax.set_ylabel("Action") if self.action: self.ax.plot( self.bundle.round_number * self.bundle.task.timestep, self.action, "bo", ) if "text" in mode: print("Action") print(self.action)