Source code for coopihc.policy.LinearFeedback

import numpy
from coopihc.policy.BasePolicy import BasePolicy

# ============== General Policies ===============


[docs]class LinearFeedback(BasePolicy): """LinearFeedback Linear Feedback policy, which applies a feedback gain to a given state of the observation, and passes it to a function. For example with: + ``state_indicator = ('user_state', 'substate1', slice(0,2,1))`` + ``feedback_gain = -numpy.eye(2)`` + ``noise_function = f(action, observation, *args)`` + ``noise_func_args = (1,2)`` You will get .. code-block:: python obs = observation['user_state']['substate_1'][slice(0,2,1)] action = - -numpy.eye(2 @ obs) return f(action, observation, *(1,2)) You can change the feedback gain online via the ``set_feedback_gain()`` method :param action_state: see `BasePolicy<coopihc.policy.BasePolicy.BasePolicy` :type action_state: `State<coopihc.base.State.State` :param state_indicator: specifies which component is used as feedback information e.g. ``('user_state', 'substate1', slice(0,2,1))`` :type state_indicator: iterable :param feedback_gain: Feedback gain matrix, defaults to "identity", which creates a negative identity matrix. :type feedback_gain: str or numpy.ndarray, optional :param noise_function: a function that produces a noise sample to add to the generated action, defaults to None :type noise_function: function, optional :param noise_func_args: arguments to the function above, defaults to () :type noise_func_args: tuple, optional """ def __init__( self, action_state, state_indicator, *args, feedback_gain="identity", noise_function=None, noise_func_args=(), **kwargs ): super().__init__(*args, action_state=action_state, **kwargs) self.state_indicator = state_indicator self.feedback_gain = feedback_gain self.noise_function = noise_function self.noise_args = noise_func_args
[docs] def set_feedback_gain(self, gain): """set_feedback_gain set feedback gain. Only needed if the gain needs to be changed after initialization, otherwise it is recommended to set the gain during initialiation of the policy. :param gain: feedback gain matrix :type gain: numpy.ndarray """ self.feedback_gain = gain
@BasePolicy.default_value def sample(self, agent_observation=None, agent_state=None): """sample Applies the gain. :return: action, reward :rtype: tuple(`StateElement<coopihc.base.StateElement.StateElement>`, float) """ output = agent_observation for item in self.state_indicator: output = output[item] output = output.view(numpy.ndarray) if isinstance( self.feedback_gain, str ): # Checking type is needed to suppress FutureWarning: elementwise comparison failed; returning scalar instead, but in the future will perform elementwise comparison in Python 3.8 + NumPy 1.22.0 if self.feedback_gain == "identity": self.feedback_gain = -numpy.eye(max(output.shape)) noiseless_feedback = -self.feedback_gain @ output.reshape((-1, 1)) if self.noise_function is None: return noiseless_feedback, 0 noisy_action = self.noise_function( noiseless_feedback, agent_observation, *self.noise_args ) return noisy_action, 0