import copy
import math
import copy
from coopihc.base.Space import Space
from coopihc.base.State import State
from coopihc.helpers import sort_two_lists
from coopihc.policy.BasePolicy import BasePolicy
import numpy
# ============= Discrete Policies =================
# ----------- Bayesian Information Gain Policy ---------
[docs]class BIGDiscretePolicy(BasePolicy):
"""BIGDiscretePolicy [summary]
Bayesian Information Gain Policy, adapted from [1]_.
The main ideas/assumptions are:
* A user wants the task to go to some goal state :math:`\\Theta`
* The assistant can put the task in a number of states (X)
* The user can perform a given set of action Y
* A model :math:`p(Y=y|X=X, \\Theta = \\theta)` exists for user behavior
Make sure to call:
* attach_set_theta, to specify the potential goal states
* attach_transition_function, to specify how the task state evolves after an assistant action
.. [1] Liu, Wanyu, et al. "Bignav: Bayesian information gain for guiding multiscale navigation." Proceedings of the 2017 CHI Conference on Human Factors in Computing Systems. 2017.
:param assistant_action_state: action state of the assistant
:type assistant_action_state: `State<coopihc.base.State.State>`
:param user_policy_model: user policy model. This may be the real policy of the user, but realistically has to be a model of the user policy. This policy must currently be an `ELLDiscretePolicy<coopihc.policy.ELLDiscretePolicy.ELLDiscretePolicy>`.
:type user_policy_model: ELLDiscretePolicy<coopihc.policy.ELLDiscretePolicy.ELLDiscretePolicy>`
"""
def __init__(
self, assistant_action_state, user_policy_model, *args, threshold=0.8, **kwargs
):
self.threshold = threshold
super().__init__(*args, action_state=assistant_action_state, **kwargs)
self.assistant_action_set = Space.cartesian_product(
self.action_state["action"].space
)[0]
self.user_policy_model = user_policy_model
self.user_action_set = Space.cartesian_product(
user_policy_model.action_state["action"].space
)[0]
self.user_policy_likelihood_function = user_policy_model.compute_likelihood
def attach_set_theta(self, set_theta):
self.set_theta = set_theta
def attach_transition_function(self, trans_func):
self.transition_function = trans_func
[docs] def PYy_Xx(self, user_action, assistant_action, potential_states, beliefs):
""":math:`P(Y=y|X=x)`
Computes the conditional probability :math:`P(Y=y|X=x)`, where X is the assistant outcome and Y the user's response.
:param user_action: user action y for which the condition is computed
:type user_action: `StateElement<coopihc.base.StateElement.StateElement>`
:param assistant_action: assistant action to be evaluated
:type assistant_action: `StateElement<coopihc.base.StateElement.StateElement>`
:param potential_states: collection of potential goal states
:type potential_states: iterable
:param beliefs: (list) beliefs for each target
:type beliefs: (list) beliefs for each target
:return: the conditional :math:`P(Y=y|X=x)`
:rtype: list
"""
pYy__Xx = 0
for potential_state, belief in zip(potential_states, beliefs):
pYy__Xx += (
self.user_policy_likelihood_function(user_action, potential_state)
* belief
)
return pYy__Xx
[docs] def HY__Xx(self, potential_states, assistant_action, beliefs):
""":math:`H(Y |X=x)`
Computes the conditional entropy :math:`H(Y |X=x) = -\mathbb{E}[\log(p(Y|X=x))]`.
:param assistant_action: assistant action to be evaluated
:type assistant_action: `StateElement<coopihc.base.StateElement.StateElement>`
:param potential_states: collection of potential goal states
:type potential_states: iterable
:param beliefs: (list) beliefs for each target
:type beliefs: (list) beliefs for each target
:return: :math:`H(Y |X=x)`
:rtype: float
"""
H = 0
for user_action in self.user_action_set:
pYy_Xx = self.PYy_Xx(
user_action, assistant_action, potential_states, beliefs
)
if pYy_Xx != 0:
H += -pYy_Xx * math.log(pYy_Xx, 2)
return H
[docs] def HY__OoXx(self, potential_states, beliefs):
""":math:`H(Y |\Theta = \theta, X=x)`
Computes the conditional entropy :math:`H(Y |\Theta = \theta, X=x) = -\mathbb{E}[\log(p(Y|\Theta = \theta, X=x))]`.
:param potential_states: collection of potential goal states
:type potential_states: iterable
:param beliefs: (list) beliefs for each target
:type beliefs: (list) beliefs for each target
:return: :math:`H(Y |\Theta = \theta, X=x)`
:rtype: float
"""
H = 0
for user_action in self.user_action_set:
for potential_state, belief in zip(potential_states, beliefs):
pYy__OoXx = self.user_policy_likelihood_function(
user_action, potential_state
)
if pYy__OoXx != 0: # convention: 0 log 0 = 0
H += -belief * pYy__OoXx * math.log(pYy__OoXx, 2)
return H
[docs] def IG(self, assistant_action, observation, beliefs):
"""Information Gain :math:`\mathrm{IG}(X=x)`
Computes the expected information gain :math:`\mathrm{IG}(X=x) = H(Y |X=x) - H(Y |\Theta = \theta, X=x)` for a potential assistant action x.
:param assistant_action: assistant action to be evaluated
:type assistant_action: `StateElement<coopihc.base.StateElement.StateElement>`
:param observation: current assistant observation
:type observation: `State<coopihc.base.State.State>`
:param beliefs: (list) beliefs for each target
:type beliefs: (list) beliefs for each target
:return: [description]
:rtype: [type]
"""
observation = self.transition_function(assistant_action, observation)
potential_states = []
for nt, t in enumerate(self.set_theta):
# Deepcopy would be safer, but copy should do. Deepcopy is much more expensive to produce.
# potential_state = copy.deepcopy(observation)
potential_state = copy.copy(observation)
for key, value in t.items():
try:
potential_state[key[0]][key[1]] = value
except KeyError: # key[0] is not in observation
_state = State()
_state[key[1]] = value
potential_state[key[0]] = _state
potential_states.append(potential_state)
return self.HY__Xx(potential_states, assistant_action, beliefs) - self.HY__OoXx(
potential_states, beliefs
)
[docs] def find_best_action(self):
"""find_best_action
Finds expected information gain associated with each possible future cursor position and ranks them in order from the most to less informative.
:return: (assistant actions, associated information gain)
:rtype: tuple(list, list)
"""
beliefs = self.host.state["beliefs"]
index = numpy.argmax(beliefs)
hp = beliefs[index]
if hp > self.threshold:
targets = self.observation["task_state"]["targets"]
hp_target = targets[index]
return [hp_target], [None]
else:
observation = self.observation
IG_storage = [
self.IG(action, observation, beliefs.squeeze().tolist())
for action in self.assistant_action_set
]
_IG, action = sort_two_lists(
IG_storage, self.assistant_action_set, lambda pair: pair[0]
)
action.reverse(), _IG.reverse()
return action, _IG
@BasePolicy.default_value
def sample(self, agent_observation=None, agent_state=None):
"""sample
Choose action (select the action with highest expected information gain)
:return: (assistant action, associated reward)
:rtype: tuple(`StateElement<coopihc.base.StateElement.StateElement>`, float)
"""
self._actions, self._IG = self.find_best_action()
return self._actions[0], 0