diff --git a/bumble/hid.py b/bumble/hid.py index f50442a..059621c 100644 --- a/bumble/hid.py +++ b/bumble/hid.py @@ -120,6 +120,7 @@ class SetReportMessage(Message): def __bytes__(self) -> bytes: return self.header(self.report_type) + self.data + @dataclass class SendControlData(Message): report_type: int @@ -131,6 +132,8 @@ class SendControlData(Message): packet_bytes.extend(self.data) return self.header(self.report_type) + packet_bytes + + @dataclass class GetProtocolMessage(Message): message_type = Message.MessageType.GET_PROTOCOL @@ -147,6 +150,7 @@ class SetProtocolMessage(Message): def __bytes__(self) -> bytes: return self.header(self.protocol_mode) + @dataclass class GetProtocolReplyMessage(Message): protocol_mode: int @@ -157,6 +161,7 @@ class GetProtocolReplyMessage(Message): packet_bytes.append(self.protocol_mode) return self.header(Message.ReportType.OTHER_REPORT) + packet_bytes + @dataclass class Suspend(Message): message_type = Message.MessageType.CONTROL @@ -180,7 +185,8 @@ class VirtualCableUnplug(Message): def __bytes__(self) -> bytes: return self.header(Message.ControlCommand.VIRTUAL_CABLE_UNPLUG) -#Device sends input report, host sends output report. + +# Device sends input report, host sends output report. @dataclass class SendData(Message): data: bytes @@ -190,6 +196,7 @@ class SendData(Message): def __bytes__(self) -> bytes: return self.header(self.report_type) + self.data + @dataclass class SendHandshakeMessage(Message): result_code: int @@ -208,7 +215,6 @@ class HID(EventEmitter): HOST = 0x00 DEVICE = 0x01 - def __init__(self, device: Device, role: int) -> None: super().__init__() self.device = device @@ -347,7 +353,7 @@ class HID(EventEmitter): assert self.l2cap_intr_channel self.l2cap_intr_channel.send_pdu(msg) - def send_report_on_interrupt(self, data: bytes) -> None: + def send_data(self, data: bytes) -> None: if self.role == HID.Role.HOST: report_type = Message.ReportType.OUTPUT_REPORT else: @@ -368,20 +374,18 @@ class HID(EventEmitter): # ----------------------------------------------------------------------------- - class Device(HID): class GetSetReturn(enum.IntEnum): FAILURE = 0x00 REPORT_ID_NOT_FOUND = 0x01 ERR_UNSUPPORTED_REQUEST = 0x02 ERR_UNKNOWN = 0x03 - SUCCESS = 0xff + SUCCESS = 0xFF - - class GetSetStatus(): + class GetSetStatus: def __init__(self) -> None: self.status = 0 - self.data=None + self.data = None def __init__(self, device: Device) -> None: super().__init__(device, HID.Role.DEVICE) @@ -396,56 +400,56 @@ class Device(HID): logger.debug(f'>>> HID HANDSHAKE MESSAGE, PDU: {hid_message.hex()}') self.send_pdu_on_ctrl(hid_message) - def send_control_data(self,report_type: int, data: bytes): - msg = SendControlData(report_type= report_type, data=data) + def send_control_data(self, report_type: int, data: bytes): + msg = SendControlData(report_type=report_type, data=data) hid_message = bytes(msg) logger.debug(f'>>> HID CONTROL DATA: {hid_message.hex()}') self.send_pdu_on_ctrl(hid_message) def handle_get_report(self, pdu: bytes): ret = self.GetSetStatus() - report_type=pdu[0] & 0x03 + report_type = pdu[0] & 0x03 buffer_flag = (pdu[0] & 0x08) >> 3 report_id = pdu[1] logger.debug("buffer_flag: " + str(buffer_flag)) - if(buffer_flag == 1): + if buffer_flag == 1: buffer_size = (pdu[3] << 8) | pdu[2] else: buffer_size = 0 - if(self.get_report_cb != None): + if self.get_report_cb != None: ret = self.get_report_cb(report_id, report_type, buffer_size) - if(ret.status == self.GetSetReturn.FAILURE): + if ret.status == self.GetSetReturn.FAILURE: self.send_handshake_message(Message.Handshake.ERR_UNKNOWN) - elif(ret.status == self.GetSetReturn.SUCCESS): - data = bytearray() - data.append(report_id) - data.extend(ret.data) - if(len(data) None: diff --git a/examples/run_hid_device.py b/examples/run_hid_device.py index 3287cb1..e88ea60 100644 --- a/examples/run_hid_device.py +++ b/examples/run_hid_device.py @@ -60,7 +60,7 @@ from bumble.utils import AsyncRunner SDP_HID_SERVICE_NAME_ATTRIBUTE_ID = 0x0100 SDP_HID_SERVICE_DESCRIPTION_ATTRIBUTE_ID = 0x0101 SDP_HID_PROVIDER_NAME_ATTRIBUTE_ID = 0x0102 -SDP_HID_DEVICE_RELEASE_NUMBER_ATTRIBUTE_ID = 0x0200 # [DEPRECATED] +SDP_HID_DEVICE_RELEASE_NUMBER_ATTRIBUTE_ID = 0x0200 # [DEPRECATED] SDP_HID_PARSER_VERSION_ATTRIBUTE_ID = 0x0201 SDP_HID_DEVICE_SUBCLASS_ATTRIBUTE_ID = 0x0202 SDP_HID_COUNTRY_CODE_ATTRIBUTE_ID = 0x0203 @@ -68,20 +68,20 @@ SDP_HID_VIRTUAL_CABLE_ATTRIBUTE_ID = 0x0204 SDP_HID_RECONNECT_INITIATE_ATTRIBUTE_ID = 0x0205 SDP_HID_DESCRIPTOR_LIST_ATTRIBUTE_ID = 0x0206 SDP_HID_LANGID_BASE_LIST_ATTRIBUTE_ID = 0x0207 -SDP_HID_SDP_DISABLE_ATTRIBUTE_ID = 0x0208 # [DEPRECATED] +SDP_HID_SDP_DISABLE_ATTRIBUTE_ID = 0x0208 # [DEPRECATED] SDP_HID_BATTERY_POWER_ATTRIBUTE_ID = 0x0209 SDP_HID_REMOTE_WAKE_ATTRIBUTE_ID = 0x020A -SDP_HID_PROFILE_VERSION_ATTRIBUTE_ID = 0x020B # DEPRECATED] +SDP_HID_PROFILE_VERSION_ATTRIBUTE_ID = 0x020B # DEPRECATED] SDP_HID_SUPERVISION_TIMEOUT_ATTRIBUTE_ID = 0x020C -SDP_HID_NORMALLY_CONNECTABLE_ATTRIBUTE_ID = 0x020D +SDP_HID_NORMALLY_CONNECTABLE_ATTRIBUTE_ID = 0x020D SDP_HID_BOOT_DEVICE_ATTRIBUTE_ID = 0x020E SDP_HID_SSR_HOST_MAX_LATENCY_ATTRIBUTE_ID = 0x020F SDP_HID_SSR_HOST_MIN_TIMEOUT_ATTRIBUTE_ID = 0x0210 # Refer to HID profile specification v1.1.1, "5.3 Service Discovery Protocol (SDP)" for details # HID SDP attribute values -LANGUAGE = 0x656e # 0x656E uint16 “en” (English) -ENCODING = 0x6a # 0x006A uint16 UTF-8 encoding +LANGUAGE = 0x656E # 0x656E uint16 “en” (English) +ENCODING = 0x6A # 0x006A uint16 UTF-8 encoding PRIMARY_LANGUAGE_BASE_ID = 0x100 # 0x0100 uint16 PrimaryLanguageBaseID VERSION_NUMBER = 0x0101 # 0x0101 uint16 version number (v1.1) SERVICE_NAME = b'Bumble HID' @@ -222,7 +222,7 @@ HID_REPORT_MAP = bytes( # Text String, 50 Octet Report Descriptor 0x06, # . Input (Data,Var,Rel,No Wrap,Linear,Preferred State,No Null Position) 0xC0, # . End Collection 0xC0, # End Collection - ] + ] ) @@ -298,7 +298,9 @@ def sdp_records(): DataElement.sequence( [ DataElement.uuid(BT_L2CAP_PROTOCOL_ID), - DataElement.unsigned_integer_16(HID_INTERRUPT_PSM), + DataElement.unsigned_integer_16( + HID_INTERRUPT_PSM + ), ] ), DataElement.sequence( @@ -362,8 +364,12 @@ def sdp_records(): [ DataElement.sequence( [ - DataElement.unsigned_integer_16(HID_LANGID_BASE_LANGUAGE), - DataElement.unsigned_integer_16(HID_LANGID_BASE_BLUETOOTH_STRING_OFFSET), + DataElement.unsigned_integer_16( + HID_LANGID_BASE_LANGUAGE + ), + DataElement.unsigned_integer_16( + HID_LANGID_BASE_BLUETOOTH_STRING_OFFSET + ), ] ), ] @@ -415,8 +421,8 @@ class DeviceData: self.keyboardData = bytearray([0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]) self.mouseData = bytearray([0x00, 0x00, 0x00, 0x00]) -#Device's live data - Mouse and Keyboard will be stored in this -deviceData=DeviceData() +# Device's live data - Mouse and Keyboard will be stored in this +deviceData = DeviceData() # ----------------------------------------------------------------------------- async def keyboard_device(hid_device, command): @@ -425,6 +431,7 @@ async def keyboard_device(hid_device, command): # Start a Websocket server to receive events from a web page async def serve(websocket, _path): global deviceData + # global mouseData while True: try: message = await websocket.recv() @@ -438,24 +445,25 @@ async def keyboard_device(hid_device, command): code = ord(key) if ord('a') <= code <= ord('z'): hid_code = 0x04 + code - ord('a') - deviceData.keyboardData = bytearray([0x01, 0x00, 0x00, hid_code, 0x00, 0x00, 0x00, 0x00, 0x00]) - hid_device.send_report_on_interrupt(deviceData.keyboardData) + deviceData.setKeyBoardData(bytearray([0x01, 0x00, 0x00, hid_code, 0x00, 0x00, 0x00, 0x00, 0x00])) + hid_device.send_report_on_interrupt(deviceData.getKeyBoardData()) elif message_type == 'keyup': - deviceData.keyboardData = bytearray([0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]) - hid_device.send_report_on_interrupt(deviceData.keyboardData) + deviceData.setKeyBoardData(bytearray([0x01, 0x00, 0x00, hid_code, 0x00, 0x00, 0x00, 0x00, 0x00])) + hid_device.send_report_on_interrupt(deviceData.getKeyBoardData()) elif message_type == "mousemove": # logical min and max values log_min = -127 log_max = 127 x = parsed['x'] y = parsed['y'] - # limiting x and y values within logical max and min range - x = max(log_min, min(log_max, x)) - y = max(log_min, min(log_max, y)) + if y > 127: + y = 127 + elif y < -127: + y = -127 x_cord = x.to_bytes(signed = True) y_cord = y.to_bytes(signed = True) - deviceData.mouseData = bytearray([0x02, 0x00]) + x_cord + y_cord - hid_device.send_report_on_interrupt(deviceData.mouseData) + deviceData.setMouseData(bytearray([0x02, 0x00]) + x_cord + y_cord) + hid_device.send_report_on_interrupt(deviceData.getMouseData()) except websockets.exceptions.ConnectionClosedOK: pass @@ -464,7 +472,6 @@ async def keyboard_device(hid_device, command): await asyncio.get_event_loop().create_future() - # ----------------------------------------------------------------------------- async def main(): if len(sys.argv) < 3: @@ -484,7 +491,7 @@ async def main(): hid_host_bd_addr = str(hid_device.remote_device_bd_address) await hid_device.disconnect_interrupt_channel() await hid_device.disconnect_control_channel() - await device.keystore.delete(hid_host_bd_addr) #type: ignore + await device.keystore.delete(hid_host_bd_addr) # type: ignore connection = hid_device.connection if connection is not None: await connection.disconnect() @@ -492,9 +499,9 @@ async def main(): def on_hid_data_cb(pdu): print(f'Received Data, PDU: {pdu.hex()}') - def on_get_report_cb(report_id,report_type, buffer_size): + def on_get_report_cb(report_id, report_type, buffer_size): retValue = hid_device.GetSetStatus() - print("GET_REPORT report_id: " + str(report_id) +"report_type: "+ str(report_type)+ + print("GET_REPORT report_id: " + str(report_id) +"report_type: "+ str(report_type)+ "buffer_size:" + str(buffer_size)) if report_type == Message.ReportType.INPUT_REPORT: if report_id == 1: @@ -506,17 +513,17 @@ async def main(): else: retValue.status = hid_device.GetSetReturn.REPORT_ID_NOT_FOUND - if(buffer_size): - data_len = buffer_size -1 + if buffer_size: + data_len = buffer_size - 1 retValue.data = retValue.data[:data_len] elif report_type == Message.ReportType.OUTPUT_REPORT: - #This sample app has nothing to do with the report received, to enable PTS + #This sample app has nothing to do with the report received, to enable PTS #testing, we will return single byte random data. retValue.data = bytearray([0x11]) retValue.status = hid_device.GetSetReturn.SUCCESS elif report_type == Message.ReportType.FEATURE_REPORT: - #TBD - not requried for PTS testing + # TBD - not requried for PTS testing retValue.status = hid_device.GetSetReturn.ERR_UNSUPPORTED_REQUEST else: @@ -526,23 +533,22 @@ async def main(): def on_set_report_cb(report_id, report_type, data): retValue = hid_device.GetSetStatus() - print("SET_REPORT report_id: " + str(report_id) +"report_type: "+ str(report_type)+ + print("SET_REPORT report_id: " + str(report_id) +"report_type: "+ str(report_type)+ "data:" + str(data)) retValue.status = hid_device.GetSetReturn.SUCCESS return retValue - def on_get_protocol_cb(): retValue = hid_device.GetSetStatus() - retValue.data=protocol_mode.to_bytes() - retValue.status=hid_device.GetSetReturn.SUCCESS + retValue.data = protocol_mode.to_bytes() + retValue.status = hid_device.GetSetReturn.SUCCESS return retValue def on_set_protocol_cb(protocol): retValue = hid_device.GetSetStatus() - #We do not support SET_PROTOCOL. + # We do not support SET_PROTOCOL. print("SET_PROTOCOL report_id: " + str(protocol)) - retValue.status=hid_device.GetSetReturn.ERR_UNSUPPORTED_REQUEST + retValue.status = hid_device.GetSetReturn.ERR_UNSUPPORTED_REQUEST return retValue def on_virtual_cable_unplug_cb(): @@ -584,7 +590,9 @@ async def main(): async def menu(): reader = await get_stream_reader(sys.stdin) while True: - print("\n************************ HID Device Menu *****************************\n") + print( + "\n************************ HID Device Menu *****************************\n" + ) print(" 1. Connect Control Channel") print(" 2. Connect Interrupt Channel") print(" 3. Disconnect Control Channel") @@ -621,22 +629,26 @@ async def main(): choice1 = choice1.decode('utf-8').strip() if choice1 == '1': - data = bytearray([0x01, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00]) - hid_device.send_report_on_interrupt(data) - data = bytearray([0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]) - hid_device.send_report_on_interrupt(data) + data = bytearray( + [0x01, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00] + ) + hid_device.send_data(data) + data = bytearray( + [0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00] + ) + hid_device.send_data(data) elif choice1 == '2': - data = bytearray([0x02, 0x00, 0x00, 0xf6]) - hid_device.send_report_on_interrupt(data) - data = bytearray([0x02, 0x00, 0x00, 0x00]) - hid_device.send_report_on_interrupt(data) + data = bytearray([0x02, 0x00, 0x00, 0xF6]) + hid_device.send_data(data) + data = bytearray([0x02, 0x00, 0x00, 0x00]) + hid_device.send_data(data) elif choice1 == '3': - data = bytearray([0x00, 0x00, 0x00, 0x00]) - hid_device.send_report_on_interrupt(data) - data = bytearray([0x00, 0x00, 0x00, 0x00]) - hid_device.send_report_on_interrupt(data) + data = bytearray([0x00, 0x00, 0x00, 0x00]) + hid_device.send_data(data) + data = bytearray([0x00, 0x00, 0x00, 0x00]) + hid_device.send_data(data) else: print('Incorrect option selected') @@ -665,7 +677,9 @@ async def main(): elif choice == '9': hid_host_bd_addr = str(hid_device.remote_device_bd_address) - connection = await device.connect(hid_host_bd_addr, transport=BT_BR_EDR_TRANSPORT) + connection = await device.connect( + hid_host_bd_addr, transport=BT_BR_EDR_TRANSPORT + ) await connection.authenticate() await connection.encrypt() @@ -691,7 +705,7 @@ async def main(): await keyboard_device(hid_device, 'web') else: - #default option is using keyboard.html (web) + # default option is using keyboard.html (web) await keyboard_device(hid_device, 'web') await hci_source.wait_for_termination() diff --git a/examples/run_hid_host.py b/examples/run_hid_host.py index fe9e811..7519b4e 100644 --- a/examples/run_hid_host.py +++ b/examples/run_hid_host.py @@ -308,8 +308,8 @@ async def main(): if (report_length <= 1) or (report_id == 0): return - #Parse report over interrupt channel - if (report_type == Message.ReportType.INPUT_REPORT): + # Parse report over interrupt channel + if report_type == Message.ReportType.INPUT_REPORT: ReportParser.parse_input_report(pdu[1:]) # type: ignore async def handle_virtual_cable_unplug(): @@ -362,7 +362,6 @@ async def main(): await get_hid_device_sdp_record(connection) - async def menu(): reader = await get_stream_reader(sys.stdin) while True: @@ -493,11 +492,11 @@ async def main(): data = bytearray( [0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00] ) - hid_host.send_report_on_interrupt(data) + hid_host.send_data(data) elif choice1 == '2': data = bytearray([0x03, 0x00, 0x0D, 0xFD, 0x00, 0x00]) - hid_host.send_report_on_interrupt(data) + hid_host.send_data(data) else: print('Incorrect option selected')