diff --git a/bumble/controller.py b/bumble/controller.py index e0f33192..356b7e42 100644 --- a/bumble/controller.py +++ b/bumble/controller.py @@ -27,7 +27,7 @@ from bumble.colors import color from bumble.core import ( PhysicalTransport, ) - +from bumble import hci from bumble.hci import ( HCI_ACL_DATA_PACKET, HCI_COMMAND_DISALLOWED_ERROR, @@ -977,7 +977,68 @@ class Controller: self, connection.peer_address, HCI_Connection_Complete_Event.LinkType.ESCO ) - def on_hci_switch_role_command(self, command): + def on_hci_sniff_mode_command(self, command: hci.HCI_Sniff_Mode_Command): + ''' + See Bluetooth spec Vol 4, Part E - 7.2.2 Sniff Mode command + ''' + if self.link is None: + self.send_hci_packet( + hci.HCI_Command_Status_Event( + status=hci.HCI_UNKNOWN_CONNECTION_IDENTIFIER_ERROR, + num_hci_command_packets=1, + command_opcode=command.op_code, + ) + ) + return + + self.send_hci_packet( + hci.HCI_Command_Status_Event( + status=HCI_SUCCESS, + num_hci_command_packets=1, + command_opcode=command.op_code, + ) + ) + self.send_hci_packet( + hci.HCI_Mode_Change_Event( + status=HCI_SUCCESS, + connection_handle=command.connection_handle, + current_mode=hci.HCI_Mode_Change_Event.Mode.SNIFF, + interval=2, + ) + ) + + def on_hci_exit_sniff_mode_command(self, command: hci.HCI_Exit_Sniff_Mode_Command): + ''' + See Bluetooth spec Vol 4, Part E - 7.2.3 Exit Sniff Mode command + ''' + + if self.link is None: + self.send_hci_packet( + hci.HCI_Command_Status_Event( + status=hci.HCI_UNKNOWN_CONNECTION_IDENTIFIER_ERROR, + num_hci_command_packets=1, + command_opcode=command.op_code, + ) + ) + return + + self.send_hci_packet( + hci.HCI_Command_Status_Event( + status=HCI_SUCCESS, + num_hci_command_packets=1, + command_opcode=command.op_code, + ) + ) + self.send_hci_packet( + hci.HCI_Mode_Change_Event( + status=HCI_SUCCESS, + connection_handle=command.connection_handle, + current_mode=hci.HCI_Mode_Change_Event.Mode.ACTIVE, + interval=2, + ) + ) + + def on_hci_switch_role_command(self, command: hci.HCI_Switch_Role_Command): ''' See Bluetooth spec Vol 4, Part E - 7.2.8 Switch Role command ''' diff --git a/bumble/device.py b/bumble/device.py index 9c11b1ee..ad03dacb 100644 --- a/bumble/device.py +++ b/bumble/device.py @@ -1710,6 +1710,8 @@ class Connection(utils.CompositeEventEmitter): pairing_peer_authentication_requirements: Optional[int] cs_configs: dict[int, ChannelSoundingConfig] # Config ID to Configuration cs_procedures: dict[int, ChannelSoundingProcedure] # Config ID to Procedures + classic_mode: int = hci.HCI_Mode_Change_Event.Mode.ACTIVE + classic_interval: int = 0 EVENT_CONNECTION_ATT_MTU_UPDATE = "connection_att_mtu_update" EVENT_DISCONNECTION = "disconnection" @@ -1736,6 +1738,8 @@ class Connection(utils.CompositeEventEmitter): EVENT_CHANNEL_SOUNDING_CONFIG_REMOVED = "channel_sounding_config_removed" EVENT_CHANNEL_SOUNDING_PROCEDURE_FAILURE = "channel_sounding_procedure_failure" EVENT_CHANNEL_SOUNDING_PROCEDURE = "channel_sounding_procedure" + EVENT_MODE_CHANGE = "mode_change" + EVENT_MODE_CHANGE_FAILURE = "mode_change_failure" EVENT_ROLE_CHANGE = "role_change" EVENT_ROLE_CHANGE_FAILURE = "role_change_failure" EVENT_CLASSIC_PAIRING = "classic_pairing" @@ -5877,6 +5881,19 @@ class Device(utils.CompositeEventEmitter): utils.AsyncRunner.spawn(reply()) + # [Classic only] + @host_event_handler + @with_connection_from_handle + def on_mode_change( + self, connection: Connection, status: int, current_mode: int, interval: int + ): + if status == hci.HCI_SUCCESS: + connection.classic_mode = current_mode + connection.classic_interval = interval + connection.emit(connection.EVENT_MODE_CHANGE) + else: + connection.emit(connection.EVENT_MODE_CHANGE_FAILURE, status) + # [Classic only] @host_event_handler @with_connection_from_address diff --git a/bumble/host.py b/bumble/host.py index 817485a8..bfbe283c 100644 --- a/bumble/host.py +++ b/bumble/host.py @@ -1392,6 +1392,15 @@ class Host(utils.EventEmitter): def on_hci_synchronous_connection_changed_event(self, event): pass + def on_hci_mode_change_event(self, event: hci.HCI_Mode_Change_Event): + self.emit( + 'mode_change', + event.connection_handle, + event.status, + event.current_mode, + event.interval, + ) + def on_hci_role_change_event(self, event): if event.status == hci.HCI_SUCCESS: logger.debug( diff --git a/tests/device_test.py b/tests/device_test.py index 5ddbd8ae..8d0b5ade 100644 --- a/tests/device_test.py +++ b/tests/device_test.py @@ -575,6 +575,42 @@ async def test_cis_setup_failure(): await asyncio.wait_for(cis_create_task, _TIMEOUT) +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_enter_and_exit_sniff_mode(): + devices = TwoDevices() + await devices.setup_connection() + + q = asyncio.Queue() + + def on_mode_change(): + q.put_nowait(lambda: None) + + devices.connections[0].on(Connection.EVENT_MODE_CHANGE, on_mode_change) + + await devices[0].send_command( + hci.HCI_Sniff_Mode_Command( + connection_handle=devices.connections[0].handle, + sniff_max_interval=2, + sniff_min_interval=2, + sniff_attempt=2, + sniff_timeout=2, + ), + ) + + await asyncio.wait_for(q.get(), _TIMEOUT) + assert devices.connections[0].classic_mode == hci.HCI_Mode_Change_Event.Mode.SNIFF + assert devices.connections[0].classic_interval == 2 + + await devices[0].send_command( + hci.HCI_Exit_Sniff_Mode_Command(connection_handle=devices.connections[0].handle) + ) + + await asyncio.wait_for(q.get(), _TIMEOUT) + assert devices.connections[0].classic_mode == hci.HCI_Mode_Change_Event.Mode.ACTIVE + assert devices.connections[0].classic_interval == 2 + + # ----------------------------------------------------------------------------- @pytest.mark.asyncio async def test_power_on_default_static_address_should_not_be_any():