# Some of the code is adapted from https://github.com/IRLL/HIPPO_Gym/
import asyncio, websockets, json, sys
import time
from multiprocessing import Process, Pipe
from coopihc.interactiontask import PipeTaskWrapper
from coopihc.bundle.wrappers import PipedTaskBundleWrapper
# like functools.partial, but with arguments added from the back
[docs]def partialback(func, *extra_args):
like functools.partial, but with arguments added from the back
def wrapper(*args):
args = list(args)
return func(*args)
return wrapper
[docs]class WsServer:
"""WebSocket Server for Bundle
A Websocket server that handles a bundle with an external task that communicates with that server.
:param bundle: the bundle to serve
:type bundle: :py:class:`Bundle<coopihc.bundle.Bundle.Bundle>`
:param taskwrapper: Task wrapper
:type taskwrapper: :py:class:`PipeTaskWrapper<coopihc.interactiontask.PipeTaskWrapper.PipeTaskWrapper>`
:param address: server address, defaults to "localhost"
:type address: str, optional
:param port: port number, defaults to 4000
:type port: int, optional
def __init__(self, bundle, taskwrapper, address="localhost", port=4000):
self.start_server = websockets.serve(
partialback(self.bundlehandler, bundle, taskwrapper), address, port
self.user = None
self.bundle = bundle
[docs] def start(self):
Start the server
[docs] async def bundlehandler(self, websocket, path, bundle, taskwrapper):
On websocket connection, starts a new userTrial in a new Process.
Then starts async listeners for sending and recieving messages.
:param websocket: address
:type websocket: string
:param path: portnumber
:type path: int
:param bundle: the bundle to serve
:type bundle: :py:class:`Bundle<coopihc.bundle.Bundle.Bundle>`
:param taskwrapper: Task wrapper
:type taskwrapper: :py:class:`PipeTaskWrapper<coopihc.interactiontask.PipeTaskWrapper.PipeTaskWrapper>`
await self.register(websocket)
bundlepipeup, bundlepipedown = Pipe()
process = Process(
target=PipedTaskBundleWrapper, args=(bundle, taskwrapper, bundlepipedown)
consumerTask = asyncio.ensure_future(
self.consumer_handler(websocket, bundlepipeup)
producerTask = asyncio.ensure_future(
self.producer_handler(websocket, bundlepipeup)
done, pending = await asyncio.wait(
[consumerTask, producerTask], return_when=asyncio.FIRST_COMPLETED
for task in pending:
await websocket.close()
[docs] async def register(self, websocket):
Keep track of clients.
:param websocket: address
:type websocket: string
self.user = websocket
print("new task connected: {}".format(str(websocket)))
[docs] async def consumer_handler(self, websocket, pipe):
When messages from websocket are received, send them over the pipe
:param websocket: address
:type websocket: string
:param pipe: Pipe through which messages are sent
:type pipe: :py:class:`Pipe <subprocess.Pipe>`
async for message in websocket:
print("received message {}".format(message))
# print("json", [[key, value, type(value)] for key, value in json.loads(message).items()])
[docs] async def producer_handler(self, websocket, pipe):
Look for messages to send from the Bundle.
asyncio.sleep() is required to make this non-blocking
default sleep time is (0.01) which creates a maximum framerate of
just under 100 frames/s. For faster framerates decrease sleep time
however be aware that this will affect the ability of the
consumer_handler function to keep up with messages from the websocket
and may cause poor performance if the web-client is sending a high volume of messages.
:param websocket: address
:type websocket: string
:param pipe: Pipe through which messages are sent
:type pipe: :py:class:`Pipe <subprocess.Pipe>`
done = False
while True:
done = await self.producer(websocket, pipe)
await asyncio.sleep(0.01)
[docs] async def producer(self, websocket, pipe):
Check pipe for messages to send to websocket.
If the process is done, send final message to websocket and return
True to tell calling functions that the process is complete.
:param websocket: address
:type websocket: string
:param pipe: Pipe through which messages are sent
:type pipe: :py:class:`Pipe <subprocess.Pipe>`
:return: [description]
:rtype: [type]
if pipe.poll():
message = pipe.recv()
if message == "done":
await websocket.send(json.dumps({"type": "done"}))
return True
print("sending message: \t {}".format(json.dumps(message)))
await websocket.send(json.dumps(message))
return False