Source code for coopihc.interactiontask.ClassicControlTask

import numpy
import copy

from coopihc.helpers import flatten
from coopihc.base.State import State
from coopihc.base.elements import discrete_array_element, array_element, cat_element
from coopihc.base.elements import array_element
from coopihc.interactiontask.InteractionTask import InteractionTask


[docs]class ClassicControlTask(InteractionTask): """ClassicControlTask A task used for a classic control setting with signal dependent and independent noise. You can account for control-dependent noise with an appropriate noise model in the policy or the observation engine. The task has a state x(.) which evolves according to .. math :: \\begin{align} x(+.) = Ax(.) + Bu(.) + Fx(.).d\\beta + G.d\\omega + Hu(.)d\\gamma \\\\ \\end{align} for "timespace=discrete" and .. math :: \\begin{align} x(+.) = (Ax(.) + Bu(.))dt + Fx(.).d\\beta + G.d\\omega + Hu(.)d\\gamma \\\\ \\end{align} for "timespace=continuous". where :math:``u(.)`` is the user action. The task is finised when the first component x[0,0] is close enough to 0. Currently this is implemented as the condition ``abs(x[0, 0]) <= 0.01``. where :math:`\\beta, \\omega \\sim \\mathcal{N}(0, \\sqrt{dt})` are Wiener processes. A and B may represent continuous or discrete dynamics. A conversion is implictly made following the value of discrete_dynamics keyword: .. math :: \\begin{align} A_c = \\frac{1}{dt} (A - I) \\\\ B_c = B \\frac{1}{dt} \\end{align} .. math :: \\begin{align} A_d = I + dt \cdot{} A \\\\ B_d = dt \cdot{} B \\end{align} :param timestep: dt :type timestep: float :param A: Passive dynamics :type A: numpy.ndarray :param B: Response to command :type B: numpy.ndarray :param F: signal dependent noise, defaults to None :type F: numpy.ndarray, optional :param G: independent noise, defaults to None :type G: numpy.ndarray, optional :param H: control-dependent noise, defaults to None :type H: numpy.ndarray, optional :param discrete_dynamics: whether A and B are continuous or discrete, defaults to True :type discrete_dynamics: bool, optional :param noise: whether to include noise, defaults to "on" :type noise: str, optional :param timespace: if the task is modeled as discrete or continuous, defaults to "discrete" :type noise: str, optional """ @property def user_action(self): return super().user_action[0] def __init__( self, timestep, A, B, *args, F=None, G=None, H=None, discrete_dynamics=True, noise="on", timespace="discrete", end="standard", **kwargs ): super().__init__(*args, **kwargs) self.dim = max(A.shape) self.state = State() self.state["x"] = array_element( low=numpy.full((self.dim, 1), -numpy.inf), high=numpy.full((self.dim, 1), numpy.inf), ) self.state_last_x = copy.copy(self.state["x"]) self.timestep = timestep if F is None: self.F = numpy.zeros(A.shape) else: self.F = F if G is None: self.G = numpy.zeros(A.shape) else: self.G = G if H is None: self.H = numpy.zeros(B.shape) else: self.H = H # Convert dynamics between discrete and continuous. if discrete_dynamics: self.A_d = A self.B_d = B # Euler method self.A_c = 1 / timestep * (A - numpy.eye(A.shape[0])) self.B_c = B / timestep else: self.A_c = A self.B_c = B # Euler Method self.A_d = numpy.eye(A.shape[0]) + timestep * A self.B_d = timestep * B self.noise = noise self.timespace = timespace if end == "standard": self.end = numpy.full((self.dim, 1), 0.01) else: self.end = end self.state_last_x = None
[docs] def finit(self): """finit Define whether to use continuous or discrete representation for A and B """ if self.timespace == "continuous": self.A = self.A_c self.B = self.B_c else: self.A = self.A_d self.B = self.B_d
[docs] def reset(self, dic=None): """Force all substates except the first to be null. Force all substates except the first to be null. Also stores the last state as an attribute (for rendering). :param dic: reset_dic, see :py:class:``InteractionTask <coopihc.interactiontask.InteractionTask.InteractionTask>``, defaults to None :type dic: dictionnary, optional """ # Force zero velocity self.state["x"][0, 0] = 1 self.state["x"][1:, 0] = 0
[docs] def on_user_action(self, *args, user_action=None, **kwargs): """user step Takes the state from x(.) to x(+.) according to .. math :: \\begin{align} x(+.) = Ax(.) + Bu(.) + Fx(.).\\beta + G.\\omega \\\\ \\end{align} """ # Call super for counters # For readability A, B, F, G, H = self.A, self.B, self.F, self.G, self.H _u = self.user_action.view(numpy.ndarray) _x = self.state["x"].view(numpy.ndarray) # Generate noise samples if self.noise == "on": beta, gamma = numpy.random.normal(0, numpy.sqrt(self.timestep), (2, 1)) omega = numpy.random.normal(0, numpy.sqrt(self.timestep), (self.dim, 1)) else: beta, gamma = numpy.random.normal(0, 0, (2, 1)) omega = numpy.random.normal(0, 0, (self.dim, 1)) # Store last_x for render self.state_last_x = copy.copy(_x) # Deterministic update + State dependent noise + independent noise if self.timespace == "discrete": _x = (A @ _x + B * _u) + F @ _x * beta + G @ omega + H * _u * gamma else: _x += ( (A @ _x + B * _u) * self.timestep + F @ _x * beta + G @ omega + H * _u * gamma ) self.state["x"] = _x is_done = self.stopping_condition() return self.state, 0, is_done
def stopping_condition(self): _x = self.state["x"] if (abs(_x[:]) <= self.end).all(): return True return False
[docs] def on_assistant_action(self, *args, **kwargs): """on_assistant_action""" return self.state, 0, False
[docs] def render(self, mode="text", ax_user=None, ax_assistant=None, ax_task=None): """render Text mode: print task state plot mode: Dynamically update axes with state trajectories. """ if mode is None: mode = "text" if "text" in mode: print("state") print(self.state["x"]) if "plot" in mode: if self.ax is not None: self.draw() if self.turn_number == 3: self.ax.legend( handles=[self.axes[i].lines[0] for i in range(self.dim)] ) else: self.color = ["b", "g", "r", "c", "m", "y", "k"] self.labels = ["x[{:d}]".format(i) for i in range(self.dim)] self.axes = [ax_task] self.ax = ax_task for i in range(self.dim - 1): self.axes.append(self.ax.twinx()) for i, ax in enumerate(self.axes): # ax.yaxis.label.set_color(self.color[i]) ax.tick_params(axis="y", colors=self.color[i]) self.draw()
def draw(self): if (self.state_last_x == self.state["x"]).all() or self.state_last_x is None: pass else: for i in range(self.dim): self.axes[i].plot( [ ((self.round_number - 1)) * self.timestep, (self.round_number) * self.timestep, ], flatten( [ self.state_last_x[i, 0].tolist(), self.state["x"][i, 0].tolist(), ] ), "-", color=self.color[i], label=self.labels[i], ) return