mm-client/client.py
2025-10-22 16:19:08 +02:00

90 lines
2.1 KiB
Python

from config import *
from httpx_ws import aconnect_ws
import orjson
import asyncio
import uuid
class MMClient:
def __init__(self):
self.url = WEBSOCKET_URL
self.ws = None
self.lock = asyncio.Lock()
self.recv_queue = asyncio.Queue()
self.send_queue = asyncio.Queue()
self.pending_acks = {}
self.connected = False
async def run(self):
while True:
try:
async with aconnect_ws(self.url) as ws:
self.ws = ws
self.connected = True
send_task = asyncio.create_task(self._send_loop())
recv_task = asyncio.create_task(self._recv_loop())
await asyncio.gather(send_task, recv_task)
except Exception as e:
print(f"Got exception: {e}. Retrying in 5 seconds...")
self.connected = False
await asyncio.sleep(5)
async def _send_loop(self):
while True:
packet = await self.send_queue.get()
# Requeue packet for later if disconnected
if not self.connected:
await self.send_queue.put(packet)
await asyncio.sleep(1)
continue
print(packet)
packet = orjson.dumps(packet)
await self.ws.send_bytes(packet)
async def _recv_loop(self):
while True:
try:
packet = await self.ws.receive()
packet = orjson.loads(packet.data)
except Exception as e:
print("Something went wrong", e)
continue
if nonce := packet.get('nonce'):
if nonce and nonce in self.pending_acks:
fut = self.pending_acks.pop(nonce)
if not fut.done():
fut.set_result(packet)
print("RECEIVED EVENT", packet)
async def send_event(self, event_id, payload, ack=False):
packet = {
"_": event_id,
"payload": payload,
"nonce": str(uuid.uuid1())
}
print("Sending packet...")
if ack:
fut = asyncio.get_event_loop().create_future()
self.pending_acks[str(packet["nonce"])] = fut
await self.send_queue.put(packet)
if ack:
try:
async with asyncio.timeout(2):
print("WAITING FOR RETURN....")
result = await fut
except asyncio.TimeoutError:
print("RETURN DID NOT ARRIVE....")
return False
else:
print("GOT RETURN RESULT....", result)
return result
print("CCCCCCC")
return