mirror of
https://github.com/google/bumble.git
synced 2026-04-16 00:25:31 +00:00
279 lines
8.8 KiB
Python
279 lines
8.8 KiB
Python
# Copyright 2021-2024 Google LLC
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# https://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
# -----------------------------------------------------------------------------
|
|
# Imports
|
|
# -----------------------------------------------------------------------------
|
|
import asyncio
|
|
import logging
|
|
import unittest
|
|
import unittest.mock
|
|
|
|
import pytest
|
|
|
|
from bumble.controller import Controller
|
|
from bumble.hci import (
|
|
HCI_AclDataPacket,
|
|
HCI_Command_Complete_Event,
|
|
HCI_Command_Status_Event,
|
|
HCI_CommandStatus,
|
|
HCI_Disconnect_Command,
|
|
HCI_Error,
|
|
HCI_ErrorCode,
|
|
HCI_Event,
|
|
HCI_GenericReturnParameters,
|
|
HCI_LE_Terminate_BIG_Command,
|
|
HCI_Reset_Command,
|
|
HCI_StatusReturnParameters,
|
|
)
|
|
from bumble.host import DataPacketQueue, Host
|
|
from bumble.transport.common import AsyncPipeSink, TransportSink
|
|
|
|
# -----------------------------------------------------------------------------
|
|
# Logging
|
|
# -----------------------------------------------------------------------------
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.parametrize(
|
|
'supported_commands, lmp_features',
|
|
[
|
|
(
|
|
# Default commands
|
|
'2000800000c000000000e4000000a822000000000000040000f7ffff7f000000'
|
|
'30f0f9ff01008004000000000000000000000000000000000000000000000000',
|
|
# Only LE LMP feature
|
|
'0000000060000000',
|
|
),
|
|
(
|
|
# All commands
|
|
'ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff'
|
|
'ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff',
|
|
# 3 pages of LMP features
|
|
'000102030405060708090A0B0C0D0E0F011112131415161718191A1B1C1D1E1F',
|
|
),
|
|
],
|
|
)
|
|
async def test_reset(supported_commands: str, lmp_features: str):
|
|
controller = Controller('C')
|
|
controller.supported_commands = bytes.fromhex(supported_commands)
|
|
controller.lmp_features = bytes.fromhex(lmp_features)
|
|
host = Host(controller, AsyncPipeSink(controller))
|
|
|
|
await host.reset()
|
|
|
|
assert host.local_lmp_features == int.from_bytes(
|
|
bytes.fromhex(lmp_features), 'little'
|
|
)
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
def test_data_packet_queue():
|
|
controller = unittest.mock.Mock()
|
|
queue = DataPacketQueue(10, 2, controller.send)
|
|
assert queue.queued == 0
|
|
assert queue.completed == 0
|
|
packet = HCI_AclDataPacket(
|
|
connection_handle=123, pb_flag=0, bc_flag=0, data_total_length=0, data=b''
|
|
)
|
|
|
|
queue.enqueue(packet, packet.connection_handle)
|
|
assert queue.queued == 1
|
|
assert queue.completed == 0
|
|
assert controller.send.call_count == 1
|
|
|
|
queue.enqueue(packet, packet.connection_handle)
|
|
assert queue.queued == 2
|
|
assert queue.completed == 0
|
|
assert controller.send.call_count == 2
|
|
|
|
queue.enqueue(packet, packet.connection_handle)
|
|
assert queue.queued == 3
|
|
assert queue.completed == 0
|
|
assert controller.send.call_count == 2
|
|
|
|
queue.on_packets_completed(1, 8000)
|
|
assert queue.queued == 3
|
|
assert queue.completed == 0
|
|
assert controller.send.call_count == 2
|
|
|
|
queue.on_packets_completed(1, 123)
|
|
assert queue.queued == 3
|
|
assert queue.completed == 1
|
|
assert controller.send.call_count == 3
|
|
|
|
queue.enqueue(packet, packet.connection_handle)
|
|
assert queue.queued == 4
|
|
assert queue.completed == 1
|
|
assert controller.send.call_count == 3
|
|
|
|
queue.on_packets_completed(2, 123)
|
|
assert queue.queued == 4
|
|
assert queue.completed == 3
|
|
assert controller.send.call_count == 4
|
|
|
|
queue.on_packets_completed(1, 123)
|
|
assert queue.queued == 4
|
|
assert queue.completed == 4
|
|
assert controller.send.call_count == 4
|
|
|
|
queue.enqueue(packet, 123)
|
|
queue.enqueue(packet, 123)
|
|
queue.enqueue(packet, 123)
|
|
queue.enqueue(packet, 124)
|
|
queue.enqueue(packet, 124)
|
|
queue.enqueue(packet, 124)
|
|
queue.on_packets_completed(1, 123)
|
|
assert queue.queued == 10
|
|
assert queue.completed == 5
|
|
queue.flush(123)
|
|
queue.flush(124)
|
|
assert queue.queued == 10
|
|
assert queue.completed == 10
|
|
|
|
queue.enqueue(packet, 123)
|
|
queue.on_packets_completed(1, 124)
|
|
assert queue.queued == 11
|
|
assert queue.completed == 10
|
|
queue.on_packets_completed(1000, 123)
|
|
assert queue.queued == 11
|
|
assert queue.completed == 11
|
|
|
|
drain_listener = unittest.mock.Mock()
|
|
queue.on('flow', drain_listener.on_flow)
|
|
queue.enqueue(packet, 123)
|
|
assert drain_listener.on_flow.call_count == 0
|
|
queue.on_packets_completed(1, 123)
|
|
assert drain_listener.on_flow.call_count == 1
|
|
queue.enqueue(packet, 123)
|
|
queue.enqueue(packet, 123)
|
|
queue.enqueue(packet, 123)
|
|
queue.flush(123)
|
|
assert drain_listener.on_flow.call_count == 1
|
|
assert queue.queued == 15
|
|
assert queue.completed == 15
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
class Source:
|
|
terminated: asyncio.Future[None]
|
|
sink: TransportSink
|
|
|
|
def set_packet_sink(self, sink: TransportSink) -> None:
|
|
self.sink = sink
|
|
|
|
|
|
class Sink:
|
|
response: HCI_Event
|
|
|
|
def __init__(self, source: Source, response: HCI_Event) -> None:
|
|
self.source = source
|
|
self.response = response
|
|
|
|
def on_packet(self, packet: bytes) -> None:
|
|
self.source.sink.on_packet(bytes(self.response))
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_send_sync_command() -> None:
|
|
source = Source()
|
|
sink = Sink(
|
|
source,
|
|
HCI_Command_Complete_Event(
|
|
1,
|
|
HCI_Reset_Command.op_code,
|
|
HCI_StatusReturnParameters(status=HCI_ErrorCode.SUCCESS),
|
|
),
|
|
)
|
|
|
|
host = Host(source, sink)
|
|
host.ready = True
|
|
|
|
# Sync command with success
|
|
response1 = await host.send_sync_command(HCI_Reset_Command())
|
|
assert response1.status == HCI_ErrorCode.SUCCESS
|
|
|
|
# Sync command with error status should raise
|
|
error_response = HCI_Command_Complete_Event(
|
|
1,
|
|
HCI_Reset_Command.op_code,
|
|
HCI_StatusReturnParameters(status=HCI_ErrorCode.COMMAND_DISALLOWED_ERROR),
|
|
)
|
|
sink.response = error_response
|
|
with pytest.raises(HCI_Error) as excinfo:
|
|
await host.send_sync_command(HCI_Reset_Command())
|
|
|
|
assert excinfo.value.error_code == error_response.return_parameters.status
|
|
|
|
# Sync command with raw result
|
|
response2 = await host.send_sync_command_raw(HCI_Reset_Command())
|
|
assert response2.return_parameters.status == HCI_ErrorCode.COMMAND_DISALLOWED_ERROR
|
|
|
|
# Sync command with a command that's not an HCI_SyncCommand
|
|
# (here, for convenience, we use an HCI_AsyncCommand instance)
|
|
command = HCI_Disconnect_Command(connection_handle=0x1234, reason=0x13)
|
|
sink.response = HCI_Command_Complete_Event(
|
|
1,
|
|
command.op_code,
|
|
HCI_GenericReturnParameters(data=bytes.fromhex("00112233")),
|
|
)
|
|
response3 = await host.send_sync_command_raw(command) # type: ignore
|
|
assert isinstance(response3.return_parameters, HCI_GenericReturnParameters)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_send_async_command() -> None:
|
|
source = Source()
|
|
sink = Sink(
|
|
source,
|
|
HCI_Command_Status_Event(
|
|
HCI_CommandStatus.PENDING,
|
|
1,
|
|
HCI_Reset_Command.op_code,
|
|
),
|
|
)
|
|
|
|
host = Host(source, sink)
|
|
host.ready = True
|
|
|
|
# Normal pending status
|
|
response = await host.send_async_command(
|
|
HCI_LE_Terminate_BIG_Command(big_handle=0, reason=0)
|
|
)
|
|
assert response == HCI_CommandStatus.PENDING
|
|
|
|
# Unknown HCI command result returned as a Command Status
|
|
sink.response = HCI_Command_Status_Event(
|
|
HCI_ErrorCode.UNKNOWN_HCI_COMMAND_ERROR,
|
|
1,
|
|
HCI_LE_Terminate_BIG_Command.op_code,
|
|
)
|
|
response = await host.send_async_command(
|
|
HCI_LE_Terminate_BIG_Command(big_handle=0, reason=0), check_status=False
|
|
)
|
|
assert response == HCI_ErrorCode.UNKNOWN_HCI_COMMAND_ERROR
|
|
|
|
# Unknown HCI command result returned as a Command Complete
|
|
sink.response = HCI_Command_Complete_Event(
|
|
1,
|
|
HCI_LE_Terminate_BIG_Command.op_code,
|
|
HCI_StatusReturnParameters(HCI_ErrorCode.UNKNOWN_HCI_COMMAND_ERROR),
|
|
)
|
|
response = await host.send_async_command(
|
|
HCI_LE_Terminate_BIG_Command(big_handle=0, reason=0), check_status=False
|
|
)
|
|
assert response == HCI_ErrorCode.UNKNOWN_HCI_COMMAND_ERROR
|