From 5ce353bcdeb5370631ee6563a5962f1fd6b74421 Mon Sep 17 00:00:00 2001 From: skarnataki Date: Wed, 27 Sep 2023 10:57:51 +0000 Subject: [PATCH] Review comment Fix --- bumble/hid.py | 177 +++++++++++++++++++++++++-------------- bumble/sdp.py | 6 +- examples/run_hid_host.py | 134 +++++++++++++++-------------- 3 files changed, 186 insertions(+), 131 deletions(-) diff --git a/bumble/hid.py b/bumble/hid.py index 19c7773..772a6ba 100644 --- a/bumble/hid.py +++ b/bumble/hid.py @@ -15,11 +15,13 @@ # ----------------------------------------------------------------------------- # Imports # ----------------------------------------------------------------------------- +from __future__ import annotations import logging import asyncio from pyee import EventEmitter from typing import Optional, Tuple, Callable, Dict, Union +from .device import Device, Connection from . import core, l2cap # type: ignore from .colors import color # type: ignore @@ -36,8 +38,8 @@ logger = logging.getLogger(__name__) # ----------------------------------------------------------------------------- # fmt: off -HID_CTRL_PSM = 0x0011 -HID_INTR_PSM = 0x0013 +HID_CONTROL_PSM = 0x0011 +HID_INTERRUPT_PSM = 0x0013 # HIDP message types HID_HANDSHAKE = 0x00 @@ -71,11 +73,64 @@ HID_SUSPEND = 0x03 HID_EXIT_SUSPEND = 0x04 HID_VIRTUAL_CABLE_UNPLUG = 0x05 + +class HIDPacket(): + def __init__(self, + report_type: Optional[int] = None, + report_id: Optional[int] = None, + buffer_size: Optional[int] = None, + protocol_mode: Optional[int] = None, + data: Optional[bytes] = None) -> None: + + self.report_type = report_type + self.report_id = report_id + self.buffer_size = buffer_size + self.protocol_mode = protocol_mode + self.data = data + + def to_bytes_gr(self) -> bytes: + if(self.report_type == HID_OTHER_REPORT): + param = self.report_type + else: + param = 0x08 | self.report_type + header = ((HID_GET_REPORT << 4) | param) + packet_bytes = bytearray() + packet_bytes.append(header) + packet_bytes.append(self.report_id) + packet_bytes.extend([(self.buffer_size & 0xff), ((self.buffer_size >> 8) & 0xff)]) + return bytes(packet_bytes) + + def to_bytes_sr(self) -> bytes: + header = ((HID_SET_REPORT << 4) | self.report_type) + packet_bytes = bytearray() + packet_bytes.append(header) + packet_bytes.extend(self.data) + return bytes(packet_bytes) + + def to_bytes_gp(self) -> bytes: + header = (HID_GET_PROTOCOL << 4) + packet_bytes = bytearray() + packet_bytes.append(header) + return bytes(packet_bytes) + + def to_bytes_sp(self) -> bytes: + header = (HID_SET_PROTOCOL << 4 | self.protocol_mode) + packet_bytes = bytearray() + packet_bytes.append(header) + packet_bytes.append(self.protocol_mode) + return bytes(packet_bytes) + + def to_bytes_send_data(self) -> bytes: + header = ((HID_DATA << 4) | HID_OUTPUT_REPORT) + packet_bytes = bytearray() + packet_bytes.append(header) + packet_bytes.extend(self.data) + return bytes(packet_bytes) # ----------------------------------------------------------------------------- class HIDHost(EventEmitter): l2cap_channel: Optional[l2cap.Channel] - def __init__(self, device, connection) -> None: + def __init__(self, device: Device, connection: Connection) -> None: super().__init__() self.device = device self.connection = connection @@ -83,14 +138,14 @@ class HIDHost(EventEmitter): self.l2cap_intr_channel = None # Register ourselves with the L2CAP channel manager - device.register_l2cap_server(HID_CTRL_PSM, self.on_connection) - device.register_l2cap_server(HID_INTR_PSM, self.on_connection) + device.register_l2cap_server(HID_CONTROL_PSM, self.on_connection) + device.register_l2cap_server(HID_INTERRUPT_PSM, self.on_connection) - async def control_channel_connect(self) -> None: + async def connect_control_channel(self) -> None: # Create a new L2CAP connection - control channel try: self.l2cap_ctrl_channel = await self.device.l2cap_channel_manager.connect( - self.connection, HID_CTRL_PSM + self.connection, HID_CONTROL_PSM ) except ProtocolError as error: logger.error(f'L2CAP connection failed: {error}') @@ -100,11 +155,11 @@ class HIDHost(EventEmitter): # Become a sink for the L2CAP channel self.l2cap_ctrl_channel.sink = self.on_ctrl_pdu - async def interrupt_channel_connect(self) -> None: + async def connect_interrupt_channel(self) -> None: # Create a new L2CAP connection - interrupt channel try: self.l2cap_intr_channel = await self.device.l2cap_channel_manager.connect( - self.connection, HID_INTR_PSM + self.connection, HID_INTERRUPT_PSM ) except ProtocolError as error: logger.error(f'L2CAP connection failed: {error}') @@ -114,10 +169,14 @@ class HIDHost(EventEmitter): # Become a sink for the L2CAP channel self.l2cap_intr_channel.sink = self.on_intr_pdu - async def interrupt_channel_disconnect(self): + async def disconnect_interrupt_channel(self) -> None: + if self.l2cap_intr_channel is None: + raise InvalidStateError('invalid state') await self.l2cap_intr_channel.disconnect() # type: ignore - async def control_channel_disconnect(self): + async def disconnect_control_channel(self) -> None: + if self.l2cap_ctrl_channel is None: + raise InvalidStateError('invalid state') await self.l2cap_ctrl_channel.disconnect() # type: ignore def on_connection(self, l2cap_channel: l2cap.Channel) -> None: @@ -125,7 +184,7 @@ class HIDHost(EventEmitter): l2cap_channel.on('open', lambda: self.on_l2cap_channel_open(l2cap_channel)) def on_l2cap_channel_open(self, l2cap_channel: l2cap.Channel) -> None: - if l2cap_channel.psm == HID_CTRL_PSM: + if l2cap_channel.psm == HID_CONTROL_PSM: self.l2cap_ctrl_channel = l2cap_channel self.l2cap_ctrl_channel.sink = self.on_ctrl_pdu else: @@ -138,21 +197,21 @@ class HIDHost(EventEmitter): # Here we will receive all kinds of packets, parse and then call respective callbacks message_type = pdu[0] >> 4 param = pdu[0] & 0x0f - if (message_type == HID_HANDSHAKE): + if message_type == HID_HANDSHAKE : logger.debug('<<< HID HANDSHAKE') self.handle_handshake(param) - self.emit('handshake', ) - elif (message_type == HID_DATA): + self.emit('handshake', pdu) + elif message_type == HID_DATA : logger.debug('<<< HID CONTROL DATA') self.emit('data', pdu) - elif (message_type == HID_CONTROL): - if (param == HID_SUSPEND): + elif message_type == HID_CONTROL : + if param == HID_SUSPEND : logger.debug('<<< HID SUSPEND') self.emit('suspend', pdu) - elif (param == HID_EXIT_SUSPEND): + elif param == HID_EXIT_SUSPEND : logger.debug('<<< HID EXIT SUSPEND') self.emit('exit_suspend', pdu) - elif (param == HID_VIRTUAL_CABLE_UNPLUG): + elif param == HID_VIRTUAL_CABLE_UNPLUG : logger.debug('<<< HID VIRTUAL CABLE UNPLUG') self.emit('virtual_cable_unplug') else: @@ -165,50 +224,41 @@ class HIDHost(EventEmitter): logger.debug(f'<<< HID INTERRUPT PDU: {pdu.hex()}') self.emit("data", pdu) - def register_data_cb(self, data_cb): - self.on('data', data_cb) + def get_report(self, report_type: int, report_id: int, buffer_size: int) -> None: + msg = HIDPacket(report_type = report_type , report_id = report_id , buffer_size = buffer_size) + hid_packet = msg.to_bytes_gr() + logger.debug(f'>>> HID CONTROL GET REPORT, PDU: {hid_packet.hex()}') + self.send_pdu_on_ctrl(hid_packet) # type: ignore - def register_handshake_cb(self, handshake_cb): - self.on('handshake', handshake_cb) - - def register_virtual_cable_unplug(self, virtual_cable_unplug_cb): - self.on('virtual_cable_unplug', virtual_cable_unplug_cb) - - def get_report(self, report_type, report_id, buffer_size): - if(report_type == HID_OTHER_REPORT): - param = report_type - else: - param = 0x08 | report_type - header = ((HID_GET_REPORT << 4) | param) - msg = bytes([header, report_id, (buffer_size & 0xff), ((buffer_size >> 8) & 0xff)]) - logger.debug(f'>>> HID CONTROL GET REPORT, PDU: {msg.hex()}') - self.l2cap_ctrl_channel.send_pdu(msg) # type: ignore - - def set_report(self, report_type, data): - header = ((HID_SET_REPORT << 4) | report_type) - msg = bytearray([header]) - msg.extend(data) - logger.debug(f'>>> HID CONTROL SET REPORT, PDU:{msg.hex()}') - self.l2cap_ctrl_channel.send_pdu(msg) # type: ignore + def set_report(self, report_type: int, data: bytes): + msg = HIDPacket(report_type= report_type,data = data) + hid_packet = msg.to_bytes_sr() + logger.debug(f'>>> HID CONTROL SET REPORT, PDU:{hid_packet.hex()}') + self.send_pdu_on_ctrl(hid_packet) # type: ignore def get_protocol(self): - header = (HID_GET_PROTOCOL << 4) - msg = bytearray([header]) - logger.debug(f'>>> HID CONTROL GET PROTOCOL, PDU: {msg.hex()}') + msg = HIDPacket() + hid_packet = msg.to_bytes_gp() + logger.debug(f'>>> HID CONTROL GET PROTOCOL, PDU: {hid_packet.hex()}') + self.send_pdu_on_ctrl(hid_packet) # type: ignore + + def set_protocol(self, protocol_mode: int): + msg = HIDPacket(protocol_mode= protocol_mode) + hid_packet = msg.to_bytes_sp() + logger.debug(f'>>> HID CONTROL SET PROTOCOL, PDU: {hid_packet.hex()}') + self.send_pdu_on_ctrl(hid_packet) # type: ignore + + def send_pdu_on_ctrl(self, msg: bytes) -> None: self.l2cap_ctrl_channel.send_pdu(msg) # type: ignore - def set_protocol(self, protocol_mode): - header = (HID_SET_PROTOCOL << 4 | protocol_mode) - msg = bytearray([header]) - logger.debug(f'>>> HID CONTROL SET PROTOCOL, PDU: {msg.hex()}') - self.l2cap_ctrl_channel.send_pdu(msg) # type: ignore + def send_pdu_on_intr(self, msg: bytes) -> None: + self.l2cap_intr_channel.send_pdu(msg) # type: ignore def send_data(self, data): - header = ((HID_DATA << 4) | HID_OUTPUT_REPORT) - msg = bytearray([header]) - msg.extend(data) - logger.debug(f'>>> HID INTERRUPT SEND DATA, PDU: {msg.hex()}') - self.l2cap_intr_channel.send_pdu(msg) # type: ignore + msg = HIDPacket(data= data) + hid_packet = msg.to_bytes_send_data() + logger.debug(f'>>> HID INTERRUPT SEND DATA, PDU: {hid_packet.hex()}') + self.send_pdu_on_intr(hid_packet) # type: ignore def suspend(self): header = (HID_CONTROL << 4 | HID_SUSPEND) @@ -228,19 +278,18 @@ class HIDHost(EventEmitter): logger.debug(f'>>> HID CONTROL VIRTUAL CABLE UNPLUG, PDU: {msg.hex()}') self.l2cap_ctrl_channel.send_pdu(msg) # type: ignore - def handle_handshake(self, param): - if (param == HANDSHAKE_SUCCESSFUL): + def handle_handshake(self, param: int): + if param == HANDSHAKE_SUCCESSFUL : logger.debug(f'<<< HID HANDSHAKE: SUCCESSFUL') - elif (param == HANDSHAKE_NOT_READY): + elif param == HANDSHAKE_NOT_READY : logger.warning(f'<<< HID HANDSHAKE: NOT_READY') - elif (param == HANDSHAKE_ERR_INVALID_REPORT_ID): + elif param == HANDSHAKE_ERR_INVALID_REPORT_ID : logger.warning(f'<<< HID HANDSHAKE: ERR_INVALID_REPORT_ID') - elif (param == HANDSHAKE_ERR_UNSUPPORTED_REQUEST): + elif param == HANDSHAKE_ERR_UNSUPPORTED_REQUEST : logger.warning(f'<<< HID HANDSHAKE: ERR_UNSUPPORTED_REQUEST') - elif (param == HANDSHAKE_ERR_UNKNOWN): + elif param == HANDSHAKE_ERR_UNKNOWN : logger.warning(f'<<< HID HANDSHAKE: ERR_UNKNOWN') - elif (param == HANDSHAKE_ERR_FATAL): + elif param == HANDSHAKE_ERR_FATAL : logger.warning(f'<<< HID HANDSHAKE: ERR_FATAL') else: # 0x5-0xD = Reserved logger.warning("<<< HID HANDSHAKE: RESERVED VALUE") - diff --git a/bumble/sdp.py b/bumble/sdp.py index 06be980..389cd55 100644 --- a/bumble/sdp.py +++ b/bumble/sdp.py @@ -167,7 +167,7 @@ class DataElement: UUID: lambda x: DataElement( DataElement.UUID, core.UUID.from_bytes(bytes(reversed(x))) ), - TEXT_STRING: lambda x: DataElement(DataElement.TEXT_STRING, x), + TEXT_STRING: lambda x: DataElement(DataElement.TEXT_STRING, x.decode('latin1')), BOOLEAN: lambda x: DataElement(DataElement.BOOLEAN, x[0] == 1), SEQUENCE: lambda x: DataElement( DataElement.SEQUENCE, DataElement.list_from_bytes(x) @@ -376,7 +376,9 @@ class DataElement: raise ValueError('invalid value_size') elif self.type == DataElement.UUID: data = bytes(reversed(bytes(self.value))) - elif self.type in (DataElement.TEXT_STRING, DataElement.URL): + elif self.type == DataElement.TEXT_STRING: + data = self.value.encode('latin1') + elif self.type == DataElement.URL: data = self.value.encode('utf8') elif self.type == DataElement.BOOLEAN: data = bytes([1 if self.value else 0]) diff --git a/examples/run_hid_host.py b/examples/run_hid_host.py index be76c46..ad92b27 100644 --- a/examples/run_hid_host.py +++ b/examples/run_hid_host.py @@ -103,111 +103,111 @@ async def get_hid_device_sdp_record(device, connection): ) print(color(f'SDP attributes for HID device','magenta')) for attribute in attributes: - if (attribute.id == SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID): + if attribute.id == SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID : print(color(' Service Record Handle : ', 'cyan'), hex(attribute.value.value)) - elif (attribute.id == SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID): + elif attribute.id == SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID : print(color(' Service Class : ', 'cyan'), attribute.value.value[0].value) - elif (attribute.id == SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID): + elif attribute.id == SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID : print(color(' SDP Browse Group List : ', 'cyan'), attribute.value.value[0].value) - elif (attribute.id == SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID): + elif attribute.id == SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID : print(color(' BT_L2CAP_PROTOCOL_ID : ', 'cyan'), attribute.value.value[0].value[0].value) print(color(' PSM for Bluetooth HID Control channel : ', 'cyan'), hex(attribute.value.value[0].value[1].value)) print(color(' BT_HIDP_PROTOCOL_ID : ', 'cyan'), attribute.value.value[1].value[0].value) - elif (attribute.id == SDP_LANGUAGE_BASE_ATTRIBUTE_ID_LIST_ATTRIBUTE_ID): + elif attribute.id == SDP_LANGUAGE_BASE_ATTRIBUTE_ID_LIST_ATTRIBUTE_ID : print(color(' Lanugage : ', 'cyan'), hex(attribute.value.value[0].value)) print(color(' Encoding : ', 'cyan'), hex(attribute.value.value[1].value)) print(color(' PrimaryLanguageBaseID : ', 'cyan'), hex(attribute.value.value[2].value)) - elif (attribute.id == SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID): + elif attribute.id == SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID : print(color(' BT_HUMAN_INTERFACE_DEVICE_SERVICE ', 'cyan'), attribute.value.value[0].value[0].value) print(color(' HID Profileversion number : ', 'cyan'), hex(attribute.value.value[0].value[1].value)) - elif (attribute.id == SDP_ADDITIONAL_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID): + elif attribute.id == SDP_ADDITIONAL_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID : print(color(' BT_L2CAP_PROTOCOL_ID : ', 'cyan'), attribute.value.value[0].value[0].value[0].value) print(color(' PSM for Bluetooth HID Interrupt channel : ', 'cyan'), hex(attribute.value.value[0].value[0].value[1].value)) print(color(' BT_HIDP_PROTOCOL_ID : ', 'cyan'), attribute.value.value[0].value[1].value[0].value) - elif (attribute.id == SDP_HID_SERVICE_NAME_ATTRIBUTE_ID): + elif attribute.id == SDP_HID_SERVICE_NAME_ATTRIBUTE_ID : print(color(' Service Name: ', 'cyan'), attribute.value.value) HID_Service_Name = attribute.value.value - elif (attribute.id == SDP_HID_SERVICE_DESCRIPTION_ATTRIBUTE_ID): + elif attribute.id == SDP_HID_SERVICE_DESCRIPTION_ATTRIBUTE_ID : print(color(' Service Description: ', 'cyan'), attribute.value.value) HID_Service_Description = attribute.value.value - elif (attribute.id == SDP_HID_PROVIDER_NAME_ATTRIBUTE_ID): + elif attribute.id == SDP_HID_PROVIDER_NAME_ATTRIBUTE_ID : print(color(' Provider Name: ', 'cyan'), attribute.value.value) HID_Provider_Name = attribute.value.value - elif (attribute.id == SDP_HID_DEVICE_RELEASE_NUMBER_ATTRIBUTE_ID): + elif attribute.id == SDP_HID_DEVICE_RELEASE_NUMBER_ATTRIBUTE_ID : print(color(' Release Number: ', 'cyan'), hex(attribute.value.value)) HID_Device_Release_Number = attribute.value.value - elif (attribute.id == SDP_HID_PARSER_VERSION_ATTRIBUTE_ID): + elif attribute.id == SDP_HID_PARSER_VERSION_ATTRIBUTE_ID : print(color(' HID Parser Version: ', 'cyan'), hex(attribute.value.value)) HID_Parser_Version = attribute.value.value - elif (attribute.id == SDP_HID_DEVICE_SUBCLASS_ATTRIBUTE_ID): + elif attribute.id == SDP_HID_DEVICE_SUBCLASS_ATTRIBUTE_ID : print(color(' HIDDeviceSubclass: ', 'cyan'), hex(attribute.value.value)) HID_Device_Subclass = attribute.value.value - elif (attribute.id == SDP_HID_COUNTRY_CODE_ATTRIBUTE_ID): + elif attribute.id == SDP_HID_COUNTRY_CODE_ATTRIBUTE_ID : print(color(' HIDCountryCode: ', 'cyan'), hex(attribute.value.value)) HID_Country_Code = attribute.value.value - elif (attribute.id == SDP_HID_VIRTUAL_CABLE_ATTRIBUTE_ID): + elif attribute.id == SDP_HID_VIRTUAL_CABLE_ATTRIBUTE_ID : print(color(' HIDVirtualCable: ', 'cyan'), attribute.value.value) HID_Virtual_Cable = attribute.value.value - elif (attribute.id == SDP_HID_RECONNECT_INITIATE_ATTRIBUTE_ID): + elif attribute.id == SDP_HID_RECONNECT_INITIATE_ATTRIBUTE_ID : print(color(' HIDReconnectInitiate: ', 'cyan'), attribute.value.value) HID_Reconnect_Initiate = attribute.value.value - elif (attribute.id == SDP_HID_DESCRIPTOR_LIST_ATTRIBUTE_ID): + elif attribute.id == SDP_HID_DESCRIPTOR_LIST_ATTRIBUTE_ID : print(color(' HID Report Descriptor type: ', 'cyan'), hex(attribute.value.value[0].value[0].value)) - print(color(' HID Report DescriptorList: ', 'cyan'), attribute.value.value[0].value[1].value) + print(color(' HID Report DescriptorList: ', 'cyan'), (attribute.value.value[0].value[1].value).encode('latin-1')) HID_Descriptor_Type = attribute.value.value[0].value[0].value HID_Report_Descriptor_List = attribute.value.value[0].value[1].value - elif (attribute.id == SDP_HID_LANGID_BASE_LIST_ATTRIBUTE_ID): + elif attribute.id == SDP_HID_LANGID_BASE_LIST_ATTRIBUTE_ID : print(color(' HID LANGID Base Language: ', 'cyan'), hex(attribute.value.value[0].value[0].value)) print(color(' HID LANGID Base Bluetooth String Offset: ', 'cyan'), hex(attribute.value.value[0].value[1].value)) HID_LANGID_Base_Language = attribute.value.value[0].value[0].value HID_LANGID_Base_Bluetooth_String_Offset = attribute.value.value[0].value[1].value - elif (attribute.id == SDP_HID_BATTERY_POWER_ATTRIBUTE_ID): + elif attribute.id == SDP_HID_BATTERY_POWER_ATTRIBUTE_ID : print(color(' HIDBatteryPower: ', 'cyan'), attribute.value.value) HID_Battery_Power = attribute.value.value - elif (attribute.id == SDP_HID_REMOTE_WAKE_ATTRIBUTE_ID): + elif attribute.id == SDP_HID_REMOTE_WAKE_ATTRIBUTE_ID : print(color(' HIDRemoteWake: ', 'cyan'), attribute.value.value) HID_Remote_Wake = attribute.value.value - elif (attribute.id == SDP_HID_PROFILE_VERSION_ATTRIBUTE_ID): + elif attribute.id == SDP_HID_PROFILE_VERSION_ATTRIBUTE_ID : print(color(' HIDProfileVersion : ', 'cyan'), hex(attribute.value.value)) HID_Profile_Version = attribute.value.value - elif (attribute.id == SDP_HID_SUPERVISION_TIMEOUT_ATTRIBUTE_ID): + elif attribute.id == SDP_HID_SUPERVISION_TIMEOUT_ATTRIBUTE_ID : print(color(' HIDSupervisionTimeout: ', 'cyan'), hex(attribute.value.value)) HID_Supervision_Timeout = attribute.value.value - elif (attribute.id == SDP_HID_NORMALLY_CONNECTABLE_ATTRIBUTE_ID): + elif attribute.id == SDP_HID_NORMALLY_CONNECTABLE_ATTRIBUTE_ID : print(color(' HIDNormallyConnectable: ', 'cyan'), attribute.value.value) HID_Normally_Connectable = attribute.value.value - elif (attribute.id == SDP_HID_BOOT_DEVICE_ATTRIBUTE_ID): + elif attribute.id == SDP_HID_BOOT_DEVICE_ATTRIBUTE_ID : print(color(' HIDBootDevice: ', 'cyan'), attribute.value.value) HID_Boot_Device = attribute.value.value - elif (attribute.id == SDP_HID_SSR_HOST_MAX_LATENCY_ATTRIBUTE_ID): + elif attribute.id == SDP_HID_SSR_HOST_MAX_LATENCY_ATTRIBUTE_ID : print(color(' HIDSSRHostMaxLatency: ', 'cyan'), hex(attribute.value.value)) HID_SSR_Host_Max_Latency = attribute.value.value - elif (attribute.id == SDP_HID_SSR_HOST_MIN_TIMEOUT_ATTRIBUTE_ID): + elif attribute.id == SDP_HID_SSR_HOST_MIN_TIMEOUT_ATTRIBUTE_ID : print(color(' HIDSSRHostMinTimeout: ', 'cyan'), hex(attribute.value.value)) HID_SSR_Host_Min_Timeout = attribute.value.value @@ -242,6 +242,7 @@ async def main(): return report_length = len(pdu[1:]) report_id = pdu[1] + if (report_type != HID_OTHER_REPORT): print(color(f' Report type = {report_type}, Report length = {report_length}, Report id = {report_id}', 'blue', None, 'bold')) @@ -252,8 +253,8 @@ async def main(): ReportParser.parse_input_report(pdu[1:]) #type: ignore async def handle_virtual_cable_unplug(): - await hid_host.interrupt_channel_disconnect() - await hid_host.control_channel_disconnect() + await hid_host.disconnect_interrupt_channel() + await hid_host.disconnect_control_channel() await device.keystore.delete(target_address) #type: ignore await connection.disconnect() @@ -292,10 +293,10 @@ async def main(): hid_host = HIDHost(device, connection) # Register for HID data call back - hid_host.register_data_cb(on_hid_data_cb) + hid_host.on('data', on_hid_data_cb) # Register for virtual cable unplug call back - hid_host.register_virtual_cable_unplug(on_hid_virtual_cable_unplug_cb) + hid_host.on('virtual_cable_unplug', on_hid_virtual_cable_unplug_cb) async def menu(): reader = await get_stream_reader(sys.stdin) @@ -321,106 +322,109 @@ async def main(): choice = await reader.readline() choice = choice.decode('utf-8').strip() - if (choice == '1'): - await hid_host.control_channel_connect() + if choice == '1': + await hid_host.connect_control_channel() - elif (choice == '2'): - await hid_host.interrupt_channel_connect() + elif choice == '2': + await hid_host.connect_interrupt_channel() - elif (choice == '3'): - await hid_host.control_channel_disconnect() + elif choice == '3': + await hid_host.disconnect_control_channel() - elif (choice == '4'): - await hid_host.interrupt_channel_disconnect() + elif choice == '4': + await hid_host.disconnect_interrupt_channel() - elif (choice == '5'): + elif choice == '5': print(" 1. Report ID 0x02") print(" 2. Report ID 0x03") print(" 3. Report ID 0x05") choice1 = await reader.readline() choice1 = choice1.decode('utf-8').strip() - if (choice1 == '1'): + if choice1 == '1': hid_host.get_report(1, 2, 3) - elif (choice1 == '2'): + elif choice1 == '2': hid_host.get_report(2, 3, 2) - elif (choice1 == '3'): + elif choice1 == '3': hid_host.get_report(3, 5, 3) else: print('Incorrect option selected') - elif (choice == '6'): + elif choice == '6': print(" 1. Report type 1 and Report id 0x01") print(" 2. Report type 2 and Report id 0x03") print(" 3. Report type 3 and Report id 0x05") choice1 = await reader.readline() choice1 = choice1.decode('utf-8').strip() - if (choice1 == '1'): + if choice1 == '1': # data includes first octet as report id data = bytearray([0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01]) hid_host.set_report(1, data) - elif (choice1 == '2'): + elif choice1 == '2': data = bytearray([0x03, 0x01, 0x01]) hid_host.set_report(2, data) - elif (choice1 == '3'): + elif choice1 == '3': data = bytearray([0x05, 0x01, 0x01, 0x01]) hid_host.set_report(3, data) else: print('Incorrect option selected') - elif (choice == '7'): + elif choice == '7': print(" 0. Boot") print(" 1. Report") choice1 = await reader.readline() choice1 = choice1.decode('utf-8').strip() - if (choice1 == '0'): + if choice1 == '0': hid_host.set_protocol(HID_BOOT_PROTOCOL_MODE) - elif (choice1 == '1'): + elif choice1 == '1': hid_host.set_protocol(HID_REPORT_PROTOCOL_MODE) else: print('Incorrect option selected') - elif (choice == '8'): + elif choice == '8': hid_host.get_protocol() - elif (choice == '9'): + elif choice == '9': print(" 1. Report ID 0x01") print(" 2. Report ID 0x03") choice1 = await reader.readline() choice1 = choice1.decode('utf-8').strip() - if (choice1 == '1'): + if choice1 == '1': data = bytearray([0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]) hid_host.send_data(data) - elif (choice1 == '2'): + elif choice1 == '2': data = bytearray([0x03, 0x00, 0x0d, 0xfd, 0x00, 0x00]) hid_host.send_data(data) else: print('Incorrect option selected') - elif (choice == '10'): + elif choice == '10': hid_host.suspend() - elif (choice == '11'): + elif choice == '11': hid_host.exit_suspend() - elif (choice == '12'): + elif choice == '12': hid_host.virtual_cable_unplug() - await device.keystore.delete(target_address) + try: + await device.keystore.delete(target_address) + except KeyError: + print('Device not found or Device already unpaired.') - elif (choice == '13'): + elif choice == '13': peer_address = Address.from_string_for_transport(target_address, transport=BT_BR_EDR_TRANSPORT) connection = device.find_connection_by_bd_addr(peer_address, transport=BT_BR_EDR_TRANSPORT) if connection is not None: @@ -428,14 +432,14 @@ async def main(): else: print("Already disconnected from device") - elif (choice == '14'): + elif choice == '14': try: await device.keystore.delete(target_address) print("Unpair successful") except KeyError: print('Device not found or Device already unpaired.') - elif (choice == '15'): + elif choice == '15': connection = await device.connect(target_address, transport=BT_BR_EDR_TRANSPORT) await connection.authenticate() await connection.encrypt() @@ -443,19 +447,19 @@ async def main(): else: print("Invalid option selected.") - if ((len(sys.argv) > 4) and (sys.argv[4] == 'test-mode')): + if (len(sys.argv) > 4) and (sys.argv[4] == 'test-mode'): # Enabling menu for testing await menu() else: # HID Connection # Control channel - await hid_host.control_channel_connect() + await hid_host.connect_control_channel() # Interrupt Channel - await hid_host.interrupt_channel_connect() + await hid_host.connect_interrupt_channel() await hci_source.wait_for_termination() # ----------------------------------------------------------------------------- logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper()) -asyncio.run(main()) \ No newline at end of file +asyncio.run(main())