mm-client/device/serial.py
2025-10-22 16:19:08 +02:00

82 lines
1.9 KiB
Python

import logging
import asyncio
import serial
log = logging.getLogger('serial-device')
def _zebra_beep(beep_id):
command = [0x05, 0xE6, 0x04, 0x00, beep_id]
checksum = (~sum(command) + 1)
return bytes(command) + (checksum&0xffff).to_bytes(2, 'big')
def _honeywell_beep(beep_id):
return b"0\x1b" + f"{beep_id}".encode('ASCII') + b','
_beep_map = {
'ack': {'zebra': _zebra_beep(5), 'honeywell': _honeywell_beep(7)},
'nack': {'zebra': _zebra_beep(15), 'honeywell': _honeywell_beep('b')},
'err': {'zebra': _zebra_beep(16), 'honeywell': _honeywell_beep(8)}
}
class SerialDevice:
def __init__(self, path, mm, brand):
self.path = path
self.ser = None
self.run = False
self.mm = mm
self.brand = None
def start(self):
self.run = True
asyncio.create_task(self.loop())
def stop(self):
self.run = False
async def loop(self):
with serial.Serial(self.path) as ser:
# Flush data that was sent during startup
ser.reset_input_buffer()
self.ser = ser
while self.run:
try:
await self.cycle(ser)
except Exception as e:
log.error(f"Halting serial loop due to exception {e}")
return
async def cycle(self, ser):
buf = b''
# Try to read all the remaining data
while ser.in_waiting:
buf += ser.read()
else:
if not buf:
# Sleep until data is available
while not ser.in_waiting:
await asyncio.sleep(0.1) # If there is no data, sleep
return
# We have data, let's apply some basic cleaning
buf = buf.strip(b"\r\n") # Remove line break
log.debug(f"RECV {buf}")
res = await self.mm.send_event("BarcodeScan", {'content': buf.decode('ASCII')}, ack=True)
if beep := res.get('beep'):
if beep not in _beep_map:
beep = 'nack'
if self.brand in _beep_map[beep]:
log.debug(f"BEEP {beep}")
ser.write(_beep_map[beep][brand])
# Reset the input buffer (discard the response)
# TODO: make this more elegant
async with asyncio.timeout(0.5):
ser.read()
ser.reset_input_buffer()