Merge pull request #721 from khsiao-google/main

Implement HCI_Mode_Change_Event
This commit is contained in:
zxzxwu
2025-07-18 16:49:23 +08:00
committed by GitHub
4 changed files with 125 additions and 2 deletions
+63 -2
View File
@@ -27,7 +27,7 @@ from bumble.colors import color
from bumble.core import (
PhysicalTransport,
)
from bumble import hci
from bumble.hci import (
HCI_ACL_DATA_PACKET,
HCI_COMMAND_DISALLOWED_ERROR,
@@ -977,7 +977,68 @@ class Controller:
self, connection.peer_address, HCI_Connection_Complete_Event.LinkType.ESCO
)
def on_hci_switch_role_command(self, command):
def on_hci_sniff_mode_command(self, command: hci.HCI_Sniff_Mode_Command):
'''
See Bluetooth spec Vol 4, Part E - 7.2.2 Sniff Mode command
'''
if self.link is None:
self.send_hci_packet(
hci.HCI_Command_Status_Event(
status=hci.HCI_UNKNOWN_CONNECTION_IDENTIFIER_ERROR,
num_hci_command_packets=1,
command_opcode=command.op_code,
)
)
return
self.send_hci_packet(
hci.HCI_Command_Status_Event(
status=HCI_SUCCESS,
num_hci_command_packets=1,
command_opcode=command.op_code,
)
)
self.send_hci_packet(
hci.HCI_Mode_Change_Event(
status=HCI_SUCCESS,
connection_handle=command.connection_handle,
current_mode=hci.HCI_Mode_Change_Event.Mode.SNIFF,
interval=2,
)
)
def on_hci_exit_sniff_mode_command(self, command: hci.HCI_Exit_Sniff_Mode_Command):
'''
See Bluetooth spec Vol 4, Part E - 7.2.3 Exit Sniff Mode command
'''
if self.link is None:
self.send_hci_packet(
hci.HCI_Command_Status_Event(
status=hci.HCI_UNKNOWN_CONNECTION_IDENTIFIER_ERROR,
num_hci_command_packets=1,
command_opcode=command.op_code,
)
)
return
self.send_hci_packet(
hci.HCI_Command_Status_Event(
status=HCI_SUCCESS,
num_hci_command_packets=1,
command_opcode=command.op_code,
)
)
self.send_hci_packet(
hci.HCI_Mode_Change_Event(
status=HCI_SUCCESS,
connection_handle=command.connection_handle,
current_mode=hci.HCI_Mode_Change_Event.Mode.ACTIVE,
interval=2,
)
)
def on_hci_switch_role_command(self, command: hci.HCI_Switch_Role_Command):
'''
See Bluetooth spec Vol 4, Part E - 7.2.8 Switch Role command
'''
+17
View File
@@ -1710,6 +1710,8 @@ class Connection(utils.CompositeEventEmitter):
pairing_peer_authentication_requirements: Optional[int]
cs_configs: dict[int, ChannelSoundingConfig] # Config ID to Configuration
cs_procedures: dict[int, ChannelSoundingProcedure] # Config ID to Procedures
classic_mode: int = hci.HCI_Mode_Change_Event.Mode.ACTIVE
classic_interval: int = 0
EVENT_CONNECTION_ATT_MTU_UPDATE = "connection_att_mtu_update"
EVENT_DISCONNECTION = "disconnection"
@@ -1736,6 +1738,8 @@ class Connection(utils.CompositeEventEmitter):
EVENT_CHANNEL_SOUNDING_CONFIG_REMOVED = "channel_sounding_config_removed"
EVENT_CHANNEL_SOUNDING_PROCEDURE_FAILURE = "channel_sounding_procedure_failure"
EVENT_CHANNEL_SOUNDING_PROCEDURE = "channel_sounding_procedure"
EVENT_MODE_CHANGE = "mode_change"
EVENT_MODE_CHANGE_FAILURE = "mode_change_failure"
EVENT_ROLE_CHANGE = "role_change"
EVENT_ROLE_CHANGE_FAILURE = "role_change_failure"
EVENT_CLASSIC_PAIRING = "classic_pairing"
@@ -5877,6 +5881,19 @@ class Device(utils.CompositeEventEmitter):
utils.AsyncRunner.spawn(reply())
# [Classic only]
@host_event_handler
@with_connection_from_handle
def on_mode_change(
self, connection: Connection, status: int, current_mode: int, interval: int
):
if status == hci.HCI_SUCCESS:
connection.classic_mode = current_mode
connection.classic_interval = interval
connection.emit(connection.EVENT_MODE_CHANGE)
else:
connection.emit(connection.EVENT_MODE_CHANGE_FAILURE, status)
# [Classic only]
@host_event_handler
@with_connection_from_address
+9
View File
@@ -1392,6 +1392,15 @@ class Host(utils.EventEmitter):
def on_hci_synchronous_connection_changed_event(self, event):
pass
def on_hci_mode_change_event(self, event: hci.HCI_Mode_Change_Event):
self.emit(
'mode_change',
event.connection_handle,
event.status,
event.current_mode,
event.interval,
)
def on_hci_role_change_event(self, event):
if event.status == hci.HCI_SUCCESS:
logger.debug(
+36
View File
@@ -575,6 +575,42 @@ async def test_cis_setup_failure():
await asyncio.wait_for(cis_create_task, _TIMEOUT)
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_enter_and_exit_sniff_mode():
devices = TwoDevices()
await devices.setup_connection()
q = asyncio.Queue()
def on_mode_change():
q.put_nowait(lambda: None)
devices.connections[0].on(Connection.EVENT_MODE_CHANGE, on_mode_change)
await devices[0].send_command(
hci.HCI_Sniff_Mode_Command(
connection_handle=devices.connections[0].handle,
sniff_max_interval=2,
sniff_min_interval=2,
sniff_attempt=2,
sniff_timeout=2,
),
)
await asyncio.wait_for(q.get(), _TIMEOUT)
assert devices.connections[0].classic_mode == hci.HCI_Mode_Change_Event.Mode.SNIFF
assert devices.connections[0].classic_interval == 2
await devices[0].send_command(
hci.HCI_Exit_Sniff_Mode_Command(connection_handle=devices.connections[0].handle)
)
await asyncio.wait_for(q.get(), _TIMEOUT)
assert devices.connections[0].classic_mode == hci.HCI_Mode_Change_Event.Mode.ACTIVE
assert devices.connections[0].classic_interval == 2
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_power_on_default_static_address_should_not_be_any():