From e02303a44828c613e58b842afe850891954770a2 Mon Sep 17 00:00:00 2001 From: SneKarnataki Date: Fri, 15 Sep 2023 11:31:15 +0000 Subject: [PATCH] Submitting the initial version of HID Profile files Includes: 1. HID Host implementation - hid.py 2. HID application to test Host with 3rd party HID Device application - run_hid_host.py 3. HID supporting files for testing - hid_report_parser.py & hid_key_map.py Commands to run the application: Default application: python run_hid_host.py classic1.json usb:0 Menu options for testing (Get/Set): python run_hid_host.py classic1.json usb:0 test-mode CuttleFish:tcp-client:127.0.0.1:7300 Application used for testing as Device : Bluetooth Keyboard & Mouse-5.3.0.apk Note: Change in sdp.py file while testing hid profile, TEXT_STRING: lambda x: DataElement(DataElement.TEXT_STRING, x.decode('utf8')) changed to TEXT_STRING: lambda x: DataElement(DataElement.TEXT_STRING, x) as we were facing error "UnicodeDecodeError: 'utf-8' codec can't decode byte 0xa1 in position 4: invalid start byte" while fetching sdp records. --- bumble/hid.py | 247 +++++++++++++++++- bumble/hid_menu.py | 17 -- examples/hid_key_map.py | 248 ++++++++++++++++++ examples/hid_report_parser.py | 139 ++++++++++ examples/run_hid_device.py | 1 - examples/run_hid_host.py | 461 ++++++++++++++++++++++++++++++++++ 6 files changed, 1094 insertions(+), 19 deletions(-) delete mode 100644 bumble/hid_menu.py create mode 100644 examples/hid_key_map.py create mode 100644 examples/hid_report_parser.py delete mode 100644 examples/run_hid_device.py create mode 100644 examples/run_hid_host.py diff --git a/bumble/hid.py b/bumble/hid.py index 71b20a07..19c7773d 100644 --- a/bumble/hid.py +++ b/bumble/hid.py @@ -1 +1,246 @@ -#Implement hid profile here - TBD +# Copyright 2021-2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# ----------------------------------------------------------------------------- +# Imports +# ----------------------------------------------------------------------------- +import logging +import asyncio + +from pyee import EventEmitter +from typing import Optional, Tuple, Callable, Dict, Union + +from . import core, l2cap # type: ignore +from .colors import color # type: ignore +from .core import BT_BR_EDR_TRANSPORT, InvalidStateError, ProtocolError # type: ignore + +# ----------------------------------------------------------------------------- +# Logging +# ----------------------------------------------------------------------------- +logger = logging.getLogger(__name__) + + +# ----------------------------------------------------------------------------- +# Constants +# ----------------------------------------------------------------------------- +# fmt: off + +HID_CTRL_PSM = 0x0011 +HID_INTR_PSM = 0x0013 + +# HIDP message types +HID_HANDSHAKE = 0x00 +HID_CONTROL = 0x01 +HID_GET_REPORT = 0x04 +HID_SET_REPORT = 0x05 +HID_GET_PROTOCOL = 0x06 +HID_SET_PROTOCOL = 0x07 +HID_DATA = 0x0A + +# Report types +HID_OTHER_REPORT = 0x00 +HID_INPUT_REPORT = 0x01 +HID_OUTPUT_REPORT = 0x02 +HID_FEATURE_REPORT = 0x03 + +# Handshake parameters +HANDSHAKE_SUCCESSFUL = 0x00 +HANDSHAKE_NOT_READY = 0x01 +HANDSHAKE_ERR_INVALID_REPORT_ID = 0x02 +HANDSHAKE_ERR_UNSUPPORTED_REQUEST = 0x03 +HANDSHAKE_ERR_UNKNOWN = 0x0E +HANDSHAKE_ERR_FATAL = 0x0F + +# Protocol modes +HID_BOOT_PROTOCOL_MODE = 0x00 +HID_REPORT_PROTOCOL_MODE = 0x01 + +# Control Operations +HID_SUSPEND = 0x03 +HID_EXIT_SUSPEND = 0x04 +HID_VIRTUAL_CABLE_UNPLUG = 0x05 + +# ----------------------------------------------------------------------------- +class HIDHost(EventEmitter): + l2cap_channel: Optional[l2cap.Channel] + + def __init__(self, device, connection) -> None: + super().__init__() + self.device = device + self.connection = connection + self.l2cap_ctrl_channel= None + self.l2cap_intr_channel = None + + # Register ourselves with the L2CAP channel manager + device.register_l2cap_server(HID_CTRL_PSM, self.on_connection) + device.register_l2cap_server(HID_INTR_PSM, self.on_connection) + + async def control_channel_connect(self) -> None: + # Create a new L2CAP connection - control channel + try: + self.l2cap_ctrl_channel = await self.device.l2cap_channel_manager.connect( + self.connection, HID_CTRL_PSM + ) + except ProtocolError as error: + logger.error(f'L2CAP connection failed: {error}') + raise + + assert self.l2cap_ctrl_channel is not None + # Become a sink for the L2CAP channel + self.l2cap_ctrl_channel.sink = self.on_ctrl_pdu + + async def interrupt_channel_connect(self) -> None: + # Create a new L2CAP connection - interrupt channel + try: + self.l2cap_intr_channel = await self.device.l2cap_channel_manager.connect( + self.connection, HID_INTR_PSM + ) + except ProtocolError as error: + logger.error(f'L2CAP connection failed: {error}') + raise + + assert self.l2cap_intr_channel is not None + # Become a sink for the L2CAP channel + self.l2cap_intr_channel.sink = self.on_intr_pdu + + async def interrupt_channel_disconnect(self): + await self.l2cap_intr_channel.disconnect() # type: ignore + + async def control_channel_disconnect(self): + await self.l2cap_ctrl_channel.disconnect() # type: ignore + + def on_connection(self, l2cap_channel: l2cap.Channel) -> None: + logger.debug(f'+++ New L2CAP connection: {l2cap_channel}') + l2cap_channel.on('open', lambda: self.on_l2cap_channel_open(l2cap_channel)) + + def on_l2cap_channel_open(self, l2cap_channel: l2cap.Channel) -> None: + if l2cap_channel.psm == HID_CTRL_PSM: + self.l2cap_ctrl_channel = l2cap_channel + self.l2cap_ctrl_channel.sink = self.on_ctrl_pdu + else: + self.l2cap_intr_channel = l2cap_channel + self.l2cap_intr_channel.sink = self.on_intr_pdu + logger.debug(f'$$$ L2CAP channel open: {l2cap_channel}') + + def on_ctrl_pdu(self, pdu: bytes) -> None: + logger.debug(f'<<< HID CONTROL PDU: {pdu.hex()}') + # Here we will receive all kinds of packets, parse and then call respective callbacks + message_type = pdu[0] >> 4 + param = pdu[0] & 0x0f + if (message_type == HID_HANDSHAKE): + logger.debug('<<< HID HANDSHAKE') + self.handle_handshake(param) + self.emit('handshake', ) + elif (message_type == HID_DATA): + logger.debug('<<< HID CONTROL DATA') + self.emit('data', pdu) + elif (message_type == HID_CONTROL): + if (param == HID_SUSPEND): + logger.debug('<<< HID SUSPEND') + self.emit('suspend', pdu) + elif (param == HID_EXIT_SUSPEND): + logger.debug('<<< HID EXIT SUSPEND') + self.emit('exit_suspend', pdu) + elif (param == HID_VIRTUAL_CABLE_UNPLUG): + logger.debug('<<< HID VIRTUAL CABLE UNPLUG') + self.emit('virtual_cable_unplug') + else: + logger.debug('<<< HID CONTROL OPERATION UNSUPPORTED') + else: + logger.debug('<<< HID CONTROL DATA') + self.emit('data', pdu) + + def on_intr_pdu(self, pdu: bytes) -> None: + logger.debug(f'<<< HID INTERRUPT PDU: {pdu.hex()}') + self.emit("data", pdu) + + def register_data_cb(self, data_cb): + self.on('data', data_cb) + + def register_handshake_cb(self, handshake_cb): + self.on('handshake', handshake_cb) + + def register_virtual_cable_unplug(self, virtual_cable_unplug_cb): + self.on('virtual_cable_unplug', virtual_cable_unplug_cb) + + def get_report(self, report_type, report_id, buffer_size): + if(report_type == HID_OTHER_REPORT): + param = report_type + else: + param = 0x08 | report_type + header = ((HID_GET_REPORT << 4) | param) + msg = bytes([header, report_id, (buffer_size & 0xff), ((buffer_size >> 8) & 0xff)]) + logger.debug(f'>>> HID CONTROL GET REPORT, PDU: {msg.hex()}') + self.l2cap_ctrl_channel.send_pdu(msg) # type: ignore + + def set_report(self, report_type, data): + header = ((HID_SET_REPORT << 4) | report_type) + msg = bytearray([header]) + msg.extend(data) + logger.debug(f'>>> HID CONTROL SET REPORT, PDU:{msg.hex()}') + self.l2cap_ctrl_channel.send_pdu(msg) # type: ignore + + def get_protocol(self): + header = (HID_GET_PROTOCOL << 4) + msg = bytearray([header]) + logger.debug(f'>>> HID CONTROL GET PROTOCOL, PDU: {msg.hex()}') + self.l2cap_ctrl_channel.send_pdu(msg) # type: ignore + + def set_protocol(self, protocol_mode): + header = (HID_SET_PROTOCOL << 4 | protocol_mode) + msg = bytearray([header]) + logger.debug(f'>>> HID CONTROL SET PROTOCOL, PDU: {msg.hex()}') + self.l2cap_ctrl_channel.send_pdu(msg) # type: ignore + + def send_data(self, data): + header = ((HID_DATA << 4) | HID_OUTPUT_REPORT) + msg = bytearray([header]) + msg.extend(data) + logger.debug(f'>>> HID INTERRUPT SEND DATA, PDU: {msg.hex()}') + self.l2cap_intr_channel.send_pdu(msg) # type: ignore + + def suspend(self): + header = (HID_CONTROL << 4 | HID_SUSPEND) + msg = bytearray([header]) + logger.debug(f'>>> HID CONTROL SUSPEND, PDU:{msg.hex()}') + self.l2cap_ctrl_channel.send_pdu(msg) # type: ignore + + def exit_suspend(self): + header = (HID_CONTROL << 4 | HID_EXIT_SUSPEND) + msg = bytearray([header]) + logger.debug(f'>>> HID CONTROL EXIT SUSPEND, PDU:{msg.hex()}') + self.l2cap_ctrl_channel.send_pdu(msg) # type: ignore + + def virtual_cable_unplug(self): + header = (HID_CONTROL << 4 | HID_VIRTUAL_CABLE_UNPLUG) + msg = bytearray([header]) + logger.debug(f'>>> HID CONTROL VIRTUAL CABLE UNPLUG, PDU: {msg.hex()}') + self.l2cap_ctrl_channel.send_pdu(msg) # type: ignore + + def handle_handshake(self, param): + if (param == HANDSHAKE_SUCCESSFUL): + logger.debug(f'<<< HID HANDSHAKE: SUCCESSFUL') + elif (param == HANDSHAKE_NOT_READY): + logger.warning(f'<<< HID HANDSHAKE: NOT_READY') + elif (param == HANDSHAKE_ERR_INVALID_REPORT_ID): + logger.warning(f'<<< HID HANDSHAKE: ERR_INVALID_REPORT_ID') + elif (param == HANDSHAKE_ERR_UNSUPPORTED_REQUEST): + logger.warning(f'<<< HID HANDSHAKE: ERR_UNSUPPORTED_REQUEST') + elif (param == HANDSHAKE_ERR_UNKNOWN): + logger.warning(f'<<< HID HANDSHAKE: ERR_UNKNOWN') + elif (param == HANDSHAKE_ERR_FATAL): + logger.warning(f'<<< HID HANDSHAKE: ERR_FATAL') + else: # 0x5-0xD = Reserved + logger.warning("<<< HID HANDSHAKE: RESERVED VALUE") + diff --git a/bumble/hid_menu.py b/bumble/hid_menu.py deleted file mode 100644 index 8b732575..00000000 --- a/bumble/hid_menu.py +++ /dev/null @@ -1,17 +0,0 @@ -"""TODO(skarnataki): DO NOT SUBMIT without one-line documentation for hid_menu. - -TODO(skarnataki): DO NOT SUBMIT without a detailed description of hid_menu. -""" - -from collections.abc import Sequence - -from absl import app - - -def main(argv: Sequence[str]) -> None: - if len(argv) > 1: - raise app.UsageError("Too many command-line arguments.") - - -if __name__ == "__main__": - app.run(main) diff --git a/examples/hid_key_map.py b/examples/hid_key_map.py new file mode 100644 index 00000000..aa919fd2 --- /dev/null +++ b/examples/hid_key_map.py @@ -0,0 +1,248 @@ +# shift map + +# letters +shift_map = { + 'a' : 'A', + 'b' : 'B', + 'c' : 'C', + 'd' : 'D', + 'e' : 'E', + 'f' : 'F', + 'g' : 'G', + 'h' : 'H', + 'i' : 'I', + 'j' : 'J', + 'k' : 'K', + 'l' : 'L', + 'm' : 'M', + 'n' : 'N', + 'o' : 'O', + 'p' : 'P', + 'q' : 'Q', + 'r' : 'R', + 's' : 'S', + 't' : 'T', + 'u' : 'U', + 'v' : 'V', + 'w' : 'W', + 'x' : 'X', + 'y' : 'Y', + 'z' : 'Z', + # numbers + '1' : '!', + '2' : '@', + '3' : '#', + '4' : '$', + '5' : '%', + '6' : '^', + '7' : '&', + '8' : '*', + '9' : '(', + '0' : ')', + # symbols + '-' : '_', + '=' : '+', + '[' : '{', + ']' : '}', + '\\' : '|', + ';' : ':', + '\'' : '"', + ',' : '<', + '.' : '>', + '/' : '?', + '`' : '~' +} + +# hex map + +# modifier keys +mod_keys = { + '00' : '', + '01' : 'left_ctrl', + '02' : 'left_shift', + '04' : 'left_alt', + '08' : 'left_meta', + '10' : 'right_ctrl', + '20' : 'right_shift', + '40' : 'right_alt', + '80' : 'right_meta' +} + +# base keys + +base_keys = { + # meta + '00' : '', # none + '01' : 'error_ovf', + # letters + '04' : 'a', + '05' : 'b', + '06' : 'c', + '07' : 'd', + '08' : 'e', + '09' : 'f', + '0a' : 'g', + '0b' : 'h', + '0c' : 'i', + '0d' : 'j', + '0e' : 'k', + '0f' : 'l', + '10' : 'm', + '11' : 'n', + '12' : 'o', + '13' : 'p', + '14' : 'q', + '15' : 'r', + '16' : 's', + '17' : 't', + '18' : 'u', + '19' : 'v', + '1a' : 'w', + '1b' : 'x', + '1c' : 'y', + '1d' : 'z', + # numbers + '1e' : '1', + '1f' : '2', + '20' : '3', + '21' : '4', + '22' : '5', + '23' : '6', + '24' : '7', + '25' : '8', + '26' : '9', + '27' : '0', + # misc + '28' : 'enter', #enter \n + '29' : 'esc', + '2a' : 'backspace', + '2b' : 'tab', + '2c' : 'spacebar', #space + '2d' : '-', + '2e' : '=', + '2f' : '[', + '30' : ']', + '31' : '\\', + '32' : '=', + '33' : '_SEMICOLON', + '34' : 'KEY_APOSTROPHE', + '35' : 'KEY_GRAVE', + '36' : 'KEY_COMMA', + '37' : 'KEY_DOT', + '38' : 'KEY_SLASH', + '39' : 'KEY_CAPSLOCK', + '3a' : 'KEY_F1', + '3b' : 'KEY_F2', + '3c' : 'KEY_F3', + '3d' : 'KEY_F4', + '3e' : 'KEY_F5', + '3f' : 'KEY_F6', + '40' : 'KEY_F7', + '41' : 'KEY_F8', + '42' : 'KEY_F9', + '43' : 'KEY_F10', + '44' : 'KEY_F11', + '45' : 'KEY_F12', + '46' : 'KEY_SYSRQ', + '47' : 'KEY_SCROLLLOCK', + '48' : 'KEY_PAUSE', + '49' : 'KEY_INSERT', + '4a' : 'KEY_HOME', + '4b' : 'KEY_PAGEUP', + '4c' : 'KEY_DELETE', + '4d' : 'KEY_END', + '4e' : 'KEY_PAGEDOWN', + '4f' : 'KEY_RIGHT', + '50' : 'KEY_LEFT', + '51' : 'KEY_DOWN', + '52' : 'KEY_UP', + '53' : 'KEY_NUMLOCK', + '54' : 'KEY_KPSLASH', + '55' : 'KEY_KPASTERISK', + '56' : 'KEY_KPMINUS', + '57' : 'KEY_KPPLUS', + '58' : 'KEY_KPENTER', + '59' : 'KEY_KP1', + '5a' : 'KEY_KP2', + '5b' : 'KEY_KP3', + '5c' : 'KEY_KP4', + '5d' : 'KEY_KP5', + '5e' : 'KEY_KP6', + '5f' : 'KEY_KP7', + '60' : 'KEY_KP8', + '61' : 'KEY_KP9', + '62' : 'KEY_KP0', + '63' : 'KEY_KPDOT', + '64' : 'KEY_102ND', + '65' : 'KEY_COMPOSE', + '66' : 'KEY_POWER', + '67' : 'KEY_KPEQUAL', + '68' : 'KEY_F13', + '69' : 'KEY_F14', + '6a' : 'KEY_F15', + '6b' : 'KEY_F16', + '6c' : 'KEY_F17', + '6d' : 'KEY_F18', + '6e' : 'KEY_F19', + '6f' : 'KEY_F20', + '70' : 'KEY_F21', + '71' : 'KEY_F22', + '72' : 'KEY_F23', + '73' : 'KEY_F24', + '74' : 'KEY_OPEN', + '75' : 'KEY_HELP', + '76' : 'KEY_PROPS', + '77' : 'KEY_FRONT', + '78' : 'KEY_STOP', + '79' : 'KEY_AGAIN', + '7a' : 'KEY_UNDO', + '7b' : 'KEY_CUT', + '7c' : 'KEY_COPY', + '7d' : 'KEY_PASTE', + '7e' : 'KEY_FIND', + '7f' : 'KEY_MUTE', + '80' : 'KEY_VOLUMEUP', + '81' : 'KEY_VOLUMEDOWN', + '85' : 'KEY_KPCOMMA', + '87' : 'KEY_RO', + '88' : 'KEY_KATAKANAHIRAGANA', + '89' : 'KEY_YEN', + '8a' : 'KEY_HENKAN', + '8b' : 'KEY_MUHENKAN', + '8c' : 'KEY_KPJPCOMMA', + '90' : 'KEY_HANGEUL', + '91' : 'KEY_HANJA', + '92' : 'KEY_KATAKANA', + '93' : 'KEY_HIRAGANA', + '94' : 'KEY_ZENKAKUHANKAKU', + 'b6' : 'KEY_KPLEFTPAREN', + 'b7' : 'KEY_KPRIGHTPAREN', + 'e0' : 'KEY_LEFTCTRL', + 'e1' : 'KEY_LEFTSHIFT', + 'e2' : 'KEY_LEFTALT', + 'e3' : 'KEY_LEFTMETA', + 'e4' : 'KEY_RIGHTCTRL', + 'e5' : 'KEY_RIGHTSHIFT', + 'e6' : 'KEY_RIGHTALT', + 'e7' : 'KEY_RIGHTMETA', + 'e8' : 'KEY_MEDIA_PLAYPAUSE', + 'e9' : 'KEY_MEDIA_STOPCD', + 'ea' : 'KEY_MEDIA_PREVIOUSSONG', + 'eb' : 'KEY_MEDIA_NEXTSONG', + 'ec' : 'KEY_MEDIA_EJECTCD', + 'ed' : 'KEY_MEDIA_VOLUMEUP', + 'ee' : 'KEY_MEDIA_VOLUMEDOWN', + 'ef' : 'KEY_MEDIA_MUTE', + 'f0' : 'KEY_MEDIA_WWW', + 'f1' : 'KEY_MEDIA_BACK', + 'f2' : 'KEY_MEDIA_FORWARD', + 'f3' : 'KEY_MEDIA_STOP', + 'f4' : 'KEY_MEDIA_FIND', + 'f5' : 'KEY_MEDIA_SCROLLUP', + 'f6' : 'KEY_MEDIA_SCROLLDOWN', + 'f7' : 'KEY_MEDIA_EDIT', + 'f8' : 'KEY_MEDIA_SLEEP', + 'f9' : 'KEY_MEDIA_COFFEE', + 'fa' : 'KEY_MEDIA_REFRESH', + 'fb' : 'KEY_MEDIA_CALC' +} diff --git a/examples/hid_report_parser.py b/examples/hid_report_parser.py new file mode 100644 index 00000000..f53288fc --- /dev/null +++ b/examples/hid_report_parser.py @@ -0,0 +1,139 @@ +from bumble.colors import color +from hid_key_map import base_keys, mod_keys, shift_map + + +# ------------------------------------------------------------------------------ +def get_key(modifier: str, key: str) -> str: + if modifier == '22': + modifier = '02' + if modifier in mod_keys: + modifier = mod_keys[modifier] + else: + return '' + if key in base_keys: + key = base_keys[key] + else: + return '' + if (modifier == 'left_shift' or modifier == 'right_shift') and key in shift_map: + key = shift_map[key] + + return key + + +class Keyboard: + def __init__(self): # type: ignore + self.report = [ + [ # Bit array for Modifier keys + 0, # Right GUI - (usually the Windows key) + 0, # Right ALT + 0, # Right Shift + 0, # Right Control + 0, # Left GUI - (usually the Windows key) + 0, # Left ALT + 0, # Left Shift + 0, # Left Control + ], + 0x00, # Vendor reserved + '', # Rest is space for 6 keys + '', + '', + '', + '', + '', + ] + + def decode_keyboard_report(self, input_report: bytes, report_length: int) -> None: + if report_length >= 8: + modifier = input_report[1] + self.report[0] = [int(x) for x in '{0:08b}'.format(modifier)] + self.report[0].reverse() # type: ignore + + modifier_key = str((modifier & 0x22).to_bytes(1, "big").hex()) + keycodes = [] + for k in range(3, report_length): + keycodes.append(str(input_report[k].to_bytes(1, "big").hex())) + self.report[k - 1] = get_key(modifier_key, keycodes[k - 3]) + else: + print(color('Warning: Not able to parse report', 'yellow')) + + def print_keyboard_report(self) -> None: + print(color('\tKeyboard Input Received', 'green', None, 'bold')) + print(color(f'Keys:', 'white', None, 'bold')) + for i in range(1, 7): + print(color(f' Key{i}{" ":>8s}= ', 'cyan', None, 'bold'), self.report[i + 1]) + print(color(f'\nModifier Keys:', 'white', None, 'bold')) + print( + color(f' Left Ctrl : ', 'cyan'), f'{self.report[0][0] == 1!s:<5}', # type: ignore + color(f' Left Shift : ', 'cyan'), f'{self.report[0][1] == 1!s:<5}', # type: ignore + color(f' Left ALT : ', 'cyan'), f'{self.report[0][2] == 1!s:<5}', # type: ignore + color(f' Left GUI : ', 'cyan'), f'{self.report[0][3] == 1!s:<5}\n', # type: ignore + color(f' Right Ctrl : ', 'cyan'), f'{self.report[0][4] == 1!s:<5}', # type: ignore + color(f' Right Shift : ', 'cyan'), f'{self.report[0][5] == 1!s:<5}', # type: ignore + color(f' Right ALT : ', 'cyan'), f'{self.report[0][6] == 1!s:<5}', # type: ignore + color(f' Right GUI : ', 'cyan'), f'{self.report[0][7] == 1!s:<5}', # type: ignore + ) + + +# ------------------------------------------------------------------------------ +class Mouse: + def __init__(self): # type: ignore + self.report = [ + [ # Bit array for Buttons + 0, # Button 1 (primary/trigger + 0, # Button 2 (secondary) + 0, # Button 3 (tertiary) + 0, # Button 4 + 0, # Button 5 + 0, # unused padding bits + 0, # unused padding bits + 0, # unused padding bits + ], + 0, # X + 0, # Y + 0, # Wheel + 0, # AC Pan + ] + + def decode_mouse_report(self, input_report: bytes, report_length: int) -> None: + self.report[0] = [int(x) for x in '{0:08b}'.format(input_report[1])] + self.report[0].reverse() # type: ignore + self.report[1] = input_report[2] + self.report[2] = input_report[3] + if report_length in [5, 6]: + self.report[3] = input_report[4] + self.report[4] = input_report[5] if report_length == 6 else 0 + + def print_mouse_report(self) -> None: + print(color('\tMouse Input Received', 'green', None, 'bold')) + print( + color(f' Button 1 (primary/trigger) = ', 'cyan'), self.report[0][0] == 1, # type: ignore + color(f'\n Button 2 (secondary) = ', 'cyan'), self.report[0][1] == 1, # type: ignore + color(f'\n Button 3 (tertiary) = ', 'cyan'), self.report[0][2] == 1, # type: ignore + color(f'\n Button4 = ', 'cyan'), self.report[0][3] == 1, # type: ignore + color(f'\n Button5 = ', 'cyan'), self.report[0][4] == 1, # type: ignore + color(f'\n X (X-axis displacement) = ', 'cyan'), self.report[1], + color(f'\n Y (Y-axis displacement) = ', 'cyan'), self.report[2], + color(f'\n Wheel = ', 'cyan'), self.report[3], + color(f'\n AC PAN = ', 'cyan'), self.report[4], + ) + + +# ------------------------------------------------------------------------------ +class ReportParser: + def parse_input_report(input_report: bytes) -> None: # type: ignore + + report_id = input_report[0] + report_length = len(input_report) + + # Keyboard input report (report id = 1) + if report_id == 1 and report_length >= 8: + keyboard = Keyboard() # type: ignore + keyboard.decode_keyboard_report(input_report, report_length) + keyboard.print_keyboard_report() + # Mouse input report (report id = 2) + elif report_id == 2 and report_length in [4, 5, 6]: + mouse = Mouse() # type: ignore + mouse.decode_mouse_report(input_report, report_length) + mouse.print_mouse_report() + else: + print(color(f'Warning: Parse Error Report ID {report_id}', 'yellow')) diff --git a/examples/run_hid_device.py b/examples/run_hid_device.py deleted file mode 100644 index 9daeafb9..00000000 --- a/examples/run_hid_device.py +++ /dev/null @@ -1 +0,0 @@ -test diff --git a/examples/run_hid_host.py b/examples/run_hid_host.py new file mode 100644 index 00000000..be76c461 --- /dev/null +++ b/examples/run_hid_host.py @@ -0,0 +1,461 @@ +# Copyright 2021-2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# ----------------------------------------------------------------------------- +# Imports +# ----------------------------------------------------------------------------- +import asyncio +import sys +import os +import logging + + +from bumble.colors import color + +import bumble.core +from bumble.device import Device +from bumble.transport import open_transport_or_link +from bumble.core import ( + BT_L2CAP_PROTOCOL_ID, + BT_HIDP_PROTOCOL_ID, + BT_HUMAN_INTERFACE_DEVICE_SERVICE, + BT_BR_EDR_TRANSPORT, +) +from bumble.hci import Address +from bumble.hid import HIDHost, HID_INPUT_REPORT, HID_OTHER_REPORT, HID_BOOT_PROTOCOL_MODE, HID_REPORT_PROTOCOL_MODE +from bumble.sdp import ( + Client as SDP_Client, + DataElement, + ServiceAttribute, + SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, + SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID, + SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID, + SDP_ALL_ATTRIBUTES_RANGE, + SDP_LANGUAGE_BASE_ATTRIBUTE_ID_LIST_ATTRIBUTE_ID, + SDP_ADDITIONAL_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, + SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID, + SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID, +) +from hid_report_parser import ReportParser + +# ----------------------------------------------------------------------------- +# SDP attributes for Bluetooth HID devices +SDP_HID_SERVICE_NAME_ATTRIBUTE_ID = 0x0100 +SDP_HID_SERVICE_DESCRIPTION_ATTRIBUTE_ID = 0x0101 +SDP_HID_PROVIDER_NAME_ATTRIBUTE_ID = 0x0102 +SDP_HID_DEVICE_RELEASE_NUMBER_ATTRIBUTE_ID = 0x0200 # [DEPRECATED] +SDP_HID_PARSER_VERSION_ATTRIBUTE_ID = 0x0201 +SDP_HID_DEVICE_SUBCLASS_ATTRIBUTE_ID = 0x0202 +SDP_HID_COUNTRY_CODE_ATTRIBUTE_ID = 0x0203 +SDP_HID_VIRTUAL_CABLE_ATTRIBUTE_ID = 0x0204 +SDP_HID_RECONNECT_INITIATE_ATTRIBUTE_ID = 0x0205 +SDP_HID_DESCRIPTOR_LIST_ATTRIBUTE_ID = 0x0206 +SDP_HID_LANGID_BASE_LIST_ATTRIBUTE_ID = 0x0207 +SDP_HID_SDP_DISABLE_ATTRIBUTE_ID = 0x0208 # [DEPRECATED] +SDP_HID_BATTERY_POWER_ATTRIBUTE_ID = 0x0209 +SDP_HID_REMOTE_WAKE_ATTRIBUTE_ID = 0x020A +SDP_HID_PROFILE_VERSION_ATTRIBUTE_ID = 0x020B # DEPRECATED] +SDP_HID_SUPERVISION_TIMEOUT_ATTRIBUTE_ID = 0x020C +SDP_HID_NORMALLY_CONNECTABLE_ATTRIBUTE_ID = 0x020D +SDP_HID_BOOT_DEVICE_ATTRIBUTE_ID = 0x020E +SDP_HID_SSR_HOST_MAX_LATENCY_ATTRIBUTE_ID = 0x020F +SDP_HID_SSR_HOST_MIN_TIMEOUT_ATTRIBUTE_ID = 0x0210 + +# ----------------------------------------------------------------------------- +async def get_hid_device_sdp_record(device, connection): + + # Connect to the SDP Server + sdp_client = SDP_Client(device) + await sdp_client.connect(connection) + if sdp_client: + print(color('Connected ith SDP Server', 'blue')) + else: + print(color('Failed to connect with SDP Server', 'red')) + + # List BT HID Device service in the root browse group + service_record_handles = await sdp_client.search_services( + [BT_HUMAN_INTERFACE_DEVICE_SERVICE] + ) + + if (len(service_record_handles) < 1): + print(color('BT HID Device service not found on peer device!!!!','red')) + await sdp_client.disconnect() + return + + # For BT_HUMAN_INTERFACE_DEVICE_SERVICE service, get all its attributes + for service_record_handle in service_record_handles: + attributes = await sdp_client.get_attributes( + service_record_handle, [SDP_ALL_ATTRIBUTES_RANGE] + ) + print( + color(f'SERVICE {service_record_handle:04X} attributes:', 'yellow') + ) + print(color(f'SDP attributes for HID device','magenta')) + for attribute in attributes: + if (attribute.id == SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID): + print(color(' Service Record Handle : ', 'cyan'), hex(attribute.value.value)) + + elif (attribute.id == SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID): + print(color(' Service Class : ', 'cyan'), attribute.value.value[0].value) + + elif (attribute.id == SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID): + print(color(' SDP Browse Group List : ', 'cyan'), attribute.value.value[0].value) + + elif (attribute.id == SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID): + print(color(' BT_L2CAP_PROTOCOL_ID : ', 'cyan'), attribute.value.value[0].value[0].value) + print(color(' PSM for Bluetooth HID Control channel : ', 'cyan'), hex(attribute.value.value[0].value[1].value)) + print(color(' BT_HIDP_PROTOCOL_ID : ', 'cyan'), attribute.value.value[1].value[0].value) + + elif (attribute.id == SDP_LANGUAGE_BASE_ATTRIBUTE_ID_LIST_ATTRIBUTE_ID): + print(color(' Lanugage : ', 'cyan'), hex(attribute.value.value[0].value)) + print(color(' Encoding : ', 'cyan'), hex(attribute.value.value[1].value)) + print(color(' PrimaryLanguageBaseID : ', 'cyan'), hex(attribute.value.value[2].value)) + + elif (attribute.id == SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID): + print(color(' BT_HUMAN_INTERFACE_DEVICE_SERVICE ', 'cyan'), attribute.value.value[0].value[0].value) + print(color(' HID Profileversion number : ', 'cyan'), hex(attribute.value.value[0].value[1].value)) + + elif (attribute.id == SDP_ADDITIONAL_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID): + print(color(' BT_L2CAP_PROTOCOL_ID : ', 'cyan'), attribute.value.value[0].value[0].value[0].value) + print(color(' PSM for Bluetooth HID Interrupt channel : ', 'cyan'), hex(attribute.value.value[0].value[0].value[1].value)) + print(color(' BT_HIDP_PROTOCOL_ID : ', 'cyan'), attribute.value.value[0].value[1].value[0].value) + + elif (attribute.id == SDP_HID_SERVICE_NAME_ATTRIBUTE_ID): + print(color(' Service Name: ', 'cyan'), attribute.value.value) + HID_Service_Name = attribute.value.value + + elif (attribute.id == SDP_HID_SERVICE_DESCRIPTION_ATTRIBUTE_ID): + print(color(' Service Description: ', 'cyan'), attribute.value.value) + HID_Service_Description = attribute.value.value + + elif (attribute.id == SDP_HID_PROVIDER_NAME_ATTRIBUTE_ID): + print(color(' Provider Name: ', 'cyan'), attribute.value.value) + HID_Provider_Name = attribute.value.value + + elif (attribute.id == SDP_HID_DEVICE_RELEASE_NUMBER_ATTRIBUTE_ID): + print(color(' Release Number: ', 'cyan'), hex(attribute.value.value)) + HID_Device_Release_Number = attribute.value.value + + elif (attribute.id == SDP_HID_PARSER_VERSION_ATTRIBUTE_ID): + print(color(' HID Parser Version: ', 'cyan'), hex(attribute.value.value)) + HID_Parser_Version = attribute.value.value + + elif (attribute.id == SDP_HID_DEVICE_SUBCLASS_ATTRIBUTE_ID): + print(color(' HIDDeviceSubclass: ', 'cyan'), hex(attribute.value.value)) + HID_Device_Subclass = attribute.value.value + + elif (attribute.id == SDP_HID_COUNTRY_CODE_ATTRIBUTE_ID): + print(color(' HIDCountryCode: ', 'cyan'), hex(attribute.value.value)) + HID_Country_Code = attribute.value.value + + elif (attribute.id == SDP_HID_VIRTUAL_CABLE_ATTRIBUTE_ID): + print(color(' HIDVirtualCable: ', 'cyan'), attribute.value.value) + HID_Virtual_Cable = attribute.value.value + + elif (attribute.id == SDP_HID_RECONNECT_INITIATE_ATTRIBUTE_ID): + print(color(' HIDReconnectInitiate: ', 'cyan'), attribute.value.value) + HID_Reconnect_Initiate = attribute.value.value + + elif (attribute.id == SDP_HID_DESCRIPTOR_LIST_ATTRIBUTE_ID): + print(color(' HID Report Descriptor type: ', 'cyan'), hex(attribute.value.value[0].value[0].value)) + print(color(' HID Report DescriptorList: ', 'cyan'), attribute.value.value[0].value[1].value) + HID_Descriptor_Type = attribute.value.value[0].value[0].value + HID_Report_Descriptor_List = attribute.value.value[0].value[1].value + + elif (attribute.id == SDP_HID_LANGID_BASE_LIST_ATTRIBUTE_ID): + print(color(' HID LANGID Base Language: ', 'cyan'), hex(attribute.value.value[0].value[0].value)) + print(color(' HID LANGID Base Bluetooth String Offset: ', 'cyan'), hex(attribute.value.value[0].value[1].value)) + HID_LANGID_Base_Language = attribute.value.value[0].value[0].value + HID_LANGID_Base_Bluetooth_String_Offset = attribute.value.value[0].value[1].value + + elif (attribute.id == SDP_HID_BATTERY_POWER_ATTRIBUTE_ID): + print(color(' HIDBatteryPower: ', 'cyan'), attribute.value.value) + HID_Battery_Power = attribute.value.value + + elif (attribute.id == SDP_HID_REMOTE_WAKE_ATTRIBUTE_ID): + print(color(' HIDRemoteWake: ', 'cyan'), attribute.value.value) + HID_Remote_Wake = attribute.value.value + + elif (attribute.id == SDP_HID_PROFILE_VERSION_ATTRIBUTE_ID): + print(color(' HIDProfileVersion : ', 'cyan'), hex(attribute.value.value)) + HID_Profile_Version = attribute.value.value + + elif (attribute.id == SDP_HID_SUPERVISION_TIMEOUT_ATTRIBUTE_ID): + print(color(' HIDSupervisionTimeout: ', 'cyan'), hex(attribute.value.value)) + HID_Supervision_Timeout = attribute.value.value + + elif (attribute.id == SDP_HID_NORMALLY_CONNECTABLE_ATTRIBUTE_ID): + print(color(' HIDNormallyConnectable: ', 'cyan'), attribute.value.value) + HID_Normally_Connectable = attribute.value.value + + elif (attribute.id == SDP_HID_BOOT_DEVICE_ATTRIBUTE_ID): + print(color(' HIDBootDevice: ', 'cyan'), attribute.value.value) + HID_Boot_Device = attribute.value.value + + elif (attribute.id == SDP_HID_SSR_HOST_MAX_LATENCY_ATTRIBUTE_ID): + print(color(' HIDSSRHostMaxLatency: ', 'cyan'), hex(attribute.value.value)) + HID_SSR_Host_Max_Latency = attribute.value.value + + elif (attribute.id == SDP_HID_SSR_HOST_MIN_TIMEOUT_ATTRIBUTE_ID): + print(color(' HIDSSRHostMinTimeout: ', 'cyan'), hex(attribute.value.value)) + HID_SSR_Host_Min_Timeout = attribute.value.value + + else: + print(color(f' Warning: Attribute ID: {attribute.id} match not found.\n Attribute Info: {attribute}', 'yellow')) + + await sdp_client.disconnect() + +# ----------------------------------------------------------------------------- +async def get_stream_reader(pipe) -> asyncio.StreamReader: + loop = asyncio.get_event_loop() + reader = asyncio.StreamReader(loop=loop) + protocol = asyncio.StreamReaderProtocol(reader) + await loop.connect_read_pipe(lambda: protocol, pipe) + return reader + +# ----------------------------------------------------------------------------- +async def main(): + if len(sys.argv) < 4: + print( + 'Usage: run_hid_host.py ' + ' [test-mode]' + ) + + print('example: run_hid_host.py classic1.json usb:0 E1:CA:72:48:C4:E8/P') + return + + def on_hid_data_cb(pdu): + report_type = pdu[0] & 0x0f + if (len(pdu) == 1): + print(color(f'Warning: No report received','yellow')) + return + report_length = len(pdu[1:]) + report_id = pdu[1] + if (report_type != HID_OTHER_REPORT): + print(color(f' Report type = {report_type}, Report length = {report_length}, Report id = {report_id}', 'blue', None, 'bold')) + + if ((report_length <= 1) or (report_id == 0)): + return + + if report_type == HID_INPUT_REPORT: + ReportParser.parse_input_report(pdu[1:]) #type: ignore + + async def handle_virtual_cable_unplug(): + await hid_host.interrupt_channel_disconnect() + await hid_host.control_channel_disconnect() + await device.keystore.delete(target_address) #type: ignore + await connection.disconnect() + + def on_hid_virtual_cable_unplug_cb(): + asyncio.create_task(handle_virtual_cable_unplug()) + + print('<<< connecting to HCI...') + async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): + print('<<< CONNECTED') + + # Create a device + device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) + device.classic_enabled = True + await device.power_on() + + # Connect to a peer + target_address = sys.argv[3] + print(f'=== Connecting to {target_address}...') + connection = await device.connect(target_address, transport=BT_BR_EDR_TRANSPORT) + print(f'=== Connected to {connection.peer_address}!') + + # Request authentication + print('*** Authenticating...') + await connection.authenticate() + print('*** Authenticated...') + + # Enable encryption + print('*** Enabling encryption...') + await connection.encrypt() + print('*** Encryption on') + + await get_hid_device_sdp_record(device, connection) + + # Create HID host and start it + print('@@@ Starting HID Host...') + hid_host = HIDHost(device, connection) + + # Register for HID data call back + hid_host.register_data_cb(on_hid_data_cb) + + # Register for virtual cable unplug call back + hid_host.register_virtual_cable_unplug(on_hid_virtual_cable_unplug_cb) + + async def menu(): + reader = await get_stream_reader(sys.stdin) + while True: + print("\n************************ HID Host Menu *****************************\n") + print(" 1. Connect Control Channel") + print(" 2. Connect Interrupt Channel") + print(" 3. Disconnect Control Channel") + print(" 4. Disconnect Interrupt Channel") + print(" 5. Get Report") + print(" 6. Set Report") + print(" 7. Set Protocol Mode") + print(" 8. Get Protocol Mode") + print(" 9. Send Report") + print("10. Suspend") + print("11. Exit Suspend") + print("12. Virtual Cable Unplug") + print("13. Disconnect device") + print("14. Delete Bonding") + print("15. Re-connect to device") + print("\nEnter your choice : \n") + + choice = await reader.readline() + choice = choice.decode('utf-8').strip() + + if (choice == '1'): + await hid_host.control_channel_connect() + + elif (choice == '2'): + await hid_host.interrupt_channel_connect() + + elif (choice == '3'): + await hid_host.control_channel_disconnect() + + elif (choice == '4'): + await hid_host.interrupt_channel_disconnect() + + elif (choice == '5'): + print(" 1. Report ID 0x02") + print(" 2. Report ID 0x03") + print(" 3. Report ID 0x05") + choice1 = await reader.readline() + choice1 = choice1.decode('utf-8').strip() + + if (choice1 == '1'): + hid_host.get_report(1, 2, 3) + + elif (choice1 == '2'): + hid_host.get_report(2, 3, 2) + + elif (choice1 == '3'): + hid_host.get_report(3, 5, 3) + + else: + print('Incorrect option selected') + + elif (choice == '6'): + print(" 1. Report type 1 and Report id 0x01") + print(" 2. Report type 2 and Report id 0x03") + print(" 3. Report type 3 and Report id 0x05") + choice1 = await reader.readline() + choice1 = choice1.decode('utf-8').strip() + + if (choice1 == '1'): + # data includes first octet as report id + data = bytearray([0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01]) + hid_host.set_report(1, data) + + elif (choice1 == '2'): + data = bytearray([0x03, 0x01, 0x01]) + hid_host.set_report(2, data) + + elif (choice1 == '3'): + data = bytearray([0x05, 0x01, 0x01, 0x01]) + hid_host.set_report(3, data) + + else: + print('Incorrect option selected') + + elif (choice == '7'): + print(" 0. Boot") + print(" 1. Report") + choice1 = await reader.readline() + choice1 = choice1.decode('utf-8').strip() + + if (choice1 == '0'): + hid_host.set_protocol(HID_BOOT_PROTOCOL_MODE) + + elif (choice1 == '1'): + hid_host.set_protocol(HID_REPORT_PROTOCOL_MODE) + + else: + print('Incorrect option selected') + + elif (choice == '8'): + hid_host.get_protocol() + + elif (choice == '9'): + print(" 1. Report ID 0x01") + print(" 2. Report ID 0x03") + choice1 = await reader.readline() + choice1 = choice1.decode('utf-8').strip() + + if (choice1 == '1'): + data = bytearray([0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]) + hid_host.send_data(data) + + elif (choice1 == '2'): + data = bytearray([0x03, 0x00, 0x0d, 0xfd, 0x00, 0x00]) + hid_host.send_data(data) + + else: + print('Incorrect option selected') + + elif (choice == '10'): + hid_host.suspend() + + elif (choice == '11'): + hid_host.exit_suspend() + + elif (choice == '12'): + hid_host.virtual_cable_unplug() + await device.keystore.delete(target_address) + + elif (choice == '13'): + peer_address = Address.from_string_for_transport(target_address, transport=BT_BR_EDR_TRANSPORT) + connection = device.find_connection_by_bd_addr(peer_address, transport=BT_BR_EDR_TRANSPORT) + if connection is not None: + await connection.disconnect() + else: + print("Already disconnected from device") + + elif (choice == '14'): + try: + await device.keystore.delete(target_address) + print("Unpair successful") + except KeyError: + print('Device not found or Device already unpaired.') + + elif (choice == '15'): + connection = await device.connect(target_address, transport=BT_BR_EDR_TRANSPORT) + await connection.authenticate() + await connection.encrypt() + + else: + print("Invalid option selected.") + + if ((len(sys.argv) > 4) and (sys.argv[4] == 'test-mode')): + # Enabling menu for testing + await menu() + else: + # HID Connection + # Control channel + await hid_host.control_channel_connect() + # Interrupt Channel + await hid_host.interrupt_channel_connect() + + await hci_source.wait_for_termination() + + +# ----------------------------------------------------------------------------- +logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper()) +asyncio.run(main()) \ No newline at end of file