# Copyright 2021-2024 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ----------------------------------------------------------------------------- # Imports # ----------------------------------------------------------------------------- import asyncio import logging import unittest import unittest.mock import pytest from bumble import controller, hci from bumble.controller import Controller from bumble.hci import ( HCI_AclDataPacket, HCI_Command_Complete_Event, HCI_Command_Status_Event, HCI_CommandStatus, HCI_Disconnect_Command, HCI_Error, HCI_ErrorCode, HCI_Event, HCI_GenericReturnParameters, HCI_LE_Terminate_BIG_Command, HCI_Reset_Command, HCI_StatusReturnParameters, ) from bumble.host import DataPacketQueue, Host from bumble.transport.common import AsyncPipeSink, TransportSink # ----------------------------------------------------------------------------- # Logging # ----------------------------------------------------------------------------- logger = logging.getLogger(__name__) # ----------------------------------------------------------------------------- @pytest.mark.asyncio @pytest.mark.parametrize( 'supported_commands, max_lmp_features_page_number', [ (controller.Controller.supported_commands, 0), ( # All commands set(hci.HCI_Command.command_names.keys()), # 3 pages of LMP features 2, ), ], ) async def test_reset(supported_commands: set[int], max_lmp_features_page_number: int): controller = Controller('C') controller.supported_commands = supported_commands controller.lmp_features_max_page_number = max_lmp_features_page_number host = Host(controller, AsyncPipeSink(controller)) await host.reset() assert host.local_lmp_features == ( controller.lmp_features & ~(1 << (64 * max_lmp_features_page_number + 1)) ) # ----------------------------------------------------------------------------- def test_data_packet_queue(): controller = unittest.mock.Mock() queue = DataPacketQueue(10, 2, controller.send) assert queue.queued == 0 assert queue.completed == 0 packet = HCI_AclDataPacket( connection_handle=123, pb_flag=0, bc_flag=0, data_total_length=0, data=b'' ) queue.enqueue(packet, packet.connection_handle) assert queue.queued == 1 assert queue.completed == 0 assert controller.send.call_count == 1 queue.enqueue(packet, packet.connection_handle) assert queue.queued == 2 assert queue.completed == 0 assert controller.send.call_count == 2 queue.enqueue(packet, packet.connection_handle) assert queue.queued == 3 assert queue.completed == 0 assert controller.send.call_count == 2 queue.on_packets_completed(1, 8000) assert queue.queued == 3 assert queue.completed == 0 assert controller.send.call_count == 2 queue.on_packets_completed(1, 123) assert queue.queued == 3 assert queue.completed == 1 assert controller.send.call_count == 3 queue.enqueue(packet, packet.connection_handle) assert queue.queued == 4 assert queue.completed == 1 assert controller.send.call_count == 3 queue.on_packets_completed(2, 123) assert queue.queued == 4 assert queue.completed == 3 assert controller.send.call_count == 4 queue.on_packets_completed(1, 123) assert queue.queued == 4 assert queue.completed == 4 assert controller.send.call_count == 4 queue.enqueue(packet, 123) queue.enqueue(packet, 123) queue.enqueue(packet, 123) queue.enqueue(packet, 124) queue.enqueue(packet, 124) queue.enqueue(packet, 124) queue.on_packets_completed(1, 123) assert queue.queued == 10 assert queue.completed == 5 queue.flush(123) queue.flush(124) assert queue.queued == 10 assert queue.completed == 10 queue.enqueue(packet, 123) queue.on_packets_completed(1, 124) assert queue.queued == 11 assert queue.completed == 10 queue.on_packets_completed(1000, 123) assert queue.queued == 11 assert queue.completed == 11 drain_listener = unittest.mock.Mock() queue.on('flow', drain_listener.on_flow) queue.enqueue(packet, 123) assert drain_listener.on_flow.call_count == 0 queue.on_packets_completed(1, 123) assert drain_listener.on_flow.call_count == 1 queue.enqueue(packet, 123) queue.enqueue(packet, 123) queue.enqueue(packet, 123) queue.flush(123) assert drain_listener.on_flow.call_count == 1 assert queue.queued == 15 assert queue.completed == 15 # ----------------------------------------------------------------------------- class Source: terminated: asyncio.Future[None] sink: TransportSink def set_packet_sink(self, sink: TransportSink) -> None: self.sink = sink class Sink: response: HCI_Event | None def __init__(self, source: Source, response: HCI_Event | None) -> None: self.source = source self.response = response def on_packet(self, packet: bytes) -> None: if self.response is not None: self.source.sink.on_packet(bytes(self.response)) @pytest.mark.asyncio async def test_send_sync_command() -> None: source = Source() sink = Sink( source, HCI_Command_Complete_Event( 1, HCI_Reset_Command.op_code, HCI_StatusReturnParameters(status=HCI_ErrorCode.SUCCESS), ), ) host = Host(source, sink) host.ready = True # Sync command with success response1 = await host.send_sync_command(HCI_Reset_Command()) assert response1.status == HCI_ErrorCode.SUCCESS # Sync command with error status should raise error_response = HCI_Command_Complete_Event( 1, HCI_Reset_Command.op_code, HCI_StatusReturnParameters(status=HCI_ErrorCode.COMMAND_DISALLOWED_ERROR), ) sink.response = error_response with pytest.raises(HCI_Error) as excinfo: await host.send_sync_command(HCI_Reset_Command()) assert excinfo.value.error_code == error_response.return_parameters.status # Sync command with raw result response2 = await host.send_sync_command_raw(HCI_Reset_Command()) assert response2.return_parameters.status == HCI_ErrorCode.COMMAND_DISALLOWED_ERROR # Sync command with a command that's not an HCI_SyncCommand # (here, for convenience, we use an HCI_AsyncCommand instance) command = HCI_Disconnect_Command(connection_handle=0x1234, reason=0x13) sink.response = HCI_Command_Complete_Event( 1, command.op_code, HCI_GenericReturnParameters(data=bytes.fromhex("00112233")), ) response3 = await host.send_sync_command_raw(command) # type: ignore assert isinstance(response3.return_parameters, HCI_GenericReturnParameters) @pytest.mark.asyncio async def test_send_sync_command_timeout() -> None: source = Source() sink = Sink(source, None) host = Host(source, sink) host.ready = True with pytest.raises(asyncio.TimeoutError): await host.send_sync_command(HCI_Reset_Command(), response_timeout=0.01) # The sending semaphore should have been released, so this should not block # indefinitely with pytest.raises(asyncio.TimeoutError): await host.send_sync_command(hci.HCI_Reset_Command(), response_timeout=0.01) @pytest.mark.asyncio async def test_send_async_command() -> None: source = Source() sink = Sink( source, HCI_Command_Status_Event( HCI_CommandStatus.PENDING, 1, HCI_Reset_Command.op_code, ), ) host = Host(source, sink) host.ready = True # Normal pending status response = await host.send_async_command( HCI_LE_Terminate_BIG_Command(big_handle=0, reason=0) ) assert response == HCI_CommandStatus.PENDING # Unknown HCI command result returned as a Command Status sink.response = HCI_Command_Status_Event( HCI_ErrorCode.UNKNOWN_HCI_COMMAND_ERROR, 1, HCI_LE_Terminate_BIG_Command.op_code, ) response = await host.send_async_command( HCI_LE_Terminate_BIG_Command(big_handle=0, reason=0), check_status=False ) assert response == HCI_ErrorCode.UNKNOWN_HCI_COMMAND_ERROR # Unknown HCI command result returned as a Command Complete sink.response = HCI_Command_Complete_Event( 1, HCI_LE_Terminate_BIG_Command.op_code, HCI_StatusReturnParameters(HCI_ErrorCode.UNKNOWN_HCI_COMMAND_ERROR), ) response = await host.send_async_command( HCI_LE_Terminate_BIG_Command(big_handle=0, reason=0), check_status=False ) assert response == HCI_ErrorCode.UNKNOWN_HCI_COMMAND_ERROR