90 lines
2.1 KiB
Python
90 lines
2.1 KiB
Python
from config import *
|
|
from httpx_ws import aconnect_ws
|
|
import orjson
|
|
import asyncio
|
|
import uuid
|
|
|
|
class MMClient:
|
|
def __init__(self):
|
|
self.url = WEBSOCKET_URL
|
|
self.ws = None
|
|
self.lock = asyncio.Lock()
|
|
self.recv_queue = asyncio.Queue()
|
|
self.send_queue = asyncio.Queue()
|
|
self.pending_acks = {}
|
|
self.connected = False
|
|
|
|
async def run(self):
|
|
while True:
|
|
try:
|
|
async with aconnect_ws(self.url) as ws:
|
|
self.ws = ws
|
|
self.connected = True
|
|
send_task = asyncio.create_task(self._send_loop())
|
|
recv_task = asyncio.create_task(self._recv_loop())
|
|
await asyncio.gather(send_task, recv_task)
|
|
except Exception as e:
|
|
print(f"Got exception: {e}. Retrying in 5 seconds...")
|
|
self.connected = False
|
|
await asyncio.sleep(5)
|
|
|
|
async def _send_loop(self):
|
|
while True:
|
|
packet = await self.send_queue.get()
|
|
|
|
# Requeue packet for later if disconnected
|
|
if not self.connected:
|
|
await self.send_queue.put(packet)
|
|
await asyncio.sleep(1)
|
|
continue
|
|
|
|
print(packet)
|
|
packet = orjson.dumps(packet)
|
|
await self.ws.send_bytes(packet)
|
|
|
|
async def _recv_loop(self):
|
|
while True:
|
|
try:
|
|
packet = await self.ws.receive()
|
|
packet = orjson.loads(packet.data)
|
|
except Exception as e:
|
|
print("Something went wrong", e)
|
|
continue
|
|
|
|
if nonce := packet.get('nonce'):
|
|
if nonce and nonce in self.pending_acks:
|
|
fut = self.pending_acks.pop(nonce)
|
|
if not fut.done():
|
|
fut.set_result(packet)
|
|
|
|
print("RECEIVED EVENT", packet)
|
|
|
|
async def send_event(self, event_id, payload, ack=False):
|
|
packet = {
|
|
"_": event_id,
|
|
"payload": payload,
|
|
"nonce": str(uuid.uuid1())
|
|
}
|
|
|
|
|
|
print("Sending packet...")
|
|
if ack:
|
|
fut = asyncio.get_event_loop().create_future()
|
|
self.pending_acks[str(packet["nonce"])] = fut
|
|
|
|
await self.send_queue.put(packet)
|
|
|
|
if ack:
|
|
try:
|
|
async with asyncio.timeout(2):
|
|
print("WAITING FOR RETURN....")
|
|
result = await fut
|
|
except asyncio.TimeoutError:
|
|
print("RETURN DID NOT ARRIVE....")
|
|
return False
|
|
else:
|
|
print("GOT RETURN RESULT....", result)
|
|
return result
|
|
|
|
print("CCCCCCC")
|
|
return
|