Source code for coopihc.bundle.wrappers.PipedTaskBundleWrapper

from coopihc.bundle.BaseBundle import BaseBundle
import time


[docs]class PipedTaskBundleWrapper(BaseBundle): """PipedTaskBundleWrapper Wrap a Bundle so that its task gets replaced by a version of the task which emits messages via a pipe. :param bundle: bundle to wrap :type bundle: :py:mod:`Bundle<coopihc.bundle>` :param taskwrapper: Task wrapper :type taskwrapper: :py:class:`PipeTaskWrapper<coopihc.interactiontask.PipeTaskWrapper.PipeTaskWrapper>` :param pipe: Pipe through which messages are sent :type pipe: :py:class:`Pipe <subprocess.Pipe>` """ # Wrap it by taking over bundles attribute via the instance __dict__. Methods can not be taken like that since they belong to the class __dict__ and have to be called via self.bundle.method() def __init__(self, bundle, taskwrapper, pipe): self.__dict__ = bundle.__dict__ # take over bundles attributes self.bundle = bundle self.pipe = pipe pipedtask = taskwrapper(bundle.task, pipe) self.bundle.task = pipedtask # replace the task with the piped task bundle_kwargs = bundle.kwargs bundle_class = self.bundle.__class__ self.bundle = bundle_class( pipedtask, bundle.user, bundle.assistant, **bundle_kwargs ) self.framerate = 1000 self.iter = 0 self.run()
[docs] def run(self, reset_dic={}, **kwargs): """run the bundle Play the game. :param reset_dic: reset dictionnary, see :py:mod:`Bundle API<coopihc.bundle>`, defaults to {} :type reset_dic: dict, optional """ reset_kwargs = kwargs.get("reset_kwargs") if reset_kwargs is None: reset_kwargs = {} self.bundle.reset(dic=reset_dic, **reset_kwargs) time.sleep(1 / self.framerate) while True: obs, sum_reward, is_done, rewards = self.bundle.step() time.sleep(1 / self.framerate) if is_done: break self.end()
[docs] def end(self): """end Send 'done' over the pipe to signal that the game is over """ self.pipe.send("done")