mirror of
https://github.com/google/bumble.git
synced 2026-04-16 00:25:31 +00:00
Merge pull request #345 from rdhavan/bumble_hid_device
Bumble hid device implementation - Application and hid profile
This commit is contained in:
407
bumble/hid.py
407
bumble/hid.py
@@ -19,16 +19,17 @@ from __future__ import annotations
|
||||
from dataclasses import dataclass
|
||||
import logging
|
||||
import enum
|
||||
import struct
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
from pyee import EventEmitter
|
||||
from typing import Optional, TYPE_CHECKING
|
||||
from typing import Optional, Callable, TYPE_CHECKING
|
||||
from typing_extensions import override
|
||||
|
||||
from bumble import l2cap
|
||||
from bumble import l2cap, device
|
||||
from bumble.colors import color
|
||||
from bumble.core import InvalidStateError, ProtocolError
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from bumble.device import Device, Connection
|
||||
from .hci import Address
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@@ -60,6 +61,7 @@ class Message:
|
||||
NOT_READY = 0x01
|
||||
ERR_INVALID_REPORT_ID = 0x02
|
||||
ERR_UNSUPPORTED_REQUEST = 0x03
|
||||
ERR_INVALID_PARAMETER = 0x04
|
||||
ERR_UNKNOWN = 0x0E
|
||||
ERR_FATAL = 0x0F
|
||||
|
||||
@@ -101,13 +103,14 @@ class GetReportMessage(Message):
|
||||
def __bytes__(self) -> bytes:
|
||||
packet_bytes = bytearray()
|
||||
packet_bytes.append(self.report_id)
|
||||
packet_bytes.extend(
|
||||
[(self.buffer_size & 0xFF), ((self.buffer_size >> 8) & 0xFF)]
|
||||
)
|
||||
if self.report_type == Message.ReportType.OTHER_REPORT:
|
||||
if self.buffer_size == 0:
|
||||
return self.header(self.report_type) + packet_bytes
|
||||
else:
|
||||
return self.header(0x08 | self.report_type) + packet_bytes
|
||||
return (
|
||||
self.header(0x08 | self.report_type)
|
||||
+ packet_bytes
|
||||
+ struct.pack("<H", self.buffer_size)
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -120,6 +123,16 @@ class SetReportMessage(Message):
|
||||
return self.header(self.report_type) + self.data
|
||||
|
||||
|
||||
@dataclass
|
||||
class SendControlData(Message):
|
||||
report_type: int
|
||||
data: bytes
|
||||
message_type = Message.MessageType.DATA
|
||||
|
||||
def __bytes__(self) -> bytes:
|
||||
return self.header(self.report_type) + self.data
|
||||
|
||||
|
||||
@dataclass
|
||||
class GetProtocolMessage(Message):
|
||||
message_type = Message.MessageType.GET_PROTOCOL
|
||||
@@ -161,31 +174,47 @@ class VirtualCableUnplug(Message):
|
||||
return self.header(Message.ControlCommand.VIRTUAL_CABLE_UNPLUG)
|
||||
|
||||
|
||||
# Device sends input report, host sends output report.
|
||||
@dataclass
|
||||
class SendData(Message):
|
||||
data: bytes
|
||||
report_type: int
|
||||
message_type = Message.MessageType.DATA
|
||||
|
||||
def __bytes__(self) -> bytes:
|
||||
return self.header(Message.ReportType.OUTPUT_REPORT) + self.data
|
||||
return self.header(self.report_type) + self.data
|
||||
|
||||
|
||||
@dataclass
|
||||
class SendHandshakeMessage(Message):
|
||||
result_code: int
|
||||
message_type = Message.MessageType.HANDSHAKE
|
||||
|
||||
def __bytes__(self) -> bytes:
|
||||
return self.header(self.result_code)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
class Host(EventEmitter):
|
||||
l2cap_ctrl_channel: Optional[l2cap.ClassicChannel]
|
||||
l2cap_intr_channel: Optional[l2cap.ClassicChannel]
|
||||
class HID(ABC, EventEmitter):
|
||||
l2cap_ctrl_channel: Optional[l2cap.ClassicChannel] = None
|
||||
l2cap_intr_channel: Optional[l2cap.ClassicChannel] = None
|
||||
connection: Optional[device.Connection] = None
|
||||
|
||||
def __init__(self, device: Device, connection: Connection) -> None:
|
||||
class Role(enum.IntEnum):
|
||||
HOST = 0x00
|
||||
DEVICE = 0x01
|
||||
|
||||
def __init__(self, device: device.Device, role: Role) -> None:
|
||||
super().__init__()
|
||||
self.remote_device_bd_address: Optional[Address] = None
|
||||
self.device = device
|
||||
self.connection = connection
|
||||
|
||||
self.l2cap_ctrl_channel = None
|
||||
self.l2cap_intr_channel = None
|
||||
self.role = role
|
||||
|
||||
# Register ourselves with the L2CAP channel manager
|
||||
device.register_l2cap_server(HID_CONTROL_PSM, self.on_connection)
|
||||
device.register_l2cap_server(HID_INTERRUPT_PSM, self.on_connection)
|
||||
device.register_l2cap_server(HID_CONTROL_PSM, self.on_l2cap_connection)
|
||||
device.register_l2cap_server(HID_INTERRUPT_PSM, self.on_l2cap_connection)
|
||||
|
||||
device.on('connection', self.on_device_connection)
|
||||
|
||||
async def connect_control_channel(self) -> None:
|
||||
# Create a new L2CAP connection - control channel
|
||||
@@ -229,9 +258,18 @@ class Host(EventEmitter):
|
||||
self.l2cap_ctrl_channel = None
|
||||
await channel.disconnect()
|
||||
|
||||
def on_connection(self, l2cap_channel: l2cap.ClassicChannel) -> None:
|
||||
def on_device_connection(self, connection: device.Connection) -> None:
|
||||
self.connection = connection
|
||||
self.remote_device_bd_address = connection.peer_address
|
||||
connection.on('disconnection', self.on_device_disconnection)
|
||||
|
||||
def on_device_disconnection(self, reason: int) -> None:
|
||||
self.connection = None
|
||||
|
||||
def on_l2cap_connection(self, l2cap_channel: l2cap.ClassicChannel) -> None:
|
||||
logger.debug(f'+++ New L2CAP connection: {l2cap_channel}')
|
||||
l2cap_channel.on('open', lambda: self.on_l2cap_channel_open(l2cap_channel))
|
||||
l2cap_channel.on('close', lambda: self.on_l2cap_channel_close(l2cap_channel))
|
||||
|
||||
def on_l2cap_channel_open(self, l2cap_channel: l2cap.ClassicChannel) -> None:
|
||||
if l2cap_channel.psm == HID_CONTROL_PSM:
|
||||
@@ -242,63 +280,20 @@ class Host(EventEmitter):
|
||||
self.l2cap_intr_channel.sink = self.on_intr_pdu
|
||||
logger.debug(f'$$$ L2CAP channel open: {l2cap_channel}')
|
||||
|
||||
def on_ctrl_pdu(self, pdu: bytes) -> None:
|
||||
logger.debug(f'<<< HID CONTROL PDU: {pdu.hex()}')
|
||||
# Here we will receive all kinds of packets, parse and then call respective callbacks
|
||||
message_type = pdu[0] >> 4
|
||||
param = pdu[0] & 0x0F
|
||||
|
||||
if message_type == Message.MessageType.HANDSHAKE:
|
||||
logger.debug(f'<<< HID HANDSHAKE: {Message.Handshake(param).name}')
|
||||
self.emit('handshake', Message.Handshake(param))
|
||||
elif message_type == Message.MessageType.DATA:
|
||||
logger.debug('<<< HID CONTROL DATA')
|
||||
self.emit('data', pdu)
|
||||
elif message_type == Message.MessageType.CONTROL:
|
||||
if param == Message.ControlCommand.SUSPEND:
|
||||
logger.debug('<<< HID SUSPEND')
|
||||
self.emit('suspend', pdu)
|
||||
elif param == Message.ControlCommand.EXIT_SUSPEND:
|
||||
logger.debug('<<< HID EXIT SUSPEND')
|
||||
self.emit('exit_suspend', pdu)
|
||||
elif param == Message.ControlCommand.VIRTUAL_CABLE_UNPLUG:
|
||||
logger.debug('<<< HID VIRTUAL CABLE UNPLUG')
|
||||
self.emit('virtual_cable_unplug')
|
||||
else:
|
||||
logger.debug('<<< HID CONTROL OPERATION UNSUPPORTED')
|
||||
def on_l2cap_channel_close(self, l2cap_channel: l2cap.ClassicChannel) -> None:
|
||||
if l2cap_channel.psm == HID_CONTROL_PSM:
|
||||
self.l2cap_ctrl_channel = None
|
||||
else:
|
||||
logger.debug('<<< HID CONTROL DATA')
|
||||
self.emit('data', pdu)
|
||||
self.l2cap_intr_channel = None
|
||||
logger.debug(f'$$$ L2CAP channel close: {l2cap_channel}')
|
||||
|
||||
@abstractmethod
|
||||
def on_ctrl_pdu(self, pdu: bytes) -> None:
|
||||
pass
|
||||
|
||||
def on_intr_pdu(self, pdu: bytes) -> None:
|
||||
logger.debug(f'<<< HID INTERRUPT PDU: {pdu.hex()}')
|
||||
self.emit("data", pdu)
|
||||
|
||||
def get_report(self, report_type: int, report_id: int, buffer_size: int) -> None:
|
||||
msg = GetReportMessage(
|
||||
report_type=report_type, report_id=report_id, buffer_size=buffer_size
|
||||
)
|
||||
hid_message = bytes(msg)
|
||||
logger.debug(f'>>> HID CONTROL GET REPORT, PDU: {hid_message.hex()}')
|
||||
self.send_pdu_on_ctrl(hid_message)
|
||||
|
||||
def set_report(self, report_type: int, data: bytes):
|
||||
msg = SetReportMessage(report_type=report_type, data=data)
|
||||
hid_message = bytes(msg)
|
||||
logger.debug(f'>>> HID CONTROL SET REPORT, PDU:{hid_message.hex()}')
|
||||
self.send_pdu_on_ctrl(hid_message)
|
||||
|
||||
def get_protocol(self):
|
||||
msg = GetProtocolMessage()
|
||||
hid_message = bytes(msg)
|
||||
logger.debug(f'>>> HID CONTROL GET PROTOCOL, PDU: {hid_message.hex()}')
|
||||
self.send_pdu_on_ctrl(hid_message)
|
||||
|
||||
def set_protocol(self, protocol_mode: int):
|
||||
msg = SetProtocolMessage(protocol_mode=protocol_mode)
|
||||
hid_message = bytes(msg)
|
||||
logger.debug(f'>>> HID CONTROL SET PROTOCOL, PDU: {hid_message.hex()}')
|
||||
self.send_pdu_on_ctrl(hid_message)
|
||||
self.emit("interrupt_data", pdu)
|
||||
|
||||
def send_pdu_on_ctrl(self, msg: bytes) -> None:
|
||||
assert self.l2cap_ctrl_channel
|
||||
@@ -308,26 +303,252 @@ class Host(EventEmitter):
|
||||
assert self.l2cap_intr_channel
|
||||
self.l2cap_intr_channel.send_pdu(msg)
|
||||
|
||||
def send_data(self, data):
|
||||
msg = SendData(data)
|
||||
def send_data(self, data: bytes) -> None:
|
||||
if self.role == HID.Role.HOST:
|
||||
report_type = Message.ReportType.OUTPUT_REPORT
|
||||
else:
|
||||
report_type = Message.ReportType.INPUT_REPORT
|
||||
msg = SendData(data, report_type)
|
||||
hid_message = bytes(msg)
|
||||
logger.debug(f'>>> HID INTERRUPT SEND DATA, PDU: {hid_message.hex()}')
|
||||
self.send_pdu_on_intr(hid_message)
|
||||
if self.l2cap_intr_channel is not None:
|
||||
logger.debug(f'>>> HID INTERRUPT SEND DATA, PDU: {hid_message.hex()}')
|
||||
self.send_pdu_on_intr(hid_message)
|
||||
|
||||
def suspend(self):
|
||||
msg = Suspend()
|
||||
hid_message = bytes(msg)
|
||||
logger.debug(f'>>> HID CONTROL SUSPEND, PDU:{hid_message.hex()}')
|
||||
self.send_pdu_on_ctrl(msg)
|
||||
|
||||
def exit_suspend(self):
|
||||
msg = ExitSuspend()
|
||||
hid_message = bytes(msg)
|
||||
logger.debug(f'>>> HID CONTROL EXIT SUSPEND, PDU:{hid_message.hex()}')
|
||||
self.send_pdu_on_ctrl(msg)
|
||||
|
||||
def virtual_cable_unplug(self):
|
||||
def virtual_cable_unplug(self) -> None:
|
||||
msg = VirtualCableUnplug()
|
||||
hid_message = bytes(msg)
|
||||
logger.debug(f'>>> HID CONTROL VIRTUAL CABLE UNPLUG, PDU: {hid_message.hex()}')
|
||||
self.send_pdu_on_ctrl(msg)
|
||||
self.send_pdu_on_ctrl(hid_message)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
|
||||
class Device(HID):
|
||||
class GetSetReturn(enum.IntEnum):
|
||||
FAILURE = 0x00
|
||||
REPORT_ID_NOT_FOUND = 0x01
|
||||
ERR_UNSUPPORTED_REQUEST = 0x02
|
||||
ERR_UNKNOWN = 0x03
|
||||
ERR_INVALID_PARAMETER = 0x04
|
||||
SUCCESS = 0xFF
|
||||
|
||||
class GetSetStatus:
|
||||
def __init__(self) -> None:
|
||||
self.data = bytearray()
|
||||
self.status = 0
|
||||
|
||||
def __init__(self, device: device.Device) -> None:
|
||||
super().__init__(device, HID.Role.DEVICE)
|
||||
get_report_cb: Optional[Callable[[int, int, int], None]] = None
|
||||
set_report_cb: Optional[Callable[[int, int, int, bytes], None]] = None
|
||||
get_protocol_cb: Optional[Callable[[], None]] = None
|
||||
set_protocol_cb: Optional[Callable[[int], None]] = None
|
||||
|
||||
@override
|
||||
def on_ctrl_pdu(self, pdu: bytes) -> None:
|
||||
logger.debug(f'<<< HID CONTROL PDU: {pdu.hex()}')
|
||||
param = pdu[0] & 0x0F
|
||||
message_type = pdu[0] >> 4
|
||||
|
||||
if message_type == Message.MessageType.GET_REPORT:
|
||||
logger.debug('<<< HID GET REPORT')
|
||||
self.handle_get_report(pdu)
|
||||
elif message_type == Message.MessageType.SET_REPORT:
|
||||
logger.debug('<<< HID SET REPORT')
|
||||
self.handle_set_report(pdu)
|
||||
elif message_type == Message.MessageType.GET_PROTOCOL:
|
||||
logger.debug('<<< HID GET PROTOCOL')
|
||||
self.handle_get_protocol(pdu)
|
||||
elif message_type == Message.MessageType.SET_PROTOCOL:
|
||||
logger.debug('<<< HID SET PROTOCOL')
|
||||
self.handle_set_protocol(pdu)
|
||||
elif message_type == Message.MessageType.DATA:
|
||||
logger.debug('<<< HID CONTROL DATA')
|
||||
self.emit('control_data', pdu)
|
||||
elif message_type == Message.MessageType.CONTROL:
|
||||
if param == Message.ControlCommand.SUSPEND:
|
||||
logger.debug('<<< HID SUSPEND')
|
||||
self.emit('suspend')
|
||||
elif param == Message.ControlCommand.EXIT_SUSPEND:
|
||||
logger.debug('<<< HID EXIT SUSPEND')
|
||||
self.emit('exit_suspend')
|
||||
elif param == Message.ControlCommand.VIRTUAL_CABLE_UNPLUG:
|
||||
logger.debug('<<< HID VIRTUAL CABLE UNPLUG')
|
||||
self.emit('virtual_cable_unplug')
|
||||
else:
|
||||
logger.debug('<<< HID CONTROL OPERATION UNSUPPORTED')
|
||||
else:
|
||||
logger.debug('<<< HID MESSAGE TYPE UNSUPPORTED')
|
||||
self.send_handshake_message(Message.Handshake.ERR_UNSUPPORTED_REQUEST)
|
||||
|
||||
def send_handshake_message(self, result_code: int) -> None:
|
||||
msg = SendHandshakeMessage(result_code)
|
||||
hid_message = bytes(msg)
|
||||
logger.debug(f'>>> HID HANDSHAKE MESSAGE, PDU: {hid_message.hex()}')
|
||||
self.send_pdu_on_ctrl(hid_message)
|
||||
|
||||
def send_control_data(self, report_type: int, data: bytes):
|
||||
msg = SendControlData(report_type=report_type, data=data)
|
||||
hid_message = bytes(msg)
|
||||
logger.debug(f'>>> HID CONTROL DATA: {hid_message.hex()}')
|
||||
self.send_pdu_on_ctrl(hid_message)
|
||||
|
||||
def handle_get_report(self, pdu: bytes):
|
||||
if self.get_report_cb is None:
|
||||
logger.debug("GetReport callback not registered !!")
|
||||
self.send_handshake_message(Message.Handshake.ERR_UNSUPPORTED_REQUEST)
|
||||
return
|
||||
report_type = pdu[0] & 0x03
|
||||
buffer_flag = (pdu[0] & 0x08) >> 3
|
||||
report_id = pdu[1]
|
||||
logger.debug("buffer_flag: " + str(buffer_flag))
|
||||
if buffer_flag == 1:
|
||||
buffer_size = (pdu[3] << 8) | pdu[2]
|
||||
else:
|
||||
buffer_size = 0
|
||||
|
||||
ret = self.get_report_cb(report_id, report_type, buffer_size)
|
||||
assert ret is not None
|
||||
if ret.status == self.GetSetReturn.FAILURE:
|
||||
self.send_handshake_message(Message.Handshake.ERR_UNKNOWN)
|
||||
elif ret.status == self.GetSetReturn.SUCCESS:
|
||||
data = bytearray()
|
||||
data.append(report_id)
|
||||
data.extend(ret.data)
|
||||
if len(data) < self.l2cap_ctrl_channel.mtu: # type: ignore[union-attr]
|
||||
self.send_control_data(report_type=report_type, data=data)
|
||||
else:
|
||||
self.send_handshake_message(Message.Handshake.ERR_INVALID_PARAMETER)
|
||||
elif ret.status == self.GetSetReturn.REPORT_ID_NOT_FOUND:
|
||||
self.send_handshake_message(Message.Handshake.ERR_INVALID_REPORT_ID)
|
||||
elif ret.status == self.GetSetReturn.ERR_INVALID_PARAMETER:
|
||||
self.send_handshake_message(Message.Handshake.ERR_INVALID_PARAMETER)
|
||||
elif ret.status == self.GetSetReturn.ERR_UNSUPPORTED_REQUEST:
|
||||
self.send_handshake_message(Message.Handshake.ERR_UNSUPPORTED_REQUEST)
|
||||
|
||||
def register_get_report_cb(self, cb: Callable[[int, int, int], None]) -> None:
|
||||
self.get_report_cb = cb
|
||||
logger.debug("GetReport callback registered successfully")
|
||||
|
||||
def handle_set_report(self, pdu: bytes):
|
||||
if self.set_report_cb is None:
|
||||
logger.debug("SetReport callback not registered !!")
|
||||
self.send_handshake_message(Message.Handshake.ERR_UNSUPPORTED_REQUEST)
|
||||
return
|
||||
report_type = pdu[0] & 0x03
|
||||
report_id = pdu[1]
|
||||
report_data = pdu[2:]
|
||||
report_size = len(report_data) + 1
|
||||
ret = self.set_report_cb(report_id, report_type, report_size, report_data)
|
||||
assert ret is not None
|
||||
if ret.status == self.GetSetReturn.SUCCESS:
|
||||
self.send_handshake_message(Message.Handshake.SUCCESSFUL)
|
||||
elif ret.status == self.GetSetReturn.ERR_INVALID_PARAMETER:
|
||||
self.send_handshake_message(Message.Handshake.ERR_INVALID_PARAMETER)
|
||||
elif ret.status == self.GetSetReturn.REPORT_ID_NOT_FOUND:
|
||||
self.send_handshake_message(Message.Handshake.ERR_INVALID_REPORT_ID)
|
||||
else:
|
||||
self.send_handshake_message(Message.Handshake.ERR_UNSUPPORTED_REQUEST)
|
||||
|
||||
def register_set_report_cb(
|
||||
self, cb: Callable[[int, int, int, bytes], None]
|
||||
) -> None:
|
||||
self.set_report_cb = cb
|
||||
logger.debug("SetReport callback registered successfully")
|
||||
|
||||
def handle_get_protocol(self, pdu: bytes):
|
||||
if self.get_protocol_cb is None:
|
||||
logger.debug("GetProtocol callback not registered !!")
|
||||
self.send_handshake_message(Message.Handshake.ERR_UNSUPPORTED_REQUEST)
|
||||
return
|
||||
ret = self.get_protocol_cb()
|
||||
assert ret is not None
|
||||
if ret.status == self.GetSetReturn.SUCCESS:
|
||||
self.send_control_data(Message.ReportType.OTHER_REPORT, ret.data)
|
||||
else:
|
||||
self.send_handshake_message(Message.Handshake.ERR_UNSUPPORTED_REQUEST)
|
||||
|
||||
def register_get_protocol_cb(self, cb: Callable[[], None]) -> None:
|
||||
self.get_protocol_cb = cb
|
||||
logger.debug("GetProtocol callback registered successfully")
|
||||
|
||||
def handle_set_protocol(self, pdu: bytes):
|
||||
if self.set_protocol_cb is None:
|
||||
logger.debug("SetProtocol callback not registered !!")
|
||||
self.send_handshake_message(Message.Handshake.ERR_UNSUPPORTED_REQUEST)
|
||||
return
|
||||
ret = self.set_protocol_cb(pdu[0] & 0x01)
|
||||
assert ret is not None
|
||||
if ret.status == self.GetSetReturn.SUCCESS:
|
||||
self.send_handshake_message(Message.Handshake.SUCCESSFUL)
|
||||
else:
|
||||
self.send_handshake_message(Message.Handshake.ERR_UNSUPPORTED_REQUEST)
|
||||
|
||||
def register_set_protocol_cb(self, cb: Callable[[int], None]) -> None:
|
||||
self.set_protocol_cb = cb
|
||||
logger.debug("SetProtocol callback registered successfully")
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
class Host(HID):
|
||||
def __init__(self, device: device.Device) -> None:
|
||||
super().__init__(device, HID.Role.HOST)
|
||||
|
||||
def get_report(self, report_type: int, report_id: int, buffer_size: int) -> None:
|
||||
msg = GetReportMessage(
|
||||
report_type=report_type, report_id=report_id, buffer_size=buffer_size
|
||||
)
|
||||
hid_message = bytes(msg)
|
||||
logger.debug(f'>>> HID CONTROL GET REPORT, PDU: {hid_message.hex()}')
|
||||
self.send_pdu_on_ctrl(hid_message)
|
||||
|
||||
def set_report(self, report_type: int, data: bytes) -> None:
|
||||
msg = SetReportMessage(report_type=report_type, data=data)
|
||||
hid_message = bytes(msg)
|
||||
logger.debug(f'>>> HID CONTROL SET REPORT, PDU:{hid_message.hex()}')
|
||||
self.send_pdu_on_ctrl(hid_message)
|
||||
|
||||
def get_protocol(self) -> None:
|
||||
msg = GetProtocolMessage()
|
||||
hid_message = bytes(msg)
|
||||
logger.debug(f'>>> HID CONTROL GET PROTOCOL, PDU: {hid_message.hex()}')
|
||||
self.send_pdu_on_ctrl(hid_message)
|
||||
|
||||
def set_protocol(self, protocol_mode: int) -> None:
|
||||
msg = SetProtocolMessage(protocol_mode=protocol_mode)
|
||||
hid_message = bytes(msg)
|
||||
logger.debug(f'>>> HID CONTROL SET PROTOCOL, PDU: {hid_message.hex()}')
|
||||
self.send_pdu_on_ctrl(hid_message)
|
||||
|
||||
def suspend(self) -> None:
|
||||
msg = Suspend()
|
||||
hid_message = bytes(msg)
|
||||
logger.debug(f'>>> HID CONTROL SUSPEND, PDU:{hid_message.hex()}')
|
||||
self.send_pdu_on_ctrl(hid_message)
|
||||
|
||||
def exit_suspend(self) -> None:
|
||||
msg = ExitSuspend()
|
||||
hid_message = bytes(msg)
|
||||
logger.debug(f'>>> HID CONTROL EXIT SUSPEND, PDU:{hid_message.hex()}')
|
||||
self.send_pdu_on_ctrl(hid_message)
|
||||
|
||||
@override
|
||||
def on_ctrl_pdu(self, pdu: bytes) -> None:
|
||||
logger.debug(f'<<< HID CONTROL PDU: {pdu.hex()}')
|
||||
param = pdu[0] & 0x0F
|
||||
message_type = pdu[0] >> 4
|
||||
if message_type == Message.MessageType.HANDSHAKE:
|
||||
logger.debug(f'<<< HID HANDSHAKE: {Message.Handshake(param).name}')
|
||||
self.emit('handshake', Message.Handshake(param))
|
||||
elif message_type == Message.MessageType.DATA:
|
||||
logger.debug('<<< HID CONTROL DATA')
|
||||
self.emit('control_data', pdu)
|
||||
elif message_type == Message.MessageType.CONTROL:
|
||||
if param == Message.ControlCommand.VIRTUAL_CABLE_UNPLUG:
|
||||
logger.debug('<<< HID VIRTUAL CABLE UNPLUG')
|
||||
self.emit('virtual_cable_unplug')
|
||||
else:
|
||||
logger.debug('<<< HID CONTROL OPERATION UNSUPPORTED')
|
||||
else:
|
||||
logger.debug('<<< HID MESSAGE TYPE UNSUPPORTED')
|
||||
|
||||
5
examples/hid_keyboard.json
Normal file
5
examples/hid_keyboard.json
Normal file
@@ -0,0 +1,5 @@
|
||||
{
|
||||
"name": "Bumble HID Keyboard",
|
||||
"class_of_device": 9664,
|
||||
"keystore": "JsonKeyStore"
|
||||
}
|
||||
@@ -40,9 +40,9 @@
|
||||
}
|
||||
}
|
||||
function onMouseMove(event) {
|
||||
//console.log(event.clientX, event.clientY)
|
||||
mouseInfo.innerText = `MOUSE: x=${event.clientX}, y=${event.clientY}`
|
||||
send({ type:'mousemove', x: event.clientX, y: event.clientY })
|
||||
//console.log(event.movementX, event.movementY)
|
||||
mouseInfo.innerText = `MOUSE: x=${event.movementX}, y=${event.movementY}`
|
||||
send({ type:'mousemove', x: event.movementX, y: event.movementY })
|
||||
}
|
||||
|
||||
function onKeyDown(event) {
|
||||
|
||||
748
examples/run_hid_device.py
Normal file
748
examples/run_hid_device.py
Normal file
@@ -0,0 +1,748 @@
|
||||
# Copyright 2021-2022 Google LLC
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# https://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Imports
|
||||
# -----------------------------------------------------------------------------
|
||||
import asyncio
|
||||
import sys
|
||||
import os
|
||||
import logging
|
||||
import json
|
||||
import websockets
|
||||
from bumble.colors import color
|
||||
|
||||
from bumble.device import Device
|
||||
from bumble.transport import open_transport_or_link
|
||||
from bumble.core import (
|
||||
BT_BR_EDR_TRANSPORT,
|
||||
BT_L2CAP_PROTOCOL_ID,
|
||||
BT_HUMAN_INTERFACE_DEVICE_SERVICE,
|
||||
BT_HIDP_PROTOCOL_ID,
|
||||
UUID,
|
||||
)
|
||||
from bumble.hci import Address
|
||||
from bumble.hid import (
|
||||
Device as HID_Device,
|
||||
HID_CONTROL_PSM,
|
||||
HID_INTERRUPT_PSM,
|
||||
Message,
|
||||
)
|
||||
from bumble.sdp import (
|
||||
Client as SDP_Client,
|
||||
DataElement,
|
||||
ServiceAttribute,
|
||||
SDP_PUBLIC_BROWSE_ROOT,
|
||||
SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
|
||||
SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
|
||||
SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
|
||||
SDP_ALL_ATTRIBUTES_RANGE,
|
||||
SDP_LANGUAGE_BASE_ATTRIBUTE_ID_LIST_ATTRIBUTE_ID,
|
||||
SDP_ADDITIONAL_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
|
||||
SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID,
|
||||
SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID,
|
||||
)
|
||||
from bumble.utils import AsyncRunner
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# SDP attributes for Bluetooth HID devices
|
||||
SDP_HID_SERVICE_NAME_ATTRIBUTE_ID = 0x0100
|
||||
SDP_HID_SERVICE_DESCRIPTION_ATTRIBUTE_ID = 0x0101
|
||||
SDP_HID_PROVIDER_NAME_ATTRIBUTE_ID = 0x0102
|
||||
SDP_HID_DEVICE_RELEASE_NUMBER_ATTRIBUTE_ID = 0x0200 # [DEPRECATED]
|
||||
SDP_HID_PARSER_VERSION_ATTRIBUTE_ID = 0x0201
|
||||
SDP_HID_DEVICE_SUBCLASS_ATTRIBUTE_ID = 0x0202
|
||||
SDP_HID_COUNTRY_CODE_ATTRIBUTE_ID = 0x0203
|
||||
SDP_HID_VIRTUAL_CABLE_ATTRIBUTE_ID = 0x0204
|
||||
SDP_HID_RECONNECT_INITIATE_ATTRIBUTE_ID = 0x0205
|
||||
SDP_HID_DESCRIPTOR_LIST_ATTRIBUTE_ID = 0x0206
|
||||
SDP_HID_LANGID_BASE_LIST_ATTRIBUTE_ID = 0x0207
|
||||
SDP_HID_SDP_DISABLE_ATTRIBUTE_ID = 0x0208 # [DEPRECATED]
|
||||
SDP_HID_BATTERY_POWER_ATTRIBUTE_ID = 0x0209
|
||||
SDP_HID_REMOTE_WAKE_ATTRIBUTE_ID = 0x020A
|
||||
SDP_HID_PROFILE_VERSION_ATTRIBUTE_ID = 0x020B # DEPRECATED]
|
||||
SDP_HID_SUPERVISION_TIMEOUT_ATTRIBUTE_ID = 0x020C
|
||||
SDP_HID_NORMALLY_CONNECTABLE_ATTRIBUTE_ID = 0x020D
|
||||
SDP_HID_BOOT_DEVICE_ATTRIBUTE_ID = 0x020E
|
||||
SDP_HID_SSR_HOST_MAX_LATENCY_ATTRIBUTE_ID = 0x020F
|
||||
SDP_HID_SSR_HOST_MIN_TIMEOUT_ATTRIBUTE_ID = 0x0210
|
||||
|
||||
# Refer to HID profile specification v1.1.1, "5.3 Service Discovery Protocol (SDP)" for details
|
||||
# HID SDP attribute values
|
||||
LANGUAGE = 0x656E # 0x656E uint16 “en” (English)
|
||||
ENCODING = 0x6A # 0x006A uint16 UTF-8 encoding
|
||||
PRIMARY_LANGUAGE_BASE_ID = 0x100 # 0x0100 uint16 PrimaryLanguageBaseID
|
||||
VERSION_NUMBER = 0x0101 # 0x0101 uint16 version number (v1.1)
|
||||
SERVICE_NAME = b'Bumble HID'
|
||||
SERVICE_DESCRIPTION = b'Bumble'
|
||||
PROVIDER_NAME = b'Bumble'
|
||||
HID_PARSER_VERSION = 0x0111 # uint16 0x0111 (v1.1.1)
|
||||
HID_DEVICE_SUBCLASS = 0xC0 # Combo keyboard/pointing device
|
||||
HID_COUNTRY_CODE = 0x21 # 0x21 Uint8, USA
|
||||
HID_VIRTUAL_CABLE = True # Virtual cable enabled
|
||||
HID_RECONNECT_INITIATE = True # Reconnect initiate enabled
|
||||
REPORT_DESCRIPTOR_TYPE = 0x22 # 0x22 Type = Report Descriptor
|
||||
HID_LANGID_BASE_LANGUAGE = 0x0409 # 0x0409 Language = English (United States)
|
||||
HID_LANGID_BASE_BLUETOOTH_STRING_OFFSET = 0x100 # 0x0100 Default
|
||||
HID_BATTERY_POWER = True # Battery power enabled
|
||||
HID_REMOTE_WAKE = True # Remote wake enabled
|
||||
HID_SUPERVISION_TIMEOUT = 0xC80 # uint16 0xC80 (2s)
|
||||
HID_NORMALLY_CONNECTABLE = True # Normally connectable enabled
|
||||
HID_BOOT_DEVICE = True # Boot device support enabled
|
||||
HID_SSR_HOST_MAX_LATENCY = 0x640 # uint16 0x640 (1s)
|
||||
HID_SSR_HOST_MIN_TIMEOUT = 0xC80 # uint16 0xC80 (2s)
|
||||
HID_REPORT_MAP = bytes( # Text String, 50 Octet Report Descriptor
|
||||
# pylint: disable=line-too-long
|
||||
[
|
||||
0x05,
|
||||
0x01, # Usage Page (Generic Desktop Ctrls)
|
||||
0x09,
|
||||
0x06, # Usage (Keyboard)
|
||||
0xA1,
|
||||
0x01, # Collection (Application)
|
||||
0x85,
|
||||
0x01, # . Report ID (1)
|
||||
0x05,
|
||||
0x07, # . Usage Page (Kbrd/Keypad)
|
||||
0x19,
|
||||
0xE0, # . Usage Minimum (0xE0)
|
||||
0x29,
|
||||
0xE7, # . Usage Maximum (0xE7)
|
||||
0x15,
|
||||
0x00, # . Logical Minimum (0)
|
||||
0x25,
|
||||
0x01, # . Logical Maximum (1)
|
||||
0x75,
|
||||
0x01, # . Report Size (1)
|
||||
0x95,
|
||||
0x08, # . Report Count (8)
|
||||
0x81,
|
||||
0x02, # . Input (Data,Var,Abs,No Wrap,Linear,Preferred State,No Null Position)
|
||||
0x95,
|
||||
0x01, # . Report Count (1)
|
||||
0x75,
|
||||
0x08, # . Report Size (8)
|
||||
0x81,
|
||||
0x03, # . Input (Const,Var,Abs,No Wrap,Linear,Preferred State,No Null Position)
|
||||
0x95,
|
||||
0x05, # . Report Count (5)
|
||||
0x75,
|
||||
0x01, # . Report Size (1)
|
||||
0x05,
|
||||
0x08, # . Usage Page (LEDs)
|
||||
0x19,
|
||||
0x01, # . Usage Minimum (Num Lock)
|
||||
0x29,
|
||||
0x05, # . Usage Maximum (Kana)
|
||||
0x91,
|
||||
0x02, # . Output (Data,Var,Abs,No Wrap,Linear,Preferred State,No Null Position,Non-volatile)
|
||||
0x95,
|
||||
0x01, # . Report Count (1)
|
||||
0x75,
|
||||
0x03, # . Report Size (3)
|
||||
0x91,
|
||||
0x03, # . Output (Const,Var,Abs,No Wrap,Linear,Preferred State,No Null Position,Non-volatile)
|
||||
0x95,
|
||||
0x06, # . Report Count (6)
|
||||
0x75,
|
||||
0x08, # . Report Size (8)
|
||||
0x15,
|
||||
0x00, # . Logical Minimum (0)
|
||||
0x25,
|
||||
0x65, # . Logical Maximum (101)
|
||||
0x05,
|
||||
0x07, # . Usage Page (Kbrd/Keypad)
|
||||
0x19,
|
||||
0x00, # . Usage Minimum (0x00)
|
||||
0x29,
|
||||
0x65, # . Usage Maximum (0x65)
|
||||
0x81,
|
||||
0x00, # . Input (Data,Array,Abs,No Wrap,Linear,Preferred State,No Null Position)
|
||||
0xC0, # End Collection
|
||||
0x05,
|
||||
0x01, # Usage Page (Generic Desktop Ctrls)
|
||||
0x09,
|
||||
0x02, # Usage (Mouse)
|
||||
0xA1,
|
||||
0x01, # Collection (Application)
|
||||
0x85,
|
||||
0x02, # . Report ID (2)
|
||||
0x09,
|
||||
0x01, # . Usage (Pointer)
|
||||
0xA1,
|
||||
0x00, # . Collection (Physical)
|
||||
0x05,
|
||||
0x09, # . Usage Page (Button)
|
||||
0x19,
|
||||
0x01, # . Usage Minimum (0x01)
|
||||
0x29,
|
||||
0x03, # . Usage Maximum (0x03)
|
||||
0x15,
|
||||
0x00, # . Logical Minimum (0)
|
||||
0x25,
|
||||
0x01, # . Logical Maximum (1)
|
||||
0x95,
|
||||
0x03, # . Report Count (3)
|
||||
0x75,
|
||||
0x01, # . Report Size (1)
|
||||
0x81,
|
||||
0x02, # . Input (Data,Var,Abs,No Wrap,Linear,Preferred State,No Null Position)
|
||||
0x95,
|
||||
0x01, # . Report Count (1)
|
||||
0x75,
|
||||
0x05, # . Report Size (5)
|
||||
0x81,
|
||||
0x03, # . Input (Const,Var,Abs,No Wrap,Linear,Preferred State,No Null Position)
|
||||
0x05,
|
||||
0x01, # . Usage Page (Generic Desktop Ctrls)
|
||||
0x09,
|
||||
0x30, # . Usage (X)
|
||||
0x09,
|
||||
0x31, # . Usage (Y)
|
||||
0x15,
|
||||
0x81, # . Logical Minimum (-127)
|
||||
0x25,
|
||||
0x7F, # . Logical Maximum (127)
|
||||
0x75,
|
||||
0x08, # . Report Size (8)
|
||||
0x95,
|
||||
0x02, # . Report Count (2)
|
||||
0x81,
|
||||
0x06, # . Input (Data,Var,Rel,No Wrap,Linear,Preferred State,No Null Position)
|
||||
0xC0, # . End Collection
|
||||
0xC0, # End Collection
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
# Default protocol mode set to report protocol
|
||||
protocol_mode = Message.ProtocolMode.REPORT_PROTOCOL
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
def sdp_records():
|
||||
service_record_handle = 0x00010002
|
||||
return {
|
||||
service_record_handle: [
|
||||
ServiceAttribute(
|
||||
SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID,
|
||||
DataElement.unsigned_integer_32(service_record_handle),
|
||||
),
|
||||
ServiceAttribute(
|
||||
SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID,
|
||||
DataElement.sequence([DataElement.uuid(SDP_PUBLIC_BROWSE_ROOT)]),
|
||||
),
|
||||
ServiceAttribute(
|
||||
SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
|
||||
DataElement.sequence(
|
||||
[DataElement.uuid(BT_HUMAN_INTERFACE_DEVICE_SERVICE)]
|
||||
),
|
||||
),
|
||||
ServiceAttribute(
|
||||
SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
|
||||
DataElement.sequence(
|
||||
[
|
||||
DataElement.sequence(
|
||||
[
|
||||
DataElement.uuid(BT_L2CAP_PROTOCOL_ID),
|
||||
DataElement.unsigned_integer_16(HID_CONTROL_PSM),
|
||||
]
|
||||
),
|
||||
DataElement.sequence(
|
||||
[
|
||||
DataElement.uuid(BT_HIDP_PROTOCOL_ID),
|
||||
]
|
||||
),
|
||||
]
|
||||
),
|
||||
),
|
||||
ServiceAttribute(
|
||||
SDP_LANGUAGE_BASE_ATTRIBUTE_ID_LIST_ATTRIBUTE_ID,
|
||||
DataElement.sequence(
|
||||
[
|
||||
DataElement.unsigned_integer_16(LANGUAGE),
|
||||
DataElement.unsigned_integer_16(ENCODING),
|
||||
DataElement.unsigned_integer_16(PRIMARY_LANGUAGE_BASE_ID),
|
||||
]
|
||||
),
|
||||
),
|
||||
ServiceAttribute(
|
||||
SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
|
||||
DataElement.sequence(
|
||||
[
|
||||
DataElement.sequence(
|
||||
[
|
||||
DataElement.uuid(BT_HUMAN_INTERFACE_DEVICE_SERVICE),
|
||||
DataElement.unsigned_integer_16(VERSION_NUMBER),
|
||||
]
|
||||
),
|
||||
]
|
||||
),
|
||||
),
|
||||
ServiceAttribute(
|
||||
SDP_ADDITIONAL_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
|
||||
DataElement.sequence(
|
||||
[
|
||||
DataElement.sequence(
|
||||
[
|
||||
DataElement.sequence(
|
||||
[
|
||||
DataElement.uuid(BT_L2CAP_PROTOCOL_ID),
|
||||
DataElement.unsigned_integer_16(
|
||||
HID_INTERRUPT_PSM
|
||||
),
|
||||
]
|
||||
),
|
||||
DataElement.sequence(
|
||||
[
|
||||
DataElement.uuid(BT_HIDP_PROTOCOL_ID),
|
||||
]
|
||||
),
|
||||
]
|
||||
),
|
||||
]
|
||||
),
|
||||
),
|
||||
ServiceAttribute(
|
||||
SDP_HID_SERVICE_NAME_ATTRIBUTE_ID,
|
||||
DataElement(DataElement.TEXT_STRING, SERVICE_NAME),
|
||||
),
|
||||
ServiceAttribute(
|
||||
SDP_HID_SERVICE_DESCRIPTION_ATTRIBUTE_ID,
|
||||
DataElement(DataElement.TEXT_STRING, SERVICE_DESCRIPTION),
|
||||
),
|
||||
ServiceAttribute(
|
||||
SDP_HID_PROVIDER_NAME_ATTRIBUTE_ID,
|
||||
DataElement(DataElement.TEXT_STRING, PROVIDER_NAME),
|
||||
),
|
||||
ServiceAttribute(
|
||||
SDP_HID_PARSER_VERSION_ATTRIBUTE_ID,
|
||||
DataElement.unsigned_integer_32(HID_PARSER_VERSION),
|
||||
),
|
||||
ServiceAttribute(
|
||||
SDP_HID_DEVICE_SUBCLASS_ATTRIBUTE_ID,
|
||||
DataElement.unsigned_integer_32(HID_DEVICE_SUBCLASS),
|
||||
),
|
||||
ServiceAttribute(
|
||||
SDP_HID_COUNTRY_CODE_ATTRIBUTE_ID,
|
||||
DataElement.unsigned_integer_32(HID_COUNTRY_CODE),
|
||||
),
|
||||
ServiceAttribute(
|
||||
SDP_HID_VIRTUAL_CABLE_ATTRIBUTE_ID,
|
||||
DataElement.boolean(HID_VIRTUAL_CABLE),
|
||||
),
|
||||
ServiceAttribute(
|
||||
SDP_HID_RECONNECT_INITIATE_ATTRIBUTE_ID,
|
||||
DataElement.boolean(HID_RECONNECT_INITIATE),
|
||||
),
|
||||
ServiceAttribute(
|
||||
SDP_HID_DESCRIPTOR_LIST_ATTRIBUTE_ID,
|
||||
DataElement.sequence(
|
||||
[
|
||||
DataElement.sequence(
|
||||
[
|
||||
DataElement.unsigned_integer_16(REPORT_DESCRIPTOR_TYPE),
|
||||
DataElement(DataElement.TEXT_STRING, HID_REPORT_MAP),
|
||||
]
|
||||
),
|
||||
]
|
||||
),
|
||||
),
|
||||
ServiceAttribute(
|
||||
SDP_HID_LANGID_BASE_LIST_ATTRIBUTE_ID,
|
||||
DataElement.sequence(
|
||||
[
|
||||
DataElement.sequence(
|
||||
[
|
||||
DataElement.unsigned_integer_16(
|
||||
HID_LANGID_BASE_LANGUAGE
|
||||
),
|
||||
DataElement.unsigned_integer_16(
|
||||
HID_LANGID_BASE_BLUETOOTH_STRING_OFFSET
|
||||
),
|
||||
]
|
||||
),
|
||||
]
|
||||
),
|
||||
),
|
||||
ServiceAttribute(
|
||||
SDP_HID_BATTERY_POWER_ATTRIBUTE_ID,
|
||||
DataElement.boolean(HID_BATTERY_POWER),
|
||||
),
|
||||
ServiceAttribute(
|
||||
SDP_HID_REMOTE_WAKE_ATTRIBUTE_ID,
|
||||
DataElement.boolean(HID_REMOTE_WAKE),
|
||||
),
|
||||
ServiceAttribute(
|
||||
SDP_HID_SUPERVISION_TIMEOUT_ATTRIBUTE_ID,
|
||||
DataElement.unsigned_integer_16(HID_SUPERVISION_TIMEOUT),
|
||||
),
|
||||
ServiceAttribute(
|
||||
SDP_HID_NORMALLY_CONNECTABLE_ATTRIBUTE_ID,
|
||||
DataElement.boolean(HID_NORMALLY_CONNECTABLE),
|
||||
),
|
||||
ServiceAttribute(
|
||||
SDP_HID_BOOT_DEVICE_ATTRIBUTE_ID,
|
||||
DataElement.boolean(HID_BOOT_DEVICE),
|
||||
),
|
||||
ServiceAttribute(
|
||||
SDP_HID_SSR_HOST_MAX_LATENCY_ATTRIBUTE_ID,
|
||||
DataElement.unsigned_integer_16(HID_SSR_HOST_MAX_LATENCY),
|
||||
),
|
||||
ServiceAttribute(
|
||||
SDP_HID_SSR_HOST_MIN_TIMEOUT_ATTRIBUTE_ID,
|
||||
DataElement.unsigned_integer_16(HID_SSR_HOST_MIN_TIMEOUT),
|
||||
),
|
||||
]
|
||||
}
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def get_stream_reader(pipe) -> asyncio.StreamReader:
|
||||
loop = asyncio.get_event_loop()
|
||||
reader = asyncio.StreamReader(loop=loop)
|
||||
protocol = asyncio.StreamReaderProtocol(reader)
|
||||
await loop.connect_read_pipe(lambda: protocol, pipe)
|
||||
return reader
|
||||
|
||||
|
||||
class DeviceData:
|
||||
def __init__(self) -> None:
|
||||
self.keyboardData = bytearray(
|
||||
[0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]
|
||||
)
|
||||
self.mouseData = bytearray([0x02, 0x00, 0x00, 0x00])
|
||||
|
||||
|
||||
# Device's live data - Mouse and Keyboard will be stored in this
|
||||
deviceData = DeviceData()
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def keyboard_device(hid_device):
|
||||
|
||||
# Start a Websocket server to receive events from a web page
|
||||
async def serve(websocket, _path):
|
||||
global deviceData
|
||||
while True:
|
||||
try:
|
||||
message = await websocket.recv()
|
||||
print('Received: ', str(message))
|
||||
parsed = json.loads(message)
|
||||
message_type = parsed['type']
|
||||
if message_type == 'keydown':
|
||||
# Only deal with keys a to z for now
|
||||
key = parsed['key']
|
||||
if len(key) == 1:
|
||||
code = ord(key)
|
||||
if ord('a') <= code <= ord('z'):
|
||||
hid_code = 0x04 + code - ord('a')
|
||||
deviceData.keyboardData = bytearray(
|
||||
[
|
||||
0x01,
|
||||
0x00,
|
||||
0x00,
|
||||
hid_code,
|
||||
0x00,
|
||||
0x00,
|
||||
0x00,
|
||||
0x00,
|
||||
0x00,
|
||||
]
|
||||
)
|
||||
hid_device.send_data(deviceData.keyboardData)
|
||||
elif message_type == 'keyup':
|
||||
deviceData.keyboardData = bytearray(
|
||||
[0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]
|
||||
)
|
||||
hid_device.send_data(deviceData.keyboardData)
|
||||
elif message_type == "mousemove":
|
||||
# logical min and max values
|
||||
log_min = -127
|
||||
log_max = 127
|
||||
x = parsed['x']
|
||||
y = parsed['y']
|
||||
# limiting x and y values within logical max and min range
|
||||
x = max(log_min, min(log_max, x))
|
||||
y = max(log_min, min(log_max, y))
|
||||
x_cord = x.to_bytes(signed=True)
|
||||
y_cord = y.to_bytes(signed=True)
|
||||
deviceData.mouseData = bytearray([0x02, 0x00]) + x_cord + y_cord
|
||||
hid_device.send_data(deviceData.mouseData)
|
||||
except websockets.exceptions.ConnectionClosedOK:
|
||||
pass
|
||||
|
||||
# pylint: disable-next=no-member
|
||||
await websockets.serve(serve, 'localhost', 8989)
|
||||
await asyncio.get_event_loop().create_future()
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def main():
|
||||
if len(sys.argv) < 3:
|
||||
print(
|
||||
'Usage: python run_hid_device.py <device-config> <transport-spec> <command>'
|
||||
' where <command> is one of:\n'
|
||||
' test-mode (run with menu enabled for testing)\n'
|
||||
' web (run a keyboard with keypress input from a web page, '
|
||||
'see keyboard.html'
|
||||
)
|
||||
print('example: python run_hid_device.py hid_keyboard.json usb:0 web')
|
||||
print('example: python run_hid_device.py hid_keyboard.json usb:0 test-mode')
|
||||
|
||||
return
|
||||
|
||||
async def handle_virtual_cable_unplug():
|
||||
hid_host_bd_addr = str(hid_device.remote_device_bd_address)
|
||||
await hid_device.disconnect_interrupt_channel()
|
||||
await hid_device.disconnect_control_channel()
|
||||
await device.keystore.delete(hid_host_bd_addr) # type: ignore
|
||||
connection = hid_device.connection
|
||||
if connection is not None:
|
||||
await connection.disconnect()
|
||||
|
||||
def on_hid_data_cb(pdu: bytes):
|
||||
print(f'Received Data, PDU: {pdu.hex()}')
|
||||
|
||||
def on_get_report_cb(report_id: int, report_type: int, buffer_size: int):
|
||||
retValue = hid_device.GetSetStatus()
|
||||
print(
|
||||
"GET_REPORT report_id: "
|
||||
+ str(report_id)
|
||||
+ "report_type: "
|
||||
+ str(report_type)
|
||||
+ "buffer_size:"
|
||||
+ str(buffer_size)
|
||||
)
|
||||
if report_type == Message.ReportType.INPUT_REPORT:
|
||||
if report_id == 1:
|
||||
retValue.data = deviceData.keyboardData[1:]
|
||||
retValue.status = hid_device.GetSetReturn.SUCCESS
|
||||
elif report_id == 2:
|
||||
retValue.data = deviceData.mouseData[1:]
|
||||
retValue.status = hid_device.GetSetReturn.SUCCESS
|
||||
else:
|
||||
retValue.status = hid_device.GetSetReturn.REPORT_ID_NOT_FOUND
|
||||
|
||||
if buffer_size:
|
||||
data_len = buffer_size - 1
|
||||
retValue.data = retValue.data[:data_len]
|
||||
elif report_type == Message.ReportType.OUTPUT_REPORT:
|
||||
# This sample app has nothing to do with the report received, to enable PTS
|
||||
# testing, we will return single byte random data.
|
||||
retValue.data = bytearray([0x11])
|
||||
retValue.status = hid_device.GetSetReturn.SUCCESS
|
||||
elif report_type == Message.ReportType.FEATURE_REPORT:
|
||||
retValue.status = hid_device.GetSetReturn.ERR_INVALID_PARAMETER
|
||||
elif report_type == Message.ReportType.OTHER_REPORT:
|
||||
if report_id == 3:
|
||||
retValue.status = hid_device.GetSetReturn.REPORT_ID_NOT_FOUND
|
||||
else:
|
||||
retValue.status = hid_device.GetSetReturn.FAILURE
|
||||
|
||||
return retValue
|
||||
|
||||
def on_set_report_cb(
|
||||
report_id: int, report_type: int, report_size: int, data: bytes
|
||||
):
|
||||
retValue = hid_device.GetSetStatus()
|
||||
print(
|
||||
"SET_REPORT report_id: "
|
||||
+ str(report_id)
|
||||
+ "report_type: "
|
||||
+ str(report_type)
|
||||
+ "report_size "
|
||||
+ str(report_size)
|
||||
+ "data:"
|
||||
+ str(data)
|
||||
)
|
||||
if report_type == Message.ReportType.FEATURE_REPORT:
|
||||
retValue.status = hid_device.GetSetReturn.ERR_INVALID_PARAMETER
|
||||
elif report_type == Message.ReportType.INPUT_REPORT:
|
||||
if report_id == 1 and report_size != len(deviceData.keyboardData):
|
||||
retValue.status = hid_device.GetSetReturn.ERR_INVALID_PARAMETER
|
||||
elif report_id == 2 and report_size != len(deviceData.mouseData):
|
||||
retValue.status = hid_device.GetSetReturn.ERR_INVALID_PARAMETER
|
||||
elif report_id == 3:
|
||||
retValue.status = hid_device.GetSetReturn.REPORT_ID_NOT_FOUND
|
||||
else:
|
||||
retValue.status = hid_device.GetSetReturn.SUCCESS
|
||||
else:
|
||||
retValue.status = hid_device.GetSetReturn.SUCCESS
|
||||
|
||||
return retValue
|
||||
|
||||
def on_get_protocol_cb():
|
||||
retValue = hid_device.GetSetStatus()
|
||||
retValue.data = protocol_mode.to_bytes()
|
||||
retValue.status = hid_device.GetSetReturn.SUCCESS
|
||||
return retValue
|
||||
|
||||
def on_set_protocol_cb(protocol: int):
|
||||
retValue = hid_device.GetSetStatus()
|
||||
# We do not support SET_PROTOCOL.
|
||||
print("SET_PROTOCOL report_id: " + str(protocol))
|
||||
retValue.status = hid_device.GetSetReturn.ERR_UNSUPPORTED_REQUEST
|
||||
return retValue
|
||||
|
||||
def on_virtual_cable_unplug_cb():
|
||||
print(f'Received Virtual Cable Unplug')
|
||||
asyncio.create_task(handle_virtual_cable_unplug())
|
||||
|
||||
print('<<< connecting to HCI...')
|
||||
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
|
||||
print('<<< connected')
|
||||
|
||||
# Create a device
|
||||
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
|
||||
device.classic_enabled = True
|
||||
|
||||
# Create and register HID device
|
||||
hid_device = HID_Device(device)
|
||||
|
||||
# Register for call backs
|
||||
hid_device.on('interrupt_data', on_hid_data_cb)
|
||||
|
||||
hid_device.register_get_report_cb(on_get_report_cb)
|
||||
hid_device.register_set_report_cb(on_set_report_cb)
|
||||
hid_device.register_get_protocol_cb(on_get_protocol_cb)
|
||||
hid_device.register_set_protocol_cb(on_set_protocol_cb)
|
||||
|
||||
# Register for virtual cable unplug call back
|
||||
hid_device.on('virtual_cable_unplug', on_virtual_cable_unplug_cb)
|
||||
|
||||
# Setup the SDP to advertise HID Device service
|
||||
device.sdp_service_records = sdp_records()
|
||||
|
||||
# Start the controller
|
||||
await device.power_on()
|
||||
|
||||
# Start being discoverable and connectable
|
||||
await device.set_discoverable(True)
|
||||
await device.set_connectable(True)
|
||||
|
||||
async def menu():
|
||||
reader = await get_stream_reader(sys.stdin)
|
||||
while True:
|
||||
print(
|
||||
"\n************************ HID Device Menu *****************************\n"
|
||||
)
|
||||
print(" 1. Connect Control Channel")
|
||||
print(" 2. Connect Interrupt Channel")
|
||||
print(" 3. Disconnect Control Channel")
|
||||
print(" 4. Disconnect Interrupt Channel")
|
||||
print(" 5. Send Report on Interrupt Channel")
|
||||
print(" 6. Virtual Cable Unplug")
|
||||
print(" 7. Disconnect device")
|
||||
print(" 8. Delete Bonding")
|
||||
print(" 9. Re-connect to device")
|
||||
print("10. Exit ")
|
||||
print("\nEnter your choice : \n")
|
||||
|
||||
choice = await reader.readline()
|
||||
choice = choice.decode('utf-8').strip()
|
||||
|
||||
if choice == '1':
|
||||
await hid_device.connect_control_channel()
|
||||
|
||||
elif choice == '2':
|
||||
await hid_device.connect_interrupt_channel()
|
||||
|
||||
elif choice == '3':
|
||||
await hid_device.disconnect_control_channel()
|
||||
|
||||
elif choice == '4':
|
||||
await hid_device.disconnect_interrupt_channel()
|
||||
|
||||
elif choice == '5':
|
||||
print(" 1. Report ID 0x01")
|
||||
print(" 2. Report ID 0x02")
|
||||
print(" 3. Invalid Report ID")
|
||||
|
||||
choice1 = await reader.readline()
|
||||
choice1 = choice1.decode('utf-8').strip()
|
||||
|
||||
if choice1 == '1':
|
||||
data = bytearray(
|
||||
[0x01, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00]
|
||||
)
|
||||
hid_device.send_data(data)
|
||||
data = bytearray(
|
||||
[0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]
|
||||
)
|
||||
hid_device.send_data(data)
|
||||
|
||||
elif choice1 == '2':
|
||||
data = bytearray([0x02, 0x00, 0x00, 0xF6])
|
||||
hid_device.send_data(data)
|
||||
data = bytearray([0x02, 0x00, 0x00, 0x00])
|
||||
hid_device.send_data(data)
|
||||
|
||||
elif choice1 == '3':
|
||||
data = bytearray([0x00, 0x00, 0x00, 0x00])
|
||||
hid_device.send_data(data)
|
||||
data = bytearray([0x00, 0x00, 0x00, 0x00])
|
||||
hid_device.send_data(data)
|
||||
|
||||
else:
|
||||
print('Incorrect option selected')
|
||||
|
||||
elif choice == '6':
|
||||
hid_device.virtual_cable_unplug()
|
||||
try:
|
||||
hid_host_bd_addr = str(hid_device.remote_device_bd_address)
|
||||
await device.keystore.delete(hid_host_bd_addr)
|
||||
except KeyError:
|
||||
print('Device not found or Device already unpaired.')
|
||||
|
||||
elif choice == '7':
|
||||
connection = hid_device.connection
|
||||
if connection is not None:
|
||||
await connection.disconnect()
|
||||
else:
|
||||
print("Already disconnected from device")
|
||||
|
||||
elif choice == '8':
|
||||
try:
|
||||
hid_host_bd_addr = str(hid_device.remote_device_bd_address)
|
||||
await device.keystore.delete(hid_host_bd_addr)
|
||||
except KeyError:
|
||||
print('Device NOT found or Device already unpaired.')
|
||||
|
||||
elif choice == '9':
|
||||
hid_host_bd_addr = str(hid_device.remote_device_bd_address)
|
||||
connection = await device.connect(
|
||||
hid_host_bd_addr, transport=BT_BR_EDR_TRANSPORT
|
||||
)
|
||||
await connection.authenticate()
|
||||
await connection.encrypt()
|
||||
|
||||
elif choice == '10':
|
||||
sys.exit("Exit successful")
|
||||
|
||||
else:
|
||||
print("Invalid option selected.")
|
||||
|
||||
if (len(sys.argv) > 3) and (sys.argv[3] == 'test-mode'):
|
||||
# Test mode for PTS/Unit testing
|
||||
await menu()
|
||||
else:
|
||||
# default option is using keyboard.html (web)
|
||||
print("Executing in Web mode")
|
||||
await keyboard_device(hid_device)
|
||||
|
||||
await hci_source.wait_for_termination()
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
|
||||
asyncio.run(main())
|
||||
@@ -285,7 +285,10 @@ async def main():
|
||||
print('example: run_hid_host.py classic1.json usb:0 E1:CA:72:48:C4:E8/P')
|
||||
return
|
||||
|
||||
def on_hid_data_cb(pdu):
|
||||
def on_hid_control_data_cb(pdu: bytes):
|
||||
print(f'Received Control Data, PDU: {pdu.hex()}')
|
||||
|
||||
def on_hid_interrupt_data_cb(pdu: bytes):
|
||||
report_type = pdu[0] & 0x0F
|
||||
if len(pdu) == 1:
|
||||
print(color(f'Warning: No report received', 'yellow'))
|
||||
@@ -305,7 +308,7 @@ async def main():
|
||||
|
||||
if (report_length <= 1) or (report_id == 0):
|
||||
return
|
||||
|
||||
# Parse report over interrupt channel
|
||||
if report_type == Message.ReportType.INPUT_REPORT:
|
||||
ReportParser.parse_input_report(pdu[1:]) # type: ignore
|
||||
|
||||
@@ -313,7 +316,9 @@ async def main():
|
||||
await hid_host.disconnect_interrupt_channel()
|
||||
await hid_host.disconnect_control_channel()
|
||||
await device.keystore.delete(target_address) # type: ignore
|
||||
await connection.disconnect()
|
||||
connection = hid_host.connection
|
||||
if connection is not None:
|
||||
await connection.disconnect()
|
||||
|
||||
def on_hid_virtual_cable_unplug_cb():
|
||||
asyncio.create_task(handle_virtual_cable_unplug())
|
||||
@@ -325,6 +330,18 @@ async def main():
|
||||
# Create a device
|
||||
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
|
||||
device.classic_enabled = True
|
||||
|
||||
# Create HID host and start it
|
||||
print('@@@ Starting HID Host...')
|
||||
hid_host = Host(device)
|
||||
|
||||
# Register for HID data call back
|
||||
hid_host.on('interrupt_data', on_hid_interrupt_data_cb)
|
||||
hid_host.on('control_data', on_hid_control_data_cb)
|
||||
|
||||
# Register for virtual cable unplug call back
|
||||
hid_host.on('virtual_cable_unplug', on_hid_virtual_cable_unplug_cb)
|
||||
|
||||
await device.power_on()
|
||||
|
||||
# Connect to a peer
|
||||
@@ -345,16 +362,6 @@ async def main():
|
||||
|
||||
await get_hid_device_sdp_record(connection)
|
||||
|
||||
# Create HID host and start it
|
||||
print('@@@ Starting HID Host...')
|
||||
hid_host = Host(device, connection)
|
||||
|
||||
# Register for HID data call back
|
||||
hid_host.on('data', on_hid_data_cb)
|
||||
|
||||
# Register for virtual cable unplug call back
|
||||
hid_host.on('virtual_cable_unplug', on_hid_virtual_cable_unplug_cb)
|
||||
|
||||
async def menu():
|
||||
reader = await get_stream_reader(sys.stdin)
|
||||
while True:
|
||||
@@ -369,13 +376,14 @@ async def main():
|
||||
print(" 6. Set Report")
|
||||
print(" 7. Set Protocol Mode")
|
||||
print(" 8. Get Protocol Mode")
|
||||
print(" 9. Send Report")
|
||||
print(" 9. Send Report on Interrupt Channel")
|
||||
print("10. Suspend")
|
||||
print("11. Exit Suspend")
|
||||
print("12. Virtual Cable Unplug")
|
||||
print("13. Disconnect device")
|
||||
print("14. Delete Bonding")
|
||||
print("15. Re-connect to device")
|
||||
print("16. Exit")
|
||||
print("\nEnter your choice : \n")
|
||||
|
||||
choice = await reader.readline()
|
||||
@@ -394,21 +402,40 @@ async def main():
|
||||
await hid_host.disconnect_interrupt_channel()
|
||||
|
||||
elif choice == '5':
|
||||
print(" 1. Report ID 0x02")
|
||||
print(" 2. Report ID 0x03")
|
||||
print(" 3. Report ID 0x05")
|
||||
print(" 1. Input Report with ID 0x01")
|
||||
print(" 2. Input Report with ID 0x02")
|
||||
print(" 3. Input Report with ID 0x0F - Invalid ReportId")
|
||||
print(" 4. Output Report with ID 0x02")
|
||||
print(" 5. Feature Report with ID 0x05 - Unsupported Request")
|
||||
print(" 6. Input Report with ID 0x02, BufferSize 3")
|
||||
print(" 7. Output Report with ID 0x03, BufferSize 2")
|
||||
print(" 8. Feature Report with ID 0x05, BufferSize 3")
|
||||
choice1 = await reader.readline()
|
||||
choice1 = choice1.decode('utf-8').strip()
|
||||
|
||||
if choice1 == '1':
|
||||
hid_host.get_report(1, 2, 3)
|
||||
hid_host.get_report(1, 1, 0)
|
||||
|
||||
elif choice1 == '2':
|
||||
hid_host.get_report(2, 3, 2)
|
||||
hid_host.get_report(1, 2, 0)
|
||||
|
||||
elif choice1 == '3':
|
||||
hid_host.get_report(3, 5, 3)
|
||||
hid_host.get_report(1, 5, 0)
|
||||
|
||||
elif choice1 == '4':
|
||||
hid_host.get_report(2, 2, 0)
|
||||
|
||||
elif choice1 == '5':
|
||||
hid_host.get_report(3, 15, 0)
|
||||
|
||||
elif choice1 == '6':
|
||||
hid_host.get_report(1, 2, 3)
|
||||
|
||||
elif choice1 == '7':
|
||||
hid_host.get_report(2, 3, 2)
|
||||
|
||||
elif choice1 == '8':
|
||||
hid_host.get_report(3, 5, 3)
|
||||
else:
|
||||
print('Incorrect option selected')
|
||||
|
||||
@@ -484,6 +511,7 @@ async def main():
|
||||
hid_host.virtual_cable_unplug()
|
||||
try:
|
||||
await device.keystore.delete(target_address)
|
||||
print("Unpair successful")
|
||||
except KeyError:
|
||||
print('Device not found or Device already unpaired.')
|
||||
|
||||
@@ -513,6 +541,9 @@ async def main():
|
||||
await connection.authenticate()
|
||||
await connection.encrypt()
|
||||
|
||||
elif choice == '16':
|
||||
sys.exit("Exit successful")
|
||||
|
||||
else:
|
||||
print("Invalid option selected.")
|
||||
|
||||
|
||||
Reference in New Issue
Block a user