mirror of
https://github.com/google/bumble.git
synced 2026-04-17 00:35:31 +00:00
712 lines
26 KiB
Python
712 lines
26 KiB
Python
# Copyright 2021-2022 Google LLC
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# https://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
# -----------------------------------------------------------------------------
|
|
# Imports
|
|
# -----------------------------------------------------------------------------
|
|
import asyncio
|
|
import sys
|
|
import os
|
|
import logging
|
|
import json
|
|
import websockets
|
|
from bumble.colors import color
|
|
|
|
from bumble.device import Device
|
|
from bumble.transport import open_transport_or_link
|
|
from bumble.core import (
|
|
BT_BR_EDR_TRANSPORT,
|
|
BT_L2CAP_PROTOCOL_ID,
|
|
BT_HUMAN_INTERFACE_DEVICE_SERVICE,
|
|
BT_HIDP_PROTOCOL_ID,
|
|
UUID,
|
|
)
|
|
from bumble.hci import Address
|
|
from bumble.hid import (
|
|
Device as HID_Device,
|
|
HID_CONTROL_PSM,
|
|
HID_INTERRUPT_PSM,
|
|
Message,
|
|
)
|
|
from bumble.sdp import (
|
|
Client as SDP_Client,
|
|
DataElement,
|
|
ServiceAttribute,
|
|
SDP_PUBLIC_BROWSE_ROOT,
|
|
SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
|
|
SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
|
|
SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
|
|
SDP_ALL_ATTRIBUTES_RANGE,
|
|
SDP_LANGUAGE_BASE_ATTRIBUTE_ID_LIST_ATTRIBUTE_ID,
|
|
SDP_ADDITIONAL_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
|
|
SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID,
|
|
SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID,
|
|
)
|
|
from bumble.utils import AsyncRunner
|
|
|
|
# -----------------------------------------------------------------------------
|
|
# SDP attributes for Bluetooth HID devices
|
|
SDP_HID_SERVICE_NAME_ATTRIBUTE_ID = 0x0100
|
|
SDP_HID_SERVICE_DESCRIPTION_ATTRIBUTE_ID = 0x0101
|
|
SDP_HID_PROVIDER_NAME_ATTRIBUTE_ID = 0x0102
|
|
SDP_HID_DEVICE_RELEASE_NUMBER_ATTRIBUTE_ID = 0x0200 # [DEPRECATED]
|
|
SDP_HID_PARSER_VERSION_ATTRIBUTE_ID = 0x0201
|
|
SDP_HID_DEVICE_SUBCLASS_ATTRIBUTE_ID = 0x0202
|
|
SDP_HID_COUNTRY_CODE_ATTRIBUTE_ID = 0x0203
|
|
SDP_HID_VIRTUAL_CABLE_ATTRIBUTE_ID = 0x0204
|
|
SDP_HID_RECONNECT_INITIATE_ATTRIBUTE_ID = 0x0205
|
|
SDP_HID_DESCRIPTOR_LIST_ATTRIBUTE_ID = 0x0206
|
|
SDP_HID_LANGID_BASE_LIST_ATTRIBUTE_ID = 0x0207
|
|
SDP_HID_SDP_DISABLE_ATTRIBUTE_ID = 0x0208 # [DEPRECATED]
|
|
SDP_HID_BATTERY_POWER_ATTRIBUTE_ID = 0x0209
|
|
SDP_HID_REMOTE_WAKE_ATTRIBUTE_ID = 0x020A
|
|
SDP_HID_PROFILE_VERSION_ATTRIBUTE_ID = 0x020B # DEPRECATED]
|
|
SDP_HID_SUPERVISION_TIMEOUT_ATTRIBUTE_ID = 0x020C
|
|
SDP_HID_NORMALLY_CONNECTABLE_ATTRIBUTE_ID = 0x020D
|
|
SDP_HID_BOOT_DEVICE_ATTRIBUTE_ID = 0x020E
|
|
SDP_HID_SSR_HOST_MAX_LATENCY_ATTRIBUTE_ID = 0x020F
|
|
SDP_HID_SSR_HOST_MIN_TIMEOUT_ATTRIBUTE_ID = 0x0210
|
|
|
|
# HID SDP attribute values
|
|
LANGUAGE = 0x656e
|
|
ENCODING = 0x6a
|
|
PRIMARY_LANGUAGE_BASE_ID = 0x100
|
|
VERSION_NUMBER = 0x0101
|
|
SERVICE_NAME = b'Bumble HID'
|
|
SERVICE_DESCRIPTION = b'Bumble'
|
|
PROVIDER_NAME = b'Bumble'
|
|
HID_PARSER_VERSION = 0x0111
|
|
HID_DEVICE_SUBCLASS = 0xC0
|
|
HID_COUNTRY_CODE = 0x21
|
|
HID_VIRTUAL_CABLE = True
|
|
HID_RECONNECT_INITIATE = True
|
|
REPORT_DESCRIPTOR_TYPE = 0x22
|
|
|
|
|
|
HID_REPORT_MAP = bytes(
|
|
# pylint: disable=line-too-long
|
|
[
|
|
0x05,
|
|
0x01, # Usage Page (Generic Desktop Ctrls)
|
|
0x09,
|
|
0x06, # Usage (Keyboard)
|
|
0xA1,
|
|
0x01, # Collection (Application)
|
|
0x85,
|
|
0x01, # . Report ID (1)
|
|
0x05,
|
|
0x07, # . Usage Page (Kbrd/Keypad)
|
|
0x19,
|
|
0xE0, # . Usage Minimum (0xE0)
|
|
0x29,
|
|
0xE7, # . Usage Maximum (0xE7)
|
|
0x15,
|
|
0x00, # . Logical Minimum (0)
|
|
0x25,
|
|
0x01, # . Logical Maximum (1)
|
|
0x75,
|
|
0x01, # . Report Size (1)
|
|
0x95,
|
|
0x08, # . Report Count (8)
|
|
0x81,
|
|
0x02, # . Input (Data,Var,Abs,No Wrap,Linear,Preferred State,No Null Position)
|
|
0x95,
|
|
0x01, # . Report Count (1)
|
|
0x75,
|
|
0x08, # . Report Size (8)
|
|
0x81,
|
|
0x03, # . Input (Const,Var,Abs,No Wrap,Linear,Preferred State,No Null Position)
|
|
0x95,
|
|
0x05, # . Report Count (5)
|
|
0x75,
|
|
0x01, # . Report Size (1)
|
|
0x05,
|
|
0x08, # . Usage Page (LEDs)
|
|
0x19,
|
|
0x01, # . Usage Minimum (Num Lock)
|
|
0x29,
|
|
0x05, # . Usage Maximum (Kana)
|
|
0x91,
|
|
0x02, # . Output (Data,Var,Abs,No Wrap,Linear,Preferred State,No Null Position,Non-volatile)
|
|
0x95,
|
|
0x01, # . Report Count (1)
|
|
0x75,
|
|
0x03, # . Report Size (3)
|
|
0x91,
|
|
0x03, # . Output (Const,Var,Abs,No Wrap,Linear,Preferred State,No Null Position,Non-volatile)
|
|
0x95,
|
|
0x06, # . Report Count (6)
|
|
0x75,
|
|
0x08, # . Report Size (8)
|
|
0x15,
|
|
0x00, # . Logical Minimum (0)
|
|
0x25,
|
|
0x65, # . Logical Maximum (101)
|
|
0x05,
|
|
0x07, # . Usage Page (Kbrd/Keypad)
|
|
0x19,
|
|
0x00, # . Usage Minimum (0x00)
|
|
0x29,
|
|
0x65, # . Usage Maximum (0x65)
|
|
0x81,
|
|
0x00, # . Input (Data,Array,Abs,No Wrap,Linear,Preferred State,No Null Position)
|
|
0xC0, # End Collection
|
|
0x05,
|
|
0x01, # Usage Page (Generic Desktop Ctrls)
|
|
0x09,
|
|
0x02, # Usage (Mouse)
|
|
0xA1,
|
|
0x01, # Collection (Application)
|
|
0x85,
|
|
0x02, # . Report ID (2)
|
|
0x09,
|
|
0x01, # . Usage (Pointer)
|
|
0xA1,
|
|
0x00, # . Collection (Physical)
|
|
0x05,
|
|
0x09, # . Usage Page (Button)
|
|
0x19,
|
|
0x01, # . Usage Minimum (0x01)
|
|
0x29,
|
|
0x03, # . Usage Maximum (0x03)
|
|
0x15,
|
|
0x00, # . Logical Minimum (0)
|
|
0x25,
|
|
0x01, # . Logical Maximum (1)
|
|
0x95,
|
|
0x03, # . Report Count (3)
|
|
0x75,
|
|
0x01, # . Report Size (1)
|
|
0x81,
|
|
0x02, # . Input (Data,Var,Abs,No Wrap,Linear,Preferred State,No Null Position)
|
|
0x95,
|
|
0x01, # . Report Count (1)
|
|
0x75,
|
|
0x05, # . Report Size (5)
|
|
0x81,
|
|
0x03, # . Input (Const,Var,Abs,No Wrap,Linear,Preferred State,No Null Position)
|
|
0x05,
|
|
0x01, # . Usage Page (Generic Desktop Ctrls)
|
|
0x09,
|
|
0x30, # . Usage (X)
|
|
0x09,
|
|
0x31, # . Usage (Y)
|
|
0x15,
|
|
0x81, # . Logical Minimum (-127)
|
|
0x25,
|
|
0x7F, # . Logical Maximum (127)
|
|
0x75,
|
|
0x08, # . Report Size (8)
|
|
0x95,
|
|
0x02, # . Report Count (2)
|
|
0x81,
|
|
0x06, # . Input (Data,Var,Rel,No Wrap,Linear,Preferred State,No Null Position)
|
|
0xC0, # . End Collection
|
|
0xC0, # End Collection
|
|
]
|
|
)
|
|
HID_LANGID_BASE_LANGUAGE = 0x0409
|
|
HID_LANGID_BASE_BLUETOOTH_STRING_OFFSET = 0x100
|
|
HID_BATTERY_POWER = True
|
|
HID_REMOTE_WAKE = True
|
|
HID_SUPERVISION_TIMEOUT = 0xC80
|
|
HID_NORMALLY_CONNECTABLE = True
|
|
HID_BOOT_DEVICE = True
|
|
HID_SSR_HOST_MAX_LATENCY = 0x640
|
|
HID_SSR_HOST_MIN_TIMEOUT = 0xC80
|
|
|
|
# Default protocol mode set to report protocol
|
|
protocol_mode = Message.ProtocolMode.REPORT_PROTOCOL
|
|
|
|
# -----------------------------------------------------------------------------
|
|
def sdp_records():
|
|
service_record_handle = 0x00010002
|
|
return {
|
|
service_record_handle: [
|
|
ServiceAttribute(
|
|
SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID,
|
|
DataElement.unsigned_integer_32(service_record_handle),
|
|
),
|
|
ServiceAttribute(
|
|
SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID,
|
|
DataElement.sequence([DataElement.uuid(SDP_PUBLIC_BROWSE_ROOT)]),
|
|
),
|
|
ServiceAttribute(
|
|
SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
|
|
DataElement.sequence(
|
|
[DataElement.uuid(BT_HUMAN_INTERFACE_DEVICE_SERVICE)]
|
|
),
|
|
),
|
|
ServiceAttribute(
|
|
SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
|
|
DataElement.sequence(
|
|
[
|
|
DataElement.sequence(
|
|
[
|
|
DataElement.uuid(BT_L2CAP_PROTOCOL_ID),
|
|
DataElement.unsigned_integer_16(HID_CONTROL_PSM),
|
|
]
|
|
),
|
|
DataElement.sequence(
|
|
[
|
|
DataElement.uuid(BT_HIDP_PROTOCOL_ID),
|
|
]
|
|
),
|
|
]
|
|
),
|
|
),
|
|
ServiceAttribute(
|
|
SDP_LANGUAGE_BASE_ATTRIBUTE_ID_LIST_ATTRIBUTE_ID,
|
|
DataElement.sequence(
|
|
[
|
|
DataElement.unsigned_integer_16(LANGUAGE),
|
|
DataElement.unsigned_integer_16(ENCODING),
|
|
DataElement.unsigned_integer_16(PRIMARY_LANGUAGE_BASE_ID),
|
|
]
|
|
),
|
|
),
|
|
ServiceAttribute(
|
|
SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
|
|
DataElement.sequence(
|
|
[
|
|
DataElement.sequence(
|
|
[
|
|
DataElement.uuid(BT_HUMAN_INTERFACE_DEVICE_SERVICE),
|
|
DataElement.unsigned_integer_16(VERSION_NUMBER),
|
|
]
|
|
),
|
|
]
|
|
),
|
|
),
|
|
ServiceAttribute(
|
|
SDP_ADDITIONAL_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
|
|
DataElement.sequence(
|
|
[
|
|
DataElement.sequence(
|
|
[
|
|
DataElement.sequence(
|
|
[
|
|
DataElement.uuid(BT_L2CAP_PROTOCOL_ID),
|
|
DataElement.unsigned_integer_16(HID_INTERRUPT_PSM),
|
|
]
|
|
),
|
|
DataElement.sequence(
|
|
[
|
|
DataElement.uuid(BT_HIDP_PROTOCOL_ID),
|
|
]
|
|
),
|
|
]
|
|
),
|
|
]
|
|
),
|
|
),
|
|
ServiceAttribute(
|
|
SDP_HID_SERVICE_NAME_ATTRIBUTE_ID,
|
|
DataElement(DataElement.TEXT_STRING, SERVICE_NAME),
|
|
),
|
|
ServiceAttribute(
|
|
SDP_HID_SERVICE_DESCRIPTION_ATTRIBUTE_ID,
|
|
DataElement(DataElement.TEXT_STRING, SERVICE_DESCRIPTION),
|
|
),
|
|
ServiceAttribute(
|
|
SDP_HID_PROVIDER_NAME_ATTRIBUTE_ID,
|
|
DataElement(DataElement.TEXT_STRING, PROVIDER_NAME),
|
|
),
|
|
ServiceAttribute(
|
|
SDP_HID_PARSER_VERSION_ATTRIBUTE_ID,
|
|
DataElement.unsigned_integer_32(HID_PARSER_VERSION),
|
|
),
|
|
ServiceAttribute(
|
|
SDP_HID_DEVICE_SUBCLASS_ATTRIBUTE_ID,
|
|
DataElement.unsigned_integer_32(HID_DEVICE_SUBCLASS),
|
|
),
|
|
ServiceAttribute(
|
|
SDP_HID_COUNTRY_CODE_ATTRIBUTE_ID,
|
|
DataElement.unsigned_integer_32(HID_COUNTRY_CODE),
|
|
),
|
|
ServiceAttribute(
|
|
SDP_HID_VIRTUAL_CABLE_ATTRIBUTE_ID,
|
|
DataElement.boolean(HID_VIRTUAL_CABLE),
|
|
),
|
|
ServiceAttribute(
|
|
SDP_HID_RECONNECT_INITIATE_ATTRIBUTE_ID,
|
|
DataElement.boolean(HID_RECONNECT_INITIATE),
|
|
),
|
|
ServiceAttribute(
|
|
SDP_HID_DESCRIPTOR_LIST_ATTRIBUTE_ID,
|
|
DataElement.sequence(
|
|
[
|
|
DataElement.sequence(
|
|
[
|
|
DataElement.unsigned_integer_16(REPORT_DESCRIPTOR_TYPE),
|
|
DataElement(DataElement.TEXT_STRING, HID_REPORT_MAP),
|
|
]
|
|
),
|
|
]
|
|
),
|
|
),
|
|
ServiceAttribute(
|
|
SDP_HID_LANGID_BASE_LIST_ATTRIBUTE_ID,
|
|
DataElement.sequence(
|
|
[
|
|
DataElement.sequence(
|
|
[
|
|
DataElement.unsigned_integer_16(HID_LANGID_BASE_LANGUAGE),
|
|
DataElement.unsigned_integer_16(HID_LANGID_BASE_BLUETOOTH_STRING_OFFSET),
|
|
]
|
|
),
|
|
]
|
|
),
|
|
),
|
|
ServiceAttribute(
|
|
SDP_HID_BATTERY_POWER_ATTRIBUTE_ID,
|
|
DataElement.boolean(HID_BATTERY_POWER),
|
|
),
|
|
ServiceAttribute(
|
|
SDP_HID_REMOTE_WAKE_ATTRIBUTE_ID,
|
|
DataElement.boolean(HID_REMOTE_WAKE),
|
|
),
|
|
ServiceAttribute(
|
|
SDP_HID_SUPERVISION_TIMEOUT_ATTRIBUTE_ID,
|
|
DataElement.unsigned_integer_16(HID_SUPERVISION_TIMEOUT),
|
|
),
|
|
ServiceAttribute(
|
|
SDP_HID_NORMALLY_CONNECTABLE_ATTRIBUTE_ID,
|
|
DataElement.boolean(HID_NORMALLY_CONNECTABLE),
|
|
),
|
|
ServiceAttribute(
|
|
SDP_HID_BOOT_DEVICE_ATTRIBUTE_ID,
|
|
DataElement.boolean(HID_BOOT_DEVICE),
|
|
),
|
|
ServiceAttribute(
|
|
SDP_HID_SSR_HOST_MAX_LATENCY_ATTRIBUTE_ID,
|
|
DataElement.unsigned_integer_16(HID_SSR_HOST_MAX_LATENCY),
|
|
),
|
|
ServiceAttribute(
|
|
SDP_HID_SSR_HOST_MIN_TIMEOUT_ATTRIBUTE_ID,
|
|
DataElement.unsigned_integer_16(HID_SSR_HOST_MIN_TIMEOUT),
|
|
),
|
|
]
|
|
}
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
async def get_stream_reader(pipe) -> asyncio.StreamReader:
|
|
loop = asyncio.get_event_loop()
|
|
reader = asyncio.StreamReader(loop=loop)
|
|
protocol = asyncio.StreamReaderProtocol(reader)
|
|
await loop.connect_read_pipe(lambda: protocol, pipe)
|
|
return reader
|
|
|
|
|
|
keyboardData=bytearray([0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])
|
|
mouseData=bytearray([0x00, 0x00, 0x00, 0x00])
|
|
|
|
# -----------------------------------------------------------------------------
|
|
async def keyboard_device(hid_device, command):
|
|
if command == 'web':
|
|
# Start a Websocket server to receive events from a web page
|
|
async def serve(websocket, _path):
|
|
global keyboardData
|
|
global mouseData
|
|
while True:
|
|
try:
|
|
message = await websocket.recv()
|
|
print('Received: ', str(message))
|
|
parsed = json.loads(message)
|
|
message_type = parsed['type']
|
|
if message_type == 'keydown':
|
|
# Only deal with keys a to z for now
|
|
key = parsed['key']
|
|
if len(key) == 1:
|
|
code = ord(key)
|
|
if ord('a') <= code <= ord('z'):
|
|
hid_code = 0x04 + code - ord('a')
|
|
keyboardData = bytearray([0x01, 0x00, 0x00, hid_code, 0x00, 0x00, 0x00, 0x00, 0x00])
|
|
hid_device.send_data(keyboardData)
|
|
elif message_type == 'keyup':
|
|
keyboardData = bytearray([0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])
|
|
hid_device.send_data(keyboardData)
|
|
elif message_type == "mousemove":
|
|
x = parsed['x']
|
|
if x > 127:
|
|
x = 127
|
|
elif x < -127:
|
|
x = -127
|
|
y = parsed['y']
|
|
if y > 127:
|
|
y = 127
|
|
elif y < -127:
|
|
y = -127
|
|
x_cord = x.to_bytes(signed = True)
|
|
y_cord = y.to_bytes(signed = True)
|
|
mouseData = bytearray([0x02, 0x00]) + x_cord + y_cord
|
|
hid_device.send_data(mouseData)
|
|
except websockets.exceptions.ConnectionClosedOK:
|
|
pass
|
|
|
|
# pylint: disable-next=no-member
|
|
await websockets.serve(serve, 'localhost', 8989)
|
|
await asyncio.get_event_loop().create_future()
|
|
else:
|
|
message = bytes('hello', 'ascii')
|
|
while True:
|
|
for letter in message:
|
|
await asyncio.sleep(3.0)
|
|
|
|
# Keypress for the letter
|
|
keycode = 0x04 + letter - 0x61
|
|
keyboardData = bytearray([0x01, 0x00, 0x00, keycode, 0x00, 0x00, 0x00, 0x00, 0x00])
|
|
hid_device.send_data(keyboardData)
|
|
|
|
# Key release
|
|
keyboardData = bytearray([0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])
|
|
hid_device.send_data(keyboardData)
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
async def main():
|
|
if len(sys.argv) < 3:
|
|
print(
|
|
'Usage: python run_hid_device.py <device-config> <transport-spec> <command>'
|
|
' where <command> is one of:\n'
|
|
' test-mode (run with menu enabled for testing)\n'
|
|
' web (run a keyboard with keypress input from a web page, '
|
|
'see keyboard.html'
|
|
)
|
|
print('example: python run_hid_device.py classic3.json usb:0 web')
|
|
print('example: python run_hid_device.py classic3.json usb:0 test-mode')
|
|
|
|
return
|
|
|
|
async def handle_virtual_cable_unplug():
|
|
hid_host_bd_addr = str(hid_device.remote_device_bd_address)
|
|
await hid_device.disconnect_interrupt_channel()
|
|
await hid_device.disconnect_control_channel()
|
|
await device.keystore.delete(hid_host_bd_addr) #type: ignore
|
|
connection = hid_device.connection
|
|
if connection is not None:
|
|
await connection.disconnect()
|
|
|
|
def on_hid_data_cb(pdu):
|
|
print(f'Received Data, PDU: {pdu.hex()}')
|
|
|
|
def on_set_report_cb(report_id: int, report: bytes):
|
|
if (report_id > 2) or (report_id == 0):
|
|
hid_device.send_handshake_message(Message.Handshake.ERR_INVALID_REPORT_ID)
|
|
print("Warning: Report ID Not Supported")
|
|
else:
|
|
hid_device.send_handshake_message(Message.Handshake.SUCCESSFUL)
|
|
print('Set Report, Report ID: ', report_id)
|
|
print('Report:', report)
|
|
|
|
def on_get_protocol_cb():
|
|
if HID_BOOT_DEVICE:
|
|
data = protocol_mode.to_bytes()
|
|
hid_device.send_control_data(Message.ReportType.OTHER_REPORT, data)
|
|
else:
|
|
hid_device.send_handshake_message(Message.Handshake.ERR_UNSUPPORTED_REQUEST)
|
|
|
|
def on_get_report_cb(report_id,report_type, buffer_size):
|
|
retValue = hid_device.GetReportStatus()
|
|
print("GET_REPORT report_id: " + str(report_id) +"report_type: "+ str(report_type)+
|
|
"buffer_size:" + str(buffer_size))
|
|
if report_type == Message.ReportType.INPUT_REPORT:
|
|
if report_id == 1:
|
|
retValue.data = keyboardData
|
|
retValue.status = hid_device.ReportStatus.SUCCESS
|
|
elif report_id == 2:
|
|
retValue.data = mouseData
|
|
retValue.status = hid_device.ReportStatus.SUCCESS
|
|
else:
|
|
retValue.status = hid_device.ReportStatus.REPORT_ID_NOT_FOUND
|
|
|
|
if(buffer_size):
|
|
data_len = buffer_size -1
|
|
retValue.data = retValue.data[:data_len]
|
|
elif report_type == Message.ReportType.OUTPUT_REPORT:
|
|
#This sample app has nothing to do with the report received, to enable PTS
|
|
#testing, we will return single byte random data.
|
|
retValue.data = bytearray([0x11])
|
|
retValue.status = hid_device.ReportStatus.SUCCESS
|
|
|
|
elif report_type == Message.ReportType.FEATURE_REPORT:
|
|
#TBD - not requried for PTS testing
|
|
retValue.status = hid_device.ReportStatus.ERR_UNSUPPORTED_REQUEST
|
|
|
|
else:
|
|
retValue.status = hid_device.ReportStatus.FAILURE
|
|
|
|
return retValue
|
|
|
|
def on_set_report_cb(report_id, report_type, data):
|
|
retValue = hid_device.GetReportStatus()
|
|
print("SET_REPORT report_id: " + str(report_id) +"report_type: "+ str(report_type)+
|
|
"data:" + str(data))
|
|
retValue.status = hid_device.ReportStatus.SUCCESS
|
|
return retValue
|
|
|
|
|
|
def on_set_protocol_cb(param):
|
|
if HID_BOOT_DEVICE:
|
|
global protocol_mode
|
|
protocol_mode = Message.ProtocolMode(param)
|
|
hid_device.send_handshake_message(Message.Handshake.SUCCESSFUL)
|
|
else:
|
|
hid_device.send_handshake_message(Message.Handshake.ERR_UNSUPPORTED_REQUEST)
|
|
|
|
def on_virtual_cable_unplug_cb():
|
|
print(f'Received Virtual Cable Unplug')
|
|
asyncio.create_task(handle_virtual_cable_unplug())
|
|
|
|
print('<<< connecting to HCI...')
|
|
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
|
|
print('<<< connected')
|
|
|
|
# Create a device
|
|
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
|
|
device.classic_enabled = True
|
|
|
|
# Create and register HID device
|
|
hid_device = HID_Device(device)
|
|
|
|
# Register for call backs
|
|
hid_device.on('interrupt_data', on_hid_data_cb)
|
|
hid_device.on('get_protocol', on_get_protocol_cb)
|
|
hid_device.on('set_protocol', on_set_protocol_cb)
|
|
|
|
hid_device.register_get_report_cb(on_get_report_cb)
|
|
hid_device.register_set_report_cb(on_set_report_cb)
|
|
|
|
# Register for virtual cable unplug call back
|
|
hid_device.on('virtual_cable_unplug', on_virtual_cable_unplug_cb)
|
|
|
|
# Setup the SDP to advertise HID Device service
|
|
device.sdp_service_records = sdp_records()
|
|
|
|
# Start the controller
|
|
await device.power_on()
|
|
|
|
# Start being discoverable and connectable
|
|
await device.set_discoverable(True)
|
|
await device.set_connectable(True)
|
|
|
|
async def menu():
|
|
reader = await get_stream_reader(sys.stdin)
|
|
while True:
|
|
print("\n************************ HID Device Menu *****************************\n")
|
|
print(" 1. Connect Control Channel")
|
|
print(" 2. Connect Interrupt Channel")
|
|
print(" 3. Disconnect Control Channel")
|
|
print(" 4. Disconnect Interrupt Channel")
|
|
print(" 5. Send Report")
|
|
print(" 6. Virtual Cable Unplug")
|
|
print(" 7. Disconnect device")
|
|
print(" 8. Delete Bonding")
|
|
print(" 9. Re-connect to device")
|
|
print("\nEnter your choice : \n")
|
|
|
|
choice = await reader.readline()
|
|
choice = choice.decode('utf-8').strip()
|
|
|
|
if choice == '1':
|
|
await hid_device.connect_control_channel()
|
|
|
|
elif choice == '2':
|
|
await hid_device.connect_interrupt_channel()
|
|
|
|
elif choice == '3':
|
|
await hid_device.disconnect_control_channel()
|
|
|
|
elif choice == '4':
|
|
await hid_device.disconnect_interrupt_channel()
|
|
|
|
elif choice == '5':
|
|
print(" 1. Report ID 0x01")
|
|
print(" 2. Report ID 0x02")
|
|
|
|
choice1 = await reader.readline()
|
|
choice1 = choice1.decode('utf-8').strip()
|
|
|
|
if choice1 == '1':
|
|
data = bytearray([0x01, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00])
|
|
hid_device.send_data(data)
|
|
data = bytearray([0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])
|
|
hid_device.send_data(data)
|
|
|
|
elif choice1 == '2':
|
|
data = bytearray([0x02, 0x00, 0x00, 0xf6])
|
|
hid_device.send_data(data)
|
|
data = bytearray([0x02, 0x00, 0x00, 0x00])
|
|
hid_device.send_data(data)
|
|
|
|
else:
|
|
print('Incorrect option selected')
|
|
|
|
elif choice == '6':
|
|
hid_device.virtual_cable_unplug()
|
|
try:
|
|
hid_host_bd_addr = str(hid_device.remote_device_bd_address)
|
|
await device.keystore.delete(hid_host_bd_addr)
|
|
except KeyError:
|
|
print('Device not found or Device already unpaired.')
|
|
|
|
elif choice == '7':
|
|
connection = hid_device.connection
|
|
if connection is not None:
|
|
await connection.disconnect()
|
|
else:
|
|
print("Already disconnected from device")
|
|
|
|
elif choice == '8':
|
|
try:
|
|
hid_host_bd_addr = str(hid_device.remote_device_bd_address)
|
|
await device.keystore.delete(hid_host_bd_addr)
|
|
except KeyError:
|
|
print('Device not found or Device already unpaired.')
|
|
|
|
elif choice == '9':
|
|
hid_host_bd_addr = str(hid_device.remote_device_bd_address)
|
|
connection = await device.connect(hid_host_bd_addr, transport=BT_BR_EDR_TRANSPORT)
|
|
await connection.authenticate()
|
|
await connection.encrypt()
|
|
|
|
else:
|
|
print("Invalid option selected.")
|
|
|
|
if len(sys.argv) > 3:
|
|
command = sys.argv[3]
|
|
|
|
if command == 'test-mode':
|
|
# Enabling menu for testing
|
|
await menu()
|
|
|
|
elif command == 'web':
|
|
# Run as a keyboard device
|
|
await keyboard_device(hid_device, command)
|
|
|
|
else:
|
|
print("Command incorrect. Switching to default: web")
|
|
await keyboard_device(hid_device, 'web')
|
|
|
|
else:
|
|
await keyboard_device(hid_device, 'web')
|
|
|
|
await hci_source.wait_for_termination()
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
|
|
asyncio.run(main())
|