mirror of
https://github.com/google/bumble.git
synced 2026-05-10 04:18:03 +00:00
Submitting review comment fix: header function and extra lines.
Executed formatter on file.
This commit is contained in:
@@ -208,11 +208,11 @@ class Host(EventEmitter):
|
|||||||
|
|
||||||
def on_l2cap_channel_open(self, l2cap_channel: l2cap.Channel) -> None:
|
def on_l2cap_channel_open(self, l2cap_channel: l2cap.Channel) -> None:
|
||||||
if l2cap_channel.psm == HID_CONTROL_PSM:
|
if l2cap_channel.psm == HID_CONTROL_PSM:
|
||||||
self.l2cap_ctrl_channel = l2cap_channel
|
self.l2cap_ctrl_channel = l2cap_channel # type: ignore
|
||||||
self.l2cap_ctrl_channel.sink = self.on_ctrl_pdu
|
self.l2cap_ctrl_channel.sink = self.on_ctrl_pdu # type: ignore
|
||||||
else:
|
else:
|
||||||
self.l2cap_intr_channel = l2cap_channel
|
self.l2cap_intr_channel = l2cap_channel # type: ignore
|
||||||
self.l2cap_intr_channel.sink = self.on_intr_pdu
|
self.l2cap_intr_channel.sink = self.on_intr_pdu # type: ignore
|
||||||
logger.debug(f'$$$ L2CAP channel open: {l2cap_channel}')
|
logger.debug(f'$$$ L2CAP channel open: {l2cap_channel}')
|
||||||
|
|
||||||
def on_ctrl_pdu(self, pdu: bytes) -> None:
|
def on_ctrl_pdu(self, pdu: bytes) -> None:
|
||||||
@@ -300,4 +300,3 @@ class Host(EventEmitter):
|
|||||||
msg = bytearray([header])
|
msg = bytearray([header])
|
||||||
logger.debug(f'>>> HID CONTROL VIRTUAL CABLE UNPLUG, PDU: {msg.hex()}')
|
logger.debug(f'>>> HID CONTROL VIRTUAL CABLE UNPLUG, PDU: {msg.hex()}')
|
||||||
self.l2cap_ctrl_channel.send_pdu(msg) # type: ignore
|
self.l2cap_ctrl_channel.send_pdu(msg) # type: ignore
|
||||||
|
|
||||||
|
|||||||
@@ -50,7 +50,7 @@ shift_map = {
|
|||||||
',': '<',
|
',': '<',
|
||||||
'.': '>',
|
'.': '>',
|
||||||
'/': '?',
|
'/': '?',
|
||||||
'`' : '~'
|
'`': '~',
|
||||||
}
|
}
|
||||||
|
|
||||||
# hex map
|
# hex map
|
||||||
@@ -65,7 +65,7 @@ mod_keys = {
|
|||||||
'10': 'right_ctrl',
|
'10': 'right_ctrl',
|
||||||
'20': 'right_shift',
|
'20': 'right_shift',
|
||||||
'40': 'right_alt',
|
'40': 'right_alt',
|
||||||
'80' : 'right_meta'
|
'80': 'right_meta',
|
||||||
}
|
}
|
||||||
|
|
||||||
# base keys
|
# base keys
|
||||||
@@ -244,5 +244,5 @@ base_keys = {
|
|||||||
'f8': 'KEY_MEDIA_SLEEP',
|
'f8': 'KEY_MEDIA_SLEEP',
|
||||||
'f9': 'KEY_MEDIA_COFFEE',
|
'f9': 'KEY_MEDIA_COFFEE',
|
||||||
'fa': 'KEY_MEDIA_REFRESH',
|
'fa': 'KEY_MEDIA_REFRESH',
|
||||||
'fb' : 'KEY_MEDIA_CALC'
|
'fb': 'KEY_MEDIA_CALC',
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -60,17 +60,27 @@ class Keyboard:
|
|||||||
print(color('\tKeyboard Input Received', 'green', None, 'bold'))
|
print(color('\tKeyboard Input Received', 'green', None, 'bold'))
|
||||||
print(color(f'Keys:', 'white', None, 'bold'))
|
print(color(f'Keys:', 'white', None, 'bold'))
|
||||||
for i in range(1, 7):
|
for i in range(1, 7):
|
||||||
print(color(f' Key{i}{" ":>8s}= ', 'cyan', None, 'bold'), self.report[i + 1])
|
print(
|
||||||
|
color(f' Key{i}{" ":>8s}= ', 'cyan', None, 'bold'), self.report[i + 1]
|
||||||
|
)
|
||||||
print(color(f'\nModifier Keys:', 'white', None, 'bold'))
|
print(color(f'\nModifier Keys:', 'white', None, 'bold'))
|
||||||
print(
|
print(
|
||||||
color(f' Left Ctrl : ', 'cyan'), f'{self.report[0][0] == 1!s:<5}', # type: ignore
|
color(f' Left Ctrl : ', 'cyan'),
|
||||||
color(f' Left Shift : ', 'cyan'), f'{self.report[0][1] == 1!s:<5}', # type: ignore
|
f'{self.report[0][0] == 1!s:<5}', # type: ignore
|
||||||
color(f' Left ALT : ', 'cyan'), f'{self.report[0][2] == 1!s:<5}', # type: ignore
|
color(f' Left Shift : ', 'cyan'),
|
||||||
color(f' Left GUI : ', 'cyan'), f'{self.report[0][3] == 1!s:<5}\n', # type: ignore
|
f'{self.report[0][1] == 1!s:<5}', # type: ignore
|
||||||
color(f' Right Ctrl : ', 'cyan'), f'{self.report[0][4] == 1!s:<5}', # type: ignore
|
color(f' Left ALT : ', 'cyan'),
|
||||||
color(f' Right Shift : ', 'cyan'), f'{self.report[0][5] == 1!s:<5}', # type: ignore
|
f'{self.report[0][2] == 1!s:<5}', # type: ignore
|
||||||
color(f' Right ALT : ', 'cyan'), f'{self.report[0][6] == 1!s:<5}', # type: ignore
|
color(f' Left GUI : ', 'cyan'),
|
||||||
color(f' Right GUI : ', 'cyan'), f'{self.report[0][7] == 1!s:<5}', # type: ignore
|
f'{self.report[0][3] == 1!s:<5}\n', # type: ignore
|
||||||
|
color(f' Right Ctrl : ', 'cyan'),
|
||||||
|
f'{self.report[0][4] == 1!s:<5}', # type: ignore
|
||||||
|
color(f' Right Shift : ', 'cyan'),
|
||||||
|
f'{self.report[0][5] == 1!s:<5}', # type: ignore
|
||||||
|
color(f' Right ALT : ', 'cyan'),
|
||||||
|
f'{self.report[0][6] == 1!s:<5}', # type: ignore
|
||||||
|
color(f' Right GUI : ', 'cyan'),
|
||||||
|
f'{self.report[0][7] == 1!s:<5}', # type: ignore
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@@ -106,23 +116,32 @@ class Mouse:
|
|||||||
def print_mouse_report(self) -> None:
|
def print_mouse_report(self) -> None:
|
||||||
print(color('\tMouse Input Received', 'green', None, 'bold'))
|
print(color('\tMouse Input Received', 'green', None, 'bold'))
|
||||||
print(
|
print(
|
||||||
color(f' Button 1 (primary/trigger) = ', 'cyan'), self.report[0][0] == 1, # type: ignore
|
color(f' Button 1 (primary/trigger) = ', 'cyan'),
|
||||||
color(f'\n Button 2 (secondary) = ', 'cyan'), self.report[0][1] == 1, # type: ignore
|
self.report[0][0] == 1, # type: ignore
|
||||||
color(f'\n Button 3 (tertiary) = ', 'cyan'), self.report[0][2] == 1, # type: ignore
|
color(f'\n Button 2 (secondary) = ', 'cyan'),
|
||||||
color(f'\n Button4 = ', 'cyan'), self.report[0][3] == 1, # type: ignore
|
self.report[0][1] == 1, # type: ignore
|
||||||
color(f'\n Button5 = ', 'cyan'), self.report[0][4] == 1, # type: ignore
|
color(f'\n Button 3 (tertiary) = ', 'cyan'),
|
||||||
color(f'\n X (X-axis displacement) = ', 'cyan'), self.report[1],
|
self.report[0][2] == 1, # type: ignore
|
||||||
color(f'\n Y (Y-axis displacement) = ', 'cyan'), self.report[2],
|
color(f'\n Button4 = ', 'cyan'),
|
||||||
color(f'\n Wheel = ', 'cyan'), self.report[3],
|
self.report[0][3] == 1, # type: ignore
|
||||||
color(f'\n AC PAN = ', 'cyan'), self.report[4],
|
color(f'\n Button5 = ', 'cyan'),
|
||||||
|
self.report[0][4] == 1, # type: ignore
|
||||||
|
color(f'\n X (X-axis displacement) = ', 'cyan'),
|
||||||
|
self.report[1],
|
||||||
|
color(f'\n Y (Y-axis displacement) = ', 'cyan'),
|
||||||
|
self.report[2],
|
||||||
|
color(f'\n Wheel = ', 'cyan'),
|
||||||
|
self.report[3],
|
||||||
|
color(f'\n AC PAN = ', 'cyan'),
|
||||||
|
self.report[4],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
# ------------------------------------------------------------------------------
|
# ------------------------------------------------------------------------------
|
||||||
class ReportParser:
|
class ReportParser:
|
||||||
def parse_input_report(input_report: bytes) -> None: # type: ignore
|
def parse_input_report(self, input_report: bytes) -> None: # type: ignore
|
||||||
|
|
||||||
report_id = input_report[0]
|
report_id = input_report[0] # pylint: disable=unsubscriptable-object
|
||||||
report_length = len(input_report)
|
report_length = len(input_report)
|
||||||
|
|
||||||
# Keyboard input report (report id = 1)
|
# Keyboard input report (report id = 1)
|
||||||
|
|||||||
@@ -74,6 +74,7 @@ SDP_HID_SSR_HOST_MIN_TIMEOUT_ATTRIBUTE_ID = 0x0210
|
|||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
async def get_hid_device_sdp_record(device, connection):
|
async def get_hid_device_sdp_record(device, connection):
|
||||||
|
|
||||||
# Connect to the SDP Server
|
# Connect to the SDP Server
|
||||||
@@ -89,47 +90,86 @@ async def get_hid_device_sdp_record(device, connection):
|
|||||||
[BT_HUMAN_INTERFACE_DEVICE_SERVICE]
|
[BT_HUMAN_INTERFACE_DEVICE_SERVICE]
|
||||||
)
|
)
|
||||||
|
|
||||||
if (len(service_record_handles) < 1):
|
if len(service_record_handles) < 1:
|
||||||
await sdp_client.disconnect()
|
await sdp_client.disconnect()
|
||||||
raise Exception(color(f'BT HID Device service not found on peer device!!!!','red'))
|
raise Exception(
|
||||||
|
color(f'BT HID Device service not found on peer device!!!!', 'red')
|
||||||
|
)
|
||||||
|
|
||||||
# For BT_HUMAN_INTERFACE_DEVICE_SERVICE service, get all its attributes
|
# For BT_HUMAN_INTERFACE_DEVICE_SERVICE service, get all its attributes
|
||||||
for service_record_handle in service_record_handles:
|
for service_record_handle in service_record_handles:
|
||||||
attributes = await sdp_client.get_attributes(
|
attributes = await sdp_client.get_attributes(
|
||||||
service_record_handle, [SDP_ALL_ATTRIBUTES_RANGE]
|
service_record_handle, [SDP_ALL_ATTRIBUTES_RANGE]
|
||||||
)
|
)
|
||||||
print(
|
print(color(f'SERVICE {service_record_handle:04X} attributes:', 'yellow'))
|
||||||
color(f'SERVICE {service_record_handle:04X} attributes:', 'yellow')
|
|
||||||
)
|
|
||||||
print(color(f'SDP attributes for HID device', 'magenta'))
|
print(color(f'SDP attributes for HID device', 'magenta'))
|
||||||
for attribute in attributes:
|
for attribute in attributes:
|
||||||
if attribute.id == SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID:
|
if attribute.id == SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID:
|
||||||
print(color(' Service Record Handle : ', 'cyan'), hex(attribute.value.value))
|
print(
|
||||||
|
color(' Service Record Handle : ', 'cyan'),
|
||||||
|
hex(attribute.value.value),
|
||||||
|
)
|
||||||
|
|
||||||
elif attribute.id == SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID:
|
elif attribute.id == SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID:
|
||||||
print(color(' Service Class : ', 'cyan'), attribute.value.value[0].value)
|
print(
|
||||||
|
color(' Service Class : ', 'cyan'), attribute.value.value[0].value
|
||||||
|
)
|
||||||
|
|
||||||
elif attribute.id == SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID:
|
elif attribute.id == SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID:
|
||||||
print(color(' SDP Browse Group List : ', 'cyan'), attribute.value.value[0].value)
|
print(
|
||||||
|
color(' SDP Browse Group List : ', 'cyan'),
|
||||||
|
attribute.value.value[0].value,
|
||||||
|
)
|
||||||
|
|
||||||
elif attribute.id == SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID:
|
elif attribute.id == SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID:
|
||||||
print(color(' BT_L2CAP_PROTOCOL_ID : ', 'cyan'), attribute.value.value[0].value[0].value)
|
print(
|
||||||
print(color(' PSM for Bluetooth HID Control channel : ', 'cyan'), hex(attribute.value.value[0].value[1].value))
|
color(' BT_L2CAP_PROTOCOL_ID : ', 'cyan'),
|
||||||
print(color(' BT_HIDP_PROTOCOL_ID : ', 'cyan'), attribute.value.value[1].value[0].value)
|
attribute.value.value[0].value[0].value,
|
||||||
|
)
|
||||||
|
print(
|
||||||
|
color(' PSM for Bluetooth HID Control channel : ', 'cyan'),
|
||||||
|
hex(attribute.value.value[0].value[1].value),
|
||||||
|
)
|
||||||
|
print(
|
||||||
|
color(' BT_HIDP_PROTOCOL_ID : ', 'cyan'),
|
||||||
|
attribute.value.value[1].value[0].value,
|
||||||
|
)
|
||||||
|
|
||||||
elif attribute.id == SDP_LANGUAGE_BASE_ATTRIBUTE_ID_LIST_ATTRIBUTE_ID:
|
elif attribute.id == SDP_LANGUAGE_BASE_ATTRIBUTE_ID_LIST_ATTRIBUTE_ID:
|
||||||
print(color(' Lanugage : ', 'cyan'), hex(attribute.value.value[0].value))
|
print(
|
||||||
print(color(' Encoding : ', 'cyan'), hex(attribute.value.value[1].value))
|
color(' Lanugage : ', 'cyan'), hex(attribute.value.value[0].value)
|
||||||
print(color(' PrimaryLanguageBaseID : ', 'cyan'), hex(attribute.value.value[2].value))
|
)
|
||||||
|
print(
|
||||||
|
color(' Encoding : ', 'cyan'), hex(attribute.value.value[1].value)
|
||||||
|
)
|
||||||
|
print(
|
||||||
|
color(' PrimaryLanguageBaseID : ', 'cyan'),
|
||||||
|
hex(attribute.value.value[2].value),
|
||||||
|
)
|
||||||
|
|
||||||
elif attribute.id == SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID:
|
elif attribute.id == SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID:
|
||||||
print(color(' BT_HUMAN_INTERFACE_DEVICE_SERVICE ', 'cyan'), attribute.value.value[0].value[0].value)
|
print(
|
||||||
print(color(' HID Profileversion number : ', 'cyan'), hex(attribute.value.value[0].value[1].value))
|
color(' BT_HUMAN_INTERFACE_DEVICE_SERVICE ', 'cyan'),
|
||||||
|
attribute.value.value[0].value[0].value,
|
||||||
|
)
|
||||||
|
print(
|
||||||
|
color(' HID Profileversion number : ', 'cyan'),
|
||||||
|
hex(attribute.value.value[0].value[1].value),
|
||||||
|
)
|
||||||
|
|
||||||
elif attribute.id == SDP_ADDITIONAL_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID:
|
elif attribute.id == SDP_ADDITIONAL_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID:
|
||||||
print(color(' BT_L2CAP_PROTOCOL_ID : ', 'cyan'), attribute.value.value[0].value[0].value[0].value)
|
print(
|
||||||
print(color(' PSM for Bluetooth HID Interrupt channel : ', 'cyan'), hex(attribute.value.value[0].value[0].value[1].value))
|
color(' BT_L2CAP_PROTOCOL_ID : ', 'cyan'),
|
||||||
print(color(' BT_HIDP_PROTOCOL_ID : ', 'cyan'), attribute.value.value[0].value[1].value[0].value)
|
attribute.value.value[0].value[0].value[0].value,
|
||||||
|
)
|
||||||
|
print(
|
||||||
|
color(' PSM for Bluetooth HID Interrupt channel : ', 'cyan'),
|
||||||
|
hex(attribute.value.value[0].value[0].value[1].value),
|
||||||
|
)
|
||||||
|
print(
|
||||||
|
color(' BT_HIDP_PROTOCOL_ID : ', 'cyan'),
|
||||||
|
attribute.value.value[0].value[1].value[0].value,
|
||||||
|
)
|
||||||
|
|
||||||
elif attribute.id == SDP_HID_SERVICE_NAME_ATTRIBUTE_ID:
|
elif attribute.id == SDP_HID_SERVICE_NAME_ATTRIBUTE_ID:
|
||||||
print(color(' Service Name: ', 'cyan'), attribute.value.value)
|
print(color(' Service Name: ', 'cyan'), attribute.value.value)
|
||||||
@@ -144,10 +184,14 @@ async def get_hid_device_sdp_record(device, connection):
|
|||||||
print(color(' Release Number: ', 'cyan'), hex(attribute.value.value))
|
print(color(' Release Number: ', 'cyan'), hex(attribute.value.value))
|
||||||
|
|
||||||
elif attribute.id == SDP_HID_PARSER_VERSION_ATTRIBUTE_ID:
|
elif attribute.id == SDP_HID_PARSER_VERSION_ATTRIBUTE_ID:
|
||||||
print(color(' HID Parser Version: ', 'cyan'), hex(attribute.value.value))
|
print(
|
||||||
|
color(' HID Parser Version: ', 'cyan'), hex(attribute.value.value)
|
||||||
|
)
|
||||||
|
|
||||||
elif attribute.id == SDP_HID_DEVICE_SUBCLASS_ATTRIBUTE_ID:
|
elif attribute.id == SDP_HID_DEVICE_SUBCLASS_ATTRIBUTE_ID:
|
||||||
print(color(' HIDDeviceSubclass: ', 'cyan'), hex(attribute.value.value))
|
print(
|
||||||
|
color(' HIDDeviceSubclass: ', 'cyan'), hex(attribute.value.value)
|
||||||
|
)
|
||||||
|
|
||||||
elif attribute.id == SDP_HID_COUNTRY_CODE_ATTRIBUTE_ID:
|
elif attribute.id == SDP_HID_COUNTRY_CODE_ATTRIBUTE_ID:
|
||||||
print(color(' HIDCountryCode: ', 'cyan'), hex(attribute.value.value))
|
print(color(' HIDCountryCode: ', 'cyan'), hex(attribute.value.value))
|
||||||
@@ -159,12 +203,24 @@ async def get_hid_device_sdp_record(device, connection):
|
|||||||
print(color(' HIDReconnectInitiate: ', 'cyan'), attribute.value.value)
|
print(color(' HIDReconnectInitiate: ', 'cyan'), attribute.value.value)
|
||||||
|
|
||||||
elif attribute.id == SDP_HID_DESCRIPTOR_LIST_ATTRIBUTE_ID:
|
elif attribute.id == SDP_HID_DESCRIPTOR_LIST_ATTRIBUTE_ID:
|
||||||
print(color(' HID Report Descriptor type: ', 'cyan'), hex(attribute.value.value[0].value[0].value))
|
print(
|
||||||
print(color(' HID Report DescriptorList: ', 'cyan'), attribute.value.value[0].value[1].value)
|
color(' HID Report Descriptor type: ', 'cyan'),
|
||||||
|
hex(attribute.value.value[0].value[0].value),
|
||||||
|
)
|
||||||
|
print(
|
||||||
|
color(' HID Report DescriptorList: ', 'cyan'),
|
||||||
|
attribute.value.value[0].value[1].value,
|
||||||
|
)
|
||||||
|
|
||||||
elif attribute.id == SDP_HID_LANGID_BASE_LIST_ATTRIBUTE_ID:
|
elif attribute.id == SDP_HID_LANGID_BASE_LIST_ATTRIBUTE_ID:
|
||||||
print(color(' HID LANGID Base Language: ', 'cyan'), hex(attribute.value.value[0].value[0].value))
|
print(
|
||||||
print(color(' HID LANGID Base Bluetooth String Offset: ', 'cyan'), hex(attribute.value.value[0].value[1].value))
|
color(' HID LANGID Base Language: ', 'cyan'),
|
||||||
|
hex(attribute.value.value[0].value[0].value),
|
||||||
|
)
|
||||||
|
print(
|
||||||
|
color(' HID LANGID Base Bluetooth String Offset: ', 'cyan'),
|
||||||
|
hex(attribute.value.value[0].value[1].value),
|
||||||
|
)
|
||||||
|
|
||||||
elif attribute.id == SDP_HID_BATTERY_POWER_ATTRIBUTE_ID:
|
elif attribute.id == SDP_HID_BATTERY_POWER_ATTRIBUTE_ID:
|
||||||
print(color(' HIDBatteryPower: ', 'cyan'), attribute.value.value)
|
print(color(' HIDBatteryPower: ', 'cyan'), attribute.value.value)
|
||||||
@@ -173,25 +229,43 @@ async def get_hid_device_sdp_record(device, connection):
|
|||||||
print(color(' HIDRemoteWake: ', 'cyan'), attribute.value.value)
|
print(color(' HIDRemoteWake: ', 'cyan'), attribute.value.value)
|
||||||
|
|
||||||
elif attribute.id == SDP_HID_PROFILE_VERSION_ATTRIBUTE_ID:
|
elif attribute.id == SDP_HID_PROFILE_VERSION_ATTRIBUTE_ID:
|
||||||
print(color(' HIDProfileVersion : ', 'cyan'), hex(attribute.value.value))
|
print(
|
||||||
|
color(' HIDProfileVersion : ', 'cyan'), hex(attribute.value.value)
|
||||||
|
)
|
||||||
|
|
||||||
elif attribute.id == SDP_HID_SUPERVISION_TIMEOUT_ATTRIBUTE_ID:
|
elif attribute.id == SDP_HID_SUPERVISION_TIMEOUT_ATTRIBUTE_ID:
|
||||||
print(color(' HIDSupervisionTimeout: ', 'cyan'), hex(attribute.value.value))
|
print(
|
||||||
|
color(' HIDSupervisionTimeout: ', 'cyan'),
|
||||||
|
hex(attribute.value.value),
|
||||||
|
)
|
||||||
|
|
||||||
elif attribute.id == SDP_HID_NORMALLY_CONNECTABLE_ATTRIBUTE_ID:
|
elif attribute.id == SDP_HID_NORMALLY_CONNECTABLE_ATTRIBUTE_ID:
|
||||||
print(color(' HIDNormallyConnectable: ', 'cyan'), attribute.value.value)
|
print(
|
||||||
|
color(' HIDNormallyConnectable: ', 'cyan'), attribute.value.value
|
||||||
|
)
|
||||||
|
|
||||||
elif attribute.id == SDP_HID_BOOT_DEVICE_ATTRIBUTE_ID:
|
elif attribute.id == SDP_HID_BOOT_DEVICE_ATTRIBUTE_ID:
|
||||||
print(color(' HIDBootDevice: ', 'cyan'), attribute.value.value)
|
print(color(' HIDBootDevice: ', 'cyan'), attribute.value.value)
|
||||||
|
|
||||||
elif attribute.id == SDP_HID_SSR_HOST_MAX_LATENCY_ATTRIBUTE_ID:
|
elif attribute.id == SDP_HID_SSR_HOST_MAX_LATENCY_ATTRIBUTE_ID:
|
||||||
print(color(' HIDSSRHostMaxLatency: ', 'cyan'), hex(attribute.value.value))
|
print(
|
||||||
|
color(' HIDSSRHostMaxLatency: ', 'cyan'),
|
||||||
|
hex(attribute.value.value),
|
||||||
|
)
|
||||||
|
|
||||||
elif attribute.id == SDP_HID_SSR_HOST_MIN_TIMEOUT_ATTRIBUTE_ID:
|
elif attribute.id == SDP_HID_SSR_HOST_MIN_TIMEOUT_ATTRIBUTE_ID:
|
||||||
print(color(' HIDSSRHostMinTimeout: ', 'cyan'), hex(attribute.value.value))
|
print(
|
||||||
|
color(' HIDSSRHostMinTimeout: ', 'cyan'),
|
||||||
|
hex(attribute.value.value),
|
||||||
|
)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
print(color(f' Warning: Attribute ID: {attribute.id} match not found.\n Attribute Info: {attribute}', 'yellow'))
|
print(
|
||||||
|
color(
|
||||||
|
f' Warning: Attribute ID: {attribute.id} match not found.\n Attribute Info: {attribute}',
|
||||||
|
'yellow',
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
await sdp_client.disconnect()
|
await sdp_client.disconnect()
|
||||||
|
|
||||||
@@ -217,17 +291,24 @@ async def main():
|
|||||||
return
|
return
|
||||||
|
|
||||||
def on_hid_data_cb(pdu):
|
def on_hid_data_cb(pdu):
|
||||||
report_type = pdu[0] & 0x0f
|
report_type = pdu[0] & 0x0F
|
||||||
if (len(pdu) == 1):
|
if len(pdu) == 1:
|
||||||
print(color(f'Warning: No report received', 'yellow'))
|
print(color(f'Warning: No report received', 'yellow'))
|
||||||
return
|
return
|
||||||
report_length = len(pdu[1:])
|
report_length = len(pdu[1:])
|
||||||
report_id = pdu[1]
|
report_id = pdu[1]
|
||||||
|
|
||||||
if (report_type != Message.ReportType.OTHER_REPORT):
|
if report_type != Message.ReportType.OTHER_REPORT:
|
||||||
print(color(f' Report type = {report_type}, Report length = {report_length}, Report id = {report_id}', 'blue', None, 'bold'))
|
print(
|
||||||
|
color(
|
||||||
|
f' Report type = {report_type}, Report length = {report_length}, Report id = {report_id}',
|
||||||
|
'blue',
|
||||||
|
None,
|
||||||
|
'bold',
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
if ((report_length <= 1) or (report_id == 0)):
|
if (report_length <= 1) or (report_id == 0):
|
||||||
return
|
return
|
||||||
|
|
||||||
if report_type == Message.ReportType.INPUT_REPORT:
|
if report_type == Message.ReportType.INPUT_REPORT:
|
||||||
@@ -282,7 +363,9 @@ async def main():
|
|||||||
async def menu():
|
async def menu():
|
||||||
reader = await get_stream_reader(sys.stdin)
|
reader = await get_stream_reader(sys.stdin)
|
||||||
while True:
|
while True:
|
||||||
print("\n************************ HID Host Menu *****************************\n")
|
print(
|
||||||
|
"\n************************ HID Host Menu *****************************\n"
|
||||||
|
)
|
||||||
print(" 1. Connect Control Channel")
|
print(" 1. Connect Control Channel")
|
||||||
print(" 2. Connect Interrupt Channel")
|
print(" 2. Connect Interrupt Channel")
|
||||||
print(" 3. Disconnect Control Channel")
|
print(" 3. Disconnect Control Channel")
|
||||||
@@ -343,7 +426,9 @@ async def main():
|
|||||||
|
|
||||||
if choice1 == '1':
|
if choice1 == '1':
|
||||||
# data includes first octet as report id
|
# data includes first octet as report id
|
||||||
data = bytearray([0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01])
|
data = bytearray(
|
||||||
|
[0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01]
|
||||||
|
)
|
||||||
hid_host.set_report(1, data)
|
hid_host.set_report(1, data)
|
||||||
|
|
||||||
elif choice1 == '2':
|
elif choice1 == '2':
|
||||||
@@ -382,11 +467,13 @@ async def main():
|
|||||||
choice1 = choice1.decode('utf-8').strip()
|
choice1 = choice1.decode('utf-8').strip()
|
||||||
|
|
||||||
if choice1 == '1':
|
if choice1 == '1':
|
||||||
data = bytearray([0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])
|
data = bytearray(
|
||||||
|
[0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]
|
||||||
|
)
|
||||||
hid_host.send_data(data)
|
hid_host.send_data(data)
|
||||||
|
|
||||||
elif choice1 == '2':
|
elif choice1 == '2':
|
||||||
data = bytearray([0x03, 0x00, 0x0d, 0xfd, 0x00, 0x00])
|
data = bytearray([0x03, 0x00, 0x0D, 0xFD, 0x00, 0x00])
|
||||||
hid_host.send_data(data)
|
hid_host.send_data(data)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
@@ -406,8 +493,12 @@ async def main():
|
|||||||
print('Device not found or Device already unpaired.')
|
print('Device not found or Device already unpaired.')
|
||||||
|
|
||||||
elif choice == '13':
|
elif choice == '13':
|
||||||
peer_address = Address.from_string_for_transport(target_address, transport=BT_BR_EDR_TRANSPORT)
|
peer_address = Address.from_string_for_transport(
|
||||||
connection = device.find_connection_by_bd_addr(peer_address, transport=BT_BR_EDR_TRANSPORT)
|
target_address, transport=BT_BR_EDR_TRANSPORT
|
||||||
|
)
|
||||||
|
connection = device.find_connection_by_bd_addr(
|
||||||
|
peer_address, transport=BT_BR_EDR_TRANSPORT
|
||||||
|
)
|
||||||
if connection is not None:
|
if connection is not None:
|
||||||
await connection.disconnect()
|
await connection.disconnect()
|
||||||
else:
|
else:
|
||||||
@@ -421,7 +512,9 @@ async def main():
|
|||||||
print('Device not found or Device already unpaired.')
|
print('Device not found or Device already unpaired.')
|
||||||
|
|
||||||
elif choice == '15':
|
elif choice == '15':
|
||||||
connection = await device.connect(target_address, transport=BT_BR_EDR_TRANSPORT)
|
connection = await device.connect(
|
||||||
|
target_address, transport=BT_BR_EDR_TRANSPORT
|
||||||
|
)
|
||||||
await connection.authenticate()
|
await connection.authenticate()
|
||||||
await connection.encrypt()
|
await connection.encrypt()
|
||||||
|
|
||||||
@@ -445,4 +538,3 @@ async def main():
|
|||||||
|
|
||||||
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
|
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
|
||||||
asyncio.run(main())
|
asyncio.run(main())
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user