82 lines
1.9 KiB
Python
82 lines
1.9 KiB
Python
import logging
|
|
import asyncio
|
|
import serial
|
|
|
|
log = logging.getLogger('serial-device')
|
|
|
|
def _zebra_beep(beep_id):
|
|
command = [0x05, 0xE6, 0x04, 0x00, beep_id]
|
|
checksum = (~sum(command) + 1)
|
|
return bytes(command) + (checksum&0xffff).to_bytes(2, 'big')
|
|
|
|
def _honeywell_beep(beep_id):
|
|
return b"0\x1b" + f"{beep_id}".encode('ASCII') + b','
|
|
|
|
_beep_map = {
|
|
'ack': {'zebra': _zebra_beep(5), 'honeywell': _honeywell_beep(7)},
|
|
'nack': {'zebra': _zebra_beep(15), 'honeywell': _honeywell_beep('b')},
|
|
'err': {'zebra': _zebra_beep(16), 'honeywell': _honeywell_beep(8)}
|
|
}
|
|
|
|
class SerialDevice:
|
|
def __init__(self, path, mm, brand):
|
|
self.path = path
|
|
self.ser = None
|
|
self.run = False
|
|
self.mm = mm
|
|
self.brand = None
|
|
|
|
def start(self):
|
|
self.run = True
|
|
asyncio.create_task(self.loop())
|
|
|
|
def stop(self):
|
|
self.run = False
|
|
|
|
async def loop(self):
|
|
with serial.Serial(self.path) as ser:
|
|
|
|
# Flush data that was sent during startup
|
|
ser.reset_input_buffer()
|
|
|
|
self.ser = ser
|
|
while self.run:
|
|
try:
|
|
await self.cycle(ser)
|
|
except Exception as e:
|
|
log.error(f"Halting serial loop due to exception {e}")
|
|
return
|
|
|
|
async def cycle(self, ser):
|
|
|
|
buf = b''
|
|
|
|
# Try to read all the remaining data
|
|
while ser.in_waiting:
|
|
buf += ser.read()
|
|
else:
|
|
if not buf:
|
|
# Sleep until data is available
|
|
while not ser.in_waiting:
|
|
await asyncio.sleep(0.1) # If there is no data, sleep
|
|
return
|
|
|
|
# We have data, let's apply some basic cleaning
|
|
buf = buf.strip(b"\r\n") # Remove line break
|
|
|
|
log.debug(f"RECV {buf}")
|
|
res = await self.mm.send_event("BarcodeScan", {'content': buf.decode('ASCII')}, ack=True)
|
|
|
|
if beep := res.get('beep'):
|
|
if beep not in _beep_map:
|
|
beep = 'nack'
|
|
|
|
if self.brand in _beep_map[beep]:
|
|
log.debug(f"BEEP {beep}")
|
|
ser.write(_beep_map[beep][brand])
|
|
|
|
# Reset the input buffer (discard the response)
|
|
# TODO: make this more elegant
|
|
async with asyncio.timeout(0.5):
|
|
ser.read()
|
|
ser.reset_input_buffer()
|