Tasks

Tasks represent the agent’s environment. Usually in the CoopIHC context, the task will represent the part of an interface that the user can interact with and drive to a certain state.

Essentially, tasks are characterized by:

  • An internal state called the task state which holds all the task’s information; for example, the state of the interface.

  • A on_user_action() method, which is a transition function that describes how the task state changes on receiving a user action.

  • An on_assistant_action() method, which is a transition function that describes how the task state changes based on the assistant action.

As an example, let’s define a simple task where the goal of the user is to drive the substate called ‘x’ to a value of 4. Both the user and the assistant can provide three actions: -1, +0 and +1. We define a task by inheriting from InteractionTask and redefining a few methods.

 1class ExampleTask(InteractionTask):
 2    """ExampleTask
 3
 4    An example algebraic task which a single task state 'x', which finishes when x = 4.
 5
 6    """
 7
 8    def __init__(self, *args, **kwargs):
 9
10        # Call super().__init__() beofre anything else, which initializes some useful attributes, including a State (self.state) for the task
11
12        super().__init__(*args, **kwargs)
13
14        # Describe the state. Here it is a single item which takes value in [-4, -3, ..., 3, 4]. The StateElement has out_of_bounds_mode = clip, which means that values outside the range will automatically be clipped to fit the space.
15        self.state["x"] = discrete_array_element(
16            init=0, low=-1, high=4, out_of_bounds_mode="clip"
17        )
18
19    def reset(self, dic=None):
20        # Always start with state 'x' at 0
21        self.state["x"] = 0
22        return
23
24    def on_user_action(self, *args, **kwargs):
25        # Modify the state in place, adding the user action
26        is_done = False
27        # self.state["x"] = self.state["x"] + self.user_action
28        self.state["x"] += self.user_action
29
30        # Stopping condition, return is_done boolean floag
31        if self.state["x"] == 4:
32            is_done = True
33
34        reward = -1
35        return self.state, reward, is_done
36
37    def on_assistant_action(self, *args, **kwargs):
38        is_done = False
39        # Modify the state in place, adding the assistant action
40        self.state["x"] += self.assistant_action
41        # Stopping condition, return is_done boolean floag
42        if self.state["x"] == 4:
43            is_done = True
44
45        reward = -1
46        return self.state, reward, is_done

Some comments on the code snippet above:

  • The task state 'x' is defined in the __init__ method. Remember to always call super()’s __init__ before anything else to ensure all necessary variables internal to CoopIHC are set.

  • The reset method resets the task to an initial state, in this case 'x'=0. You don’t have to define a reset method, in which case it will inherit it from :py:class:InteractionTask<coopihc.interactiontask.InteractionTask>, and the reset method will randomly pick values for each state.

  • You have to define a user and assistant step function otherwise an error will be raised. Both of these are expected to return the triple (task state, reward, is_done).

  • A render method is available if you want to render the task online, see :py:class:InteractionTask<coopihc.interactiontask.InteractionTask>

ClassicControlTask

PipeTaskWrapper