from config import * from httpx_ws import aconnect_ws import orjson import asyncio import uuid class MMClient: def __init__(self): self.url = WEBSOCKET_URL self.ws = None self.lock = asyncio.Lock() self.recv_queue = asyncio.Queue() self.send_queue = asyncio.Queue() self.pending_acks = {} self.connected = False async def run(self): while True: try: async with aconnect_ws(self.url) as ws: self.ws = ws self.connected = True send_task = asyncio.create_task(self._send_loop()) recv_task = asyncio.create_task(self._recv_loop()) await asyncio.gather(send_task, recv_task) except Exception as e: print(f"Got exception: {e}. Retrying in 5 seconds...") self.connected = False await asyncio.sleep(5) async def _send_loop(self): while True: packet = await self.send_queue.get() # Requeue packet for later if disconnected if not self.connected: await self.send_queue.put(packet) await asyncio.sleep(1) continue print(packet) packet = orjson.dumps(packet) await self.ws.send_bytes(packet) async def _recv_loop(self): while True: try: packet = await self.ws.receive() packet = orjson.loads(packet.data) except Exception as e: print("Something went wrong", e) continue if nonce := packet.get('nonce'): if nonce and nonce in self.pending_acks: fut = self.pending_acks.pop(nonce) if not fut.done(): fut.set_result(packet) print("RECEIVED EVENT", packet) async def send_event(self, event_id, payload, ack=False): packet = { "_": event_id, "payload": payload, "nonce": str(uuid.uuid1()) } print("Sending packet...") if ack: fut = asyncio.get_event_loop().create_future() self.pending_acks[str(packet["nonce"])] = fut await self.send_queue.put(packet) if ack: try: async with asyncio.timeout(2): print("WAITING FOR RETURN....") result = await fut except asyncio.TimeoutError: print("RETURN DID NOT ARRIVE....") return False else: print("GOT RETURN RESULT....", result) return result print("CCCCCCC") return