diff --git a/bumble/classic3.json b/bumble/classic3.json new file mode 100644 index 0000000..b7b1409 --- /dev/null +++ b/bumble/classic3.json @@ -0,0 +1,5 @@ +{ + "name": "Bumble HID Keyboard", + "class_of_device": 9664, + "keystore": "JsonKeyStore" +} diff --git a/bumble/hid.py b/bumble/hid.py index 8712658..3205323 100644 --- a/bumble/hid.py +++ b/bumble/hid.py @@ -60,6 +60,7 @@ class Message: NOT_READY = 0x01 ERR_INVALID_REPORT_ID = 0x02 ERR_UNSUPPORTED_REQUEST = 0x03 + ERR_INVALID_PARAMETER = 0x04 ERR_UNKNOWN = 0x0E ERR_FATAL = 0x0F @@ -101,12 +102,12 @@ class GetReportMessage(Message): def __bytes__(self) -> bytes: packet_bytes = bytearray() packet_bytes.append(self.report_id) - packet_bytes.extend( - [(self.buffer_size & 0xFF), ((self.buffer_size >> 8) & 0xFF)] - ) - if self.report_type == Message.ReportType.OTHER_REPORT: + if self.buffer_size == 0: return self.header(self.report_type) + packet_bytes else: + packet_bytes.extend( + [(self.buffer_size & 0xFF), ((self.buffer_size >> 8) & 0xFF)] + ) return self.header(0x08 | self.report_type) + packet_bytes @@ -119,7 +120,17 @@ class SetReportMessage(Message): def __bytes__(self) -> bytes: return self.header(self.report_type) + self.data +@dataclass +class SendControlData(Message): + report_type: int + data: bytes + message_type = Message.MessageType.DATA + def __bytes__(self) -> bytes: + packet_bytes = bytearray() + + packet_bytes.extend(self.data) + return self.header(self.report_type) + packet_bytes @dataclass class GetProtocolMessage(Message): message_type = Message.MessageType.GET_PROTOCOL @@ -136,6 +147,15 @@ class SetProtocolMessage(Message): def __bytes__(self) -> bytes: return self.header(self.protocol_mode) +@dataclass +class GetProtocolReplyMessage(Message): + protocol_mode: int + message_type = Message.MessageType.DATA + + def __bytes__(self) -> bytes: + packet_bytes = bytearray() + packet_bytes.append(self.protocol_mode) + return self.header(Message.ReportType.OTHER_REPORT) + packet_bytes @dataclass class Suspend(Message): @@ -160,25 +180,41 @@ class VirtualCableUnplug(Message): def __bytes__(self) -> bytes: return self.header(Message.ControlCommand.VIRTUAL_CABLE_UNPLUG) - +#Device sends input report, host sends output report. @dataclass class SendData(Message): data: bytes + report_type: int message_type = Message.MessageType.DATA def __bytes__(self) -> bytes: - return self.header(Message.ReportType.OUTPUT_REPORT) + self.data + return self.header(self.report_type) + self.data + +@dataclass +class SendHandshakeMessage(Message): + result_code: int + message_type = Message.MessageType.HANDSHAKE + + def __bytes__(self) -> bytes: + return self.header(self.result_code) # ----------------------------------------------------------------------------- -class Host(EventEmitter): +class HID(EventEmitter): l2cap_ctrl_channel: Optional[l2cap.ClassicChannel] l2cap_intr_channel: Optional[l2cap.ClassicChannel] - def __init__(self, device: Device, connection: Connection) -> None: + class Role(enum.IntEnum): + HOST = 0x00 + DEVICE = 0x01 + + + def __init__(self, device: Device, role: int) -> None: super().__init__() self.device = device - self.connection = connection + self.connection = None + self.remote_device_bd_address = None + self.role = role self.l2cap_ctrl_channel = None self.l2cap_intr_channel = None @@ -187,6 +223,8 @@ class Host(EventEmitter): device.register_l2cap_server(HID_CONTROL_PSM, self.on_connection) device.register_l2cap_server(HID_INTERRUPT_PSM, self.on_connection) + device.on('connection', self.on_device_connection) + async def connect_control_channel(self) -> None: # Create a new L2CAP connection - control channel try: @@ -229,9 +267,18 @@ class Host(EventEmitter): self.l2cap_ctrl_channel = None await channel.disconnect() + def on_device_connection(self, connection: Connection) -> None: + self.connection = connection + self.remote_device_bd_address = connection.peer_address + connection.on('disconnection', self.on_disconnection) + def on_connection(self, l2cap_channel: l2cap.ClassicChannel) -> None: logger.debug(f'+++ New L2CAP connection: {l2cap_channel}') l2cap_channel.on('open', lambda: self.on_l2cap_channel_open(l2cap_channel)) + l2cap_channel.on('close', lambda: self.on_l2cap_channel_close(l2cap_channel)) + + def on_disconnection(self, reason: int) -> None: + self.connection = None def on_l2cap_channel_open(self, l2cap_channel: l2cap.ClassicChannel) -> None: if l2cap_channel.psm == HID_CONTROL_PSM: @@ -242,37 +289,159 @@ class Host(EventEmitter): self.l2cap_intr_channel.sink = self.on_intr_pdu logger.debug(f'$$$ L2CAP channel open: {l2cap_channel}') + def on_l2cap_channel_close(self, l2cap_channel: l2cap.ClassicChannel) -> None: + if l2cap_channel.psm == HID_CONTROL_PSM: + self.l2cap_ctrl_channel = None + else: + self.l2cap_intr_channel = None + logger.debug(f'$$$ L2CAP channel close: {l2cap_channel}') + def on_ctrl_pdu(self, pdu: bytes) -> None: logger.debug(f'<<< HID CONTROL PDU: {pdu.hex()}') # Here we will receive all kinds of packets, parse and then call respective callbacks - message_type = pdu[0] >> 4 param = pdu[0] & 0x0F + message_type = pdu[0] >> 4 if message_type == Message.MessageType.HANDSHAKE: logger.debug(f'<<< HID HANDSHAKE: {Message.Handshake(param).name}') self.emit('handshake', Message.Handshake(param)) + elif message_type == Message.MessageType.GET_REPORT: + logger.debug('<<< HID GET REPORT') + self.handle_get_report(pdu) + + elif message_type == Message.MessageType.SET_REPORT: + logger.debug('<<< HID SET REPORT') + report_type = pdu[0] & 3 + report = pdu[2:] + report_id = pdu[1] + logger.debug(report_id) + logger.debug(report_type) + #TODO: to check for size mentioned in report descriptor + self.emit('set_report', report_id, report) + elif message_type == Message.MessageType.GET_PROTOCOL: + logger.debug('<<< HID GET PROTOCOL') + self.emit('get_protocol') + elif message_type == Message.MessageType.SET_PROTOCOL: + logger.debug('<<< HID SET PROTOCOL') + self.emit('set_protocol', param) elif message_type == Message.MessageType.DATA: logger.debug('<<< HID CONTROL DATA') - self.emit('data', pdu) + self.emit('control_data', pdu) elif message_type == Message.MessageType.CONTROL: if param == Message.ControlCommand.SUSPEND: logger.debug('<<< HID SUSPEND') - self.emit('suspend', pdu) + self.emit('suspend') elif param == Message.ControlCommand.EXIT_SUSPEND: logger.debug('<<< HID EXIT SUSPEND') - self.emit('exit_suspend', pdu) + self.emit('exit_suspend') elif param == Message.ControlCommand.VIRTUAL_CABLE_UNPLUG: logger.debug('<<< HID VIRTUAL CABLE UNPLUG') self.emit('virtual_cable_unplug') else: logger.debug('<<< HID CONTROL OPERATION UNSUPPORTED') else: - logger.debug('<<< HID CONTROL DATA') - self.emit('data', pdu) + logger.debug('<<< HID MESSAGE TYPE UNSUPPORTED') + self.send_handshake_message(Message.Handshake.ERR_UNSUPPORTED_REQUEST) def on_intr_pdu(self, pdu: bytes) -> None: logger.debug(f'<<< HID INTERRUPT PDU: {pdu.hex()}') - self.emit("data", pdu) + self.emit("interrupt_data", pdu) + + def send_pdu_on_ctrl(self, msg: bytes) -> None: + self.l2cap_ctrl_channel.send_pdu(msg) # type: ignore + + def send_pdu_on_intr(self, msg: bytes) -> None: + self.l2cap_intr_channel.send_pdu(msg) # type: ignore + + def send_data(self, data: bytes) -> None: + if self.role == HID.Role.HOST: + report_type = Message.ReportType.OUTPUT_REPORT + else: + report_type = Message.ReportType.INPUT_REPORT + msg = SendData(data, report_type) + hid_message = bytes(msg) + if self.l2cap_intr_channel is not None: + logger.debug(f'>>> HID INTERRUPT SEND DATA, PDU: {hid_message.hex()}') + self.send_pdu_on_intr(hid_message) + + def virtual_cable_unplug(self) -> None: + msg = VirtualCableUnplug() + hid_message = bytes(msg) + logger.debug(f'>>> HID CONTROL VIRTUAL CABLE UNPLUG, PDU: {hid_message.hex()}') + self.send_pdu_on_ctrl(hid_message) + + +# ----------------------------------------------------------------------------- + + + +class Device(HID): + class ReportStatus(enum.IntEnum): + FAILURE = 0x00 + REPORT_ID_NOT_FOUND = 0x01 + ERR_UNSUPPORTED_REQUEST = 0x02 + ERR_UNKNOWN = 0x03 + SUCCESS = 0xff + + + class GetReportStatus(): + def __init__(self) -> None: + self.status = 0 + self.data=None + + def __init__(self, device: Device) -> None: + super().__init__(device, HID.Role.DEVICE) + + def send_handshake_message(self, result_code: int) -> None: + msg = SendHandshakeMessage(result_code) + hid_message = bytes(msg) + logger.debug(f'>>> HID HANDSHAKE MESSAGE, PDU: {hid_message.hex()}') + self.send_pdu_on_ctrl(hid_message) + + def send_control_data(self,report_type: int, data: bytes): + msg = SendControlData(report_type= report_type, data=data) + hid_message = bytes(msg) + logger.debug(f'>>> HID CONTROL DATA: {hid_message.hex()}') + self.send_pdu_on_ctrl(hid_message) + + def handle_get_report(self, pdu: bytes): + ret = self.GetReportStatus() + report_type=pdu[0] & 0x03 + buffer_flag = (pdu[0] & 0x08) >> 3 + report_id = pdu[1] + logger.debug("buffer_flag: " + str(buffer_flag)) + if(buffer_flag == 1): + buffer_size = (pdu[3] << 8) | pdu[2] + else: + buffer_size = 0 + + if(self.get_report_cb != None): + ret = self.get_report_cb(report_id, report_type, buffer_size) + + if(ret.status == self.ReportStatus.FAILURE): + self.send_handshake_message(Message.Handshake.ERR_UNKNOWN) + elif(ret.status == self.ReportStatus.SUCCESS): + data = bytearray() + data.append(report_id) + data.extend(ret.data) + #TODO Check the data size and MTU size here and only then send out + #the message + self.send_control_data(report_type=report_type, data = data) + elif(ret.status == self.ReportStatus.REPORT_ID_NOT_FOUND): + self.send_handshake_message(Message.Handshake.ERR_INVALID_REPORT_ID) + elif(ret.status == self.ReportStatus.ERR_UNSUPPORTED_REQUEST): + self.send_handshake_message(Message.Handshake.ERR_UNSUPPORTED_REQUEST) + else: + logger.debug("GetReport callback not registered !!") + + + def register_get_report_cb(self,cb): + self.get_report_cb=cb + logger.debug("GetReport callback registered successfully") +# ----------------------------------------------------------------------------- +class Host(HID): + def __init__(self, device: Device) -> None: + super().__init__(device, HID.Role.HOST) def get_report(self, report_type: int, report_id: int, buffer_size: int) -> None: msg = GetReportMessage( @@ -282,52 +451,32 @@ class Host(EventEmitter): logger.debug(f'>>> HID CONTROL GET REPORT, PDU: {hid_message.hex()}') self.send_pdu_on_ctrl(hid_message) - def set_report(self, report_type: int, data: bytes): + def set_report(self, report_type: int, data: bytes) -> None: msg = SetReportMessage(report_type=report_type, data=data) hid_message = bytes(msg) logger.debug(f'>>> HID CONTROL SET REPORT, PDU:{hid_message.hex()}') self.send_pdu_on_ctrl(hid_message) - def get_protocol(self): + def get_protocol(self) -> None: msg = GetProtocolMessage() hid_message = bytes(msg) logger.debug(f'>>> HID CONTROL GET PROTOCOL, PDU: {hid_message.hex()}') self.send_pdu_on_ctrl(hid_message) - def set_protocol(self, protocol_mode: int): + def set_protocol(self, protocol_mode: int) -> None: msg = SetProtocolMessage(protocol_mode=protocol_mode) hid_message = bytes(msg) logger.debug(f'>>> HID CONTROL SET PROTOCOL, PDU: {hid_message.hex()}') self.send_pdu_on_ctrl(hid_message) - def send_pdu_on_ctrl(self, msg: bytes) -> None: - assert self.l2cap_ctrl_channel - self.l2cap_ctrl_channel.send_pdu(msg) - - def send_pdu_on_intr(self, msg: bytes) -> None: - assert self.l2cap_intr_channel - self.l2cap_intr_channel.send_pdu(msg) - - def send_data(self, data): - msg = SendData(data) - hid_message = bytes(msg) - logger.debug(f'>>> HID INTERRUPT SEND DATA, PDU: {hid_message.hex()}') - self.send_pdu_on_intr(hid_message) - - def suspend(self): + def suspend(self) -> None: msg = Suspend() hid_message = bytes(msg) logger.debug(f'>>> HID CONTROL SUSPEND, PDU:{hid_message.hex()}') - self.send_pdu_on_ctrl(msg) + self.send_pdu_on_ctrl(hid_message) - def exit_suspend(self): + def exit_suspend(self) -> None: msg = ExitSuspend() hid_message = bytes(msg) logger.debug(f'>>> HID CONTROL EXIT SUSPEND, PDU:{hid_message.hex()}') - self.send_pdu_on_ctrl(msg) - - def virtual_cable_unplug(self): - msg = VirtualCableUnplug() - hid_message = bytes(msg) - logger.debug(f'>>> HID CONTROL VIRTUAL CABLE UNPLUG, PDU: {hid_message.hex()}') - self.send_pdu_on_ctrl(msg) + self.send_pdu_on_ctrl(hid_message) diff --git a/examples/classic3.json b/examples/classic3.json new file mode 100644 index 0000000..b7b1409 --- /dev/null +++ b/examples/classic3.json @@ -0,0 +1,5 @@ +{ + "name": "Bumble HID Keyboard", + "class_of_device": 9664, + "keystore": "JsonKeyStore" +} diff --git a/examples/run_hid_device.py b/examples/run_hid_device.py new file mode 100644 index 0000000..40d58a9 --- /dev/null +++ b/examples/run_hid_device.py @@ -0,0 +1,705 @@ +# Copyright 2021-2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# ----------------------------------------------------------------------------- +# Imports +# ----------------------------------------------------------------------------- +import asyncio +import sys +import os +import logging +import json +import websockets +from bumble.colors import color + +from bumble.device import Device +from bumble.transport import open_transport_or_link +from bumble.core import ( + BT_BR_EDR_TRANSPORT, + BT_L2CAP_PROTOCOL_ID, + BT_HUMAN_INTERFACE_DEVICE_SERVICE, + BT_HIDP_PROTOCOL_ID, + UUID, +) +from bumble.hci import Address +from bumble.hid import ( + Device as HID_Device, + HID_CONTROL_PSM, + HID_INTERRUPT_PSM, + Message, +) +from bumble.sdp import ( + Client as SDP_Client, + DataElement, + ServiceAttribute, + SDP_PUBLIC_BROWSE_ROOT, + SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, + SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID, + SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID, + SDP_ALL_ATTRIBUTES_RANGE, + SDP_LANGUAGE_BASE_ATTRIBUTE_ID_LIST_ATTRIBUTE_ID, + SDP_ADDITIONAL_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, + SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID, + SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID, +) +from bumble.utils import AsyncRunner + +# ----------------------------------------------------------------------------- +# SDP attributes for Bluetooth HID devices +SDP_HID_SERVICE_NAME_ATTRIBUTE_ID = 0x0100 +SDP_HID_SERVICE_DESCRIPTION_ATTRIBUTE_ID = 0x0101 +SDP_HID_PROVIDER_NAME_ATTRIBUTE_ID = 0x0102 +SDP_HID_DEVICE_RELEASE_NUMBER_ATTRIBUTE_ID = 0x0200 # [DEPRECATED] +SDP_HID_PARSER_VERSION_ATTRIBUTE_ID = 0x0201 +SDP_HID_DEVICE_SUBCLASS_ATTRIBUTE_ID = 0x0202 +SDP_HID_COUNTRY_CODE_ATTRIBUTE_ID = 0x0203 +SDP_HID_VIRTUAL_CABLE_ATTRIBUTE_ID = 0x0204 +SDP_HID_RECONNECT_INITIATE_ATTRIBUTE_ID = 0x0205 +SDP_HID_DESCRIPTOR_LIST_ATTRIBUTE_ID = 0x0206 +SDP_HID_LANGID_BASE_LIST_ATTRIBUTE_ID = 0x0207 +SDP_HID_SDP_DISABLE_ATTRIBUTE_ID = 0x0208 # [DEPRECATED] +SDP_HID_BATTERY_POWER_ATTRIBUTE_ID = 0x0209 +SDP_HID_REMOTE_WAKE_ATTRIBUTE_ID = 0x020A +SDP_HID_PROFILE_VERSION_ATTRIBUTE_ID = 0x020B # DEPRECATED] +SDP_HID_SUPERVISION_TIMEOUT_ATTRIBUTE_ID = 0x020C +SDP_HID_NORMALLY_CONNECTABLE_ATTRIBUTE_ID = 0x020D +SDP_HID_BOOT_DEVICE_ATTRIBUTE_ID = 0x020E +SDP_HID_SSR_HOST_MAX_LATENCY_ATTRIBUTE_ID = 0x020F +SDP_HID_SSR_HOST_MIN_TIMEOUT_ATTRIBUTE_ID = 0x0210 + +# HID SDP attribute values +LANGUAGE = 0x656e +ENCODING = 0x6a +PRIMARY_LANGUAGE_BASE_ID = 0x100 +VERSION_NUMBER = 0x0101 +SERVICE_NAME = b'Bumble HID' +SERVICE_DESCRIPTION = b'Bumble' +PROVIDER_NAME = b'Bumble' +HID_PARSER_VERSION = 0x0111 +HID_DEVICE_SUBCLASS = 0xC0 +HID_COUNTRY_CODE = 0x21 +HID_VIRTUAL_CABLE = True +HID_RECONNECT_INITIATE = True +REPORT_DESCRIPTOR_TYPE = 0x22 + + +HID_REPORT_MAP = bytes( + # pylint: disable=line-too-long + [ + 0x05, + 0x01, # Usage Page (Generic Desktop Ctrls) + 0x09, + 0x06, # Usage (Keyboard) + 0xA1, + 0x01, # Collection (Application) + 0x85, + 0x01, # . Report ID (1) + 0x05, + 0x07, # . Usage Page (Kbrd/Keypad) + 0x19, + 0xE0, # . Usage Minimum (0xE0) + 0x29, + 0xE7, # . Usage Maximum (0xE7) + 0x15, + 0x00, # . Logical Minimum (0) + 0x25, + 0x01, # . Logical Maximum (1) + 0x75, + 0x01, # . Report Size (1) + 0x95, + 0x08, # . Report Count (8) + 0x81, + 0x02, # . Input (Data,Var,Abs,No Wrap,Linear,Preferred State,No Null Position) + 0x95, + 0x01, # . Report Count (1) + 0x75, + 0x08, # . Report Size (8) + 0x81, + 0x03, # . Input (Const,Var,Abs,No Wrap,Linear,Preferred State,No Null Position) + 0x95, + 0x05, # . Report Count (5) + 0x75, + 0x01, # . Report Size (1) + 0x05, + 0x08, # . Usage Page (LEDs) + 0x19, + 0x01, # . Usage Minimum (Num Lock) + 0x29, + 0x05, # . Usage Maximum (Kana) + 0x91, + 0x02, # . Output (Data,Var,Abs,No Wrap,Linear,Preferred State,No Null Position,Non-volatile) + 0x95, + 0x01, # . Report Count (1) + 0x75, + 0x03, # . Report Size (3) + 0x91, + 0x03, # . Output (Const,Var,Abs,No Wrap,Linear,Preferred State,No Null Position,Non-volatile) + 0x95, + 0x06, # . Report Count (6) + 0x75, + 0x08, # . Report Size (8) + 0x15, + 0x00, # . Logical Minimum (0) + 0x25, + 0x65, # . Logical Maximum (101) + 0x05, + 0x07, # . Usage Page (Kbrd/Keypad) + 0x19, + 0x00, # . Usage Minimum (0x00) + 0x29, + 0x65, # . Usage Maximum (0x65) + 0x81, + 0x00, # . Input (Data,Array,Abs,No Wrap,Linear,Preferred State,No Null Position) + 0xC0, # End Collection + 0x05, + 0x01, # Usage Page (Generic Desktop Ctrls) + 0x09, + 0x02, # Usage (Mouse) + 0xA1, + 0x01, # Collection (Application) + 0x85, + 0x02, # . Report ID (2) + 0x09, + 0x01, # . Usage (Pointer) + 0xA1, + 0x00, # . Collection (Physical) + 0x05, + 0x09, # . Usage Page (Button) + 0x19, + 0x01, # . Usage Minimum (0x01) + 0x29, + 0x03, # . Usage Maximum (0x03) + 0x15, + 0x00, # . Logical Minimum (0) + 0x25, + 0x01, # . Logical Maximum (1) + 0x95, + 0x03, # . Report Count (3) + 0x75, + 0x01, # . Report Size (1) + 0x81, + 0x02, # . Input (Data,Var,Abs,No Wrap,Linear,Preferred State,No Null Position) + 0x95, + 0x01, # . Report Count (1) + 0x75, + 0x05, # . Report Size (5) + 0x81, + 0x03, # . Input (Const,Var,Abs,No Wrap,Linear,Preferred State,No Null Position) + 0x05, + 0x01, # . Usage Page (Generic Desktop Ctrls) + 0x09, + 0x30, # . Usage (X) + 0x09, + 0x31, # . Usage (Y) + 0x15, + 0x81, # . Logical Minimum (-127) + 0x25, + 0x7F, # . Logical Maximum (127) + 0x75, + 0x08, # . Report Size (8) + 0x95, + 0x02, # . Report Count (2) + 0x81, + 0x06, # . Input (Data,Var,Rel,No Wrap,Linear,Preferred State,No Null Position) + 0xC0, # . End Collection + 0xC0, # End Collection + ] +) +HID_LANGID_BASE_LANGUAGE = 0x0409 +HID_LANGID_BASE_BLUETOOTH_STRING_OFFSET = 0x100 +HID_BATTERY_POWER = True +HID_REMOTE_WAKE = True +HID_SUPERVISION_TIMEOUT = 0xC80 +HID_NORMALLY_CONNECTABLE = True +HID_BOOT_DEVICE = True +HID_SSR_HOST_MAX_LATENCY = 0x640 +HID_SSR_HOST_MIN_TIMEOUT = 0xC80 + +# Default protocol mode set to report protocol +protocol_mode = Message.ProtocolMode.REPORT_PROTOCOL + +# ----------------------------------------------------------------------------- +def sdp_records(): + service_record_handle = 0x00010002 + return { + service_record_handle: [ + ServiceAttribute( + SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID, + DataElement.unsigned_integer_32(service_record_handle), + ), + ServiceAttribute( + SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID, + DataElement.sequence([DataElement.uuid(SDP_PUBLIC_BROWSE_ROOT)]), + ), + ServiceAttribute( + SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID, + DataElement.sequence( + [DataElement.uuid(BT_HUMAN_INTERFACE_DEVICE_SERVICE)] + ), + ), + ServiceAttribute( + SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, + DataElement.sequence( + [ + DataElement.sequence( + [ + DataElement.uuid(BT_L2CAP_PROTOCOL_ID), + DataElement.unsigned_integer_16(HID_CONTROL_PSM), + ] + ), + DataElement.sequence( + [ + DataElement.uuid(BT_HIDP_PROTOCOL_ID), + ] + ), + ] + ), + ), + ServiceAttribute( + SDP_LANGUAGE_BASE_ATTRIBUTE_ID_LIST_ATTRIBUTE_ID, + DataElement.sequence( + [ + DataElement.unsigned_integer_16(LANGUAGE), + DataElement.unsigned_integer_16(ENCODING), + DataElement.unsigned_integer_16(PRIMARY_LANGUAGE_BASE_ID), + ] + ), + ), + ServiceAttribute( + SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID, + DataElement.sequence( + [ + DataElement.sequence( + [ + DataElement.uuid(BT_HUMAN_INTERFACE_DEVICE_SERVICE), + DataElement.unsigned_integer_16(VERSION_NUMBER), + ] + ), + ] + ), + ), + ServiceAttribute( + SDP_ADDITIONAL_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, + DataElement.sequence( + [ + DataElement.sequence( + [ + DataElement.sequence( + [ + DataElement.uuid(BT_L2CAP_PROTOCOL_ID), + DataElement.unsigned_integer_16(HID_INTERRUPT_PSM), + ] + ), + DataElement.sequence( + [ + DataElement.uuid(BT_HIDP_PROTOCOL_ID), + ] + ), + ] + ), + ] + ), + ), + ServiceAttribute( + SDP_HID_SERVICE_NAME_ATTRIBUTE_ID, + DataElement(DataElement.TEXT_STRING, SERVICE_NAME), + ), + ServiceAttribute( + SDP_HID_SERVICE_DESCRIPTION_ATTRIBUTE_ID, + DataElement(DataElement.TEXT_STRING, SERVICE_DESCRIPTION), + ), + ServiceAttribute( + SDP_HID_PROVIDER_NAME_ATTRIBUTE_ID, + DataElement(DataElement.TEXT_STRING, PROVIDER_NAME), + ), + ServiceAttribute( + SDP_HID_PARSER_VERSION_ATTRIBUTE_ID, + DataElement.unsigned_integer_32(HID_PARSER_VERSION), + ), + ServiceAttribute( + SDP_HID_DEVICE_SUBCLASS_ATTRIBUTE_ID, + DataElement.unsigned_integer_32(HID_DEVICE_SUBCLASS), + ), + ServiceAttribute( + SDP_HID_COUNTRY_CODE_ATTRIBUTE_ID, + DataElement.unsigned_integer_32(HID_COUNTRY_CODE), + ), + ServiceAttribute( + SDP_HID_VIRTUAL_CABLE_ATTRIBUTE_ID, + DataElement.boolean(HID_VIRTUAL_CABLE), + ), + ServiceAttribute( + SDP_HID_RECONNECT_INITIATE_ATTRIBUTE_ID, + DataElement.boolean(HID_RECONNECT_INITIATE), + ), + ServiceAttribute( + SDP_HID_DESCRIPTOR_LIST_ATTRIBUTE_ID, + DataElement.sequence( + [ + DataElement.sequence( + [ + DataElement.unsigned_integer_16(REPORT_DESCRIPTOR_TYPE), + DataElement(DataElement.TEXT_STRING, HID_REPORT_MAP), + ] + ), + ] + ), + ), + ServiceAttribute( + SDP_HID_LANGID_BASE_LIST_ATTRIBUTE_ID, + DataElement.sequence( + [ + DataElement.sequence( + [ + DataElement.unsigned_integer_16(HID_LANGID_BASE_LANGUAGE), + DataElement.unsigned_integer_16(HID_LANGID_BASE_BLUETOOTH_STRING_OFFSET), + ] + ), + ] + ), + ), + ServiceAttribute( + SDP_HID_BATTERY_POWER_ATTRIBUTE_ID, + DataElement.boolean(HID_BATTERY_POWER), + ), + ServiceAttribute( + SDP_HID_REMOTE_WAKE_ATTRIBUTE_ID, + DataElement.boolean(HID_REMOTE_WAKE), + ), + ServiceAttribute( + SDP_HID_SUPERVISION_TIMEOUT_ATTRIBUTE_ID, + DataElement.unsigned_integer_16(HID_SUPERVISION_TIMEOUT), + ), + ServiceAttribute( + SDP_HID_NORMALLY_CONNECTABLE_ATTRIBUTE_ID, + DataElement.boolean(HID_NORMALLY_CONNECTABLE), + ), + ServiceAttribute( + SDP_HID_BOOT_DEVICE_ATTRIBUTE_ID, + DataElement.boolean(HID_BOOT_DEVICE), + ), + ServiceAttribute( + SDP_HID_SSR_HOST_MAX_LATENCY_ATTRIBUTE_ID, + DataElement.unsigned_integer_16(HID_SSR_HOST_MAX_LATENCY), + ), + ServiceAttribute( + SDP_HID_SSR_HOST_MIN_TIMEOUT_ATTRIBUTE_ID, + DataElement.unsigned_integer_16(HID_SSR_HOST_MIN_TIMEOUT), + ), + ] + } + + +# ----------------------------------------------------------------------------- +async def get_stream_reader(pipe) -> asyncio.StreamReader: + loop = asyncio.get_event_loop() + reader = asyncio.StreamReader(loop=loop) + protocol = asyncio.StreamReaderProtocol(reader) + await loop.connect_read_pipe(lambda: protocol, pipe) + return reader + + +keyboardData=bytearray([0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]) +mouseData=bytearray([0x00, 0x00, 0x00, 0x00]) + +# ----------------------------------------------------------------------------- +async def keyboard_device(hid_device, command): + if command == 'web': + # Start a Websocket server to receive events from a web page + async def serve(websocket, _path): + global keyboardData + global mouseData + while True: + try: + message = await websocket.recv() + print('Received: ', str(message)) + parsed = json.loads(message) + message_type = parsed['type'] + if message_type == 'keydown': + # Only deal with keys a to z for now + key = parsed['key'] + if len(key) == 1: + code = ord(key) + if ord('a') <= code <= ord('z'): + hid_code = 0x04 + code - ord('a') + keyboardData = bytearray([0x01, 0x00, 0x00, hid_code, 0x00, 0x00, 0x00, 0x00, 0x00]) + hid_device.send_data(keyboardData) + elif message_type == 'keyup': + keyboardData = bytearray([0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]) + hid_device.send_data(keyboardData) + elif message_type == "mousemove": + x = parsed['x'] + if x > 127: + x = 127 + elif x < -127: + x = -127 + y = parsed['y'] + if y > 127: + y = 127 + elif y < -127: + y = -127 + x_cord = x.to_bytes(signed = True) + y_cord = y.to_bytes(signed = True) + mouseData = bytearray([0x02, 0x00]) + x_cord + y_cord + hid_device.send_data(mouseData) + except websockets.exceptions.ConnectionClosedOK: + pass + + # pylint: disable-next=no-member + await websockets.serve(serve, 'localhost', 8989) + await asyncio.get_event_loop().create_future() + else: + message = bytes('hello', 'ascii') + while True: + for letter in message: + await asyncio.sleep(3.0) + + # Keypress for the letter + keycode = 0x04 + letter - 0x61 + keyboardData = bytearray([0x01, 0x00, 0x00, keycode, 0x00, 0x00, 0x00, 0x00, 0x00]) + hid_device.send_data(keyboardData) + + # Key release + keyboardData = bytearray([0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]) + hid_device.send_data(keyboardData) + + +# ----------------------------------------------------------------------------- +async def main(): + if len(sys.argv) < 3: + print( + 'Usage: python run_hid_device.py ' + ' where is one of:\n' + ' test-mode (run with menu enabled for testing)\n' + ' web (run a keyboard with keypress input from a web page, ' + 'see keyboard.html' + ) + print('example: python run_hid_device.py classic3.json usb:0 web') + print('example: python run_hid_device.py classic3.json usb:0 test-mode') + + return + + async def handle_virtual_cable_unplug(): + hid_host_bd_addr = str(hid_device.remote_device_bd_address) + await hid_device.disconnect_interrupt_channel() + await hid_device.disconnect_control_channel() + await device.keystore.delete(hid_host_bd_addr) #type: ignore + connection = hid_device.connection + if connection is not None: + await connection.disconnect() + + def on_hid_data_cb(pdu): + print(f'Received Data, PDU: {pdu.hex()}') + + def on_set_report_cb(report_id: int, report: bytes): + if (report_id > 2) or (report_id == 0): + hid_device.send_handshake_message(Message.Handshake.ERR_INVALID_REPORT_ID) + print("Warning: Report ID Not Supported") + else: + hid_device.send_handshake_message(Message.Handshake.SUCCESSFUL) + print('Set Report, Report ID: ', report_id) + print('Report:', report) + + def on_get_protocol_cb(): + if HID_BOOT_DEVICE: + data = protocol_mode.to_bytes() + hid_device.send_control_data(Message.ReportType.OTHER_REPORT, data) + else: + hid_device.send_handshake_message(Message.Handshake.ERR_UNSUPPORTED_REQUEST) + + def on_get_report_cb(report_id,report_type, buffer_size): + retValue = hid_device.GetReportStatus() + + if report_type == Message.ReportType.INPUT_REPORT: + print("GET_REPORT - inputType") + if report_id == 1: + retValue.data = keyboardData + retValue.status = hid_device.ReportStatus.SUCCESS + elif report_id == 2: + retValue.data = mouseData + retValue.status = hid_device.ReportStatus.SUCCESS + else: + retValue.status = hid_device.ReportStatus.REPORT_ID_NOT_FOUND + + if(buffer_size): + data_len = buffer_size -1 + retValue.data = retValue.data[:data_len] + elif report_type == Message.ReportType.OUTPUT_REPORT: + print("GET_REPORT - outputType") + #This sample app has nothing to do with the report received, to enable PTS + #testing, we will return single byte random data. + retValue.data = bytearray([0x11]) + retValue.status = hid_device.ReportStatus.SUCCESS + + elif report_type == Message.ReportType.FEATURE_REPORT: + #TBD - not requried for PTS testing + print("GET_REPORT - FeatureReport") + retValue.status = hid_device.ReportStatus.ERR_UNSUPPORTED_REQUEST + + else: + retValue.status = hid_device.ReportStatus.FAILURE + + return retValue + + def on_set_protocol_cb(param): + if HID_BOOT_DEVICE: + global protocol_mode + protocol_mode = Message.ProtocolMode(param) + hid_device.send_handshake_message(Message.Handshake.SUCCESSFUL) + else: + hid_device.send_handshake_message(Message.Handshake.ERR_UNSUPPORTED_REQUEST) + + def on_virtual_cable_unplug_cb(): + print(f'Received Virtual Cable Unplug') + asyncio.create_task(handle_virtual_cable_unplug()) + + print('<<< connecting to HCI...') + async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): + print('<<< connected') + + # Create a device + device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) + device.classic_enabled = True + + # Create and register HID device + hid_device = HID_Device(device) + + # Register for call backs + hid_device.on('interrupt_data', on_hid_data_cb) + hid_device.on('set_report', on_set_report_cb) + hid_device.on('get_protocol', on_get_protocol_cb) + hid_device.on('set_protocol', on_set_protocol_cb) + + hid_device.register_get_report_cb(on_get_report_cb) + + # Register for virtual cable unplug call back + hid_device.on('virtual_cable_unplug', on_virtual_cable_unplug_cb) + + # Setup the SDP to advertise HID Device service + device.sdp_service_records = sdp_records() + + # Start the controller + await device.power_on() + + # Start being discoverable and connectable + await device.set_discoverable(True) + await device.set_connectable(True) + + async def menu(): + reader = await get_stream_reader(sys.stdin) + while True: + print("\n************************ HID Device Menu *****************************\n") + print(" 1. Connect Control Channel") + print(" 2. Connect Interrupt Channel") + print(" 3. Disconnect Control Channel") + print(" 4. Disconnect Interrupt Channel") + print(" 5. Send Report") + print(" 6. Virtual Cable Unplug") + print(" 7. Disconnect device") + print(" 8. Delete Bonding") + print(" 9. Re-connect to device") + print("\nEnter your choice : \n") + + choice = await reader.readline() + choice = choice.decode('utf-8').strip() + + if choice == '1': + await hid_device.connect_control_channel() + + elif choice == '2': + await hid_device.connect_interrupt_channel() + + elif choice == '3': + await hid_device.disconnect_control_channel() + + elif choice == '4': + await hid_device.disconnect_interrupt_channel() + + elif choice == '5': + print(" 1. Report ID 0x01") + print(" 2. Report ID 0x02") + + choice1 = await reader.readline() + choice1 = choice1.decode('utf-8').strip() + + if choice1 == '1': + data = bytearray([0x01, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00]) + hid_device.send_data(data) + data = bytearray([0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]) + hid_device.send_data(data) + + elif choice1 == '2': + data = bytearray([0x02, 0x00, 0x00, 0xf6]) + hid_device.send_data(data) + data = bytearray([0x02, 0x00, 0x00, 0x00]) + hid_device.send_data(data) + + else: + print('Incorrect option selected') + + elif choice == '6': + hid_device.virtual_cable_unplug() + try: + hid_host_bd_addr = str(hid_device.remote_device_bd_address) + await device.keystore.delete(hid_host_bd_addr) + except KeyError: + print('Device not found or Device already unpaired.') + + elif choice == '7': + connection = hid_device.connection + if connection is not None: + await connection.disconnect() + else: + print("Already disconnected from device") + + elif choice == '8': + try: + hid_host_bd_addr = str(hid_device.remote_device_bd_address) + await device.keystore.delete(hid_host_bd_addr) + except KeyError: + print('Device not found or Device already unpaired.') + + elif choice == '9': + hid_host_bd_addr = str(hid_device.remote_device_bd_address) + connection = await device.connect(hid_host_bd_addr, transport=BT_BR_EDR_TRANSPORT) + await connection.authenticate() + await connection.encrypt() + + else: + print("Invalid option selected.") + + if len(sys.argv) > 3: + command = sys.argv[3] + + if command == 'test-mode': + # Enabling menu for testing + await menu() + + elif command == 'web': + # Run as a keyboard device + await keyboard_device(hid_device, command) + + else: + print("Command incorrect. Switching to default: web") + await keyboard_device(hid_device, 'web') + + else: + await keyboard_device(hid_device, 'web') + + await hci_source.wait_for_termination() + + +# ----------------------------------------------------------------------------- +logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper()) +asyncio.run(main()) diff --git a/examples/run_hid_host.py b/examples/run_hid_host.py index a174444..7076bdd 100644 --- a/examples/run_hid_host.py +++ b/examples/run_hid_host.py @@ -290,7 +290,10 @@ async def main(): print('example: run_hid_host.py classic1.json usb:0 E1:CA:72:48:C4:E8/P') return - def on_hid_data_cb(pdu): + def on_hid_control_data_cb(pdu: bytes): + print(f'Received Control Data, PDU: {pdu.hex()}') + + def on_hid_interrupt_data_cb(pdu: bytes): report_type = pdu[0] & 0x0F if len(pdu) == 1: print(color(f'Warning: No report received', 'yellow')) @@ -310,15 +313,17 @@ async def main(): if (report_length <= 1) or (report_id == 0): return - - if report_type == Message.ReportType.INPUT_REPORT: + #Parse report over interrupt channel + if (report_type == Message.ReportType.INPUT_REPORT): ReportParser.parse_input_report(pdu[1:]) # type: ignore async def handle_virtual_cable_unplug(): await hid_host.disconnect_interrupt_channel() await hid_host.disconnect_control_channel() await device.keystore.delete(target_address) # type: ignore - await connection.disconnect() + connection = hid_host.connection + if connection is not None: + await connection.disconnect() def on_hid_virtual_cable_unplug_cb(): asyncio.create_task(handle_virtual_cable_unplug()) @@ -330,6 +335,18 @@ async def main(): # Create a device device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) device.classic_enabled = True + + # Create HID host and start it + print('@@@ Starting HID Host...') + hid_host = Host(device) + + # Register for HID data call back + hid_host.on('interrupt_data', on_hid_interrupt_data_cb) + hid_host.on('control_data', on_hid_control_data_cb) + + # Register for virtual cable unplug call back + hid_host.on('virtual_cable_unplug', on_hid_virtual_cable_unplug_cb) + await device.power_on() # Connect to a peer @@ -350,15 +367,6 @@ async def main(): await get_hid_device_sdp_record(device, connection) - # Create HID host and start it - print('@@@ Starting HID Host...') - hid_host = Host(device, connection) - - # Register for HID data call back - hid_host.on('data', on_hid_data_cb) - - # Register for virtual cable unplug call back - hid_host.on('virtual_cable_unplug', on_hid_virtual_cable_unplug_cb) async def menu(): reader = await get_stream_reader(sys.stdin) @@ -399,24 +407,40 @@ async def main(): await hid_host.disconnect_interrupt_channel() elif choice == '5': - print(" 1. Report ID 0x02") - print(" 2. Report ID 0x03") - print(" 3. Report ID 0x05") + print(" 1. Report ID 0x02 - Input, Mouse") + print(" 2. Report ID 0x03 - Input, Keyboard") + print(" 3. Report ID 0x05 - Input, Invalid ReportId") + print(" 4. Report ID 0x02 - Output") + print(" 5. Report ID 0x05 - Feature") choice1 = await reader.readline() choice1 = choice1.decode('utf-8').strip() if choice1 == '1': - hid_host.get_report(1, 2, 3) + hid_host.get_report(1, 2, 0) elif choice1 == '2': - hid_host.get_report(2, 3, 2) - + hid_host.get_report(1, 1, 0) + elif choice1 == '3': - hid_host.get_report(3, 5, 3) + hid_host.get_report(1, 5, 0) + + elif choice1 == '4': + hid_host.get_report(2, 1, 0) + elif choice1 == '5': + hid_host.get_report(3, 5, 0) + + elif choice1 == '6': + hid_host.get_report(1, 2, 3) + + elif choice1 == '7': + hid_host.get_report(2, 3, 2) + + elif choice1 == '8': + hid_host.get_report(3, 5, 3) else: print('Incorrect option selected') - + elif choice == '6': print(" 1. Report type 1 and Report id 0x01") print(" 2. Report type 2 and Report id 0x03") @@ -489,6 +513,7 @@ async def main(): hid_host.virtual_cable_unplug() try: await device.keystore.delete(target_address) + print("Unpair successful") except KeyError: print('Device not found or Device already unpaired.')