Review comment Fix

This commit is contained in:
skarnataki
2023-09-27 10:57:51 +00:00
committed by Lucas Abel
parent 16d33199eb
commit 5ce353bcde
3 changed files with 186 additions and 131 deletions

View File

@@ -15,11 +15,13 @@
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# Imports # Imports
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
from __future__ import annotations
import logging import logging
import asyncio import asyncio
from pyee import EventEmitter from pyee import EventEmitter
from typing import Optional, Tuple, Callable, Dict, Union from typing import Optional, Tuple, Callable, Dict, Union
from .device import Device, Connection
from . import core, l2cap # type: ignore from . import core, l2cap # type: ignore
from .colors import color # type: ignore from .colors import color # type: ignore
@@ -36,8 +38,8 @@ logger = logging.getLogger(__name__)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# fmt: off # fmt: off
HID_CTRL_PSM = 0x0011 HID_CONTROL_PSM = 0x0011
HID_INTR_PSM = 0x0013 HID_INTERRUPT_PSM = 0x0013
# HIDP message types # HIDP message types
HID_HANDSHAKE = 0x00 HID_HANDSHAKE = 0x00
@@ -71,11 +73,64 @@ HID_SUSPEND = 0x03
HID_EXIT_SUSPEND = 0x04 HID_EXIT_SUSPEND = 0x04
HID_VIRTUAL_CABLE_UNPLUG = 0x05 HID_VIRTUAL_CABLE_UNPLUG = 0x05
class HIDPacket():
def __init__(self,
report_type: Optional[int] = None,
report_id: Optional[int] = None,
buffer_size: Optional[int] = None,
protocol_mode: Optional[int] = None,
data: Optional[bytes] = None) -> None:
self.report_type = report_type
self.report_id = report_id
self.buffer_size = buffer_size
self.protocol_mode = protocol_mode
self.data = data
def to_bytes_gr(self) -> bytes:
if(self.report_type == HID_OTHER_REPORT):
param = self.report_type
else:
param = 0x08 | self.report_type
header = ((HID_GET_REPORT << 4) | param)
packet_bytes = bytearray()
packet_bytes.append(header)
packet_bytes.append(self.report_id)
packet_bytes.extend([(self.buffer_size & 0xff), ((self.buffer_size >> 8) & 0xff)])
return bytes(packet_bytes)
def to_bytes_sr(self) -> bytes:
header = ((HID_SET_REPORT << 4) | self.report_type)
packet_bytes = bytearray()
packet_bytes.append(header)
packet_bytes.extend(self.data)
return bytes(packet_bytes)
def to_bytes_gp(self) -> bytes:
header = (HID_GET_PROTOCOL << 4)
packet_bytes = bytearray()
packet_bytes.append(header)
return bytes(packet_bytes)
def to_bytes_sp(self) -> bytes:
header = (HID_SET_PROTOCOL << 4 | self.protocol_mode)
packet_bytes = bytearray()
packet_bytes.append(header)
packet_bytes.append(self.protocol_mode)
return bytes(packet_bytes)
def to_bytes_send_data(self) -> bytes:
header = ((HID_DATA << 4) | HID_OUTPUT_REPORT)
packet_bytes = bytearray()
packet_bytes.append(header)
packet_bytes.extend(self.data)
return bytes(packet_bytes)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class HIDHost(EventEmitter): class HIDHost(EventEmitter):
l2cap_channel: Optional[l2cap.Channel] l2cap_channel: Optional[l2cap.Channel]
def __init__(self, device, connection) -> None: def __init__(self, device: Device, connection: Connection) -> None:
super().__init__() super().__init__()
self.device = device self.device = device
self.connection = connection self.connection = connection
@@ -83,14 +138,14 @@ class HIDHost(EventEmitter):
self.l2cap_intr_channel = None self.l2cap_intr_channel = None
# Register ourselves with the L2CAP channel manager # Register ourselves with the L2CAP channel manager
device.register_l2cap_server(HID_CTRL_PSM, self.on_connection) device.register_l2cap_server(HID_CONTROL_PSM, self.on_connection)
device.register_l2cap_server(HID_INTR_PSM, self.on_connection) device.register_l2cap_server(HID_INTERRUPT_PSM, self.on_connection)
async def control_channel_connect(self) -> None: async def connect_control_channel(self) -> None:
# Create a new L2CAP connection - control channel # Create a new L2CAP connection - control channel
try: try:
self.l2cap_ctrl_channel = await self.device.l2cap_channel_manager.connect( self.l2cap_ctrl_channel = await self.device.l2cap_channel_manager.connect(
self.connection, HID_CTRL_PSM self.connection, HID_CONTROL_PSM
) )
except ProtocolError as error: except ProtocolError as error:
logger.error(f'L2CAP connection failed: {error}') logger.error(f'L2CAP connection failed: {error}')
@@ -100,11 +155,11 @@ class HIDHost(EventEmitter):
# Become a sink for the L2CAP channel # Become a sink for the L2CAP channel
self.l2cap_ctrl_channel.sink = self.on_ctrl_pdu self.l2cap_ctrl_channel.sink = self.on_ctrl_pdu
async def interrupt_channel_connect(self) -> None: async def connect_interrupt_channel(self) -> None:
# Create a new L2CAP connection - interrupt channel # Create a new L2CAP connection - interrupt channel
try: try:
self.l2cap_intr_channel = await self.device.l2cap_channel_manager.connect( self.l2cap_intr_channel = await self.device.l2cap_channel_manager.connect(
self.connection, HID_INTR_PSM self.connection, HID_INTERRUPT_PSM
) )
except ProtocolError as error: except ProtocolError as error:
logger.error(f'L2CAP connection failed: {error}') logger.error(f'L2CAP connection failed: {error}')
@@ -114,10 +169,14 @@ class HIDHost(EventEmitter):
# Become a sink for the L2CAP channel # Become a sink for the L2CAP channel
self.l2cap_intr_channel.sink = self.on_intr_pdu self.l2cap_intr_channel.sink = self.on_intr_pdu
async def interrupt_channel_disconnect(self): async def disconnect_interrupt_channel(self) -> None:
if self.l2cap_intr_channel is None:
raise InvalidStateError('invalid state')
await self.l2cap_intr_channel.disconnect() # type: ignore await self.l2cap_intr_channel.disconnect() # type: ignore
async def control_channel_disconnect(self): async def disconnect_control_channel(self) -> None:
if self.l2cap_ctrl_channel is None:
raise InvalidStateError('invalid state')
await self.l2cap_ctrl_channel.disconnect() # type: ignore await self.l2cap_ctrl_channel.disconnect() # type: ignore
def on_connection(self, l2cap_channel: l2cap.Channel) -> None: def on_connection(self, l2cap_channel: l2cap.Channel) -> None:
@@ -125,7 +184,7 @@ class HIDHost(EventEmitter):
l2cap_channel.on('open', lambda: self.on_l2cap_channel_open(l2cap_channel)) l2cap_channel.on('open', lambda: self.on_l2cap_channel_open(l2cap_channel))
def on_l2cap_channel_open(self, l2cap_channel: l2cap.Channel) -> None: def on_l2cap_channel_open(self, l2cap_channel: l2cap.Channel) -> None:
if l2cap_channel.psm == HID_CTRL_PSM: if l2cap_channel.psm == HID_CONTROL_PSM:
self.l2cap_ctrl_channel = l2cap_channel self.l2cap_ctrl_channel = l2cap_channel
self.l2cap_ctrl_channel.sink = self.on_ctrl_pdu self.l2cap_ctrl_channel.sink = self.on_ctrl_pdu
else: else:
@@ -138,21 +197,21 @@ class HIDHost(EventEmitter):
# Here we will receive all kinds of packets, parse and then call respective callbacks # Here we will receive all kinds of packets, parse and then call respective callbacks
message_type = pdu[0] >> 4 message_type = pdu[0] >> 4
param = pdu[0] & 0x0f param = pdu[0] & 0x0f
if (message_type == HID_HANDSHAKE): if message_type == HID_HANDSHAKE :
logger.debug('<<< HID HANDSHAKE') logger.debug('<<< HID HANDSHAKE')
self.handle_handshake(param) self.handle_handshake(param)
self.emit('handshake', ) self.emit('handshake', pdu)
elif (message_type == HID_DATA): elif message_type == HID_DATA :
logger.debug('<<< HID CONTROL DATA') logger.debug('<<< HID CONTROL DATA')
self.emit('data', pdu) self.emit('data', pdu)
elif (message_type == HID_CONTROL): elif message_type == HID_CONTROL :
if (param == HID_SUSPEND): if param == HID_SUSPEND :
logger.debug('<<< HID SUSPEND') logger.debug('<<< HID SUSPEND')
self.emit('suspend', pdu) self.emit('suspend', pdu)
elif (param == HID_EXIT_SUSPEND): elif param == HID_EXIT_SUSPEND :
logger.debug('<<< HID EXIT SUSPEND') logger.debug('<<< HID EXIT SUSPEND')
self.emit('exit_suspend', pdu) self.emit('exit_suspend', pdu)
elif (param == HID_VIRTUAL_CABLE_UNPLUG): elif param == HID_VIRTUAL_CABLE_UNPLUG :
logger.debug('<<< HID VIRTUAL CABLE UNPLUG') logger.debug('<<< HID VIRTUAL CABLE UNPLUG')
self.emit('virtual_cable_unplug') self.emit('virtual_cable_unplug')
else: else:
@@ -165,50 +224,41 @@ class HIDHost(EventEmitter):
logger.debug(f'<<< HID INTERRUPT PDU: {pdu.hex()}') logger.debug(f'<<< HID INTERRUPT PDU: {pdu.hex()}')
self.emit("data", pdu) self.emit("data", pdu)
def register_data_cb(self, data_cb): def get_report(self, report_type: int, report_id: int, buffer_size: int) -> None:
self.on('data', data_cb) msg = HIDPacket(report_type = report_type , report_id = report_id , buffer_size = buffer_size)
hid_packet = msg.to_bytes_gr()
logger.debug(f'>>> HID CONTROL GET REPORT, PDU: {hid_packet.hex()}')
self.send_pdu_on_ctrl(hid_packet) # type: ignore
def register_handshake_cb(self, handshake_cb): def set_report(self, report_type: int, data: bytes):
self.on('handshake', handshake_cb) msg = HIDPacket(report_type= report_type,data = data)
hid_packet = msg.to_bytes_sr()
def register_virtual_cable_unplug(self, virtual_cable_unplug_cb): logger.debug(f'>>> HID CONTROL SET REPORT, PDU:{hid_packet.hex()}')
self.on('virtual_cable_unplug', virtual_cable_unplug_cb) self.send_pdu_on_ctrl(hid_packet) # type: ignore
def get_report(self, report_type, report_id, buffer_size):
if(report_type == HID_OTHER_REPORT):
param = report_type
else:
param = 0x08 | report_type
header = ((HID_GET_REPORT << 4) | param)
msg = bytes([header, report_id, (buffer_size & 0xff), ((buffer_size >> 8) & 0xff)])
logger.debug(f'>>> HID CONTROL GET REPORT, PDU: {msg.hex()}')
self.l2cap_ctrl_channel.send_pdu(msg) # type: ignore
def set_report(self, report_type, data):
header = ((HID_SET_REPORT << 4) | report_type)
msg = bytearray([header])
msg.extend(data)
logger.debug(f'>>> HID CONTROL SET REPORT, PDU:{msg.hex()}')
self.l2cap_ctrl_channel.send_pdu(msg) # type: ignore
def get_protocol(self): def get_protocol(self):
header = (HID_GET_PROTOCOL << 4) msg = HIDPacket()
msg = bytearray([header]) hid_packet = msg.to_bytes_gp()
logger.debug(f'>>> HID CONTROL GET PROTOCOL, PDU: {msg.hex()}') logger.debug(f'>>> HID CONTROL GET PROTOCOL, PDU: {hid_packet.hex()}')
self.send_pdu_on_ctrl(hid_packet) # type: ignore
def set_protocol(self, protocol_mode: int):
msg = HIDPacket(protocol_mode= protocol_mode)
hid_packet = msg.to_bytes_sp()
logger.debug(f'>>> HID CONTROL SET PROTOCOL, PDU: {hid_packet.hex()}')
self.send_pdu_on_ctrl(hid_packet) # type: ignore
def send_pdu_on_ctrl(self, msg: bytes) -> None:
self.l2cap_ctrl_channel.send_pdu(msg) # type: ignore self.l2cap_ctrl_channel.send_pdu(msg) # type: ignore
def set_protocol(self, protocol_mode): def send_pdu_on_intr(self, msg: bytes) -> None:
header = (HID_SET_PROTOCOL << 4 | protocol_mode) self.l2cap_intr_channel.send_pdu(msg) # type: ignore
msg = bytearray([header])
logger.debug(f'>>> HID CONTROL SET PROTOCOL, PDU: {msg.hex()}')
self.l2cap_ctrl_channel.send_pdu(msg) # type: ignore
def send_data(self, data): def send_data(self, data):
header = ((HID_DATA << 4) | HID_OUTPUT_REPORT) msg = HIDPacket(data= data)
msg = bytearray([header]) hid_packet = msg.to_bytes_send_data()
msg.extend(data) logger.debug(f'>>> HID INTERRUPT SEND DATA, PDU: {hid_packet.hex()}')
logger.debug(f'>>> HID INTERRUPT SEND DATA, PDU: {msg.hex()}') self.send_pdu_on_intr(hid_packet) # type: ignore
self.l2cap_intr_channel.send_pdu(msg) # type: ignore
def suspend(self): def suspend(self):
header = (HID_CONTROL << 4 | HID_SUSPEND) header = (HID_CONTROL << 4 | HID_SUSPEND)
@@ -228,19 +278,18 @@ class HIDHost(EventEmitter):
logger.debug(f'>>> HID CONTROL VIRTUAL CABLE UNPLUG, PDU: {msg.hex()}') logger.debug(f'>>> HID CONTROL VIRTUAL CABLE UNPLUG, PDU: {msg.hex()}')
self.l2cap_ctrl_channel.send_pdu(msg) # type: ignore self.l2cap_ctrl_channel.send_pdu(msg) # type: ignore
def handle_handshake(self, param): def handle_handshake(self, param: int):
if (param == HANDSHAKE_SUCCESSFUL): if param == HANDSHAKE_SUCCESSFUL :
logger.debug(f'<<< HID HANDSHAKE: SUCCESSFUL') logger.debug(f'<<< HID HANDSHAKE: SUCCESSFUL')
elif (param == HANDSHAKE_NOT_READY): elif param == HANDSHAKE_NOT_READY :
logger.warning(f'<<< HID HANDSHAKE: NOT_READY') logger.warning(f'<<< HID HANDSHAKE: NOT_READY')
elif (param == HANDSHAKE_ERR_INVALID_REPORT_ID): elif param == HANDSHAKE_ERR_INVALID_REPORT_ID :
logger.warning(f'<<< HID HANDSHAKE: ERR_INVALID_REPORT_ID') logger.warning(f'<<< HID HANDSHAKE: ERR_INVALID_REPORT_ID')
elif (param == HANDSHAKE_ERR_UNSUPPORTED_REQUEST): elif param == HANDSHAKE_ERR_UNSUPPORTED_REQUEST :
logger.warning(f'<<< HID HANDSHAKE: ERR_UNSUPPORTED_REQUEST') logger.warning(f'<<< HID HANDSHAKE: ERR_UNSUPPORTED_REQUEST')
elif (param == HANDSHAKE_ERR_UNKNOWN): elif param == HANDSHAKE_ERR_UNKNOWN :
logger.warning(f'<<< HID HANDSHAKE: ERR_UNKNOWN') logger.warning(f'<<< HID HANDSHAKE: ERR_UNKNOWN')
elif (param == HANDSHAKE_ERR_FATAL): elif param == HANDSHAKE_ERR_FATAL :
logger.warning(f'<<< HID HANDSHAKE: ERR_FATAL') logger.warning(f'<<< HID HANDSHAKE: ERR_FATAL')
else: # 0x5-0xD = Reserved else: # 0x5-0xD = Reserved
logger.warning("<<< HID HANDSHAKE: RESERVED VALUE") logger.warning("<<< HID HANDSHAKE: RESERVED VALUE")

View File

@@ -167,7 +167,7 @@ class DataElement:
UUID: lambda x: DataElement( UUID: lambda x: DataElement(
DataElement.UUID, core.UUID.from_bytes(bytes(reversed(x))) DataElement.UUID, core.UUID.from_bytes(bytes(reversed(x)))
), ),
TEXT_STRING: lambda x: DataElement(DataElement.TEXT_STRING, x), TEXT_STRING: lambda x: DataElement(DataElement.TEXT_STRING, x.decode('latin1')),
BOOLEAN: lambda x: DataElement(DataElement.BOOLEAN, x[0] == 1), BOOLEAN: lambda x: DataElement(DataElement.BOOLEAN, x[0] == 1),
SEQUENCE: lambda x: DataElement( SEQUENCE: lambda x: DataElement(
DataElement.SEQUENCE, DataElement.list_from_bytes(x) DataElement.SEQUENCE, DataElement.list_from_bytes(x)
@@ -376,7 +376,9 @@ class DataElement:
raise ValueError('invalid value_size') raise ValueError('invalid value_size')
elif self.type == DataElement.UUID: elif self.type == DataElement.UUID:
data = bytes(reversed(bytes(self.value))) data = bytes(reversed(bytes(self.value)))
elif self.type in (DataElement.TEXT_STRING, DataElement.URL): elif self.type == DataElement.TEXT_STRING:
data = self.value.encode('latin1')
elif self.type == DataElement.URL:
data = self.value.encode('utf8') data = self.value.encode('utf8')
elif self.type == DataElement.BOOLEAN: elif self.type == DataElement.BOOLEAN:
data = bytes([1 if self.value else 0]) data = bytes([1 if self.value else 0])

View File

@@ -103,111 +103,111 @@ async def get_hid_device_sdp_record(device, connection):
) )
print(color(f'SDP attributes for HID device','magenta')) print(color(f'SDP attributes for HID device','magenta'))
for attribute in attributes: for attribute in attributes:
if (attribute.id == SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID): if attribute.id == SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID :
print(color(' Service Record Handle : ', 'cyan'), hex(attribute.value.value)) print(color(' Service Record Handle : ', 'cyan'), hex(attribute.value.value))
elif (attribute.id == SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID): elif attribute.id == SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID :
print(color(' Service Class : ', 'cyan'), attribute.value.value[0].value) print(color(' Service Class : ', 'cyan'), attribute.value.value[0].value)
elif (attribute.id == SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID): elif attribute.id == SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID :
print(color(' SDP Browse Group List : ', 'cyan'), attribute.value.value[0].value) print(color(' SDP Browse Group List : ', 'cyan'), attribute.value.value[0].value)
elif (attribute.id == SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID): elif attribute.id == SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID :
print(color(' BT_L2CAP_PROTOCOL_ID : ', 'cyan'), attribute.value.value[0].value[0].value) print(color(' BT_L2CAP_PROTOCOL_ID : ', 'cyan'), attribute.value.value[0].value[0].value)
print(color(' PSM for Bluetooth HID Control channel : ', 'cyan'), hex(attribute.value.value[0].value[1].value)) print(color(' PSM for Bluetooth HID Control channel : ', 'cyan'), hex(attribute.value.value[0].value[1].value))
print(color(' BT_HIDP_PROTOCOL_ID : ', 'cyan'), attribute.value.value[1].value[0].value) print(color(' BT_HIDP_PROTOCOL_ID : ', 'cyan'), attribute.value.value[1].value[0].value)
elif (attribute.id == SDP_LANGUAGE_BASE_ATTRIBUTE_ID_LIST_ATTRIBUTE_ID): elif attribute.id == SDP_LANGUAGE_BASE_ATTRIBUTE_ID_LIST_ATTRIBUTE_ID :
print(color(' Lanugage : ', 'cyan'), hex(attribute.value.value[0].value)) print(color(' Lanugage : ', 'cyan'), hex(attribute.value.value[0].value))
print(color(' Encoding : ', 'cyan'), hex(attribute.value.value[1].value)) print(color(' Encoding : ', 'cyan'), hex(attribute.value.value[1].value))
print(color(' PrimaryLanguageBaseID : ', 'cyan'), hex(attribute.value.value[2].value)) print(color(' PrimaryLanguageBaseID : ', 'cyan'), hex(attribute.value.value[2].value))
elif (attribute.id == SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID): elif attribute.id == SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID :
print(color(' BT_HUMAN_INTERFACE_DEVICE_SERVICE ', 'cyan'), attribute.value.value[0].value[0].value) print(color(' BT_HUMAN_INTERFACE_DEVICE_SERVICE ', 'cyan'), attribute.value.value[0].value[0].value)
print(color(' HID Profileversion number : ', 'cyan'), hex(attribute.value.value[0].value[1].value)) print(color(' HID Profileversion number : ', 'cyan'), hex(attribute.value.value[0].value[1].value))
elif (attribute.id == SDP_ADDITIONAL_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID): elif attribute.id == SDP_ADDITIONAL_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID :
print(color(' BT_L2CAP_PROTOCOL_ID : ', 'cyan'), attribute.value.value[0].value[0].value[0].value) print(color(' BT_L2CAP_PROTOCOL_ID : ', 'cyan'), attribute.value.value[0].value[0].value[0].value)
print(color(' PSM for Bluetooth HID Interrupt channel : ', 'cyan'), hex(attribute.value.value[0].value[0].value[1].value)) print(color(' PSM for Bluetooth HID Interrupt channel : ', 'cyan'), hex(attribute.value.value[0].value[0].value[1].value))
print(color(' BT_HIDP_PROTOCOL_ID : ', 'cyan'), attribute.value.value[0].value[1].value[0].value) print(color(' BT_HIDP_PROTOCOL_ID : ', 'cyan'), attribute.value.value[0].value[1].value[0].value)
elif (attribute.id == SDP_HID_SERVICE_NAME_ATTRIBUTE_ID): elif attribute.id == SDP_HID_SERVICE_NAME_ATTRIBUTE_ID :
print(color(' Service Name: ', 'cyan'), attribute.value.value) print(color(' Service Name: ', 'cyan'), attribute.value.value)
HID_Service_Name = attribute.value.value HID_Service_Name = attribute.value.value
elif (attribute.id == SDP_HID_SERVICE_DESCRIPTION_ATTRIBUTE_ID): elif attribute.id == SDP_HID_SERVICE_DESCRIPTION_ATTRIBUTE_ID :
print(color(' Service Description: ', 'cyan'), attribute.value.value) print(color(' Service Description: ', 'cyan'), attribute.value.value)
HID_Service_Description = attribute.value.value HID_Service_Description = attribute.value.value
elif (attribute.id == SDP_HID_PROVIDER_NAME_ATTRIBUTE_ID): elif attribute.id == SDP_HID_PROVIDER_NAME_ATTRIBUTE_ID :
print(color(' Provider Name: ', 'cyan'), attribute.value.value) print(color(' Provider Name: ', 'cyan'), attribute.value.value)
HID_Provider_Name = attribute.value.value HID_Provider_Name = attribute.value.value
elif (attribute.id == SDP_HID_DEVICE_RELEASE_NUMBER_ATTRIBUTE_ID): elif attribute.id == SDP_HID_DEVICE_RELEASE_NUMBER_ATTRIBUTE_ID :
print(color(' Release Number: ', 'cyan'), hex(attribute.value.value)) print(color(' Release Number: ', 'cyan'), hex(attribute.value.value))
HID_Device_Release_Number = attribute.value.value HID_Device_Release_Number = attribute.value.value
elif (attribute.id == SDP_HID_PARSER_VERSION_ATTRIBUTE_ID): elif attribute.id == SDP_HID_PARSER_VERSION_ATTRIBUTE_ID :
print(color(' HID Parser Version: ', 'cyan'), hex(attribute.value.value)) print(color(' HID Parser Version: ', 'cyan'), hex(attribute.value.value))
HID_Parser_Version = attribute.value.value HID_Parser_Version = attribute.value.value
elif (attribute.id == SDP_HID_DEVICE_SUBCLASS_ATTRIBUTE_ID): elif attribute.id == SDP_HID_DEVICE_SUBCLASS_ATTRIBUTE_ID :
print(color(' HIDDeviceSubclass: ', 'cyan'), hex(attribute.value.value)) print(color(' HIDDeviceSubclass: ', 'cyan'), hex(attribute.value.value))
HID_Device_Subclass = attribute.value.value HID_Device_Subclass = attribute.value.value
elif (attribute.id == SDP_HID_COUNTRY_CODE_ATTRIBUTE_ID): elif attribute.id == SDP_HID_COUNTRY_CODE_ATTRIBUTE_ID :
print(color(' HIDCountryCode: ', 'cyan'), hex(attribute.value.value)) print(color(' HIDCountryCode: ', 'cyan'), hex(attribute.value.value))
HID_Country_Code = attribute.value.value HID_Country_Code = attribute.value.value
elif (attribute.id == SDP_HID_VIRTUAL_CABLE_ATTRIBUTE_ID): elif attribute.id == SDP_HID_VIRTUAL_CABLE_ATTRIBUTE_ID :
print(color(' HIDVirtualCable: ', 'cyan'), attribute.value.value) print(color(' HIDVirtualCable: ', 'cyan'), attribute.value.value)
HID_Virtual_Cable = attribute.value.value HID_Virtual_Cable = attribute.value.value
elif (attribute.id == SDP_HID_RECONNECT_INITIATE_ATTRIBUTE_ID): elif attribute.id == SDP_HID_RECONNECT_INITIATE_ATTRIBUTE_ID :
print(color(' HIDReconnectInitiate: ', 'cyan'), attribute.value.value) print(color(' HIDReconnectInitiate: ', 'cyan'), attribute.value.value)
HID_Reconnect_Initiate = attribute.value.value HID_Reconnect_Initiate = attribute.value.value
elif (attribute.id == SDP_HID_DESCRIPTOR_LIST_ATTRIBUTE_ID): elif attribute.id == SDP_HID_DESCRIPTOR_LIST_ATTRIBUTE_ID :
print(color(' HID Report Descriptor type: ', 'cyan'), hex(attribute.value.value[0].value[0].value)) print(color(' HID Report Descriptor type: ', 'cyan'), hex(attribute.value.value[0].value[0].value))
print(color(' HID Report DescriptorList: ', 'cyan'), attribute.value.value[0].value[1].value) print(color(' HID Report DescriptorList: ', 'cyan'), (attribute.value.value[0].value[1].value).encode('latin-1'))
HID_Descriptor_Type = attribute.value.value[0].value[0].value HID_Descriptor_Type = attribute.value.value[0].value[0].value
HID_Report_Descriptor_List = attribute.value.value[0].value[1].value HID_Report_Descriptor_List = attribute.value.value[0].value[1].value
elif (attribute.id == SDP_HID_LANGID_BASE_LIST_ATTRIBUTE_ID): elif attribute.id == SDP_HID_LANGID_BASE_LIST_ATTRIBUTE_ID :
print(color(' HID LANGID Base Language: ', 'cyan'), hex(attribute.value.value[0].value[0].value)) print(color(' HID LANGID Base Language: ', 'cyan'), hex(attribute.value.value[0].value[0].value))
print(color(' HID LANGID Base Bluetooth String Offset: ', 'cyan'), hex(attribute.value.value[0].value[1].value)) print(color(' HID LANGID Base Bluetooth String Offset: ', 'cyan'), hex(attribute.value.value[0].value[1].value))
HID_LANGID_Base_Language = attribute.value.value[0].value[0].value HID_LANGID_Base_Language = attribute.value.value[0].value[0].value
HID_LANGID_Base_Bluetooth_String_Offset = attribute.value.value[0].value[1].value HID_LANGID_Base_Bluetooth_String_Offset = attribute.value.value[0].value[1].value
elif (attribute.id == SDP_HID_BATTERY_POWER_ATTRIBUTE_ID): elif attribute.id == SDP_HID_BATTERY_POWER_ATTRIBUTE_ID :
print(color(' HIDBatteryPower: ', 'cyan'), attribute.value.value) print(color(' HIDBatteryPower: ', 'cyan'), attribute.value.value)
HID_Battery_Power = attribute.value.value HID_Battery_Power = attribute.value.value
elif (attribute.id == SDP_HID_REMOTE_WAKE_ATTRIBUTE_ID): elif attribute.id == SDP_HID_REMOTE_WAKE_ATTRIBUTE_ID :
print(color(' HIDRemoteWake: ', 'cyan'), attribute.value.value) print(color(' HIDRemoteWake: ', 'cyan'), attribute.value.value)
HID_Remote_Wake = attribute.value.value HID_Remote_Wake = attribute.value.value
elif (attribute.id == SDP_HID_PROFILE_VERSION_ATTRIBUTE_ID): elif attribute.id == SDP_HID_PROFILE_VERSION_ATTRIBUTE_ID :
print(color(' HIDProfileVersion : ', 'cyan'), hex(attribute.value.value)) print(color(' HIDProfileVersion : ', 'cyan'), hex(attribute.value.value))
HID_Profile_Version = attribute.value.value HID_Profile_Version = attribute.value.value
elif (attribute.id == SDP_HID_SUPERVISION_TIMEOUT_ATTRIBUTE_ID): elif attribute.id == SDP_HID_SUPERVISION_TIMEOUT_ATTRIBUTE_ID :
print(color(' HIDSupervisionTimeout: ', 'cyan'), hex(attribute.value.value)) print(color(' HIDSupervisionTimeout: ', 'cyan'), hex(attribute.value.value))
HID_Supervision_Timeout = attribute.value.value HID_Supervision_Timeout = attribute.value.value
elif (attribute.id == SDP_HID_NORMALLY_CONNECTABLE_ATTRIBUTE_ID): elif attribute.id == SDP_HID_NORMALLY_CONNECTABLE_ATTRIBUTE_ID :
print(color(' HIDNormallyConnectable: ', 'cyan'), attribute.value.value) print(color(' HIDNormallyConnectable: ', 'cyan'), attribute.value.value)
HID_Normally_Connectable = attribute.value.value HID_Normally_Connectable = attribute.value.value
elif (attribute.id == SDP_HID_BOOT_DEVICE_ATTRIBUTE_ID): elif attribute.id == SDP_HID_BOOT_DEVICE_ATTRIBUTE_ID :
print(color(' HIDBootDevice: ', 'cyan'), attribute.value.value) print(color(' HIDBootDevice: ', 'cyan'), attribute.value.value)
HID_Boot_Device = attribute.value.value HID_Boot_Device = attribute.value.value
elif (attribute.id == SDP_HID_SSR_HOST_MAX_LATENCY_ATTRIBUTE_ID): elif attribute.id == SDP_HID_SSR_HOST_MAX_LATENCY_ATTRIBUTE_ID :
print(color(' HIDSSRHostMaxLatency: ', 'cyan'), hex(attribute.value.value)) print(color(' HIDSSRHostMaxLatency: ', 'cyan'), hex(attribute.value.value))
HID_SSR_Host_Max_Latency = attribute.value.value HID_SSR_Host_Max_Latency = attribute.value.value
elif (attribute.id == SDP_HID_SSR_HOST_MIN_TIMEOUT_ATTRIBUTE_ID): elif attribute.id == SDP_HID_SSR_HOST_MIN_TIMEOUT_ATTRIBUTE_ID :
print(color(' HIDSSRHostMinTimeout: ', 'cyan'), hex(attribute.value.value)) print(color(' HIDSSRHostMinTimeout: ', 'cyan'), hex(attribute.value.value))
HID_SSR_Host_Min_Timeout = attribute.value.value HID_SSR_Host_Min_Timeout = attribute.value.value
@@ -242,6 +242,7 @@ async def main():
return return
report_length = len(pdu[1:]) report_length = len(pdu[1:])
report_id = pdu[1] report_id = pdu[1]
if (report_type != HID_OTHER_REPORT): if (report_type != HID_OTHER_REPORT):
print(color(f' Report type = {report_type}, Report length = {report_length}, Report id = {report_id}', 'blue', None, 'bold')) print(color(f' Report type = {report_type}, Report length = {report_length}, Report id = {report_id}', 'blue', None, 'bold'))
@@ -252,8 +253,8 @@ async def main():
ReportParser.parse_input_report(pdu[1:]) #type: ignore ReportParser.parse_input_report(pdu[1:]) #type: ignore
async def handle_virtual_cable_unplug(): async def handle_virtual_cable_unplug():
await hid_host.interrupt_channel_disconnect() await hid_host.disconnect_interrupt_channel()
await hid_host.control_channel_disconnect() await hid_host.disconnect_control_channel()
await device.keystore.delete(target_address) #type: ignore await device.keystore.delete(target_address) #type: ignore
await connection.disconnect() await connection.disconnect()
@@ -292,10 +293,10 @@ async def main():
hid_host = HIDHost(device, connection) hid_host = HIDHost(device, connection)
# Register for HID data call back # Register for HID data call back
hid_host.register_data_cb(on_hid_data_cb) hid_host.on('data', on_hid_data_cb)
# Register for virtual cable unplug call back # Register for virtual cable unplug call back
hid_host.register_virtual_cable_unplug(on_hid_virtual_cable_unplug_cb) hid_host.on('virtual_cable_unplug', on_hid_virtual_cable_unplug_cb)
async def menu(): async def menu():
reader = await get_stream_reader(sys.stdin) reader = await get_stream_reader(sys.stdin)
@@ -321,106 +322,109 @@ async def main():
choice = await reader.readline() choice = await reader.readline()
choice = choice.decode('utf-8').strip() choice = choice.decode('utf-8').strip()
if (choice == '1'): if choice == '1':
await hid_host.control_channel_connect() await hid_host.connect_control_channel()
elif (choice == '2'): elif choice == '2':
await hid_host.interrupt_channel_connect() await hid_host.connect_interrupt_channel()
elif (choice == '3'): elif choice == '3':
await hid_host.control_channel_disconnect() await hid_host.disconnect_control_channel()
elif (choice == '4'): elif choice == '4':
await hid_host.interrupt_channel_disconnect() await hid_host.disconnect_interrupt_channel()
elif (choice == '5'): elif choice == '5':
print(" 1. Report ID 0x02") print(" 1. Report ID 0x02")
print(" 2. Report ID 0x03") print(" 2. Report ID 0x03")
print(" 3. Report ID 0x05") print(" 3. Report ID 0x05")
choice1 = await reader.readline() choice1 = await reader.readline()
choice1 = choice1.decode('utf-8').strip() choice1 = choice1.decode('utf-8').strip()
if (choice1 == '1'): if choice1 == '1':
hid_host.get_report(1, 2, 3) hid_host.get_report(1, 2, 3)
elif (choice1 == '2'): elif choice1 == '2':
hid_host.get_report(2, 3, 2) hid_host.get_report(2, 3, 2)
elif (choice1 == '3'): elif choice1 == '3':
hid_host.get_report(3, 5, 3) hid_host.get_report(3, 5, 3)
else: else:
print('Incorrect option selected') print('Incorrect option selected')
elif (choice == '6'): elif choice == '6':
print(" 1. Report type 1 and Report id 0x01") print(" 1. Report type 1 and Report id 0x01")
print(" 2. Report type 2 and Report id 0x03") print(" 2. Report type 2 and Report id 0x03")
print(" 3. Report type 3 and Report id 0x05") print(" 3. Report type 3 and Report id 0x05")
choice1 = await reader.readline() choice1 = await reader.readline()
choice1 = choice1.decode('utf-8').strip() choice1 = choice1.decode('utf-8').strip()
if (choice1 == '1'): if choice1 == '1':
# data includes first octet as report id # data includes first octet as report id
data = bytearray([0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01]) data = bytearray([0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01])
hid_host.set_report(1, data) hid_host.set_report(1, data)
elif (choice1 == '2'): elif choice1 == '2':
data = bytearray([0x03, 0x01, 0x01]) data = bytearray([0x03, 0x01, 0x01])
hid_host.set_report(2, data) hid_host.set_report(2, data)
elif (choice1 == '3'): elif choice1 == '3':
data = bytearray([0x05, 0x01, 0x01, 0x01]) data = bytearray([0x05, 0x01, 0x01, 0x01])
hid_host.set_report(3, data) hid_host.set_report(3, data)
else: else:
print('Incorrect option selected') print('Incorrect option selected')
elif (choice == '7'): elif choice == '7':
print(" 0. Boot") print(" 0. Boot")
print(" 1. Report") print(" 1. Report")
choice1 = await reader.readline() choice1 = await reader.readline()
choice1 = choice1.decode('utf-8').strip() choice1 = choice1.decode('utf-8').strip()
if (choice1 == '0'): if choice1 == '0':
hid_host.set_protocol(HID_BOOT_PROTOCOL_MODE) hid_host.set_protocol(HID_BOOT_PROTOCOL_MODE)
elif (choice1 == '1'): elif choice1 == '1':
hid_host.set_protocol(HID_REPORT_PROTOCOL_MODE) hid_host.set_protocol(HID_REPORT_PROTOCOL_MODE)
else: else:
print('Incorrect option selected') print('Incorrect option selected')
elif (choice == '8'): elif choice == '8':
hid_host.get_protocol() hid_host.get_protocol()
elif (choice == '9'): elif choice == '9':
print(" 1. Report ID 0x01") print(" 1. Report ID 0x01")
print(" 2. Report ID 0x03") print(" 2. Report ID 0x03")
choice1 = await reader.readline() choice1 = await reader.readline()
choice1 = choice1.decode('utf-8').strip() choice1 = choice1.decode('utf-8').strip()
if (choice1 == '1'): if choice1 == '1':
data = bytearray([0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]) data = bytearray([0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])
hid_host.send_data(data) hid_host.send_data(data)
elif (choice1 == '2'): elif choice1 == '2':
data = bytearray([0x03, 0x00, 0x0d, 0xfd, 0x00, 0x00]) data = bytearray([0x03, 0x00, 0x0d, 0xfd, 0x00, 0x00])
hid_host.send_data(data) hid_host.send_data(data)
else: else:
print('Incorrect option selected') print('Incorrect option selected')
elif (choice == '10'): elif choice == '10':
hid_host.suspend() hid_host.suspend()
elif (choice == '11'): elif choice == '11':
hid_host.exit_suspend() hid_host.exit_suspend()
elif (choice == '12'): elif choice == '12':
hid_host.virtual_cable_unplug() hid_host.virtual_cable_unplug()
try:
await device.keystore.delete(target_address) await device.keystore.delete(target_address)
except KeyError:
print('Device not found or Device already unpaired.')
elif (choice == '13'): elif choice == '13':
peer_address = Address.from_string_for_transport(target_address, transport=BT_BR_EDR_TRANSPORT) peer_address = Address.from_string_for_transport(target_address, transport=BT_BR_EDR_TRANSPORT)
connection = device.find_connection_by_bd_addr(peer_address, transport=BT_BR_EDR_TRANSPORT) connection = device.find_connection_by_bd_addr(peer_address, transport=BT_BR_EDR_TRANSPORT)
if connection is not None: if connection is not None:
@@ -428,14 +432,14 @@ async def main():
else: else:
print("Already disconnected from device") print("Already disconnected from device")
elif (choice == '14'): elif choice == '14':
try: try:
await device.keystore.delete(target_address) await device.keystore.delete(target_address)
print("Unpair successful") print("Unpair successful")
except KeyError: except KeyError:
print('Device not found or Device already unpaired.') print('Device not found or Device already unpaired.')
elif (choice == '15'): elif choice == '15':
connection = await device.connect(target_address, transport=BT_BR_EDR_TRANSPORT) connection = await device.connect(target_address, transport=BT_BR_EDR_TRANSPORT)
await connection.authenticate() await connection.authenticate()
await connection.encrypt() await connection.encrypt()
@@ -443,15 +447,15 @@ async def main():
else: else:
print("Invalid option selected.") print("Invalid option selected.")
if ((len(sys.argv) > 4) and (sys.argv[4] == 'test-mode')): if (len(sys.argv) > 4) and (sys.argv[4] == 'test-mode'):
# Enabling menu for testing # Enabling menu for testing
await menu() await menu()
else: else:
# HID Connection # HID Connection
# Control channel # Control channel
await hid_host.control_channel_connect() await hid_host.connect_control_channel()
# Interrupt Channel # Interrupt Channel
await hid_host.interrupt_channel_connect() await hid_host.connect_interrupt_channel()
await hci_source.wait_for_termination() await hci_source.wait_for_termination()