diff --git a/bumble/hid.py b/bumble/hid.py index cc522da..c0fb053 100644 --- a/bumble/hid.py +++ b/bumble/hid.py @@ -21,8 +21,10 @@ import logging import enum import struct +from abc import ABC, abstractmethod from pyee import EventEmitter from typing import Optional, Callable, TYPE_CHECKING +from typing_extensions import override from bumble import l2cap, device from bumble.colors import color @@ -193,7 +195,7 @@ class SendHandshakeMessage(Message): # ----------------------------------------------------------------------------- -class HID(EventEmitter): +class HID(ABC, EventEmitter): l2cap_ctrl_channel: Optional[l2cap.ClassicChannel] = None l2cap_intr_channel: Optional[l2cap.ClassicChannel] = None connection: Optional[device.Connection] = None @@ -214,21 +216,6 @@ class HID(EventEmitter): device.on('connection', self.on_device_connection) - def handle_get_report(self, pdu: bytes): - pass - - def handle_set_report(self, pdu: bytes): - pass - - def handle_get_protocol(self, pdu: bytes): - pass - - def handle_set_protocol(self, pdu: bytes): - pass - - def send_handshake_message(self, result_code: int): - pass - async def connect_control_channel(self) -> None: # Create a new L2CAP connection - control channel try: @@ -300,44 +287,9 @@ class HID(EventEmitter): self.l2cap_intr_channel = None logger.debug(f'$$$ L2CAP channel close: {l2cap_channel}') + @abstractmethod def on_ctrl_pdu(self, pdu: bytes) -> None: - logger.debug(f'<<< HID CONTROL PDU: {pdu.hex()}') - param = pdu[0] & 0x0F - message_type = pdu[0] >> 4 - - if message_type == Message.MessageType.HANDSHAKE: - logger.debug(f'<<< HID HANDSHAKE: {Message.Handshake(param).name}') - self.emit('handshake', Message.Handshake(param)) - elif message_type == Message.MessageType.GET_REPORT: - logger.debug('<<< HID GET REPORT') - self.handle_get_report(pdu) - elif message_type == Message.MessageType.SET_REPORT: - logger.debug('<<< HID SET REPORT') - self.handle_set_report(pdu) - elif message_type == Message.MessageType.GET_PROTOCOL: - logger.debug('<<< HID GET PROTOCOL') - self.handle_get_protocol(pdu) - elif message_type == Message.MessageType.SET_PROTOCOL: - logger.debug('<<< HID SET PROTOCOL') - self.handle_set_protocol(pdu) - elif message_type == Message.MessageType.DATA: - logger.debug('<<< HID CONTROL DATA') - self.emit('control_data', pdu) - elif message_type == Message.MessageType.CONTROL: - if param == Message.ControlCommand.SUSPEND: - logger.debug('<<< HID SUSPEND') - self.emit('suspend') - elif param == Message.ControlCommand.EXIT_SUSPEND: - logger.debug('<<< HID EXIT SUSPEND') - self.emit('exit_suspend') - elif param == Message.ControlCommand.VIRTUAL_CABLE_UNPLUG: - logger.debug('<<< HID VIRTUAL CABLE UNPLUG') - self.emit('virtual_cable_unplug') - else: - logger.debug('<<< HID CONTROL OPERATION UNSUPPORTED') - else: - logger.debug('<<< HID MESSAGE TYPE UNSUPPORTED') - self.send_handshake_message(Message.Handshake.ERR_UNSUPPORTED_REQUEST) + pass def on_intr_pdu(self, pdu: bytes) -> None: logger.debug(f'<<< HID INTERRUPT PDU: {pdu.hex()}') @@ -393,6 +345,43 @@ class Device(HID): get_protocol_cb: Optional[Callable[[], None]] = None set_protocol_cb: Optional[Callable[[int], None]] = None + @override + def on_ctrl_pdu(self, pdu: bytes) -> None: + logger.debug(f'<<< HID CONTROL PDU: {pdu.hex()}') + param = pdu[0] & 0x0F + message_type = pdu[0] >> 4 + + if message_type == Message.MessageType.GET_REPORT: + logger.debug('<<< HID GET REPORT') + self.handle_get_report(pdu) + elif message_type == Message.MessageType.SET_REPORT: + logger.debug('<<< HID SET REPORT') + self.handle_set_report(pdu) + elif message_type == Message.MessageType.GET_PROTOCOL: + logger.debug('<<< HID GET PROTOCOL') + self.handle_get_protocol(pdu) + elif message_type == Message.MessageType.SET_PROTOCOL: + logger.debug('<<< HID SET PROTOCOL') + self.handle_set_protocol(pdu) + elif message_type == Message.MessageType.DATA: + logger.debug('<<< HID CONTROL DATA') + self.emit('control_data', pdu) + elif message_type == Message.MessageType.CONTROL: + if param == Message.ControlCommand.SUSPEND: + logger.debug('<<< HID SUSPEND') + self.emit('suspend') + elif param == Message.ControlCommand.EXIT_SUSPEND: + logger.debug('<<< HID EXIT SUSPEND') + self.emit('exit_suspend') + elif param == Message.ControlCommand.VIRTUAL_CABLE_UNPLUG: + logger.debug('<<< HID VIRTUAL CABLE UNPLUG') + self.emit('virtual_cable_unplug') + else: + logger.debug('<<< HID CONTROL OPERATION UNSUPPORTED') + else: + logger.debug('<<< HID MESSAGE TYPE UNSUPPORTED') + self.send_handshake_message(Message.Handshake.ERR_UNSUPPORTED_REQUEST) + def send_handshake_message(self, result_code: int) -> None: msg = SendHandshakeMessage(result_code) hid_message = bytes(msg) @@ -543,3 +532,23 @@ class Host(HID): hid_message = bytes(msg) logger.debug(f'>>> HID CONTROL EXIT SUSPEND, PDU:{hid_message.hex()}') self.send_pdu_on_ctrl(hid_message) + + @override + def on_ctrl_pdu(self, pdu: bytes) -> None: + logger.debug(f'<<< HID CONTROL PDU: {pdu.hex()}') + param = pdu[0] & 0x0F + message_type = pdu[0] >> 4 + if message_type == Message.MessageType.HANDSHAKE: + logger.debug(f'<<< HID HANDSHAKE: {Message.Handshake(param).name}') + self.emit('handshake', Message.Handshake(param)) + elif message_type == Message.MessageType.DATA: + logger.debug('<<< HID CONTROL DATA') + self.emit('control_data', pdu) + elif message_type == Message.MessageType.CONTROL: + if param == Message.ControlCommand.VIRTUAL_CABLE_UNPLUG: + logger.debug('<<< HID VIRTUAL CABLE UNPLUG') + self.emit('virtual_cable_unplug') + else: + logger.debug('<<< HID CONTROL OPERATION UNSUPPORTED') + else: + logger.debug('<<< HID MESSAGE TYPE UNSUPPORTED')