From f47b9178ad62519d1a6df9c9124ae4d8b6c0c105 Mon Sep 17 00:00:00 2001 From: Fahad Afroze Date: Mon, 27 Nov 2023 11:55:35 +0000 Subject: [PATCH] Added GET_REPORT and SET_REPORT changes Added changes to handle invalid cases --- bumble/hid.py | 16 ++++++++--- examples/run_hid_device.py | 55 +++++++++++++++++++++++--------------- 2 files changed, 45 insertions(+), 26 deletions(-) diff --git a/bumble/hid.py b/bumble/hid.py index 059621c6..c9205102 100644 --- a/bumble/hid.py +++ b/bumble/hid.py @@ -380,6 +380,7 @@ class Device(HID): REPORT_ID_NOT_FOUND = 0x01 ERR_UNSUPPORTED_REQUEST = 0x02 ERR_UNKNOWN = 0x03 + ERR_INVALID_PARAMETER = 0x04 SUCCESS = 0xFF class GetSetStatus: @@ -433,6 +434,8 @@ class Device(HID): elif ret.status == self.GetSetReturn.REPORT_ID_NOT_FOUND: self.send_handshake_message(Message.Handshake.ERR_INVALID_REPORT_ID) + elif ret.status == self.GetSetReturn.ERR_INVALID_PARAMETER: + self.send_handshake_message(Message.Handshake.ERR_INVALID_PARAMETER) elif ret.status == self.GetSetReturn.ERR_UNSUPPORTED_REQUEST: self.send_handshake_message(Message.Handshake.ERR_UNSUPPORTED_REQUEST) else: @@ -448,14 +451,19 @@ class Device(HID): report_type = pdu[0] & 0x03 report_id = pdu[1] report_data = pdu[2:] - ret = self.set_report_cb(report_id, report_type, report_data) + report_size = len(pdu[1:]) + ret = self.set_report_cb(report_id, report_type, report_size, report_data) if ret.status == self.GetSetReturn.SUCCESS: self.send_handshake_message(Message.Handshake.SUCCESSFUL) - return + elif ret.status == self.GetSetReturn.ERR_INVALID_PARAMETER: + self.send_handshake_message(Message.Handshake.ERR_INVALID_PARAMETER) + elif ret.status == self.GetSetReturn.REPORT_ID_NOT_FOUND: + self.send_handshake_message(Message.Handshake.ERR_INVALID_REPORT_ID) + else: + self.send_handshake_message(Message.Handshake.ERR_UNSUPPORTED_REQUEST) else: logger.debug("SetReport callback not registered !!") - - self.send_handshake_message(Message.Handshake.ERR_UNSUPPORTED_REQUEST) + self.send_handshake_message(Message.Handshake.ERR_UNSUPPORTED_REQUEST) def register_set_report_cb(self, cb): self.set_report_cb = cb diff --git a/examples/run_hid_device.py b/examples/run_hid_device.py index e88ea602..3016bdf0 100644 --- a/examples/run_hid_device.py +++ b/examples/run_hid_device.py @@ -418,8 +418,8 @@ async def get_stream_reader(pipe) -> asyncio.StreamReader: class DeviceData: def __init__(self) -> None: - self.keyboardData = bytearray([0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]) - self.mouseData = bytearray([0x00, 0x00, 0x00, 0x00]) + self.keyboardData = bytearray([0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]) + self.mouseData = bytearray([0x02, 0x00, 0x00, 0x00]) # Device's live data - Mouse and Keyboard will be stored in this deviceData = DeviceData() @@ -431,7 +431,6 @@ async def keyboard_device(hid_device, command): # Start a Websocket server to receive events from a web page async def serve(websocket, _path): global deviceData - # global mouseData while True: try: message = await websocket.recv() @@ -445,25 +444,24 @@ async def keyboard_device(hid_device, command): code = ord(key) if ord('a') <= code <= ord('z'): hid_code = 0x04 + code - ord('a') - deviceData.setKeyBoardData(bytearray([0x01, 0x00, 0x00, hid_code, 0x00, 0x00, 0x00, 0x00, 0x00])) - hid_device.send_report_on_interrupt(deviceData.getKeyBoardData()) + deviceData.keyboardData = bytearray([0x01, 0x00, 0x00, hid_code, 0x00, 0x00, 0x00, 0x00, 0x00]) + hid_device.send_data(deviceData.keyboardData) elif message_type == 'keyup': - deviceData.setKeyBoardData(bytearray([0x01, 0x00, 0x00, hid_code, 0x00, 0x00, 0x00, 0x00, 0x00])) - hid_device.send_report_on_interrupt(deviceData.getKeyBoardData()) + deviceData.keyboardData = bytearray([0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]) + hid_device.send_data(deviceData.keyboardData) elif message_type == "mousemove": # logical min and max values log_min = -127 log_max = 127 x = parsed['x'] y = parsed['y'] - if y > 127: - y = 127 - elif y < -127: - y = -127 + # limiting x and y values within logical max and min range + x = max(log_min, min(log_max, x)) + y = max(log_min, min(log_max, y)) x_cord = x.to_bytes(signed = True) y_cord = y.to_bytes(signed = True) - deviceData.setMouseData(bytearray([0x02, 0x00]) + x_cord + y_cord) - hid_device.send_report_on_interrupt(deviceData.getMouseData()) + deviceData.mouseData = bytearray([0x02, 0x00]) + x_cord + y_cord + hid_device.send_data(deviceData.mouseData) except websockets.exceptions.ConnectionClosedOK: pass @@ -505,10 +503,10 @@ async def main(): "buffer_size:" + str(buffer_size)) if report_type == Message.ReportType.INPUT_REPORT: if report_id == 1: - retValue.data = deviceData.keyboardData + retValue.data = deviceData.keyboardData[1:] retValue.status = hid_device.GetSetReturn.SUCCESS elif report_id == 2: - retValue.data = deviceData.mouseData + retValue.data = deviceData.mouseData[1:] retValue.status = hid_device.GetSetReturn.SUCCESS else: retValue.status = hid_device.GetSetReturn.REPORT_ID_NOT_FOUND @@ -521,21 +519,34 @@ async def main(): #testing, we will return single byte random data. retValue.data = bytearray([0x11]) retValue.status = hid_device.GetSetReturn.SUCCESS - elif report_type == Message.ReportType.FEATURE_REPORT: - # TBD - not requried for PTS testing - retValue.status = hid_device.GetSetReturn.ERR_UNSUPPORTED_REQUEST - + retValue.status = hid_device.GetSetReturn.ERR_INVALID_PARAMETER + elif report_type == Message.ReportType.OTHER_REPORT: + if report_id == 3: + retValue.status = hid_device.GetSetReturn.REPORT_ID_NOT_FOUND else: retValue.status = hid_device.GetSetReturn.FAILURE return retValue - def on_set_report_cb(report_id, report_type, data): + def on_set_report_cb(report_id, report_type, report_size, data): retValue = hid_device.GetSetStatus() print("SET_REPORT report_id: " + str(report_id) +"report_type: "+ str(report_type)+ - "data:" + str(data)) - retValue.status = hid_device.GetSetReturn.SUCCESS + "report_size " + str(report_size) + "data:" + str(data)) + if report_type == Message.ReportType.FEATURE_REPORT: + retValue.status = hid_device.GetSetReturn.ERR_INVALID_PARAMETER + elif report_type == Message.ReportType.INPUT_REPORT: + if report_id == 1 and report_size != len(deviceData.keyboardData): + retValue.status = hid_device.GetSetReturn.ERR_INVALID_PARAMETER + elif report_id == 2 and report_size != len(deviceData.mouseData): + retValue.status = hid_device.GetSetReturn.ERR_INVALID_PARAMETER + elif report_id == 3: + retValue.status = hid_device.GetSetReturn.REPORT_ID_NOT_FOUND + else: + retValue.status = hid_device.GetSetReturn.SUCCESS + else: + retValue.status = hid_device.GetSetReturn.SUCCESS + return retValue def on_get_protocol_cb():