Source code for coopihc.policy.BasePolicy

import numpy
import copy

from coopihc.base.State import State
from coopihc.base.elements import discrete_array_element, array_element, cat_element
from coopihc.base.elements import cat_element


# ============== General Policies ===============


[docs]class BasePolicy: """BasePolicy Base Policy class. Randomly samples from the action state. You have can provide an action state as an argument (args[0]). If no action state is provided, the policy is initialized with an action state with a single 'None' action. """ def __init__(self, *args, action_state=None, **kwargs): self._action_keys = None # For actionkeys property # If a state is provided, use it; else create one (important not to lose the reference w/r the game_state) if action_state is None: action_state = State() action_state["action"] = cat_element(N=2, init=0) self.action_state = action_state self.host = None # https://stackoverflow.com/questions/1015307/python-bind-an-unbound-method def _bind(self, func, as_name=None): if as_name is None: as_name = func.__name__ bound_method = func.__get__(self, self.__class__) setattr(self, as_name, bound_method) return bound_method def __content__(self): return self.__class__.__name__ @property def parameters(self): try: return self.host.parameters except AttributeError: raise AttributeError( "This policy has not been connected to an agent yet -- You can't access this agent's parameters" ) @property def state(self): try: return self.host.state except AttributeError: raise AttributeError( "This policy has not been connected to an agent yet -- You can't access this agent's state" ) @property def observation(self): """observation Return the last observation. :return: last observation :rtype: `State<coopihc.base.State.State>` """ try: return self.host.observation except AttributeError: raise AttributeError( "This policy has not been connected to an agent yet -- You can't access this agent's observation" ) @property def action_keys(self): if self._action_keys is None: self._action_keys = self.action_state.keys() return self._action_keys @property def action(self): """action Return the last action. :return: last action :rtype: `State<coopihc.base.StateElement.StateElement>` """ actions = tuple(self.action_state.values()) if len(actions) == 1: return next(iter(actions)) return actions @action.setter def action(self, item): try: next(iter(item)) except TypeError: item = (item,) for _action, key in zip(item, self.action_keys): self.action_state[key][...] = _action @property def unwrapped(self): return self
[docs] def default_value(func): """Apply this decorator to use bundle.game_state as default value to observe if game_state = None""" def wrapper_default_value(self, agent_observation=None, agent_state=None): if agent_observation is None: agent_observation = self.host.observation if agent_state is None: agent_state = self.state return func( self, agent_observation=agent_observation, agent_state=agent_state ) return wrapper_default_value
[docs] def reset(self, random=True): """reset Reset the policy :param random: reset the policy, defaults to True. Here in case of subclassing BasePolicy. :type random: bool, optional """ if random: self.action_state.reset()
def _base_sample(self, agent_observation=None, agent_state=None): action, reward = self.sample( agent_observation=agent_observation, agent_state=agent_state ) self.action = action return self.action, reward @default_value def sample(self, agent_observation=None, agent_state=None): """sample (Randomly) Sample from the policy :return: (action, action reward) :rtype: (StateElement<coopihc.base.StateElement.StateElement>, float) """ try: _ = [_action.reset() for _action in self.action] except TypeError: self.action.reset() return self.action, 0 def __repr__(self): try: return self.action_state.__str__() except AttributeError: return "Policy--unreadable"