Assistants that simulate users

In many cases it is useful to have an assistant maintain a model of a user, for example to perform single-shot predictions (what will be the next user step?) or complete simulations by means of rollouts (what is the end result if I choose this policy throughout the task?). This predictions can then be used in the decision-making process.

Since in CoopIHC we define users as classes (and/or instances), it seems natural to want to pass a user to an assistant, which could then query it to perform predictions.

Single-shot predictions

Single-shot predictions are easy to implement with basic CoopIHC functions. Below is an example, where we consider a coordination task: the user and assistant have to select the same action to make the task state evolve (+1). The coordination will be successful because the assistant will manage a simulation of the user that provides single-shot predictions of the next user action.

We first modify the ExampleTask that we used in the Quickstart:

 1class CoordinatedTask(InteractionTask):
 2    def __init__(self, *args, **kwargs):
 3        super().__init__(*args, **kwargs)
 4        self.state["x"] = discrete_array_element(init=0, low=0, high=9)
 5
 6    def reset(self, dic=None):
 7        self.state["x"] = 0
 8        return
 9
10    def on_user_action(self, *args, **kwargs):
11        is_done = False
12
13        if self.state["x"] == 9:
14            is_done = True
15
16        if self.round_number == 100:
17            is_done = True
18
19        reward = -1
20        return self.state, reward, is_done
21
22    def on_assistant_action(self, *args, **kwargs):
23        is_done = False
24
25        if self.user_action == self.assistant_action:
26            self.state["x"] += 1
27
28        reward = -1
29        return self.state, reward, is_done

The premise of the task is very simple: If the assistant and user input the same action (self.user_action == self.assistant_action), then the task state is incremented. When the task state reaches 9 the task is finished.

We then create a pseudorandom user, that uses a pseudorandom policy. It picks an action prescribed by the formula \(8 + p_0 \cdot{} x + p_1 \cdot{} x^2 + p_2 \cdot{} x^3 \pmod{10}\) where the p’s are user parameters and x is the task state.

 1class PseudoRandomPolicy(BasePolicy):
 2    def __init__(self, *args, **kwargs):
 3        super().__init__(*args, **kwargs)
 4
 5    @BasePolicy.default_value
 6    def sample(self, agent_observation=None, agent_state=None):
 7        x = agent_observation.task_state.x
 8
 9        _action_value = (
10            8 + self.state.p0 * x + self.state.p1 * x * x + self.state.p2 * x * x * x
11        ) % 10
12        print(f"sampled: {_action_value}")
13        return _action_value, 0

The assistant is then constructed as follows:

 1class CoordinatedAssistant(BaseAgent):
 2    def __init__(self, user_model=None, *args, **kwargs):
 3
 4        self.user_model = user_model
 5
 6        # Call the policy defined above
 7        action_state = State()
 8        action_state["action"] = cat_element(N=10, init=0)
 9
10        # Use default observation and inference engines
11        observation_engine = None
12        inference_engine = None
13
14        super().__init__(
15            "assistant",
16            *args,
17            agent_policy=CoordinatedPolicy(action_state=action_state),
18            agent_observation_engine=observation_engine,
19            agent_inference_engine=inference_engine,
20            **kwargs
21        )
22
23    def finit(self):
24        copy_task = copy.deepcopy(self.task)
25        self.simulation_bundle = Bundle(task=copy_task, user=self.user_model)

Notice that:

  • it expects that a user_model is passed during initialization.

  • it uses the finit mechanism to create a simulation that can be used by the assistant. That simulation is nothing more than a Bundle between the task and the user model.

This simulation is actually used in the policy of the assistant:

 1class CoordinatedPolicy(BasePolicy):
 2    @property
 3    def simulation_bundle(self):
 4        return self.host.simulation_bundle
 5
 6    @BasePolicy.default_value
 7    def sample(self, agent_observation=None, agent_state=None):
 8
 9        reset_dic = {"task_state": agent_observation.task_state}
10
11        self.simulation_bundle.reset(dic=reset_dic)
12        self.simulation_bundle.step(turn=2)
13
14        _action_value = self.simulation_bundle.user.action
15
16        return _action_value, 0

The policy of the assistant is straightforward:

  • Observe the current state of the game, and put the simulation in that state, via the reset mechanism.

  • Play the simulation just enough that the user model takes an action

  • Observe the action that was taken by the user model and pick the same.

At each turn, the assistant takes the same action as the user model. If we provide the assistant with the true model of the user, then the coordination is perfect:

 1user = PseudoRandomUser()
 2user_model = PseudoRandomUser()  # The same as user
 3
 4assistant = CoordinatedAssistant(user_model=user_model)
 5bundle = Bundle(task=CoordinatedTask(), user=user, assistant=assistant)
 6bundle.reset(go_to=3)
 7while True:
 8    obs, rewards, is_done = bundle.step()
 9    # print(bundle.game_state)
10    if is_done:
11        break
12

Rollouts

Usually, we need a more comprehensive simulation that spans several steps and that features a user and an assistant. Using the same assistant simultaneously in a bundle and in a simulation is not straightforward, so CoopIHC provides a few helper classes. In particular, the inference engines, policies etc. used during simulation can not be the same as the ones during execution of the bundle (or, you would have infinite recursion). CoopIHC offers the possibility of having two different inference engines and policies, using the so-called DualInferenceEngine and DualPolicy. Depending on the state of the engine, the primary or the dual engine is used (same for the policy).

To illustrate these, let’s go over a variation of the previous example:

 1task = CoordinatedTask()
 2task_model = CoordinatedTask()
 3
 4user = PseudoRandomUserWithParams(p=[1, 5, 7])
 5user_model = copy.deepcopy(user)
 6
 7
 8assistant = CoordinatedAssistantWithRollout(task_model, user_model, [5, 7])
 9bundle = Bundle(task=task, user=user, assistant=assistant)
10bundle.reset()
11
12while True:
13    obs, rewards, is_done = bundle.step()
14    if is_done:
15        break

Notice that the parameters of the PseudoRandomPolicy are given at initialization with the PseudoRandomUserWithParams (before, they were hard-coded in the user). If you look at the assistant, you see that we pass it a model of the task, a model of the user, as well as two parameters. These parameters are the last two parameters of the user model. The first one is unknown. The point of the assistant is now to infer that parameter using the models of the task and user it was given.

The code for the assistant is as follows:

 1class CoordinatedAssistantWithRollout(BaseAgent):
 2    def __init__(self, task_model, user_model, p, **kwargs):
 3        state = State()
 4        state["p0"] = discrete_array_element(init=0, low=0, high=9)
 5        state["p1"] = discrete_array_element(init=p[0], low=0, high=9)
 6        state["p2"] = discrete_array_element(init=p[1], low=0, high=9)
 7
 8        # Use default observation engine
 9        inference_engine = DualInferenceEngine(
10            primary_inference_engine=RolloutCoordinatedInferenceEngine(
11                task_model, user_model, self
12            ),
13            dual_inference_engine=BaseInferenceEngine(),
14            primary_kwargs={},
15            dual_kwargs={},
16        )
17
18        policy = PseudoRandomPolicy(
19            action_state=State(
20                **{"action": discrete_array_element(init=0, low=0, high=9)}
21            )
22        )
23
24        super().__init__(
25            "assistant",
26            agent_state=state,
27            agent_policy=policy,
28            agent_observation_engine=None,
29            agent_inference_engine=inference_engine,
30            **kwargs
31        )

The state p0 is the one that needs to be determined. Once it is known, the assistant can simply use the PseudoRandomPolicy to select the same action as the user.

The DualInferenceEngine holds two inference engines: the primary RolloutCoordinatedInferenceEngine which is used during the bundle execution, and the dual BaseInferenceEngine which is used for the simulation.

The remaining code is in the RolloutCoordinatedInferenceEngine

 1class RolloutCoordinatedInferenceEngine(BaseInferenceEngine):
 2    def __init__(self, task_model, user_model, assistant, **kwargs):
 3        super().__init__(**kwargs)
 4        self.task_model = task_model
 5        self.user_model = user_model
 6        self.assistant = assistant
 7        self._simulator = None
 8        self.__inference_count = 0
 9
10    # define the simulator here. Simulator is called like a Bundle, but it will use the dual version of objects if available.
11    @property
12    def simulator(self):
13        if self._simulator is None:
14            self._simulator = Simulator(
15                task_model=self.task_model,
16                user_model=self.user_model,
17                assistant=self.assistant,
18            )
19        return self._simulator
20
21    @BaseInferenceEngine.default_value
22    def infer(self, agent_observation=None):
23
24        if (
25            self.__inference_count > 0
26        ):  # If it is the first time there is inference, continue, else just perform a BaseInference. We can do this because we know the parameter p[0] will not evolve over time.
27            return super().infer(agent_observation=agent_observation)
28
29        self.__inference_count += 1
30
31        agent_state = getattr(agent_observation, f"{self.role}_state")
32
33        mem_state = copy.deepcopy(
34            agent_state
35        )  # agent state will be altered in the simulator, so keep a copy of it for reference.
36
37        # For the 10 possible values, completely simulate them. The right parameter is the one that leads to the maximum rewards
38        rew = [0 for i in range(10)]
39        for i in range(10):  # Exhaustively try out all cases
40            # load the simulation with the right parameters
41            reset_dic = copy.deepcopy(agent_observation)
42            # try out a new state
43            del reset_dic["assistant_state"]
44            del reset_dic["game_info"]
45            reset_dic = {
46                **{"game_info": {"round_index": 0, "turn_index": 0}},
47                **reset_dic,
48                **{
49                    "assistant_state": {
50                        "p0": i,
51                        "p1": mem_state.p1,
52                        "p2": mem_state.p2,
53                    }
54                },
55            }
56            self.simulator.reset(dic=reset_dic)
57            while True:
58                state, rewards, is_done = self.simulator.step()
59                rew[i] += sum(rewards.values())
60
61                if is_done:
62                    break
63
64        # Don't forget to close the simulator when you are finished.
65        self.simulator.close()
66        index = numpy.argmax(rew)
67        self.state["p0"] = index
68        return self.state, 0

First, we define a simulator object. For that, simply instantiate a Simulator as you would a Bundle. The difference between a simulator and a bundle is that the former will consider the dual versions of the objects. The inference is then straightforward: All possible values of p0 are tested, and the correct one is the one that leads to the highest reward.