Source code for coopihc.agents.lqrcontrollers.IHCT_LQGController

import numpy
import copy
import warnings
from coopihc.agents.BaseAgent import BaseAgent
from coopihc.observation.RuleObservationEngine import RuleObservationEngine
from coopihc.base.State import State
from coopihc.base.elements import discrete_array_element, array_element, cat_element
from coopihc.policy.LinearFeedback import LinearFeedback
from coopihc.inference.ContinuousKalmanUpdate import ContinuousKalmanUpdate


# Infinite Horizon Continuous Time LQG controller, based on Phillis 1985
[docs]class IHCT_LQGController(BaseAgent): """Infinite Horizon Continuous Time LQ Gaussian Controller. An Infinite Horizon (Steady-state) LQG controller, based on [Phillis1985]_ and [Qian2013]_. For the a task where state 'x' follows a linear noisy dynamic: .. math:: \\begin{align} x(+.) = (Ax(.) + Bu(.))dt + Fx(.).d\\beta + G.d\\omega + Hu(.)d\\gamma \\\\ \\end{align} the LQG controller produces the following observations dy and commands u minimizing cost J: .. math:: \\begin{align*} dy & = Cxdt + Dd\\xi \\\\ d\\hat{x} & = (A \\hat{x} + Bu) dt + K (dy - C\\hat{x}dt) \\\\ u & = - L\\hat{x} \\\\ \\tilde{x} & = x - \\hat{x} \\\\ J & \simeq \\mathbb{E} [\\tilde{x}^T U \\tilde{x} + x^TQx + u^TRu] \\end{align*} .. [Phillis1985] Phillis, Y. "Controller design of systems with multiplicative noise." IEEE Transactions on Automatic Control 30.10 (1985): 1017-1019. `Link <https://ieeexplore.ieee.org/abstract/document/1103828>`_ .. [Qian2013] Qian, Ning, et al. "Movement duration, Fitts's law, and an infinite-horizon optimal feedback control model for biological motor systems." Neural computation 25.3 (2013): 697-724. `Link <https://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.296.4312&rep=rep1&type=pdf>`_ :param role: "user" or "assistant" :type role: string :param timestep: duration of timestep :type timestep: float :param Q: State cost :type Q: numpy.ndarray :param R: Control cost :type R: numpy.ndarray :param U: Estimation error cost :type U: numpy.ndarray :param C: Observation matrix :type C: numpy.ndarray :param D: Observation noise matrix :type D: numpy.ndarray :param noise: whether or not to have, defaults to "on" :type noise: str, optional :param Acontroller: Representation of A for the agent. If None, the agent representation of A is equal to the task A, defaults to None. :type Acontroller: numpy.ndarray, optional :param Bcontroller: Representation of B for the agent. If None, the agent representation of B is equal to the task B, defaults to None. :type Bcontroller: numpy.ndarray, optional :param Fcontroller: Representation of F for the agent. If None, the agent representation of F is equal to the task F, defaults to None. :type Fcontroller: numpy.ndarray, optional :param Gcontroller: Representation of G for the agent. If None, the agent representation of G is equal to the task G, defaults to None. :type Gcontroller: numpy.ndarray, optional :param Hcontroller: Representation of H for the agent. If None, the agent representation of H is equal to the task H, defaults to None. :type Hcontroller: numpy.ndarray, optional """ def __init__( self, role, timestep, Q, R, U, C, D, *args, noise="on", Acontroller=None, Bcontroller=None, F=None, G=None, H=None, **kwargs ): self.C = C self.Q = Q self.R = R self.U = U self.D = D self.timestep = timestep self.role = role # Initialize Random Kalmain gains self.K = numpy.random.rand(*C.T.shape) self.L = numpy.random.rand(1, Q.shape[1]) self.noise = noise self.Acontroller = Acontroller self.Bcontroller = Bcontroller self.Fcontroller = F self.Gcontroller = G self.Hcontroller = H # =================== Linear Feedback Policy ========== action_state = State() action_state["action"] = array_element(shape=(1, 1)) # StateElement( # numpy.zeros((1, 1)), # Space( # [numpy.full((1, 1), -numpy.inf), numpy.full((1, 1), numpy.inf)], # "continuous", # ), # ) # Linear Feedback with LQ reward class LFwithLQreward(LinearFeedback): def __init__(self, R, *args, **kwargs): super().__init__(*args, **kwargs) self.R = R def sample(self, agent_observation=None, agent_state=None): action, _ = super().sample(agent_observation=agent_observation) return ( action, (action.T @ self.R @ action).squeeze().tolist(), ) agent_policy = LFwithLQreward( self.R, action_state, ("user_state", "xhat"), ) # =========== Observation Engine ============== # Rule Observation Engine with LQ reward class RuleObswithLQreward(RuleObservationEngine): def __init__(self, Q, *args, **kwargs): super().__init__(*args, **kwargs) self.Q = Q def observe(self, game_state=None): observation, _ = super().observe(game_state=game_state) x = observation["task_state"]["x"].view(numpy.ndarray) reward = x.T @ self.Q @ x return observation, reward # Base Spec user_engine_specification = [ ("game_info", "all"), ("task_state", "x"), ("user_state", "all"), ("assistant_state", None), ("user_action", "all"), ("assistant_action", None), ] # Add rule for matrix observation y += Cx def observation_linear_combination(_obs, game_state, C): return C @ _obs C_rule = { ("task_state", "x"): ( observation_linear_combination, (C,), ) } extradeterministicrules = {} extradeterministicrules.update(C_rule) # Add rule for noisy observation y += D * epsilon ~ N(mu, sigma) def additive_gaussian_noise(_obs, gamestate, D, *args): try: mu, sigma = args except ValueError: mu, sigma = numpy.zeros(_obs.shape), numpy.eye(max(_obs.shape)) return _obs + D @ numpy.random.multivariate_normal( mu, sigma, size=1 ).reshape(-1, 1) # Instantiate previous rule so that epsilon ~ N(0, sqrt(dt)) agn_rule = { ("task_state", "x"): ( additive_gaussian_noise, ( D, numpy.zeros((C.shape[0], 1)).reshape( -1, ), numpy.sqrt(timestep) * numpy.eye(C.shape[0]), ), ) } extraprobabilisticrules = {} extraprobabilisticrules.update(agn_rule) observation_engine = RuleObswithLQreward( self.Q, deterministic_specification=user_engine_specification, extradeterministicrules=extradeterministicrules, extraprobabilisticrules=extraprobabilisticrules, ) # ======================= Inference Engine inference_engine = ContinuousKalmanUpdate() super().__init__( "user", agent_policy=agent_policy, agent_observation_engine=observation_engine, agent_inference_engine=inference_engine, )
[docs] def finit(self): """Get and compute needed matrices. 0. Take A, B, F, G, H from task if not provided by the end-user 1. Create an :math:`\\hat{x}` state; 2. attach the model dynamics to the inference engine if needed 3. compute K and L; 4. set K and L in inference engine and policy """ task = self.bundle.task for elem, taskelem in zip( [ "Acontroller", "Bcontroller", "Fcontroller", "Gcontroller", "Hcontroller", ], [task.A, task.B, task.F, task.G, task.H], ): if getattr(self, elem) == None: setattr(self, elem, taskelem) # ---- init xhat state self.state["xhat"] = copy.deepcopy(self.bundle.task.state["x"]) # ---- Attach the model dynamics to the inference engine. if not self.inference_engine.fmd_flag: self.inference_engine.set_forward_model_dynamics( self.Acontroller, self.Bcontroller, self.C ) # ---- Set K and L up mc = self._MContainer( self.Acontroller, self.Bcontroller, self.C, self.D, self.Gcontroller, self.Hcontroller, self.Q, self.R, self.U, ) self.K, self.L = self._compute_Kalman_matrices(mc.pass_args()) self.inference_engine.set_K(self.K) self.policy.set_feedback_gain(self.L)
class _MContainer: """Matrix container The purpose of this container is to facilitate common manipulations of the matrices of the LQG problem, as well as potentially storing their evolution. (not implemented yet) """ def __init__(self, A, B, C, D, G, H, Q, R, U): self.A = A self.B = B self.C = C self.D = D self.G = G self.H = H self.Q = Q self.R = R self.U = U self._check_matrices() def _check_matrices(self): # Not implemented yet pass def pass_args(self): return ( self.A, self.B, self.C, self.D, self.G, self.H, self.Q, self.R, self.U, ) def _compute_Kalman_matrices(self, matrices, N=20): """Compute K and L K and L are computed according to the algorithm described in [Qian2013]_ with some minor tweaks. K and L are obtained recursively, where more and more precise estimates are obtained. At first N iterations are performed, if that fails to converge, N is grown as :math:`N^{1.3}` and K and L are recomputed. :param matrices: (A, B, C, D, G, H, Q, R, U) :type matrices: tuple(numpy.ndarray) :param N: max iterations of the algorithm on first try, defaults to 20 :type N: int, optional :return: (K, L) :rtype: tuple(numpy.ndarray, numpy.ndarray) """ A, B, C, D, G, H, Q, R, U = matrices Y = B @ H.reshape(1, -1) Lnorm = [] Knorm = [] K = numpy.random.rand(*C.T.shape) L = numpy.random.rand(1, A.shape[1]) for i in range(N): Lnorm.append(numpy.linalg.norm(L)) Knorm.append(numpy.linalg.norm(K)) n, m = A.shape Abar = numpy.block([[A - B @ L, B @ L], [numpy.zeros((n, m)), A - K @ C]]) Ybar = numpy.block([[-Y @ L, Y @ L], [-Y @ L, Y @ L]]) Gbar = numpy.block( [[G, numpy.zeros((G.shape[0], D.shape[1]))], [G, -K @ D]] ) V = numpy.block( [ [Q + L.T @ R @ L, -L.T @ R @ L], [-L.T @ R @ L, L.T @ R @ L + U], ] ) P, p_res = self._LinRicatti(Abar, Ybar, Gbar @ Gbar.T) S, s_res = self._LinRicatti(Abar.T, Ybar.T, V) P22 = P[n:, n:] S11 = S[:n, :n] S22 = S[n:, n:] K = P22 @ C.T @ numpy.linalg.pinv(D @ D.T) L = numpy.linalg.pinv(R + Y.T @ (S11 + S22) @ Y) @ B.T @ S11 K, L = self._check_KL(Knorm, Lnorm, K, L, matrices) return K, L def _LinRicatti(self, A, B, C): """_LinRicatti [summary] Returns norm of an equation of the form .. math :: \\begin{align} AX + XA.T + BXB.T + C = 0 \\end{align} :param A: See Equation above :type A: numpy.ndarray :param B: See Equation above :type B: numpy.ndarray :param C: See Equation above :type C: numpy.ndarray :return: X, residue :rtype: tuple(numpy.ndarray, float) """ # n, m = A.shape nc, mc = C.shape if n != m: print("Matrix A has to be square") return -1 M = ( numpy.kron(numpy.identity(n), A) + numpy.kron(A, numpy.identity(n)) + numpy.kron(B, B) ) C = C.reshape(-1, 1) X = -numpy.linalg.pinv(M) @ C X = X.reshape(n, n) C = C.reshape(nc, mc) res = numpy.linalg.norm(A @ X + X @ A.T + B @ X @ B.T + C) return X, res # Counting decorator
[docs] def counted_decorator(f): """counted_decorator Decorator that counts the number of times function f has been called :param f: decorated function :type f: function """ def wrapped(*args, **kwargs): wrapped.calls += 1 return f(*args, **kwargs) wrapped.calls = 0 return wrapped
@counted_decorator def _check_KL(self, Knorm, Lnorm, K, L, matrices): """Check K and L convergence Checks whether K and L have converged, by looking at the variations over the last 5 iterations. :param Knorm: list of the norms of K on each iteration :type Knorm: list(numpy.array) :param Lnorm: list of the norms of L on each iteration :type Lnorm: list(numpy.array) :param K: See Equation in class docstring :type K: numpy.array :param L: See Equation in class docstring :type L: numpy.array :param matrices: Matrix container :type matrices: _MContainer :return: K and L :rtype: tuple(numpy.ndarray, numpy.ndarray) """ average_delta = numpy.convolve( numpy.diff(Lnorm) + numpy.diff(Knorm), numpy.ones(5) / 5, mode="full", )[-5] if average_delta > 0.01: # Arbitrary threshold print( "Warning: the K and L matrices computations did not converge. Retrying with different starting point and a N={:d} search".format( int(20 * 1.3 ** self._check_KL.calls) ) ) K, L = self._compute_Kalman_matrices( matrices, N=int(20 * 1.3 ** self.check_KL.calls) ) else: return K, L