From e0dee2135f63285690a91b6daf524c8baba74725 Mon Sep 17 00:00:00 2001 From: Josh Wu Date: Sat, 8 Nov 2025 23:23:27 +0800 Subject: [PATCH] Basic LMP implementation --- bumble/controller.py | 366 ++++++++++++++++++++++++++++++++++++------- bumble/link.py | 150 ++---------------- bumble/lmp.py | 306 ++++++++++++++++++++++++++++++++++++ tests/test_utils.py | 8 +- 4 files changed, 633 insertions(+), 197 deletions(-) create mode 100644 bumble/lmp.py diff --git a/bumble/controller.py b/bumble/controller.py index 547bceaf..893a6dc2 100644 --- a/bumble/controller.py +++ b/bumble/controller.py @@ -23,9 +23,9 @@ import itertools import logging import random import struct -from typing import TYPE_CHECKING, Any, Optional, Union +from typing import TYPE_CHECKING, Any, Optional, Union, cast -from bumble import hci +from bumble import hci, lmp from bumble.colors import color from bumble.core import PhysicalTransport @@ -56,6 +56,14 @@ class CisLink: data_paths: set[int] = dataclasses.field(default_factory=set) +# ----------------------------------------------------------------------------- +@dataclasses.dataclass +class ScoLink: + handle: int + link_type: int + peer_address: hci.Address + + # ----------------------------------------------------------------------------- @dataclasses.dataclass class Connection: @@ -66,6 +74,7 @@ class Connection: link: Any transport: int link_type: int + classic_allow_role_switch: bool = False def __post_init__(self) -> None: self.assembler = hci.HCI_AclDataPacketAssembler(self.on_acl_pdu) @@ -96,6 +105,8 @@ class Controller: hci.Address, Connection ] # Connections where this controller is the peripheral classic_connections: dict[hci.Address, Connection] # Connections in BR/EDR + classic_pending_commands: dict[hci.Address, dict[lmp.Opcode, asyncio.Future[int]]] + sco_links: dict[hci.Address, ScoLink] # SCO links by address central_cis_links: dict[int, CisLink] # CIS links by handle peripheral_cis_links: dict[int, CisLink] # CIS links by handle @@ -151,6 +162,7 @@ class Controller: advertising_data: Optional[bytes] = None advertising_timer_handle: Optional[asyncio.Handle] = None classic_scan_enable: int = 0 + classic_allow_role_switch: bool = True _random_address: hci.Address = hci.Address('00:00:00:00:00:00') @@ -167,6 +179,8 @@ class Controller: self.central_connections = {} self.peripheral_connections = {} self.classic_connections = {} + self.sco_links = {} + self.classic_pending_commands = {} self.central_cis_links = {} self.peripheral_cis_links = {} self.default_phy = { @@ -293,7 +307,7 @@ class Controller: f'{color("CONTROLLER -> HOST", "green")}: {packet}' ) if self.host: - self.host.on_packet(bytes(packet)) + asyncio.get_running_loop().call_soon(self.host.on_packet, bytes(packet)) # This method allows the controller to emulate the same API as a transport source async def wait_for_termination(self) -> None: @@ -303,25 +317,20 @@ class Controller: # Link connections ############################################################ def allocate_connection_handle(self) -> int: - handle = 0 - max_handle = 0 - for connection in itertools.chain( - self.central_connections.values(), - self.peripheral_connections.values(), - self.classic_connections.values(), - ): - max_handle = max(max_handle, connection.handle) - if connection.handle == handle: - # Already used, continue searching after the current max - handle = max_handle + 1 - for cis_handle in itertools.chain( - self.central_cis_links.keys(), self.peripheral_cis_links.keys() - ): - max_handle = max(max_handle, cis_handle) - if cis_handle == handle: - # Already used, continue searching after the current max - handle = max_handle + 1 - return handle + current_handles = set( + cast(Connection | CisLink | ScoLink, link).handle + for link in itertools.chain( + self.central_connections.values(), + self.peripheral_connections.values(), + self.classic_connections.values(), + self.sco_links.values(), + self.central_cis_links.values(), + self.peripheral_cis_links.values(), + ) + ) + return next( + handle for handle in range(0xEFF + 1) if handle not in current_handles + ) def find_le_connection_by_address( self, address: hci.Address @@ -363,6 +372,12 @@ class Controller: return connection return None + def find_classic_sco_link_by_handle(self, handle: int) -> Optional[ScoLink]: + for connection in self.sco_links.values(): + if connection.handle == handle: + return connection + return None + def find_iso_link_by_handle(self, handle: int) -> Optional[CisLink]: return self.central_cis_links.get(handle) or self.peripheral_cis_links.get( handle @@ -669,9 +684,75 @@ class Controller: # Classic link connections ############################################################ + def send_lmp_packet( + self, receiver_address: hci.Address, packet: lmp.Packet + ) -> asyncio.Future[int]: + loop = asyncio.get_running_loop() + assert self.link + self.link.send_lmp_packet(self, receiver_address, packet) + future = self.classic_pending_commands.setdefault(receiver_address, {})[ + packet.opcode + ] = loop.create_future() + return future + + def on_lmp_packet(self, sender_address: hci.Address, packet: lmp.Packet): + if isinstance(packet, (lmp.LmpAccepted, lmp.LmpAcceptedExt)): + if future := self.classic_pending_commands.setdefault( + sender_address, {} + ).get(packet.response_opcode): + future.set_result(hci.HCI_SUCCESS) + else: + logger.error("!!! Unhandled packet: %s", packet) + elif isinstance(packet, (lmp.LmpNotAccepted, lmp.LmpNotAcceptedExt)): + if future := self.classic_pending_commands.setdefault( + sender_address, {} + ).get(packet.response_opcode): + future.set_result(packet.error_code) + else: + logger.error("!!! Unhandled packet: %s", packet) + elif isinstance(packet, (lmp.LmpHostConnectionReq)): + self.on_classic_connection_request( + sender_address, hci.HCI_Connection_Complete_Event.LinkType.ACL + ) + elif isinstance(packet, (lmp.LmpScoLinkReq)): + self.on_classic_connection_request( + sender_address, hci.HCI_Connection_Complete_Event.LinkType.SCO + ) + elif isinstance(packet, (lmp.LmpEscoLinkReq)): + self.on_classic_connection_request( + sender_address, hci.HCI_Connection_Complete_Event.LinkType.ESCO + ) + elif isinstance(packet, (lmp.LmpDetach)): + self.on_classic_disconnected( + sender_address, hci.HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR + ) + elif isinstance(packet, (lmp.LmpSwitchReq)): + self.on_classic_role_change_request(sender_address) + elif isinstance(packet, (lmp.LmpRemoveScoLinkReq, lmp.LmpRemoveEscoLinkReq)): + self.on_classic_sco_disconnected(sender_address, packet.error_code) + else: + logger.error("!!! Unhandled packet: %s", packet) + def on_classic_connection_request( self, peer_address: hci.Address, link_type: int ) -> None: + if link_type == hci.HCI_Connection_Complete_Event.LinkType.ACL: + self.classic_connections[peer_address] = Connection( + controller=self, + handle=0, + role=hci.Role.PERIPHERAL, + peer_address=peer_address, + link=self.link, + transport=PhysicalTransport.BR_EDR, + link_type=link_type, + classic_allow_role_switch=self.classic_allow_role_switch, + ) + else: + self.sco_links[peer_address] = ScoLink( + handle=0, + link_type=link_type, + peer_address=peer_address, + ) self.send_hci_packet( hci.HCI_Connection_Request_Event( bd_addr=peer_address, @@ -686,13 +767,13 @@ class Controller: if status == hci.HCI_SUCCESS: # Allocate (or reuse) a connection handle peer_address = peer_address - connection = self.classic_connections.get(peer_address) - if connection is None: - connection_handle = self.allocate_connection_handle() + connection_handle = self.allocate_connection_handle() + if connection := self.classic_connections.get(peer_address): + connection.handle = connection_handle + else: connection = Connection( controller=self, handle=connection_handle, - # hci.Role doesn't matter in Classic because they are managed by hci.HCI_Role_Change and hci.HCI_Role_Discovery role=hci.Role.CENTRAL, peer_address=peer_address, link=self.link, @@ -703,8 +784,6 @@ class Controller: logger.debug( f'New CLASSIC connection handle: 0x{connection_handle:04X}' ) - else: - connection_handle = connection.handle self.send_hci_packet( hci.HCI_Connection_Complete_Event( status=status, @@ -728,7 +807,7 @@ class Controller: def on_classic_disconnected(self, peer_address: hci.Address, reason: int) -> None: # Send a disconnection complete event - if connection := self.classic_connections.get(peer_address): + if connection := self.classic_connections.pop(peer_address, None): self.send_hci_packet( hci.HCI_Disconnection_Complete_Event( status=hci.HCI_SUCCESS, @@ -736,17 +815,51 @@ class Controller: reason=reason, ) ) - - # Remove the connection - del self.classic_connections[peer_address] else: logger.warning(f'!!! No classic connection found for {peer_address}') - def on_classic_role_change(self, peer_address: hci.Address, new_role: int) -> None: + def on_classic_sco_disconnected( + self, peer_address: hci.Address, reason: int + ) -> None: + # Send a disconnection complete event + if sco_link := self.sco_links.pop(peer_address, None): + self.send_hci_packet( + hci.HCI_Disconnection_Complete_Event( + status=hci.HCI_SUCCESS, + connection_handle=sco_link.handle, + reason=reason, + ) + ) + else: + logger.warning(f'!!! No classic connection found for {peer_address}') + + def on_classic_role_change_request(self, peer_address: hci.Address) -> None: + assert (connection := self.classic_connections.get(peer_address)) + if not connection.classic_allow_role_switch: + self.send_lmp_packet( + peer_address, + lmp.LmpNotAccepted( + lmp.Opcode.LMP_SWITCH_REQ, hci.HCI_ROLE_CHANGE_NOT_ALLOWED_ERROR + ), + ) + else: + self.send_lmp_packet( + peer_address, + lmp.LmpAccepted(lmp.Opcode.LMP_SWITCH_REQ), + ) + self.classic_role_change(connection) + + def classic_role_change(self, connection: Connection) -> None: + new_role = ( + hci.Role.CENTRAL + if connection.role == hci.Role.PERIPHERAL + else hci.Role.PERIPHERAL + ) + connection.role = new_role self.send_hci_packet( hci.HCI_Role_Change_Event( status=hci.HCI_SUCCESS, - bd_addr=peer_address, + bd_addr=connection.peer_address, new_role=new_role, ) ) @@ -757,17 +870,12 @@ class Controller: if status == hci.HCI_SUCCESS: # Allocate (or reuse) a connection handle connection_handle = self.allocate_connection_handle() - connection = Connection( - controller=self, + sco_link = ScoLink( handle=connection_handle, - # hci.Role doesn't matter in SCO. - role=hci.Role.CENTRAL, - peer_address=peer_address, - link=self.link, - transport=PhysicalTransport.BR_EDR, link_type=link_type, + peer_address=peer_address, ) - self.classic_connections[peer_address] = connection + self.sco_links[peer_address] = sco_link logger.debug(f'New SCO connection handle: 0x{connection_handle:04X}') else: connection_handle = 0 @@ -847,7 +955,16 @@ class Controller: ) return None - self.link.classic_connect(self, command.bd_addr) + self.classic_connections[command.bd_addr] = Connection( + controller=self, + handle=0, + role=hci.Role.CENTRAL, + peer_address=command.bd_addr, + link=self.link, + transport=PhysicalTransport.BR_EDR, + link_type=hci.HCI_Connection_Complete_Event.LinkType.ACL, + classic_allow_role_switch=bool(command.allow_role_switch), + ) # Say that the connection is pending self.send_hci_packet( @@ -857,6 +974,12 @@ class Controller: command_opcode=command.op_code, ) ) + future = self.send_lmp_packet(command.bd_addr, lmp.LmpHostConnectionReq()) + + def on_response(future: asyncio.Future[int]): + self.on_classic_connection_complete(command.bd_addr, future.result()) + + future.add_done_callback(on_response) return None def on_hci_disconnect_command( @@ -894,14 +1017,37 @@ class Controller: del self.peripheral_connections[connection.peer_address] elif connection := self.find_classic_connection_by_handle(handle): if self.link: - self.link.classic_disconnect( - self, + self.send_lmp_packet( connection.peer_address, - hci.HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR, + lmp.LmpDetach(command.reason), ) + self.on_classic_disconnected(connection.peer_address, command.reason) else: # Remove the connection del self.classic_connections[connection.peer_address] + elif sco_link := self.find_classic_sco_link_by_handle(handle): + if self.link: + if ( + sco_link.link_type + == hci.HCI_Connection_Complete_Event.LinkType.ESCO + ): + self.send_lmp_packet( + sco_link.peer_address, + lmp.LmpRemoveScoLinkReq( + sco_handle=0, error_code=command.reason + ), + ) + else: + self.send_lmp_packet( + sco_link.peer_address, + lmp.LmpRemoveEscoLinkReq( + esco_handle=0, error_code=command.reason + ), + ) + self.on_classic_sco_disconnected(sco_link.peer_address, command.reason) + else: + # Remove the connection + del self.sco_links[sco_link.peer_address] elif cis_link := ( self.central_cis_links.get(handle) or self.peripheral_cis_links.get(handle) ): @@ -925,6 +1071,16 @@ class Controller: if self.link is None: return None + + if not (connection := self.classic_connections.get(command.bd_addr)): + self.send_hci_packet( + hci.HCI_Command_Status_Event( + status=hci.HCI_UNKNOWN_CONNECTION_IDENTIFIER_ERROR, + num_hci_command_packets=1, + command_opcode=command.op_code, + ) + ) + return None self.send_hci_packet( hci.HCI_Command_Status_Event( status=hci.HCI_SUCCESS, @@ -932,7 +1088,36 @@ class Controller: command_opcode=command.op_code, ) ) - self.link.classic_accept_connection(self, command.bd_addr, command.role) + + if command.role == hci.Role.CENTRAL: + # Perform role switching before accept. + future = self.send_lmp_packet(command.bd_addr, lmp.LmpSwitchReq()) + + def on_response(future: asyncio.Future[int]): + if (status := future.result()) == hci.HCI_SUCCESS: + self.classic_role_change(connection) + # Continue connection setup. + self.send_lmp_packet( + command.bd_addr, + lmp.LmpAccepted(lmp.Opcode.LMP_HOST_CONNECTION_REQ), + ) + else: + # Abort connection setup. + self.send_lmp_packet( + command.bd_addr, + lmp.LmpNotAccepted(lmp.Opcode.LMP_HOST_CONNECTION_REQ, status), + ) + self.on_classic_connection_complete(command.bd_addr, status) + + future.add_done_callback(on_response) + + else: + # Simply accept connection. + self.send_lmp_packet( + command.bd_addr, + lmp.LmpAccepted(lmp.Opcode.LMP_HOST_CONNECTION_REQ), + ) + self.on_classic_connection_complete(command.bd_addr, hci.HCI_SUCCESS) return None def on_hci_enhanced_setup_synchronous_connection_command( @@ -966,11 +1151,32 @@ class Controller: command_opcode=command.op_code, ) ) - self.link.classic_sco_connect( - self, + future = self.send_lmp_packet( connection.peer_address, - hci.HCI_Connection_Complete_Event.LinkType.ESCO, + lmp.LmpEscoLinkReq( + esco_handle=0, + esco_lt_addr=0, + timing_control_flags=0, + d_esco=0, + t_esco=0, + w_esco=0, + esco_packet_type_c_to_p=0, + esco_packet_type_p_to_c=0, + packet_length_c_to_p=0, + packet_length_p_to_c=0, + air_mode=0, + negotiation_state=0, + ), ) + + def on_response(future: asyncio.Future[int]): + self.on_classic_sco_connection_complete( + connection.peer_address, + future.result(), + hci.HCI_Connection_Complete_Event.LinkType.ESCO, + ) + + future.add_done_callback(on_response) return None def on_hci_enhanced_accept_synchronous_connection_request_command( @@ -1000,9 +1206,13 @@ class Controller: command_opcode=command.op_code, ) ) - self.link.classic_accept_sco_connection( - self, + self.send_lmp_packet( connection.peer_address, + lmp.LmpAcceptedExt(lmp.Opcode.LMP_ESCO_LINK_REQ), + ) + self.on_classic_sco_connection_complete( + connection.peer_address, + hci.HCI_SUCCESS, hci.HCI_Connection_Complete_Event.LinkType.ESCO, ) return None @@ -1083,14 +1293,52 @@ class Controller: if self.link is None: return None - self.send_hci_packet( - hci.HCI_Command_Status_Event( - status=hci.HCI_SUCCESS, - num_hci_command_packets=1, - command_opcode=command.op_code, + + if connection := self.classic_connections.get(command.bd_addr): + current_role = connection.role + self.send_hci_packet( + hci.HCI_Command_Status_Event( + status=hci.HCI_SUCCESS, + num_hci_command_packets=1, + command_opcode=command.op_code, + ) ) - ) - self.link.classic_switch_role(self, command.bd_addr, command.role) + else: + # Connection doesn't exist, reject. + self.send_hci_packet( + hci.HCI_Command_Status_Event( + status=hci.HCI_COMMAND_DISALLOWED_ERROR, + num_hci_command_packets=1, + command_opcode=command.op_code, + ) + ) + return None + + # If role doesn't change, only send event to local host. + if current_role == command.role: + self.send_hci_packet( + hci.HCI_Role_Change_Event( + status=hci.HCI_SUCCESS, + bd_addr=command.bd_addr, + new_role=current_role, + ) + ) + else: + future = self.send_lmp_packet(command.bd_addr, lmp.LmpSwitchReq()) + + def on_response(future: asyncio.Future[int]): + if (status := future.result()) == hci.HCI_SUCCESS: + connection.role = hci.Role(command.role) + self.send_hci_packet( + hci.HCI_Role_Change_Event( + status=status, + bd_addr=command.bd_addr, + new_role=connection.role, + ) + ) + + future.add_done_callback(on_response) + return None def on_hci_set_event_mask_command( diff --git a/bumble/link.py b/bumble/link.py index cb03f890..4c4e1688 100644 --- a/bumble/link.py +++ b/bumble/link.py @@ -21,7 +21,7 @@ import asyncio import logging from typing import Optional -from bumble import controller, core, hci +from bumble import controller, core, hci, lmp # ----------------------------------------------------------------------------- # Logging @@ -109,7 +109,11 @@ class LocalLink: raise ValueError("unsupported transport type") if destination_controller is not None: - destination_controller.on_link_acl_data(source_address, transport, data) + asyncio.get_running_loop().call_soon( + lambda: destination_controller.on_link_acl_data( + source_address, transport, data + ) + ) def on_connection_complete(self) -> None: # Check that we expect this call @@ -261,142 +265,18 @@ class LocalLink: # Classic handlers ############################################################ - def classic_connect( + def send_lmp_packet( self, - initiator_controller: controller.Controller, - responder_address: hci.Address, + sender_controller: controller.Controller, + receiver_address: hci.Address, + packet: lmp.Packet, ): - logger.debug( - f'[Classic] {initiator_controller.public_address} connects to {responder_address}' - ) - responder_controller = self.find_classic_controller(responder_address) - if responder_controller is None: - initiator_controller.on_classic_connection_complete( - responder_address, hci.HCI_PAGE_TIMEOUT_ERROR + if not (receiver_controller := self.find_classic_controller(receiver_address)): + raise core.InvalidArgumentError( + f"Unable to find controller for address {receiver_address}" ) - return - self.pending_classic_connection = (initiator_controller, responder_controller) - - responder_controller.on_classic_connection_request( - initiator_controller.public_address, - hci.HCI_Connection_Complete_Event.LinkType.ACL, - ) - - def classic_accept_connection( - self, - responder_controller: controller.Controller, - initiator_address: hci.Address, - responder_role: int, - ): - logger.debug( - f'[Classic] {responder_controller.public_address} accepts to connect {initiator_address}' - ) - initiator_controller = self.find_classic_controller(initiator_address) - if initiator_controller is None: - responder_controller.on_classic_connection_complete( - responder_controller.public_address, hci.HCI_PAGE_TIMEOUT_ERROR - ) - return - - def connection_complete() -> None: - if responder_role != hci.Role.PERIPHERAL: - initiator_controller.on_classic_role_change( - responder_controller.public_address, int(not (responder_role)) - ) - initiator_controller.on_classic_connection_complete( - responder_controller.public_address, hci.HCI_SUCCESS - ) - - responder_controller.on_classic_role_change( - initiator_controller.public_address, responder_role - ) - responder_controller.on_classic_connection_complete( - initiator_controller.public_address, hci.HCI_SUCCESS - ) - self.pending_classic_connection = None - asyncio.get_running_loop().call_soon(connection_complete) - - def classic_disconnect( - self, - initiator_controller: controller.Controller, - responder_address: hci.Address, - reason: int, - ): - logger.debug( - f'[Classic] {initiator_controller.public_address} disconnects {responder_address}' - ) - responder_controller = self.find_classic_controller(responder_address) - assert responder_controller - asyncio.get_running_loop().call_soon( - lambda: initiator_controller.on_classic_disconnected( - responder_address, reason + lambda: receiver_controller.on_lmp_packet( + sender_controller.public_address, packet ) ) - responder_controller.on_classic_disconnected( - initiator_controller.public_address, reason - ) - - def classic_switch_role( - self, - initiator_controller: controller.Controller, - responder_address: hci.Address, - initiator_new_role: int, - ): - responder_controller = self.find_classic_controller(responder_address) - if responder_controller is None: - return - - asyncio.get_running_loop().call_soon( - lambda: initiator_controller.on_classic_role_change( - responder_address, initiator_new_role - ) - ) - responder_controller.on_classic_role_change( - initiator_controller.public_address, int(not (initiator_new_role)) - ) - - def classic_sco_connect( - self, - initiator_controller: controller.Controller, - responder_address: hci.Address, - link_type: int, - ): - logger.debug( - f'[Classic] {initiator_controller.public_address} connects SCO to {responder_address}' - ) - responder_controller = self.find_classic_controller(responder_address) - # Initiator controller should handle it. - assert responder_controller - - responder_controller.on_classic_connection_request( - initiator_controller.public_address, - link_type, - ) - - def classic_accept_sco_connection( - self, - responder_controller: controller.Controller, - initiator_address: hci.Address, - link_type: int, - ): - logger.debug( - f'[Classic] {responder_controller.public_address} accepts to connect SCO {initiator_address}' - ) - initiator_controller = self.find_classic_controller(initiator_address) - if initiator_controller is None: - responder_controller.on_classic_sco_connection_complete( - responder_controller.public_address, - hci.HCI_UNKNOWN_CONNECTION_IDENTIFIER_ERROR, - link_type, - ) - return - - asyncio.get_running_loop().call_soon( - lambda: initiator_controller.on_classic_sco_connection_complete( - responder_controller.public_address, hci.HCI_SUCCESS, link_type - ) - ) - responder_controller.on_classic_sco_connection_complete( - initiator_controller.public_address, hci.HCI_SUCCESS, link_type - ) diff --git a/bumble/lmp.py b/bumble/lmp.py new file mode 100644 index 00000000..73013920 --- /dev/null +++ b/bumble/lmp.py @@ -0,0 +1,306 @@ +# Copyright 2021-2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# ----------------------------------------------------------------------------- +# Imports +# ----------------------------------------------------------------------------- +from __future__ import annotations + +import struct +from dataclasses import dataclass, field +from typing import TypeVar + +from bumble import hci, utils + + +class Opcode(utils.OpenIntEnum): + ''' + See Bluetooth spec @ Vol 2, Part C - 5.1 PDU summary. + + Follow the alphabetical order defined there. + ''' + + # fmt: off + LMP_ACCEPTED = 3 + LMP_ACCEPTED_EXT = 127 << 8 + 1 + LMP_AU_RAND = 11 + LMP_AUTO_RATE = 35 + LMP_CHANNEL_CLASSIFICATION = 127 << 8 + 17 + LMP_CHANNEL_CLASSIFICATION_REQ = 127 << 8 + 16 + LMP_CLK_ADJ = 127 << 8 + 5 + LMP_CLK_ADJ_ACK = 127 << 8 + 6 + LMP_CLK_ADJ_REQ = 127 << 8 + 7 + LMP_CLKOFFSET_REQ = 5 + LMP_CLKOFFSET_RES = 6 + LMP_COMB_KEY = 9 + LMP_DECR_POWER_REQ = 32 + LMP_DETACH = 7 + LMP_DHKEY_CHECK = 65 + LMP_ENCAPSULATED_HEADER = 61 + LMP_ENCAPSULATED_PAYLOAD = 62 + LMP_ENCRYPTION_KEY_SIZE_MASK_REQ= 58 + LMP_ENCRYPTION_KEY_SIZE_MASK_RES= 59 + LMP_ENCRYPTION_KEY_SIZE_REQ = 16 + LMP_ENCRYPTION_MODE_REQ = 15 + LMP_ESCO_LINK_REQ = 127 << 8 + 12 + LMP_FEATURES_REQ = 39 + LMP_FEATURES_REQ_EXT = 127 << 8 + 3 + LMP_FEATURES_RES = 40 + LMP_FEATURES_RES_EXT = 127 << 8 + 4 + LMP_HOLD = 20 + LMP_HOLD_REQ = 21 + LMP_HOST_CONNECTION_REQ = 51 + LMP_IN_RAND = 8 + LMP_INCR_POWER_REQ = 31 + LMP_IO_CAPABILITY_REQ = 127 << 8 + 25 + LMP_IO_CAPABILITY_RES = 127 << 8 + 26 + LMP_KEYPRESS_NOTIFICATION = 127 << 8 + 30 + LMP_MAX_POWER = 33 + LMP_MAX_SLOT = 45 + LMP_MAX_SLOT_REQ = 46 + LMP_MIN_POWER = 34 + LMP_NAME_REQ = 1 + LMP_NAME_RES = 2 + LMP_NOT_ACCEPTED = 4 + LMP_NOT_ACCEPTED_EXT = 127 << 8 + 2 + LMP_NUMERIC_COMPARISON_FAILED = 127 << 8 + 27 + LMP_OOB_FAILED = 127 << 8 + 29 + LMP_PACKET_TYPE_TABLE_REQ = 127 << 8 + 11 + LMP_PAGE_MODE_REQ = 53 + LMP_PAGE_SCAN_MODE_REQ = 54 + LMP_PASSKEY_FAILED = 127 << 8 + 28 + LMP_PAUSE_ENCRYPTION_AES_REQ = 66 + LMP_PAUSE_ENCRYPTION_REQ = 127 << 8 + 23 + LMP_PING_REQ = 127 << 8 + 33 + LMP_PING_RES = 127 << 8 + 34 + LMP_POWER_CONTROL_REQ = 127 << 8 + 31 + LMP_POWER_CONTROL_RES = 127 << 8 + 32 + LMP_PREFERRED_RATE = 36 + LMP_QUALITY_OF_SERVICE = 41 + LMP_QUALITY_OF_SERVICE_REQ = 42 + LMP_REMOVE_ESCO_LINK_REQ = 127 << 8 + 13 + LMP_REMOVE_SCO_LINK_REQ = 44 + LMP_RESUME_ENCRYPTION_REQ = 127 << 8 + 24 + LMP_SAM_DEFINE_MAP = 127 << 8 + 36 + LMP_SAM_SET_TYPE0 = 127 << 8 + 35 + LMP_SAM_SWITCH = 127 << 8 + 37 + LMP_SCO_LINK_REQ = 43 + LMP_SET_AFH = 60 + LMP_SETUP_COMPLETE = 49 + LMP_SIMPLE_PAIRING_CONFIRM = 63 + LMP_SIMPLE_PAIRING_NUMBER = 64 + LMP_SLOT_OFFSET = 52 + LMP_SNIFF_REQ = 23 + LMP_SNIFF_SUBRATING_REQ = 127 << 8 + 21 + LMP_SNIFF_SUBRATING_RES = 127 << 8 + 22 + LMP_SRES = 12 + LMP_START_ENCRYPTION_REQ = 17 + LMP_STOP_ENCRYPTION_REQ = 18 + LMP_SUPERVISION_TIMEOUT = 55 + LMP_SWITCH_REQ = 19 + LMP_TEMP_KEY = 14 + LMP_TEMP_RAND = 13 + LMP_TEST_ACTIVATE = 56 + LMP_TEST_CONTROL = 57 + LMP_TIMING_ACCURACY_REQ = 47 + LMP_TIMING_ACCURACY_RES = 48 + LMP_UNIT_KEY = 10 + LMP_UNSNIFF_REQ = 24 + LMP_USE_SEMI_PERMANENT_KEY = 50 + LMP_VERSION_REQ = 37 + LMP_VERSION_RES = 38 + # fmt: on + + @classmethod + def parse_from(cls, data: bytes, offset: int = 0) -> tuple[int, Opcode]: + opcode = data[offset] + if opcode in (124, 127): + opcode = struct.unpack('>H', data)[0] + return offset + 2, Opcode(opcode) + return offset + 1, Opcode(opcode) + + def __bytes__(self) -> bytes: + if self.value >> 8: + return struct.pack('>H', self.value) + return bytes([self.value]) + + @classmethod + def type_metadata(cls): + return hci.metadata( + { + 'serializer': bytes, + 'parser': lambda data, offset: (Opcode.parse_from(data, offset)), + } + ) + + +class Packet: + ''' + See Bluetooth spec @ Vol 2, Part C - 5.1 PDU summary + ''' + + subclasses: dict[int, type[Packet]] = {} + opcode: Opcode + fields: hci.Fields = () + _payload: bytes = b'' + + _Packet = TypeVar("_Packet", bound="Packet") + + @classmethod + def subclass(cls, subclass: type[_Packet]) -> type[_Packet]: + # Register a factory for this class + cls.subclasses[subclass.opcode] = subclass + subclass.fields = hci.HCI_Object.fields_from_dataclass(subclass) + + return subclass + + @classmethod + def from_bytes(cls, data: bytes) -> Packet: + offset, opcode = Opcode.parse_from(data) + if not (subclass := cls.subclasses.get(opcode)): + instance = Packet() + instance.opcode = opcode + else: + instance = subclass( + **hci.HCI_Object.dict_from_bytes(data, offset, subclass.fields) + ) + instance.payload = data[offset:] + return instance + + @property + def payload(self) -> bytes: + if self._payload is None: + self._payload = hci.HCI_Object.dict_to_bytes(self.__dict__, self.fields) + return self._payload + + @payload.setter + def payload(self, value: bytes) -> None: + self._payload = value + + def __bytes__(self) -> bytes: + return bytes(self.opcode) + self.payload + + +@Packet.subclass +@dataclass +class LmpAccepted(Packet): + opcode = Opcode.LMP_ACCEPTED + + response_opcode: Opcode = field(metadata=Opcode.type_metadata()) + + +@Packet.subclass +@dataclass +class LmpNotAccepted(Packet): + opcode = Opcode.LMP_NOT_ACCEPTED + + response_opcode: Opcode = field(metadata=Opcode.type_metadata()) + error_code: int = field(metadata=hci.metadata(1)) + + +@Packet.subclass +@dataclass +class LmpAcceptedExt(Packet): + opcode = Opcode.LMP_ACCEPTED_EXT + + response_opcode: Opcode = field(metadata=Opcode.type_metadata()) + + +@Packet.subclass +@dataclass +class LmpNotAcceptedExt(Packet): + opcode = Opcode.LMP_NOT_ACCEPTED_EXT + + response_opcode: Opcode = field(metadata=Opcode.type_metadata()) + error_code: int = field(metadata=hci.metadata(1)) + + +@Packet.subclass +@dataclass +class LmpAuRand(Packet): + opcode = Opcode.LMP_AU_RAND + + random_number: bytes = field(metadata=hci.metadata(16)) + + +@Packet.subclass +@dataclass +class LmpDetach(Packet): + opcode = Opcode.LMP_DETACH + + error_code: int = field(metadata=hci.metadata(1)) + + +@Packet.subclass +@dataclass +class LmpEscoLinkReq(Packet): + opcode = Opcode.LMP_ESCO_LINK_REQ + + esco_handle: int = field(metadata=hci.metadata(1)) + esco_lt_addr: int = field(metadata=hci.metadata(1)) + timing_control_flags: int = field(metadata=hci.metadata(1)) + d_esco: int = field(metadata=hci.metadata(1)) + t_esco: int = field(metadata=hci.metadata(1)) + w_esco: int = field(metadata=hci.metadata(1)) + esco_packet_type_c_to_p: int = field(metadata=hci.metadata(1)) + esco_packet_type_p_to_c: int = field(metadata=hci.metadata(1)) + packet_length_c_to_p: int = field(metadata=hci.metadata(2)) + packet_length_p_to_c: int = field(metadata=hci.metadata(2)) + air_mode: int = field(metadata=hci.metadata(1)) + negotiation_state: int = field(metadata=hci.metadata(1)) + + +@Packet.subclass +@dataclass +class LmpHostConnectionReq(Packet): + opcode = Opcode.LMP_HOST_CONNECTION_REQ + + +@Packet.subclass +@dataclass +class LmpRemoveEscoLinkReq(Packet): + opcode = Opcode.LMP_REMOVE_ESCO_LINK_REQ + + esco_handle: int = field(metadata=hci.metadata(1)) + error_code: int = field(metadata=hci.metadata(1)) + + +@Packet.subclass +@dataclass +class LmpRemoveScoLinkReq(Packet): + opcode = Opcode.LMP_REMOVE_SCO_LINK_REQ + + sco_handle: int = field(metadata=hci.metadata(1)) + error_code: int = field(metadata=hci.metadata(1)) + + +@Packet.subclass +@dataclass +class LmpScoLinkReq(Packet): + opcode = Opcode.LMP_SCO_LINK_REQ + + sco_handle: int = field(metadata=hci.metadata(1)) + timing_control_flags: int = field(metadata=hci.metadata(1)) + d_sco: int = field(metadata=hci.metadata(1)) + t_sco: int = field(metadata=hci.metadata(1)) + sco_packet: int = field(metadata=hci.metadata(1)) + air_mode: int = field(metadata=hci.metadata(1)) + + +@Packet.subclass +@dataclass +class LmpSwitchReq(Packet): + opcode = Opcode.LMP_SWITCH_REQ + + switch_instant: int = field(metadata=hci.metadata(4), default=0) diff --git a/tests/test_utils.py b/tests/test_utils.py index b34263e4..1051101a 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -99,6 +99,8 @@ class TwoDevices: # ----------------------------------------------------------------------------- async def async_barrier(): - ready = asyncio.get_running_loop().create_future() - asyncio.get_running_loop().call_soon(ready.set_result, None) - await ready + # TODO: Remove async barrier - this doesn't always mean what we want. + for _ in range(3): + ready = asyncio.get_running_loop().create_future() + asyncio.get_running_loop().call_soon(ready.set_result, None) + await ready