Basic LMP implementation

This commit is contained in:
Josh Wu
2025-11-08 23:23:27 +08:00
parent bb9aa12a74
commit e0dee2135f
4 changed files with 633 additions and 197 deletions

View File

@@ -21,7 +21,7 @@ import asyncio
import logging
from typing import Optional
from bumble import controller, core, hci
from bumble import controller, core, hci, lmp
# -----------------------------------------------------------------------------
# Logging
@@ -109,7 +109,11 @@ class LocalLink:
raise ValueError("unsupported transport type")
if destination_controller is not None:
destination_controller.on_link_acl_data(source_address, transport, data)
asyncio.get_running_loop().call_soon(
lambda: destination_controller.on_link_acl_data(
source_address, transport, data
)
)
def on_connection_complete(self) -> None:
# Check that we expect this call
@@ -261,142 +265,18 @@ class LocalLink:
# Classic handlers
############################################################
def classic_connect(
def send_lmp_packet(
self,
initiator_controller: controller.Controller,
responder_address: hci.Address,
sender_controller: controller.Controller,
receiver_address: hci.Address,
packet: lmp.Packet,
):
logger.debug(
f'[Classic] {initiator_controller.public_address} connects to {responder_address}'
)
responder_controller = self.find_classic_controller(responder_address)
if responder_controller is None:
initiator_controller.on_classic_connection_complete(
responder_address, hci.HCI_PAGE_TIMEOUT_ERROR
if not (receiver_controller := self.find_classic_controller(receiver_address)):
raise core.InvalidArgumentError(
f"Unable to find controller for address {receiver_address}"
)
return
self.pending_classic_connection = (initiator_controller, responder_controller)
responder_controller.on_classic_connection_request(
initiator_controller.public_address,
hci.HCI_Connection_Complete_Event.LinkType.ACL,
)
def classic_accept_connection(
self,
responder_controller: controller.Controller,
initiator_address: hci.Address,
responder_role: int,
):
logger.debug(
f'[Classic] {responder_controller.public_address} accepts to connect {initiator_address}'
)
initiator_controller = self.find_classic_controller(initiator_address)
if initiator_controller is None:
responder_controller.on_classic_connection_complete(
responder_controller.public_address, hci.HCI_PAGE_TIMEOUT_ERROR
)
return
def connection_complete() -> None:
if responder_role != hci.Role.PERIPHERAL:
initiator_controller.on_classic_role_change(
responder_controller.public_address, int(not (responder_role))
)
initiator_controller.on_classic_connection_complete(
responder_controller.public_address, hci.HCI_SUCCESS
)
responder_controller.on_classic_role_change(
initiator_controller.public_address, responder_role
)
responder_controller.on_classic_connection_complete(
initiator_controller.public_address, hci.HCI_SUCCESS
)
self.pending_classic_connection = None
asyncio.get_running_loop().call_soon(connection_complete)
def classic_disconnect(
self,
initiator_controller: controller.Controller,
responder_address: hci.Address,
reason: int,
):
logger.debug(
f'[Classic] {initiator_controller.public_address} disconnects {responder_address}'
)
responder_controller = self.find_classic_controller(responder_address)
assert responder_controller
asyncio.get_running_loop().call_soon(
lambda: initiator_controller.on_classic_disconnected(
responder_address, reason
lambda: receiver_controller.on_lmp_packet(
sender_controller.public_address, packet
)
)
responder_controller.on_classic_disconnected(
initiator_controller.public_address, reason
)
def classic_switch_role(
self,
initiator_controller: controller.Controller,
responder_address: hci.Address,
initiator_new_role: int,
):
responder_controller = self.find_classic_controller(responder_address)
if responder_controller is None:
return
asyncio.get_running_loop().call_soon(
lambda: initiator_controller.on_classic_role_change(
responder_address, initiator_new_role
)
)
responder_controller.on_classic_role_change(
initiator_controller.public_address, int(not (initiator_new_role))
)
def classic_sco_connect(
self,
initiator_controller: controller.Controller,
responder_address: hci.Address,
link_type: int,
):
logger.debug(
f'[Classic] {initiator_controller.public_address} connects SCO to {responder_address}'
)
responder_controller = self.find_classic_controller(responder_address)
# Initiator controller should handle it.
assert responder_controller
responder_controller.on_classic_connection_request(
initiator_controller.public_address,
link_type,
)
def classic_accept_sco_connection(
self,
responder_controller: controller.Controller,
initiator_address: hci.Address,
link_type: int,
):
logger.debug(
f'[Classic] {responder_controller.public_address} accepts to connect SCO {initiator_address}'
)
initiator_controller = self.find_classic_controller(initiator_address)
if initiator_controller is None:
responder_controller.on_classic_sco_connection_complete(
responder_controller.public_address,
hci.HCI_UNKNOWN_CONNECTION_IDENTIFIER_ERROR,
link_type,
)
return
asyncio.get_running_loop().call_soon(
lambda: initiator_controller.on_classic_sco_connection_complete(
responder_controller.public_address, hci.HCI_SUCCESS, link_type
)
)
responder_controller.on_classic_sco_connection_complete(
initiator_controller.public_address, hci.HCI_SUCCESS, link_type
)