Merge pull request #880 from google/gbg/command-status-hack

add workaround for some buggy controllers
This commit is contained in:
Gilles Boccon-Gibod
2026-02-02 23:37:52 -08:00
committed by GitHub
3 changed files with 101 additions and 20 deletions

View File

@@ -616,22 +616,28 @@ class Host(utils.EventEmitter):
if self.supports_command(
hci.HCI_LE_READ_NUMBER_OF_SUPPORTED_ADVERTISING_SETS_COMMAND
):
try:
response10 = await self.send_sync_command(
hci.HCI_LE_Read_Number_Of_Supported_Advertising_Sets_Command()
)
self.number_of_supported_advertising_sets = (
response10.num_supported_advertising_sets
)
except hci.HCI_Error:
logger.warning('Failed to read number of supported advertising sets')
if self.supports_command(
hci.HCI_LE_READ_MAXIMUM_ADVERTISING_DATA_LENGTH_COMMAND
):
try:
response11 = await self.send_sync_command(
hci.HCI_LE_Read_Maximum_Advertising_Data_Length_Command()
)
self.maximum_advertising_data_length = (
response11.max_advertising_data_length
)
except hci.HCI_Error:
logger.warning('Failed to read maximum advertising data length')
@property
def controller(self) -> TransportSink | None:
@@ -776,6 +782,20 @@ class Host(utils.EventEmitter):
) -> hci.HCI_Command_Complete_Event[_RP]:
response = await self._send_command(command, response_timeout)
# For unknown HCI commands, some controllers return Command Status instead of
# Command Complete.
if (
isinstance(response, hci.HCI_Command_Status_Event)
and response.status == hci.HCI_ErrorCode.UNKNOWN_HCI_COMMAND_ERROR
):
return hci.HCI_Command_Complete_Event(
num_hci_command_packets=response.num_hci_command_packets,
command_opcode=command.op_code,
return_parameters=hci.HCI_StatusReturnParameters(
status=hci.HCI_ErrorCode(response.status)
), # type: ignore
)
# Check that the response is of the expected type
assert isinstance(response, hci.HCI_Command_Complete_Event)
@@ -789,19 +809,25 @@ class Host(utils.EventEmitter):
) -> hci.HCI_ErrorCode:
response = await self._send_command(command, response_timeout)
# Check that the response is of the expected type
# For unknown HCI commands, some controllers return Command Complete instead of
# Command Status.
if isinstance(response, hci.HCI_Command_Complete_Event):
# Assume the first byte of the return parameters is the status
if (
status := hci.HCI_ErrorCode(response.parameters[3])
) != hci.HCI_ErrorCode.UNKNOWN_HCI_COMMAND_ERROR:
logger.warning(f'unexpected return paramerers status {status}')
else:
assert isinstance(response, hci.HCI_Command_Status_Event)
status = hci.HCI_ErrorCode(response.status)
# Check the return parameters if required
status = response.status
# Check the status if required
if check_status:
if status != hci.HCI_CommandStatus.PENDING:
logger.warning(
f'{command.name} failed ' f'({hci.HCI_Constant.error_name(status)})'
)
logger.warning(f'{command.name} failed ' f'({status.name})')
raise hci.HCI_Error(status)
return hci.HCI_ErrorCode(status)
return status
@utils.deprecated("Use utils.AsyncRunner.spawn() instead.")
def send_command_sync(self, command: hci.HCI_AsyncCommand) -> None:

View File

@@ -232,6 +232,14 @@ def test_return_parameters() -> None:
assert len(params.local_name) == 248
assert hci.map_null_terminated_utf8_string(params.local_name) == 'hello'
# Some return parameters may be shorter than the full length
# (for Command Complete events with errors)
params = hci.HCI_Read_BD_ADDR_Command.parse_return_parameters(
bytes.fromhex('010011223344')
)
assert isinstance(params, hci.HCI_StatusReturnParameters)
assert params.status == hci.HCI_ErrorCode.UNKNOWN_HCI_COMMAND_ERROR
# -----------------------------------------------------------------------------
def test_HCI_Command():

View File

@@ -26,11 +26,14 @@ from bumble.controller import Controller
from bumble.hci import (
HCI_AclDataPacket,
HCI_Command_Complete_Event,
HCI_Command_Status_Event,
HCI_CommandStatus,
HCI_Disconnect_Command,
HCI_Error,
HCI_ErrorCode,
HCI_Event,
HCI_GenericReturnParameters,
HCI_LE_Terminate_BIG_Command,
HCI_Reset_Command,
HCI_StatusReturnParameters,
)
@@ -229,3 +232,47 @@ async def test_send_sync_command() -> None:
)
response3 = await host.send_sync_command_raw(command) # type: ignore
assert isinstance(response3.return_parameters, HCI_GenericReturnParameters)
@pytest.mark.asyncio
async def test_send_async_command() -> None:
source = Source()
sink = Sink(
source,
HCI_Command_Status_Event(
HCI_CommandStatus.PENDING,
1,
HCI_Reset_Command.op_code,
),
)
host = Host(source, sink)
host.ready = True
# Normal pending status
response = await host.send_async_command(
HCI_LE_Terminate_BIG_Command(big_handle=0, reason=0)
)
assert response == HCI_CommandStatus.PENDING
# Unknown HCI command result returned as a Command Status
sink.response = HCI_Command_Status_Event(
HCI_ErrorCode.UNKNOWN_HCI_COMMAND_ERROR,
1,
HCI_LE_Terminate_BIG_Command.op_code,
)
response = await host.send_async_command(
HCI_LE_Terminate_BIG_Command(big_handle=0, reason=0), check_status=False
)
assert response == HCI_ErrorCode.UNKNOWN_HCI_COMMAND_ERROR
# Unknown HCI command result returned as a Command Complete
sink.response = HCI_Command_Complete_Event(
1,
HCI_LE_Terminate_BIG_Command.op_code,
HCI_StatusReturnParameters(HCI_ErrorCode.UNKNOWN_HCI_COMMAND_ERROR),
)
response = await host.send_async_command(
HCI_LE_Terminate_BIG_Command(big_handle=0, reason=0), check_status=False
)
assert response == HCI_ErrorCode.UNKNOWN_HCI_COMMAND_ERROR