From 07f71fc895d1b082cfd123d144940cf09c9d296e Mon Sep 17 00:00:00 2001 From: skarnataki Date: Mon, 27 Nov 2023 13:04:54 +0000 Subject: [PATCH] Project format and lint error fix. Redefination if Device class needs to be discussed --- bumble/hid.py | 38 +++++++++++++++---------- examples/run_hid_device.py | 57 +++++++++++++++++++++++++++++--------- 2 files changed, 68 insertions(+), 27 deletions(-) diff --git a/bumble/hid.py b/bumble/hid.py index c9205102..0dd87cd1 100644 --- a/bumble/hid.py +++ b/bumble/hid.py @@ -274,8 +274,10 @@ class HID(EventEmitter): await channel.disconnect() def on_device_connection(self, connection: Connection) -> None: - self.connection = connection - self.remote_device_bd_address = connection.peer_address + self.connection = connection # type: ignore[assignment] + self.remote_device_bd_address = ( + connection.peer_address + ) # type: ignore[assignment] connection.on('disconnection', self.on_disconnection) def on_connection(self, l2cap_channel: l2cap.ClassicChannel) -> None: @@ -312,16 +314,16 @@ class HID(EventEmitter): self.emit('handshake', Message.Handshake(param)) elif message_type == Message.MessageType.GET_REPORT: logger.debug('<<< HID GET REPORT') - self.handle_get_report(pdu) + self.handle_get_report(pdu) # type: ignore[attr-defined] elif message_type == Message.MessageType.SET_REPORT: logger.debug('<<< HID SET REPORT') - self.handle_set_report(pdu) + self.handle_set_report(pdu) # type: ignore[attr-defined] elif message_type == Message.MessageType.GET_PROTOCOL: logger.debug('<<< HID GET PROTOCOL') - self.handle_get_protocol(pdu) + self.handle_get_protocol(pdu) # type: ignore[attr-defined] elif message_type == Message.MessageType.SET_PROTOCOL: logger.debug('<<< HID SET PROTOCOL') - self.handle_set_protocol(pdu) + self.handle_set_protocol(pdu) # type: ignore[attr-defined] elif message_type == Message.MessageType.DATA: logger.debug('<<< HID CONTROL DATA') self.emit('control_data', pdu) @@ -339,7 +341,9 @@ class HID(EventEmitter): logger.debug('<<< HID CONTROL OPERATION UNSUPPORTED') else: logger.debug('<<< HID MESSAGE TYPE UNSUPPORTED') - self.send_handshake_message(Message.Handshake.ERR_UNSUPPORTED_REQUEST) + self.send_handshake_message( + Message.Handshake.ERR_UNSUPPORTED_REQUEST + ) # type: ignore[attr-defined] def on_intr_pdu(self, pdu: bytes) -> None: logger.debug(f'<<< HID INTERRUPT PDU: {pdu.hex()}') @@ -374,7 +378,7 @@ class HID(EventEmitter): # ----------------------------------------------------------------------------- -class Device(HID): +class Device(HID): # type: ignore[no-redef] class GetSetReturn(enum.IntEnum): FAILURE = 0x00 REPORT_ID_NOT_FOUND = 0x01 @@ -385,8 +389,8 @@ class Device(HID): class GetSetStatus: def __init__(self) -> None: + self.data: bytes self.status = 0 - self.data = None def __init__(self, device: Device) -> None: super().__init__(device, HID.Role.DEVICE) @@ -419,7 +423,9 @@ class Device(HID): buffer_size = 0 if self.get_report_cb != None: - ret = self.get_report_cb(report_id, report_type, buffer_size) + ret = self.get_report_cb( + report_id, report_type, buffer_size + ) # type: ignore if ret.status == self.GetSetReturn.FAILURE: self.send_handshake_message(Message.Handshake.ERR_UNKNOWN) @@ -427,7 +433,9 @@ class Device(HID): data = bytearray() data.append(report_id) data.extend(ret.data) - if len(data) < self.l2cap_ctrl_channel.mtu: + if ( + len(data) < self.l2cap_ctrl_channel.mtu + ): # type: ignore[union-attr] self.send_control_data(report_type=report_type, data=data) else: self.send_handshake_message(Message.Handshake.ERR_INVALID_PARAMETER) @@ -452,7 +460,9 @@ class Device(HID): report_id = pdu[1] report_data = pdu[2:] report_size = len(pdu[1:]) - ret = self.set_report_cb(report_id, report_type, report_size, report_data) + ret = self.set_report_cb( + report_id, report_type, report_size, report_data + ) # type: ignore if ret.status == self.GetSetReturn.SUCCESS: self.send_handshake_message(Message.Handshake.SUCCESSFUL) elif ret.status == self.GetSetReturn.ERR_INVALID_PARAMETER: @@ -472,7 +482,7 @@ class Device(HID): def handle_get_protocol(self, pdu: bytes): ret = self.GetSetStatus() if self.get_protocol_cb != None: - ret = self.get_protocol_cb() + ret = self.get_protocol_cb() # type: ignore if ret.status == self.GetSetReturn.SUCCESS: self.send_control_data(Message.ReportType.OTHER_REPORT, ret.data) return @@ -488,7 +498,7 @@ class Device(HID): def handle_set_protocol(self, pdu: bytes): ret = self.GetSetStatus() if self.set_protocol_cb != None: - ret = self.set_protocol_cb(pdu[0] & 0x01) + ret = self.set_protocol_cb(pdu[0] & 0x01) # type: ignore if ret.status == self.GetSetReturn.SUCCESS: return else: diff --git a/examples/run_hid_device.py b/examples/run_hid_device.py index 3016bdf0..e0ac281d 100644 --- a/examples/run_hid_device.py +++ b/examples/run_hid_device.py @@ -418,9 +418,12 @@ async def get_stream_reader(pipe) -> asyncio.StreamReader: class DeviceData: def __init__(self) -> None: - self.keyboardData = bytearray([0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]) + self.keyboardData = bytearray( + [0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00] + ) self.mouseData = bytearray([0x02, 0x00, 0x00, 0x00]) + # Device's live data - Mouse and Keyboard will be stored in this deviceData = DeviceData() @@ -444,10 +447,24 @@ async def keyboard_device(hid_device, command): code = ord(key) if ord('a') <= code <= ord('z'): hid_code = 0x04 + code - ord('a') - deviceData.keyboardData = bytearray([0x01, 0x00, 0x00, hid_code, 0x00, 0x00, 0x00, 0x00, 0x00]) + deviceData.keyboardData = bytearray( + [ + 0x01, + 0x00, + 0x00, + hid_code, + 0x00, + 0x00, + 0x00, + 0x00, + 0x00, + ] + ) hid_device.send_data(deviceData.keyboardData) elif message_type == 'keyup': - deviceData.keyboardData = bytearray([0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]) + deviceData.keyboardData = bytearray( + [0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00] + ) hid_device.send_data(deviceData.keyboardData) elif message_type == "mousemove": # logical min and max values @@ -458,8 +475,8 @@ async def keyboard_device(hid_device, command): # limiting x and y values within logical max and min range x = max(log_min, min(log_max, x)) y = max(log_min, min(log_max, y)) - x_cord = x.to_bytes(signed = True) - y_cord = y.to_bytes(signed = True) + x_cord = x.to_bytes(signed=True) + y_cord = y.to_bytes(signed=True) deviceData.mouseData = bytearray([0x02, 0x00]) + x_cord + y_cord hid_device.send_data(deviceData.mouseData) except websockets.exceptions.ConnectionClosedOK: @@ -499,8 +516,14 @@ async def main(): def on_get_report_cb(report_id, report_type, buffer_size): retValue = hid_device.GetSetStatus() - print("GET_REPORT report_id: " + str(report_id) +"report_type: "+ str(report_type)+ - "buffer_size:" + str(buffer_size)) + print( + "GET_REPORT report_id: " + + str(report_id) + + "report_type: " + + str(report_type) + + "buffer_size:" + + str(buffer_size) + ) if report_type == Message.ReportType.INPUT_REPORT: if report_id == 1: retValue.data = deviceData.keyboardData[1:] @@ -515,8 +538,8 @@ async def main(): data_len = buffer_size - 1 retValue.data = retValue.data[:data_len] elif report_type == Message.ReportType.OUTPUT_REPORT: - #This sample app has nothing to do with the report received, to enable PTS - #testing, we will return single byte random data. + # This sample app has nothing to do with the report received, to enable PTS + # testing, we will return single byte random data. retValue.data = bytearray([0x11]) retValue.status = hid_device.GetSetReturn.SUCCESS elif report_type == Message.ReportType.FEATURE_REPORT: @@ -531,15 +554,23 @@ async def main(): def on_set_report_cb(report_id, report_type, report_size, data): retValue = hid_device.GetSetStatus() - print("SET_REPORT report_id: " + str(report_id) +"report_type: "+ str(report_type)+ - "report_size " + str(report_size) + "data:" + str(data)) + print( + "SET_REPORT report_id: " + + str(report_id) + + "report_type: " + + str(report_type) + + "report_size " + + str(report_size) + + "data:" + + str(data) + ) if report_type == Message.ReportType.FEATURE_REPORT: retValue.status = hid_device.GetSetReturn.ERR_INVALID_PARAMETER elif report_type == Message.ReportType.INPUT_REPORT: if report_id == 1 and report_size != len(deviceData.keyboardData): - retValue.status = hid_device.GetSetReturn.ERR_INVALID_PARAMETER + retValue.status = hid_device.GetSetReturn.ERR_INVALID_PARAMETER elif report_id == 2 and report_size != len(deviceData.mouseData): - retValue.status = hid_device.GetSetReturn.ERR_INVALID_PARAMETER + retValue.status = hid_device.GetSetReturn.ERR_INVALID_PARAMETER elif report_id == 3: retValue.status = hid_device.GetSetReturn.REPORT_ID_NOT_FOUND else: