From d6cefdff8e0d8b3cfb340cfa266b7416b4642320 Mon Sep 17 00:00:00 2001 From: dhavan Date: Wed, 22 Nov 2023 17:14:24 +0000 Subject: [PATCH] Renamed the status message class --- bumble/hid.py | 25 ++++++++++++------------- examples/run_hid_device.py | 26 +++++++++++++------------- 2 files changed, 25 insertions(+), 26 deletions(-) diff --git a/bumble/hid.py b/bumble/hid.py index 9d77b5fa..cd99d71e 100644 --- a/bumble/hid.py +++ b/bumble/hid.py @@ -298,7 +298,6 @@ class HID(EventEmitter): def on_ctrl_pdu(self, pdu: bytes) -> None: logger.debug(f'<<< HID CONTROL PDU: {pdu.hex()}') - # Here we will receive all kinds of packets, parse and then call respective callbacks param = pdu[0] & 0x0F message_type = pdu[0] >> 4 @@ -369,7 +368,7 @@ class HID(EventEmitter): class Device(HID): - class ReportStatus(enum.IntEnum): + class GetSetReturn(enum.IntEnum): FAILURE = 0x00 REPORT_ID_NOT_FOUND = 0x01 ERR_UNSUPPORTED_REQUEST = 0x02 @@ -377,7 +376,7 @@ class Device(HID): SUCCESS = 0xff - class GetReportStatus(): + class GetSetStatus(): def __init__(self) -> None: self.status = 0 self.data=None @@ -398,7 +397,7 @@ class Device(HID): self.send_pdu_on_ctrl(hid_message) def handle_get_report(self, pdu: bytes): - ret = self.GetReportStatus() + ret = self.GetSetStatus() report_type=pdu[0] & 0x03 buffer_flag = (pdu[0] & 0x08) >> 3 report_id = pdu[1] @@ -411,18 +410,18 @@ class Device(HID): if(self.get_report_cb != None): ret = self.get_report_cb(report_id, report_type, buffer_size) - if(ret.status == self.ReportStatus.FAILURE): + if(ret.status == self.GetSetReturn.FAILURE): self.send_handshake_message(Message.Handshake.ERR_UNKNOWN) - elif(ret.status == self.ReportStatus.SUCCESS): + elif(ret.status == self.GetSetReturn.SUCCESS): data = bytearray() data.append(report_id) data.extend(ret.data) #TODO Check the data size and MTU size here and only then send out #the message self.send_control_data(report_type=report_type, data = data) - elif(ret.status == self.ReportStatus.REPORT_ID_NOT_FOUND): + elif(ret.status == self.GetSetReturn.REPORT_ID_NOT_FOUND): self.send_handshake_message(Message.Handshake.ERR_INVALID_REPORT_ID) - elif(ret.status == self.ReportStatus.ERR_UNSUPPORTED_REQUEST): + elif(ret.status == self.GetSetReturn.ERR_UNSUPPORTED_REQUEST): self.send_handshake_message(Message.Handshake.ERR_UNSUPPORTED_REQUEST) else: logger.debug("GetReport callback not registered !!") @@ -438,7 +437,7 @@ class Device(HID): report_id = pdu[1] report_data = pdu[2:] ret = self.set_report_cb(report_id, report_type, report_data) - if(ret.status == self.ReportStatus.SUCCESS): + if(ret.status == self.GetSetReturn.SUCCESS): self.send_handshake_message(Message.Handshake.SUCCESSFUL) return else: @@ -451,10 +450,10 @@ class Device(HID): logger.debug("SetReport callback registered successfully") def handle_get_protocol(self, pdu: bytes): - ret = self.GetReportStatus() + ret = self.GetSetStatus() if(self.get_protocol_cb != None): ret=self.get_protocol_cb() - if(ret.status == self.ReportStatus.SUCCESS): + if(ret.status == self.GetSetReturn.SUCCESS): self.send_control_data(Message.ReportType.OTHER_REPORT, ret.data) return else: @@ -467,10 +466,10 @@ class Device(HID): logger.debug("GetProtocol callback registered successfully") def handle_set_protocol(self, pdu: bytes): - ret = self.GetReportStatus() + ret = self.GetSetStatus() if(self.set_protocol_cb != None): ret=self.set_protocol_cb(pdu[0] & 0x01) - if(ret.status == self.ReportStatus.SUCCESS): + if(ret.status == self.GetSetReturn.SUCCESS): return else: logger.debug("SetProtocol callback not registered !!") diff --git a/examples/run_hid_device.py b/examples/run_hid_device.py index a97faa96..cedf15ed 100644 --- a/examples/run_hid_device.py +++ b/examples/run_hid_device.py @@ -503,18 +503,18 @@ async def main(): print(f'Received Data, PDU: {pdu.hex()}') def on_get_report_cb(report_id,report_type, buffer_size): - retValue = hid_device.GetReportStatus() + retValue = hid_device.GetSetStatus() print("GET_REPORT report_id: " + str(report_id) +"report_type: "+ str(report_type)+ "buffer_size:" + str(buffer_size)) if report_type == Message.ReportType.INPUT_REPORT: if report_id == 1: retValue.data = keyboardData - retValue.status = hid_device.ReportStatus.SUCCESS + retValue.status = hid_device.GetSetReturn.SUCCESS elif report_id == 2: retValue.data = mouseData - retValue.status = hid_device.ReportStatus.SUCCESS + retValue.status = hid_device.GetSetReturn.SUCCESS else: - retValue.status = hid_device.ReportStatus.REPORT_ID_NOT_FOUND + retValue.status = hid_device.GetSetReturn.REPORT_ID_NOT_FOUND if(buffer_size): data_len = buffer_size -1 @@ -523,36 +523,36 @@ async def main(): #This sample app has nothing to do with the report received, to enable PTS #testing, we will return single byte random data. retValue.data = bytearray([0x11]) - retValue.status = hid_device.ReportStatus.SUCCESS + retValue.status = hid_device.GetSetReturn.SUCCESS elif report_type == Message.ReportType.FEATURE_REPORT: #TBD - not requried for PTS testing - retValue.status = hid_device.ReportStatus.ERR_UNSUPPORTED_REQUEST + retValue.status = hid_device.GetSetReturn.ERR_UNSUPPORTED_REQUEST else: - retValue.status = hid_device.ReportStatus.FAILURE + retValue.status = hid_device.GetSetReturn.FAILURE return retValue def on_set_report_cb(report_id, report_type, data): - retValue = hid_device.GetReportStatus() + retValue = hid_device.GetSetStatus() print("SET_REPORT report_id: " + str(report_id) +"report_type: "+ str(report_type)+ "data:" + str(data)) - retValue.status = hid_device.ReportStatus.SUCCESS + retValue.status = hid_device.GetSetReturn.SUCCESS return retValue def on_get_protocol_cb(): - retValue = hid_device.GetReportStatus() + retValue = hid_device.GetSetStatus() retValue.data=protocol_mode.to_bytes() - retValue.status=hid_device.ReportStatus.SUCCESS + retValue.status=hid_device.GetSetReturn.SUCCESS return retValue def on_set_protocol_cb(protocol): - retValue = hid_device.GetReportStatus() + retValue = hid_device.GetSetStatus() #We do not support SET_PROTOCOL print("SET_PROTOCOL report_id: " + str(protocol)) - retValue.status=hid_device.ReportStatus.ERR_UNSUPPORTED_REQUEST + retValue.status=hid_device.GetSetReturn.ERR_UNSUPPORTED_REQUEST return retValue def on_virtual_cable_unplug_cb():