import logging import asyncio import serial log = logging.getLogger('serial-device') def _zebra_beep(beep_id): command = [0x05, 0xE6, 0x04, 0x00, beep_id] checksum = (~sum(command) + 1) return bytes(command) + (checksum&0xffff).to_bytes(2, 'big') def _honeywell_beep(beep_id): return b"0\x1b" + f"{beep_id}".encode('ASCII') + b',' _beep_map = { 'ack': {'zebra': _zebra_beep(5), 'honeywell': _honeywell_beep(7)}, 'nack': {'zebra': _zebra_beep(15), 'honeywell': _honeywell_beep('b')}, 'err': {'zebra': _zebra_beep(16), 'honeywell': _honeywell_beep(8)} } class SerialDevice: def __init__(self, path, mm, brand): self.path = path self.ser = None self.run = False self.mm = mm self.brand = None def start(self): self.run = True asyncio.create_task(self.loop()) def stop(self): self.run = False async def loop(self): with serial.Serial(self.path) as ser: # Flush data that was sent during startup ser.reset_input_buffer() self.ser = ser while self.run: try: await self.cycle(ser) except Exception as e: log.error(f"Halting serial loop due to exception {e}") return async def cycle(self, ser): buf = b'' # Try to read all the remaining data while ser.in_waiting: buf += ser.read() else: if not buf: # Sleep until data is available while not ser.in_waiting: await asyncio.sleep(0.1) # If there is no data, sleep return # We have data, let's apply some basic cleaning buf = buf.strip(b"\r\n") # Remove line break log.debug(f"RECV {buf}") res = await self.mm.send_event("BarcodeScan", {'content': buf.decode('ASCII')}, ack=True) if beep := res.get('beep'): if beep not in _beep_map: beep = 'nack' if self.brand in _beep_map[beep]: log.debug(f"BEEP {beep}") ser.write(_beep_map[beep][brand]) # Reset the input buffer (discard the response) # TODO: make this more elegant async with asyncio.timeout(0.5): ser.read() ser.reset_input_buffer()