Submitting the initial version of HID Profile files

Includes:
1. HID Host implementation - hid.py
2. HID application to test Host with 3rd party HID Device application - run_hid_host.py
3. HID supporting files for testing - hid_report_parser.py & hid_key_map.py

Commands to run the application:
Default application:
python run_hid_host.py classic1.json usb:0 <device bd-addr>

Menu options for testing (Get/Set):
python run_hid_host.py classic1.json usb:0 <device bd-addr> test-mode

CuttleFish:tcp-client:127.0.0.1:7300

Application used for testing as Device : Bluetooth Keyboard & Mouse-5.3.0.apk

Note: Change in sdp.py file while testing hid profile,
TEXT_STRING: lambda x: DataElement(DataElement.TEXT_STRING, x.decode('utf8')) changed to
TEXT_STRING: lambda x: DataElement(DataElement.TEXT_STRING, x)
as we were facing error "UnicodeDecodeError: 'utf-8' codec can't decode byte 0xa1 in position 4: invalid start byte" while fetching sdp records.
This commit is contained in:
SneKarnataki
2023-09-15 11:31:15 +00:00
committed by Lucas Abel
parent 36fc966ad6
commit e02303a448
6 changed files with 1094 additions and 19 deletions

View File

@@ -1 +1,246 @@
#Implement hid profile here - TBD
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import logging
import asyncio
from pyee import EventEmitter
from typing import Optional, Tuple, Callable, Dict, Union
from . import core, l2cap # type: ignore
from .colors import color # type: ignore
from .core import BT_BR_EDR_TRANSPORT, InvalidStateError, ProtocolError # type: ignore
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
# Constants
# -----------------------------------------------------------------------------
# fmt: off
HID_CTRL_PSM = 0x0011
HID_INTR_PSM = 0x0013
# HIDP message types
HID_HANDSHAKE = 0x00
HID_CONTROL = 0x01
HID_GET_REPORT = 0x04
HID_SET_REPORT = 0x05
HID_GET_PROTOCOL = 0x06
HID_SET_PROTOCOL = 0x07
HID_DATA = 0x0A
# Report types
HID_OTHER_REPORT = 0x00
HID_INPUT_REPORT = 0x01
HID_OUTPUT_REPORT = 0x02
HID_FEATURE_REPORT = 0x03
# Handshake parameters
HANDSHAKE_SUCCESSFUL = 0x00
HANDSHAKE_NOT_READY = 0x01
HANDSHAKE_ERR_INVALID_REPORT_ID = 0x02
HANDSHAKE_ERR_UNSUPPORTED_REQUEST = 0x03
HANDSHAKE_ERR_UNKNOWN = 0x0E
HANDSHAKE_ERR_FATAL = 0x0F
# Protocol modes
HID_BOOT_PROTOCOL_MODE = 0x00
HID_REPORT_PROTOCOL_MODE = 0x01
# Control Operations
HID_SUSPEND = 0x03
HID_EXIT_SUSPEND = 0x04
HID_VIRTUAL_CABLE_UNPLUG = 0x05
# -----------------------------------------------------------------------------
class HIDHost(EventEmitter):
l2cap_channel: Optional[l2cap.Channel]
def __init__(self, device, connection) -> None:
super().__init__()
self.device = device
self.connection = connection
self.l2cap_ctrl_channel= None
self.l2cap_intr_channel = None
# Register ourselves with the L2CAP channel manager
device.register_l2cap_server(HID_CTRL_PSM, self.on_connection)
device.register_l2cap_server(HID_INTR_PSM, self.on_connection)
async def control_channel_connect(self) -> None:
# Create a new L2CAP connection - control channel
try:
self.l2cap_ctrl_channel = await self.device.l2cap_channel_manager.connect(
self.connection, HID_CTRL_PSM
)
except ProtocolError as error:
logger.error(f'L2CAP connection failed: {error}')
raise
assert self.l2cap_ctrl_channel is not None
# Become a sink for the L2CAP channel
self.l2cap_ctrl_channel.sink = self.on_ctrl_pdu
async def interrupt_channel_connect(self) -> None:
# Create a new L2CAP connection - interrupt channel
try:
self.l2cap_intr_channel = await self.device.l2cap_channel_manager.connect(
self.connection, HID_INTR_PSM
)
except ProtocolError as error:
logger.error(f'L2CAP connection failed: {error}')
raise
assert self.l2cap_intr_channel is not None
# Become a sink for the L2CAP channel
self.l2cap_intr_channel.sink = self.on_intr_pdu
async def interrupt_channel_disconnect(self):
await self.l2cap_intr_channel.disconnect() # type: ignore
async def control_channel_disconnect(self):
await self.l2cap_ctrl_channel.disconnect() # type: ignore
def on_connection(self, l2cap_channel: l2cap.Channel) -> None:
logger.debug(f'+++ New L2CAP connection: {l2cap_channel}')
l2cap_channel.on('open', lambda: self.on_l2cap_channel_open(l2cap_channel))
def on_l2cap_channel_open(self, l2cap_channel: l2cap.Channel) -> None:
if l2cap_channel.psm == HID_CTRL_PSM:
self.l2cap_ctrl_channel = l2cap_channel
self.l2cap_ctrl_channel.sink = self.on_ctrl_pdu
else:
self.l2cap_intr_channel = l2cap_channel
self.l2cap_intr_channel.sink = self.on_intr_pdu
logger.debug(f'$$$ L2CAP channel open: {l2cap_channel}')
def on_ctrl_pdu(self, pdu: bytes) -> None:
logger.debug(f'<<< HID CONTROL PDU: {pdu.hex()}')
# Here we will receive all kinds of packets, parse and then call respective callbacks
message_type = pdu[0] >> 4
param = pdu[0] & 0x0f
if (message_type == HID_HANDSHAKE):
logger.debug('<<< HID HANDSHAKE')
self.handle_handshake(param)
self.emit('handshake', )
elif (message_type == HID_DATA):
logger.debug('<<< HID CONTROL DATA')
self.emit('data', pdu)
elif (message_type == HID_CONTROL):
if (param == HID_SUSPEND):
logger.debug('<<< HID SUSPEND')
self.emit('suspend', pdu)
elif (param == HID_EXIT_SUSPEND):
logger.debug('<<< HID EXIT SUSPEND')
self.emit('exit_suspend', pdu)
elif (param == HID_VIRTUAL_CABLE_UNPLUG):
logger.debug('<<< HID VIRTUAL CABLE UNPLUG')
self.emit('virtual_cable_unplug')
else:
logger.debug('<<< HID CONTROL OPERATION UNSUPPORTED')
else:
logger.debug('<<< HID CONTROL DATA')
self.emit('data', pdu)
def on_intr_pdu(self, pdu: bytes) -> None:
logger.debug(f'<<< HID INTERRUPT PDU: {pdu.hex()}')
self.emit("data", pdu)
def register_data_cb(self, data_cb):
self.on('data', data_cb)
def register_handshake_cb(self, handshake_cb):
self.on('handshake', handshake_cb)
def register_virtual_cable_unplug(self, virtual_cable_unplug_cb):
self.on('virtual_cable_unplug', virtual_cable_unplug_cb)
def get_report(self, report_type, report_id, buffer_size):
if(report_type == HID_OTHER_REPORT):
param = report_type
else:
param = 0x08 | report_type
header = ((HID_GET_REPORT << 4) | param)
msg = bytes([header, report_id, (buffer_size & 0xff), ((buffer_size >> 8) & 0xff)])
logger.debug(f'>>> HID CONTROL GET REPORT, PDU: {msg.hex()}')
self.l2cap_ctrl_channel.send_pdu(msg) # type: ignore
def set_report(self, report_type, data):
header = ((HID_SET_REPORT << 4) | report_type)
msg = bytearray([header])
msg.extend(data)
logger.debug(f'>>> HID CONTROL SET REPORT, PDU:{msg.hex()}')
self.l2cap_ctrl_channel.send_pdu(msg) # type: ignore
def get_protocol(self):
header = (HID_GET_PROTOCOL << 4)
msg = bytearray([header])
logger.debug(f'>>> HID CONTROL GET PROTOCOL, PDU: {msg.hex()}')
self.l2cap_ctrl_channel.send_pdu(msg) # type: ignore
def set_protocol(self, protocol_mode):
header = (HID_SET_PROTOCOL << 4 | protocol_mode)
msg = bytearray([header])
logger.debug(f'>>> HID CONTROL SET PROTOCOL, PDU: {msg.hex()}')
self.l2cap_ctrl_channel.send_pdu(msg) # type: ignore
def send_data(self, data):
header = ((HID_DATA << 4) | HID_OUTPUT_REPORT)
msg = bytearray([header])
msg.extend(data)
logger.debug(f'>>> HID INTERRUPT SEND DATA, PDU: {msg.hex()}')
self.l2cap_intr_channel.send_pdu(msg) # type: ignore
def suspend(self):
header = (HID_CONTROL << 4 | HID_SUSPEND)
msg = bytearray([header])
logger.debug(f'>>> HID CONTROL SUSPEND, PDU:{msg.hex()}')
self.l2cap_ctrl_channel.send_pdu(msg) # type: ignore
def exit_suspend(self):
header = (HID_CONTROL << 4 | HID_EXIT_SUSPEND)
msg = bytearray([header])
logger.debug(f'>>> HID CONTROL EXIT SUSPEND, PDU:{msg.hex()}')
self.l2cap_ctrl_channel.send_pdu(msg) # type: ignore
def virtual_cable_unplug(self):
header = (HID_CONTROL << 4 | HID_VIRTUAL_CABLE_UNPLUG)
msg = bytearray([header])
logger.debug(f'>>> HID CONTROL VIRTUAL CABLE UNPLUG, PDU: {msg.hex()}')
self.l2cap_ctrl_channel.send_pdu(msg) # type: ignore
def handle_handshake(self, param):
if (param == HANDSHAKE_SUCCESSFUL):
logger.debug(f'<<< HID HANDSHAKE: SUCCESSFUL')
elif (param == HANDSHAKE_NOT_READY):
logger.warning(f'<<< HID HANDSHAKE: NOT_READY')
elif (param == HANDSHAKE_ERR_INVALID_REPORT_ID):
logger.warning(f'<<< HID HANDSHAKE: ERR_INVALID_REPORT_ID')
elif (param == HANDSHAKE_ERR_UNSUPPORTED_REQUEST):
logger.warning(f'<<< HID HANDSHAKE: ERR_UNSUPPORTED_REQUEST')
elif (param == HANDSHAKE_ERR_UNKNOWN):
logger.warning(f'<<< HID HANDSHAKE: ERR_UNKNOWN')
elif (param == HANDSHAKE_ERR_FATAL):
logger.warning(f'<<< HID HANDSHAKE: ERR_FATAL')
else: # 0x5-0xD = Reserved
logger.warning("<<< HID HANDSHAKE: RESERVED VALUE")

View File

@@ -1,17 +0,0 @@
"""TODO(skarnataki): DO NOT SUBMIT without one-line documentation for hid_menu.
TODO(skarnataki): DO NOT SUBMIT without a detailed description of hid_menu.
"""
from collections.abc import Sequence
from absl import app
def main(argv: Sequence[str]) -> None:
if len(argv) > 1:
raise app.UsageError("Too many command-line arguments.")
if __name__ == "__main__":
app.run(main)

248
examples/hid_key_map.py Normal file
View File

@@ -0,0 +1,248 @@
# shift map
# letters
shift_map = {
'a' : 'A',
'b' : 'B',
'c' : 'C',
'd' : 'D',
'e' : 'E',
'f' : 'F',
'g' : 'G',
'h' : 'H',
'i' : 'I',
'j' : 'J',
'k' : 'K',
'l' : 'L',
'm' : 'M',
'n' : 'N',
'o' : 'O',
'p' : 'P',
'q' : 'Q',
'r' : 'R',
's' : 'S',
't' : 'T',
'u' : 'U',
'v' : 'V',
'w' : 'W',
'x' : 'X',
'y' : 'Y',
'z' : 'Z',
# numbers
'1' : '!',
'2' : '@',
'3' : '#',
'4' : '$',
'5' : '%',
'6' : '^',
'7' : '&',
'8' : '*',
'9' : '(',
'0' : ')',
# symbols
'-' : '_',
'=' : '+',
'[' : '{',
']' : '}',
'\\' : '|',
';' : ':',
'\'' : '"',
',' : '<',
'.' : '>',
'/' : '?',
'`' : '~'
}
# hex map
# modifier keys
mod_keys = {
'00' : '',
'01' : 'left_ctrl',
'02' : 'left_shift',
'04' : 'left_alt',
'08' : 'left_meta',
'10' : 'right_ctrl',
'20' : 'right_shift',
'40' : 'right_alt',
'80' : 'right_meta'
}
# base keys
base_keys = {
# meta
'00' : '', # none
'01' : 'error_ovf',
# letters
'04' : 'a',
'05' : 'b',
'06' : 'c',
'07' : 'd',
'08' : 'e',
'09' : 'f',
'0a' : 'g',
'0b' : 'h',
'0c' : 'i',
'0d' : 'j',
'0e' : 'k',
'0f' : 'l',
'10' : 'm',
'11' : 'n',
'12' : 'o',
'13' : 'p',
'14' : 'q',
'15' : 'r',
'16' : 's',
'17' : 't',
'18' : 'u',
'19' : 'v',
'1a' : 'w',
'1b' : 'x',
'1c' : 'y',
'1d' : 'z',
# numbers
'1e' : '1',
'1f' : '2',
'20' : '3',
'21' : '4',
'22' : '5',
'23' : '6',
'24' : '7',
'25' : '8',
'26' : '9',
'27' : '0',
# misc
'28' : 'enter', #enter \n
'29' : 'esc',
'2a' : 'backspace',
'2b' : 'tab',
'2c' : 'spacebar', #space
'2d' : '-',
'2e' : '=',
'2f' : '[',
'30' : ']',
'31' : '\\',
'32' : '=',
'33' : '_SEMICOLON',
'34' : 'KEY_APOSTROPHE',
'35' : 'KEY_GRAVE',
'36' : 'KEY_COMMA',
'37' : 'KEY_DOT',
'38' : 'KEY_SLASH',
'39' : 'KEY_CAPSLOCK',
'3a' : 'KEY_F1',
'3b' : 'KEY_F2',
'3c' : 'KEY_F3',
'3d' : 'KEY_F4',
'3e' : 'KEY_F5',
'3f' : 'KEY_F6',
'40' : 'KEY_F7',
'41' : 'KEY_F8',
'42' : 'KEY_F9',
'43' : 'KEY_F10',
'44' : 'KEY_F11',
'45' : 'KEY_F12',
'46' : 'KEY_SYSRQ',
'47' : 'KEY_SCROLLLOCK',
'48' : 'KEY_PAUSE',
'49' : 'KEY_INSERT',
'4a' : 'KEY_HOME',
'4b' : 'KEY_PAGEUP',
'4c' : 'KEY_DELETE',
'4d' : 'KEY_END',
'4e' : 'KEY_PAGEDOWN',
'4f' : 'KEY_RIGHT',
'50' : 'KEY_LEFT',
'51' : 'KEY_DOWN',
'52' : 'KEY_UP',
'53' : 'KEY_NUMLOCK',
'54' : 'KEY_KPSLASH',
'55' : 'KEY_KPASTERISK',
'56' : 'KEY_KPMINUS',
'57' : 'KEY_KPPLUS',
'58' : 'KEY_KPENTER',
'59' : 'KEY_KP1',
'5a' : 'KEY_KP2',
'5b' : 'KEY_KP3',
'5c' : 'KEY_KP4',
'5d' : 'KEY_KP5',
'5e' : 'KEY_KP6',
'5f' : 'KEY_KP7',
'60' : 'KEY_KP8',
'61' : 'KEY_KP9',
'62' : 'KEY_KP0',
'63' : 'KEY_KPDOT',
'64' : 'KEY_102ND',
'65' : 'KEY_COMPOSE',
'66' : 'KEY_POWER',
'67' : 'KEY_KPEQUAL',
'68' : 'KEY_F13',
'69' : 'KEY_F14',
'6a' : 'KEY_F15',
'6b' : 'KEY_F16',
'6c' : 'KEY_F17',
'6d' : 'KEY_F18',
'6e' : 'KEY_F19',
'6f' : 'KEY_F20',
'70' : 'KEY_F21',
'71' : 'KEY_F22',
'72' : 'KEY_F23',
'73' : 'KEY_F24',
'74' : 'KEY_OPEN',
'75' : 'KEY_HELP',
'76' : 'KEY_PROPS',
'77' : 'KEY_FRONT',
'78' : 'KEY_STOP',
'79' : 'KEY_AGAIN',
'7a' : 'KEY_UNDO',
'7b' : 'KEY_CUT',
'7c' : 'KEY_COPY',
'7d' : 'KEY_PASTE',
'7e' : 'KEY_FIND',
'7f' : 'KEY_MUTE',
'80' : 'KEY_VOLUMEUP',
'81' : 'KEY_VOLUMEDOWN',
'85' : 'KEY_KPCOMMA',
'87' : 'KEY_RO',
'88' : 'KEY_KATAKANAHIRAGANA',
'89' : 'KEY_YEN',
'8a' : 'KEY_HENKAN',
'8b' : 'KEY_MUHENKAN',
'8c' : 'KEY_KPJPCOMMA',
'90' : 'KEY_HANGEUL',
'91' : 'KEY_HANJA',
'92' : 'KEY_KATAKANA',
'93' : 'KEY_HIRAGANA',
'94' : 'KEY_ZENKAKUHANKAKU',
'b6' : 'KEY_KPLEFTPAREN',
'b7' : 'KEY_KPRIGHTPAREN',
'e0' : 'KEY_LEFTCTRL',
'e1' : 'KEY_LEFTSHIFT',
'e2' : 'KEY_LEFTALT',
'e3' : 'KEY_LEFTMETA',
'e4' : 'KEY_RIGHTCTRL',
'e5' : 'KEY_RIGHTSHIFT',
'e6' : 'KEY_RIGHTALT',
'e7' : 'KEY_RIGHTMETA',
'e8' : 'KEY_MEDIA_PLAYPAUSE',
'e9' : 'KEY_MEDIA_STOPCD',
'ea' : 'KEY_MEDIA_PREVIOUSSONG',
'eb' : 'KEY_MEDIA_NEXTSONG',
'ec' : 'KEY_MEDIA_EJECTCD',
'ed' : 'KEY_MEDIA_VOLUMEUP',
'ee' : 'KEY_MEDIA_VOLUMEDOWN',
'ef' : 'KEY_MEDIA_MUTE',
'f0' : 'KEY_MEDIA_WWW',
'f1' : 'KEY_MEDIA_BACK',
'f2' : 'KEY_MEDIA_FORWARD',
'f3' : 'KEY_MEDIA_STOP',
'f4' : 'KEY_MEDIA_FIND',
'f5' : 'KEY_MEDIA_SCROLLUP',
'f6' : 'KEY_MEDIA_SCROLLDOWN',
'f7' : 'KEY_MEDIA_EDIT',
'f8' : 'KEY_MEDIA_SLEEP',
'f9' : 'KEY_MEDIA_COFFEE',
'fa' : 'KEY_MEDIA_REFRESH',
'fb' : 'KEY_MEDIA_CALC'
}

View File

@@ -0,0 +1,139 @@
from bumble.colors import color
from hid_key_map import base_keys, mod_keys, shift_map
# ------------------------------------------------------------------------------
def get_key(modifier: str, key: str) -> str:
if modifier == '22':
modifier = '02'
if modifier in mod_keys:
modifier = mod_keys[modifier]
else:
return ''
if key in base_keys:
key = base_keys[key]
else:
return ''
if (modifier == 'left_shift' or modifier == 'right_shift') and key in shift_map:
key = shift_map[key]
return key
class Keyboard:
def __init__(self): # type: ignore
self.report = [
[ # Bit array for Modifier keys
0, # Right GUI - (usually the Windows key)
0, # Right ALT
0, # Right Shift
0, # Right Control
0, # Left GUI - (usually the Windows key)
0, # Left ALT
0, # Left Shift
0, # Left Control
],
0x00, # Vendor reserved
'', # Rest is space for 6 keys
'',
'',
'',
'',
'',
]
def decode_keyboard_report(self, input_report: bytes, report_length: int) -> None:
if report_length >= 8:
modifier = input_report[1]
self.report[0] = [int(x) for x in '{0:08b}'.format(modifier)]
self.report[0].reverse() # type: ignore
modifier_key = str((modifier & 0x22).to_bytes(1, "big").hex())
keycodes = []
for k in range(3, report_length):
keycodes.append(str(input_report[k].to_bytes(1, "big").hex()))
self.report[k - 1] = get_key(modifier_key, keycodes[k - 3])
else:
print(color('Warning: Not able to parse report', 'yellow'))
def print_keyboard_report(self) -> None:
print(color('\tKeyboard Input Received', 'green', None, 'bold'))
print(color(f'Keys:', 'white', None, 'bold'))
for i in range(1, 7):
print(color(f' Key{i}{" ":>8s}= ', 'cyan', None, 'bold'), self.report[i + 1])
print(color(f'\nModifier Keys:', 'white', None, 'bold'))
print(
color(f' Left Ctrl : ', 'cyan'), f'{self.report[0][0] == 1!s:<5}', # type: ignore
color(f' Left Shift : ', 'cyan'), f'{self.report[0][1] == 1!s:<5}', # type: ignore
color(f' Left ALT : ', 'cyan'), f'{self.report[0][2] == 1!s:<5}', # type: ignore
color(f' Left GUI : ', 'cyan'), f'{self.report[0][3] == 1!s:<5}\n', # type: ignore
color(f' Right Ctrl : ', 'cyan'), f'{self.report[0][4] == 1!s:<5}', # type: ignore
color(f' Right Shift : ', 'cyan'), f'{self.report[0][5] == 1!s:<5}', # type: ignore
color(f' Right ALT : ', 'cyan'), f'{self.report[0][6] == 1!s:<5}', # type: ignore
color(f' Right GUI : ', 'cyan'), f'{self.report[0][7] == 1!s:<5}', # type: ignore
)
# ------------------------------------------------------------------------------
class Mouse:
def __init__(self): # type: ignore
self.report = [
[ # Bit array for Buttons
0, # Button 1 (primary/trigger
0, # Button 2 (secondary)
0, # Button 3 (tertiary)
0, # Button 4
0, # Button 5
0, # unused padding bits
0, # unused padding bits
0, # unused padding bits
],
0, # X
0, # Y
0, # Wheel
0, # AC Pan
]
def decode_mouse_report(self, input_report: bytes, report_length: int) -> None:
self.report[0] = [int(x) for x in '{0:08b}'.format(input_report[1])]
self.report[0].reverse() # type: ignore
self.report[1] = input_report[2]
self.report[2] = input_report[3]
if report_length in [5, 6]:
self.report[3] = input_report[4]
self.report[4] = input_report[5] if report_length == 6 else 0
def print_mouse_report(self) -> None:
print(color('\tMouse Input Received', 'green', None, 'bold'))
print(
color(f' Button 1 (primary/trigger) = ', 'cyan'), self.report[0][0] == 1, # type: ignore
color(f'\n Button 2 (secondary) = ', 'cyan'), self.report[0][1] == 1, # type: ignore
color(f'\n Button 3 (tertiary) = ', 'cyan'), self.report[0][2] == 1, # type: ignore
color(f'\n Button4 = ', 'cyan'), self.report[0][3] == 1, # type: ignore
color(f'\n Button5 = ', 'cyan'), self.report[0][4] == 1, # type: ignore
color(f'\n X (X-axis displacement) = ', 'cyan'), self.report[1],
color(f'\n Y (Y-axis displacement) = ', 'cyan'), self.report[2],
color(f'\n Wheel = ', 'cyan'), self.report[3],
color(f'\n AC PAN = ', 'cyan'), self.report[4],
)
# ------------------------------------------------------------------------------
class ReportParser:
def parse_input_report(input_report: bytes) -> None: # type: ignore
report_id = input_report[0]
report_length = len(input_report)
# Keyboard input report (report id = 1)
if report_id == 1 and report_length >= 8:
keyboard = Keyboard() # type: ignore
keyboard.decode_keyboard_report(input_report, report_length)
keyboard.print_keyboard_report()
# Mouse input report (report id = 2)
elif report_id == 2 and report_length in [4, 5, 6]:
mouse = Mouse() # type: ignore
mouse.decode_mouse_report(input_report, report_length)
mouse.print_mouse_report()
else:
print(color(f'Warning: Parse Error Report ID {report_id}', 'yellow'))

View File

@@ -1 +0,0 @@
test

461
examples/run_hid_host.py Normal file
View File

@@ -0,0 +1,461 @@
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import sys
import os
import logging
from bumble.colors import color
import bumble.core
from bumble.device import Device
from bumble.transport import open_transport_or_link
from bumble.core import (
BT_L2CAP_PROTOCOL_ID,
BT_HIDP_PROTOCOL_ID,
BT_HUMAN_INTERFACE_DEVICE_SERVICE,
BT_BR_EDR_TRANSPORT,
)
from bumble.hci import Address
from bumble.hid import HIDHost, HID_INPUT_REPORT, HID_OTHER_REPORT, HID_BOOT_PROTOCOL_MODE, HID_REPORT_PROTOCOL_MODE
from bumble.sdp import (
Client as SDP_Client,
DataElement,
ServiceAttribute,
SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
SDP_ALL_ATTRIBUTES_RANGE,
SDP_LANGUAGE_BASE_ATTRIBUTE_ID_LIST_ATTRIBUTE_ID,
SDP_ADDITIONAL_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID,
SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID,
)
from hid_report_parser import ReportParser
# -----------------------------------------------------------------------------
# SDP attributes for Bluetooth HID devices
SDP_HID_SERVICE_NAME_ATTRIBUTE_ID = 0x0100
SDP_HID_SERVICE_DESCRIPTION_ATTRIBUTE_ID = 0x0101
SDP_HID_PROVIDER_NAME_ATTRIBUTE_ID = 0x0102
SDP_HID_DEVICE_RELEASE_NUMBER_ATTRIBUTE_ID = 0x0200 # [DEPRECATED]
SDP_HID_PARSER_VERSION_ATTRIBUTE_ID = 0x0201
SDP_HID_DEVICE_SUBCLASS_ATTRIBUTE_ID = 0x0202
SDP_HID_COUNTRY_CODE_ATTRIBUTE_ID = 0x0203
SDP_HID_VIRTUAL_CABLE_ATTRIBUTE_ID = 0x0204
SDP_HID_RECONNECT_INITIATE_ATTRIBUTE_ID = 0x0205
SDP_HID_DESCRIPTOR_LIST_ATTRIBUTE_ID = 0x0206
SDP_HID_LANGID_BASE_LIST_ATTRIBUTE_ID = 0x0207
SDP_HID_SDP_DISABLE_ATTRIBUTE_ID = 0x0208 # [DEPRECATED]
SDP_HID_BATTERY_POWER_ATTRIBUTE_ID = 0x0209
SDP_HID_REMOTE_WAKE_ATTRIBUTE_ID = 0x020A
SDP_HID_PROFILE_VERSION_ATTRIBUTE_ID = 0x020B # DEPRECATED]
SDP_HID_SUPERVISION_TIMEOUT_ATTRIBUTE_ID = 0x020C
SDP_HID_NORMALLY_CONNECTABLE_ATTRIBUTE_ID = 0x020D
SDP_HID_BOOT_DEVICE_ATTRIBUTE_ID = 0x020E
SDP_HID_SSR_HOST_MAX_LATENCY_ATTRIBUTE_ID = 0x020F
SDP_HID_SSR_HOST_MIN_TIMEOUT_ATTRIBUTE_ID = 0x0210
# -----------------------------------------------------------------------------
async def get_hid_device_sdp_record(device, connection):
# Connect to the SDP Server
sdp_client = SDP_Client(device)
await sdp_client.connect(connection)
if sdp_client:
print(color('Connected ith SDP Server', 'blue'))
else:
print(color('Failed to connect with SDP Server', 'red'))
# List BT HID Device service in the root browse group
service_record_handles = await sdp_client.search_services(
[BT_HUMAN_INTERFACE_DEVICE_SERVICE]
)
if (len(service_record_handles) < 1):
print(color('BT HID Device service not found on peer device!!!!','red'))
await sdp_client.disconnect()
return
# For BT_HUMAN_INTERFACE_DEVICE_SERVICE service, get all its attributes
for service_record_handle in service_record_handles:
attributes = await sdp_client.get_attributes(
service_record_handle, [SDP_ALL_ATTRIBUTES_RANGE]
)
print(
color(f'SERVICE {service_record_handle:04X} attributes:', 'yellow')
)
print(color(f'SDP attributes for HID device','magenta'))
for attribute in attributes:
if (attribute.id == SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID):
print(color(' Service Record Handle : ', 'cyan'), hex(attribute.value.value))
elif (attribute.id == SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID):
print(color(' Service Class : ', 'cyan'), attribute.value.value[0].value)
elif (attribute.id == SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID):
print(color(' SDP Browse Group List : ', 'cyan'), attribute.value.value[0].value)
elif (attribute.id == SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID):
print(color(' BT_L2CAP_PROTOCOL_ID : ', 'cyan'), attribute.value.value[0].value[0].value)
print(color(' PSM for Bluetooth HID Control channel : ', 'cyan'), hex(attribute.value.value[0].value[1].value))
print(color(' BT_HIDP_PROTOCOL_ID : ', 'cyan'), attribute.value.value[1].value[0].value)
elif (attribute.id == SDP_LANGUAGE_BASE_ATTRIBUTE_ID_LIST_ATTRIBUTE_ID):
print(color(' Lanugage : ', 'cyan'), hex(attribute.value.value[0].value))
print(color(' Encoding : ', 'cyan'), hex(attribute.value.value[1].value))
print(color(' PrimaryLanguageBaseID : ', 'cyan'), hex(attribute.value.value[2].value))
elif (attribute.id == SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID):
print(color(' BT_HUMAN_INTERFACE_DEVICE_SERVICE ', 'cyan'), attribute.value.value[0].value[0].value)
print(color(' HID Profileversion number : ', 'cyan'), hex(attribute.value.value[0].value[1].value))
elif (attribute.id == SDP_ADDITIONAL_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID):
print(color(' BT_L2CAP_PROTOCOL_ID : ', 'cyan'), attribute.value.value[0].value[0].value[0].value)
print(color(' PSM for Bluetooth HID Interrupt channel : ', 'cyan'), hex(attribute.value.value[0].value[0].value[1].value))
print(color(' BT_HIDP_PROTOCOL_ID : ', 'cyan'), attribute.value.value[0].value[1].value[0].value)
elif (attribute.id == SDP_HID_SERVICE_NAME_ATTRIBUTE_ID):
print(color(' Service Name: ', 'cyan'), attribute.value.value)
HID_Service_Name = attribute.value.value
elif (attribute.id == SDP_HID_SERVICE_DESCRIPTION_ATTRIBUTE_ID):
print(color(' Service Description: ', 'cyan'), attribute.value.value)
HID_Service_Description = attribute.value.value
elif (attribute.id == SDP_HID_PROVIDER_NAME_ATTRIBUTE_ID):
print(color(' Provider Name: ', 'cyan'), attribute.value.value)
HID_Provider_Name = attribute.value.value
elif (attribute.id == SDP_HID_DEVICE_RELEASE_NUMBER_ATTRIBUTE_ID):
print(color(' Release Number: ', 'cyan'), hex(attribute.value.value))
HID_Device_Release_Number = attribute.value.value
elif (attribute.id == SDP_HID_PARSER_VERSION_ATTRIBUTE_ID):
print(color(' HID Parser Version: ', 'cyan'), hex(attribute.value.value))
HID_Parser_Version = attribute.value.value
elif (attribute.id == SDP_HID_DEVICE_SUBCLASS_ATTRIBUTE_ID):
print(color(' HIDDeviceSubclass: ', 'cyan'), hex(attribute.value.value))
HID_Device_Subclass = attribute.value.value
elif (attribute.id == SDP_HID_COUNTRY_CODE_ATTRIBUTE_ID):
print(color(' HIDCountryCode: ', 'cyan'), hex(attribute.value.value))
HID_Country_Code = attribute.value.value
elif (attribute.id == SDP_HID_VIRTUAL_CABLE_ATTRIBUTE_ID):
print(color(' HIDVirtualCable: ', 'cyan'), attribute.value.value)
HID_Virtual_Cable = attribute.value.value
elif (attribute.id == SDP_HID_RECONNECT_INITIATE_ATTRIBUTE_ID):
print(color(' HIDReconnectInitiate: ', 'cyan'), attribute.value.value)
HID_Reconnect_Initiate = attribute.value.value
elif (attribute.id == SDP_HID_DESCRIPTOR_LIST_ATTRIBUTE_ID):
print(color(' HID Report Descriptor type: ', 'cyan'), hex(attribute.value.value[0].value[0].value))
print(color(' HID Report DescriptorList: ', 'cyan'), attribute.value.value[0].value[1].value)
HID_Descriptor_Type = attribute.value.value[0].value[0].value
HID_Report_Descriptor_List = attribute.value.value[0].value[1].value
elif (attribute.id == SDP_HID_LANGID_BASE_LIST_ATTRIBUTE_ID):
print(color(' HID LANGID Base Language: ', 'cyan'), hex(attribute.value.value[0].value[0].value))
print(color(' HID LANGID Base Bluetooth String Offset: ', 'cyan'), hex(attribute.value.value[0].value[1].value))
HID_LANGID_Base_Language = attribute.value.value[0].value[0].value
HID_LANGID_Base_Bluetooth_String_Offset = attribute.value.value[0].value[1].value
elif (attribute.id == SDP_HID_BATTERY_POWER_ATTRIBUTE_ID):
print(color(' HIDBatteryPower: ', 'cyan'), attribute.value.value)
HID_Battery_Power = attribute.value.value
elif (attribute.id == SDP_HID_REMOTE_WAKE_ATTRIBUTE_ID):
print(color(' HIDRemoteWake: ', 'cyan'), attribute.value.value)
HID_Remote_Wake = attribute.value.value
elif (attribute.id == SDP_HID_PROFILE_VERSION_ATTRIBUTE_ID):
print(color(' HIDProfileVersion : ', 'cyan'), hex(attribute.value.value))
HID_Profile_Version = attribute.value.value
elif (attribute.id == SDP_HID_SUPERVISION_TIMEOUT_ATTRIBUTE_ID):
print(color(' HIDSupervisionTimeout: ', 'cyan'), hex(attribute.value.value))
HID_Supervision_Timeout = attribute.value.value
elif (attribute.id == SDP_HID_NORMALLY_CONNECTABLE_ATTRIBUTE_ID):
print(color(' HIDNormallyConnectable: ', 'cyan'), attribute.value.value)
HID_Normally_Connectable = attribute.value.value
elif (attribute.id == SDP_HID_BOOT_DEVICE_ATTRIBUTE_ID):
print(color(' HIDBootDevice: ', 'cyan'), attribute.value.value)
HID_Boot_Device = attribute.value.value
elif (attribute.id == SDP_HID_SSR_HOST_MAX_LATENCY_ATTRIBUTE_ID):
print(color(' HIDSSRHostMaxLatency: ', 'cyan'), hex(attribute.value.value))
HID_SSR_Host_Max_Latency = attribute.value.value
elif (attribute.id == SDP_HID_SSR_HOST_MIN_TIMEOUT_ATTRIBUTE_ID):
print(color(' HIDSSRHostMinTimeout: ', 'cyan'), hex(attribute.value.value))
HID_SSR_Host_Min_Timeout = attribute.value.value
else:
print(color(f' Warning: Attribute ID: {attribute.id} match not found.\n Attribute Info: {attribute}', 'yellow'))
await sdp_client.disconnect()
# -----------------------------------------------------------------------------
async def get_stream_reader(pipe) -> asyncio.StreamReader:
loop = asyncio.get_event_loop()
reader = asyncio.StreamReader(loop=loop)
protocol = asyncio.StreamReaderProtocol(reader)
await loop.connect_read_pipe(lambda: protocol, pipe)
return reader
# -----------------------------------------------------------------------------
async def main():
if len(sys.argv) < 4:
print(
'Usage: run_hid_host.py <device-config> <transport-spec> '
'<bluetooth-address> [test-mode]'
)
print('example: run_hid_host.py classic1.json usb:0 E1:CA:72:48:C4:E8/P')
return
def on_hid_data_cb(pdu):
report_type = pdu[0] & 0x0f
if (len(pdu) == 1):
print(color(f'Warning: No report received','yellow'))
return
report_length = len(pdu[1:])
report_id = pdu[1]
if (report_type != HID_OTHER_REPORT):
print(color(f' Report type = {report_type}, Report length = {report_length}, Report id = {report_id}', 'blue', None, 'bold'))
if ((report_length <= 1) or (report_id == 0)):
return
if report_type == HID_INPUT_REPORT:
ReportParser.parse_input_report(pdu[1:]) #type: ignore
async def handle_virtual_cable_unplug():
await hid_host.interrupt_channel_disconnect()
await hid_host.control_channel_disconnect()
await device.keystore.delete(target_address) #type: ignore
await connection.disconnect()
def on_hid_virtual_cable_unplug_cb():
asyncio.create_task(handle_virtual_cable_unplug())
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
print('<<< CONNECTED')
# Create a device
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
device.classic_enabled = True
await device.power_on()
# Connect to a peer
target_address = sys.argv[3]
print(f'=== Connecting to {target_address}...')
connection = await device.connect(target_address, transport=BT_BR_EDR_TRANSPORT)
print(f'=== Connected to {connection.peer_address}!')
# Request authentication
print('*** Authenticating...')
await connection.authenticate()
print('*** Authenticated...')
# Enable encryption
print('*** Enabling encryption...')
await connection.encrypt()
print('*** Encryption on')
await get_hid_device_sdp_record(device, connection)
# Create HID host and start it
print('@@@ Starting HID Host...')
hid_host = HIDHost(device, connection)
# Register for HID data call back
hid_host.register_data_cb(on_hid_data_cb)
# Register for virtual cable unplug call back
hid_host.register_virtual_cable_unplug(on_hid_virtual_cable_unplug_cb)
async def menu():
reader = await get_stream_reader(sys.stdin)
while True:
print("\n************************ HID Host Menu *****************************\n")
print(" 1. Connect Control Channel")
print(" 2. Connect Interrupt Channel")
print(" 3. Disconnect Control Channel")
print(" 4. Disconnect Interrupt Channel")
print(" 5. Get Report")
print(" 6. Set Report")
print(" 7. Set Protocol Mode")
print(" 8. Get Protocol Mode")
print(" 9. Send Report")
print("10. Suspend")
print("11. Exit Suspend")
print("12. Virtual Cable Unplug")
print("13. Disconnect device")
print("14. Delete Bonding")
print("15. Re-connect to device")
print("\nEnter your choice : \n")
choice = await reader.readline()
choice = choice.decode('utf-8').strip()
if (choice == '1'):
await hid_host.control_channel_connect()
elif (choice == '2'):
await hid_host.interrupt_channel_connect()
elif (choice == '3'):
await hid_host.control_channel_disconnect()
elif (choice == '4'):
await hid_host.interrupt_channel_disconnect()
elif (choice == '5'):
print(" 1. Report ID 0x02")
print(" 2. Report ID 0x03")
print(" 3. Report ID 0x05")
choice1 = await reader.readline()
choice1 = choice1.decode('utf-8').strip()
if (choice1 == '1'):
hid_host.get_report(1, 2, 3)
elif (choice1 == '2'):
hid_host.get_report(2, 3, 2)
elif (choice1 == '3'):
hid_host.get_report(3, 5, 3)
else:
print('Incorrect option selected')
elif (choice == '6'):
print(" 1. Report type 1 and Report id 0x01")
print(" 2. Report type 2 and Report id 0x03")
print(" 3. Report type 3 and Report id 0x05")
choice1 = await reader.readline()
choice1 = choice1.decode('utf-8').strip()
if (choice1 == '1'):
# data includes first octet as report id
data = bytearray([0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01])
hid_host.set_report(1, data)
elif (choice1 == '2'):
data = bytearray([0x03, 0x01, 0x01])
hid_host.set_report(2, data)
elif (choice1 == '3'):
data = bytearray([0x05, 0x01, 0x01, 0x01])
hid_host.set_report(3, data)
else:
print('Incorrect option selected')
elif (choice == '7'):
print(" 0. Boot")
print(" 1. Report")
choice1 = await reader.readline()
choice1 = choice1.decode('utf-8').strip()
if (choice1 == '0'):
hid_host.set_protocol(HID_BOOT_PROTOCOL_MODE)
elif (choice1 == '1'):
hid_host.set_protocol(HID_REPORT_PROTOCOL_MODE)
else:
print('Incorrect option selected')
elif (choice == '8'):
hid_host.get_protocol()
elif (choice == '9'):
print(" 1. Report ID 0x01")
print(" 2. Report ID 0x03")
choice1 = await reader.readline()
choice1 = choice1.decode('utf-8').strip()
if (choice1 == '1'):
data = bytearray([0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])
hid_host.send_data(data)
elif (choice1 == '2'):
data = bytearray([0x03, 0x00, 0x0d, 0xfd, 0x00, 0x00])
hid_host.send_data(data)
else:
print('Incorrect option selected')
elif (choice == '10'):
hid_host.suspend()
elif (choice == '11'):
hid_host.exit_suspend()
elif (choice == '12'):
hid_host.virtual_cable_unplug()
await device.keystore.delete(target_address)
elif (choice == '13'):
peer_address = Address.from_string_for_transport(target_address, transport=BT_BR_EDR_TRANSPORT)
connection = device.find_connection_by_bd_addr(peer_address, transport=BT_BR_EDR_TRANSPORT)
if connection is not None:
await connection.disconnect()
else:
print("Already disconnected from device")
elif (choice == '14'):
try:
await device.keystore.delete(target_address)
print("Unpair successful")
except KeyError:
print('Device not found or Device already unpaired.')
elif (choice == '15'):
connection = await device.connect(target_address, transport=BT_BR_EDR_TRANSPORT)
await connection.authenticate()
await connection.encrypt()
else:
print("Invalid option selected.")
if ((len(sys.argv) > 4) and (sys.argv[4] == 'test-mode')):
# Enabling menu for testing
await menu()
else:
# HID Connection
# Control channel
await hid_host.control_channel_connect()
# Interrupt Channel
await hid_host.interrupt_channel_connect()
await hci_source.wait_for_termination()
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
asyncio.run(main())