Merge pull request #877 from google/gbg/hci-fixes

fix a few HCI types and make the bridge more robust
This commit is contained in:
Gilles Boccon-Gibod
2026-02-02 11:19:28 -08:00
committed by GitHub
11 changed files with 235 additions and 118 deletions

View File

@@ -81,7 +81,9 @@ async def async_main():
response = hci.HCI_Command_Complete_Event( response = hci.HCI_Command_Complete_Event(
num_hci_command_packets=1, num_hci_command_packets=1,
command_opcode=hci_packet.op_code, command_opcode=hci_packet.op_code,
return_parameters=bytes([hci.HCI_SUCCESS]), return_parameters=hci.HCI_StatusReturnParameters(
status=hci.HCI_ErrorCode.SUCCESS
),
) )
# Return a packet with 'respond to sender' set to True # Return a packet with 'respond to sender' set to True
return (bytes(response), True) return (bytes(response), True)

View File

@@ -37,7 +37,12 @@ class HCI_Bridge:
def on_packet(self, packet): def on_packet(self, packet):
# Convert the packet bytes to an object # Convert the packet bytes to an object
hci_packet = HCI_Packet.from_bytes(packet) try:
hci_packet = HCI_Packet.from_bytes(packet)
except Exception:
logger.warning('forwarding unparsed packet as-is')
self.hci_sink.on_packet(packet)
return
# Filter the packet # Filter the packet
if self.packet_filter is not None: if self.packet_filter is not None:
@@ -50,7 +55,10 @@ class HCI_Bridge:
return return
# Analyze the packet # Analyze the packet
self.trace(hci_packet) try:
self.trace(hci_packet)
except Exception:
logger.exception('Exception while tracing packet')
# Bridge the packet # Bridge the packet
self.hci_sink.on_packet(packet) self.hci_sink.on_packet(packet)

View File

@@ -1177,7 +1177,7 @@ class ChannelSoundingCapabilities:
rtt_capability: int rtt_capability: int
rtt_aa_only_n: int rtt_aa_only_n: int
rtt_sounding_n: int rtt_sounding_n: int
rtt_random_payload_n: int rtt_random_sequence_n: int
nadm_sounding_capability: int nadm_sounding_capability: int
nadm_random_capability: int nadm_random_capability: int
cs_sync_phys_supported: int cs_sync_phys_supported: int
@@ -2763,24 +2763,39 @@ class Device(utils.CompositeEventEmitter):
logger.warning(f'!!! Command {command.name} timed out') logger.warning(f'!!! Command {command.name} timed out')
raise CommandTimeoutError() from error raise CommandTimeoutError() from error
async def send_sync_command( async def send_sync_command(self, command: hci.HCI_SyncCommand[_RP]) -> _RP:
self, command: hci.HCI_SyncCommand[_RP], check_status: bool = True
) -> _RP:
''' '''
Send a synchronous command via the host. Send a synchronous command via the host.
If the `status` field of the response's `return_parameters` is not equal to
`SUCCESS` an exception is raised.
Params: Params:
command: the command to send. command: the command to send.
check_status: If `True`, check the `status` field of the response's
`return_parameters` and raise and exception if not equal to `SUCCESS`.
Returns: Returns:
An instance of the return parameters class associated with the command class. An instance of the return parameters class associated with the command class.
''' '''
try: try:
return await self.host.send_sync_command( return await self.host.send_sync_command(command, self.command_timeout)
command, check_status, self.command_timeout except asyncio.TimeoutError as error:
) logger.warning(f'!!! Command {command.name} timed out')
raise CommandTimeoutError() from error
async def send_sync_command_raw(
self, command: hci.HCI_SyncCommand[_RP]
) -> hci.HCI_Command_Complete_Event[_RP]:
'''
Send a synchronous command via the host without checking the response.
Params:
command: the command to send.
Returns:
An HCI_Command_Complete_Event instance.
'''
try:
return await self.host.send_sync_command_raw(command, self.command_timeout)
except asyncio.TimeoutError as error: except asyncio.TimeoutError as error:
logger.warning(f'!!! Command {command.name} timed out') logger.warning(f'!!! Command {command.name} timed out')
raise CommandTimeoutError() from error raise CommandTimeoutError() from error
@@ -2797,7 +2812,7 @@ class Device(utils.CompositeEventEmitter):
raise and exception if not equal to `PENDING`. raise and exception if not equal to `PENDING`.
Returns: Returns:
An instance of the return parameters class associated with the command class. A status code.
''' '''
try: try:
return await self.host.send_async_command( return await self.host.send_async_command(
@@ -2812,12 +2827,12 @@ class Device(utils.CompositeEventEmitter):
await self.host.reset() await self.host.reset()
# Try to get the public address from the controller # Try to get the public address from the controller
response = await self.host.send_sync_command( try:
hci.HCI_Read_BD_ADDR_Command(), check_status=False response = await self.host.send_sync_command(hci.HCI_Read_BD_ADDR_Command())
)
if response.status == hci.HCI_SUCCESS:
logger.debug(color(f'BD_ADDR: {response.bd_addr}', 'yellow')) logger.debug(color(f'BD_ADDR: {response.bd_addr}', 'yellow'))
self.public_address = response.bd_addr self.public_address = response.bd_addr
except hci.HCI_Error:
logger.debug('Controller has no public address')
# Instantiate the Key Store (we do this here rather than at __init__ time # Instantiate the Key Store (we do this here rather than at __init__ time
# because some Key Store implementations use the public address as a namespace) # because some Key Store implementations use the public address as a namespace)
@@ -2926,7 +2941,7 @@ class Device(utils.CompositeEventEmitter):
rtt_capability=result.rtt_capability, rtt_capability=result.rtt_capability,
rtt_aa_only_n=result.rtt_aa_only_n, rtt_aa_only_n=result.rtt_aa_only_n,
rtt_sounding_n=result.rtt_sounding_n, rtt_sounding_n=result.rtt_sounding_n,
rtt_random_payload_n=result.rtt_random_payload_n, rtt_random_sequence_n=result.rtt_random_sequence_n,
nadm_sounding_capability=result.nadm_sounding_capability, nadm_sounding_capability=result.nadm_sounding_capability,
nadm_random_capability=result.nadm_random_capability, nadm_random_capability=result.nadm_random_capability,
cs_sync_phys_supported=result.cs_sync_phys_supported, cs_sync_phys_supported=result.cs_sync_phys_supported,
@@ -2954,27 +2969,23 @@ class Device(utils.CompositeEventEmitter):
) )
if self.classic_enabled: if self.classic_enabled:
await self.send_sync_command( await self.send_sync_command_raw(
hci.HCI_Write_Local_Name_Command(local_name=self.name.encode('utf8')), hci.HCI_Write_Local_Name_Command(local_name=self.name.encode('utf8'))
check_status=False,
) )
await self.send_sync_command( await self.send_sync_command_raw(
hci.HCI_Write_Class_Of_Device_Command( hci.HCI_Write_Class_Of_Device_Command(
class_of_device=self.class_of_device class_of_device=self.class_of_device
), )
check_status=False,
) )
await self.send_sync_command( await self.send_sync_command_raw(
hci.HCI_Write_Simple_Pairing_Mode_Command( hci.HCI_Write_Simple_Pairing_Mode_Command(
simple_pairing_mode=int(self.classic_ssp_enabled) simple_pairing_mode=int(self.classic_ssp_enabled)
), )
check_status=False,
) )
await self.send_sync_command( await self.send_sync_command_raw(
hci.HCI_Write_Secure_Connections_Host_Support_Command( hci.HCI_Write_Secure_Connections_Host_Support_Command(
secure_connections_host_support=int(self.classic_sc_enabled) secure_connections_host_support=int(self.classic_sc_enabled)
), )
check_status=False,
) )
await self.set_connectable(self.connectable) await self.set_connectable(self.connectable)
await self.set_discoverable(self.discoverable) await self.set_discoverable(self.discoverable)
@@ -6719,7 +6730,7 @@ class Device(utils.CompositeEventEmitter):
rtt_capability=event.rtt_capability, rtt_capability=event.rtt_capability,
rtt_aa_only_n=event.rtt_aa_only_n, rtt_aa_only_n=event.rtt_aa_only_n,
rtt_sounding_n=event.rtt_sounding_n, rtt_sounding_n=event.rtt_sounding_n,
rtt_random_payload_n=event.rtt_random_payload_n, rtt_random_sequence_n=event.rtt_random_sequence_n,
nadm_sounding_capability=event.nadm_sounding_capability, nadm_sounding_capability=event.nadm_sounding_capability,
nadm_random_capability=event.nadm_random_capability, nadm_random_capability=event.nadm_random_capability,
cs_sync_phys_supported=event.cs_sync_phys_supported, cs_sync_phys_supported=event.cs_sync_phys_supported,

View File

@@ -663,10 +663,13 @@ class Driver(common.Driver):
async def read_device_info(self) -> dict[ValueType, Any]: async def read_device_info(self) -> dict[ValueType, Any]:
self.host.ready = True self.host.ready = True
response1 = await self.host.send_sync_command( response1 = await self.host.send_sync_command_raw(hci.HCI_Reset_Command())
hci.HCI_Reset_Command(), check_status=False if not isinstance(
) response1.return_parameters, hci.HCI_StatusReturnParameters
if response1.status not in (hci.HCI_UNKNOWN_HCI_COMMAND_ERROR, hci.HCI_SUCCESS): ) or response1.return_parameters.status not in (
hci.HCI_UNKNOWN_HCI_COMMAND_ERROR,
hci.HCI_SUCCESS,
):
# When the controller is in operational mode, the response is a # When the controller is in operational mode, the response is a
# successful response. # successful response.
# When the controller is in bootloader mode, # When the controller is in bootloader mode,
@@ -676,13 +679,18 @@ class Driver(common.Driver):
raise DriverError("unexpected HCI response") raise DriverError("unexpected HCI response")
# Read the firmware version. # Read the firmware version.
response2 = await self.host.send_sync_command( response2 = await self.host.send_sync_command_raw(
HCI_Intel_Read_Version_Command(param0=0xFF), check_status=False HCI_Intel_Read_Version_Command(param0=0xFF)
) )
if response2.status != 0: # type: ignore if (
not isinstance(
response2.return_parameters, HCI_Intel_Read_Version_ReturnParameters
)
or response2.return_parameters.status != 0
):
raise DriverError("HCI_Intel_Read_Version_Command error") raise DriverError("HCI_Intel_Read_Version_Command error")
tlvs = _parse_tlv(response2.tlv) # type: ignore tlvs = _parse_tlv(response2.return_parameters.tlv) # type: ignore
# Convert the list to a dict. That's Ok here because we only expect each type # Convert the list to a dict. That's Ok here because we only expect each type
# to appear just once. # to appear just once.

View File

@@ -534,11 +534,13 @@ class Driver(common.Driver):
@staticmethod @staticmethod
async def get_loaded_firmware_version(host: Host) -> int | None: async def get_loaded_firmware_version(host: Host) -> int | None:
response1 = await host.send_sync_command( response1 = await host.send_sync_command_raw(HCI_RTK_Read_ROM_Version_Command())
HCI_RTK_Read_ROM_Version_Command(), check_status=False if (
) not isinstance(
response1.return_parameters, HCI_RTK_Read_ROM_Version_ReturnParameters
if response1.status != hci.HCI_SUCCESS: )
or response1.return_parameters.status != hci.HCI_SUCCESS
):
return None return None
response2 = await host.send_sync_command( response2 = await host.send_sync_command(
@@ -559,13 +561,20 @@ class Driver(common.Driver):
await host.send_sync_command(hci.HCI_Reset_Command()) await host.send_sync_command(hci.HCI_Reset_Command())
host.ready = True host.ready = True
command = hci.HCI_Read_Local_Version_Information_Command() response = await host.send_sync_command_raw(
response = await host.send_sync_command(command, check_status=False) hci.HCI_Read_Local_Version_Information_Command()
if response.status != hci.HCI_SUCCESS: )
if (
not isinstance(
response.return_parameters,
hci.HCI_Read_Local_Version_Information_ReturnParameters,
)
or response.return_parameters.status != hci.HCI_SUCCESS
):
logger.error("failed to probe local version information") logger.error("failed to probe local version information")
return None return None
local_version = response local_version = response.return_parameters
logger.debug( logger.debug(
f"looking for a driver: 0x{local_version.lmp_subversion:04X} " f"looking for a driver: 0x{local_version.lmp_subversion:04X} "
@@ -641,15 +650,21 @@ class Driver(common.Driver):
# TODO: load the firmware # TODO: load the firmware
async def download_for_rtl8723b(self): async def download_for_rtl8723b(self) -> int | None:
if self.driver_info.has_rom_version: if self.driver_info.has_rom_version:
response1 = await self.host.send_sync_command( response1 = await self.host.send_sync_command_raw(
HCI_RTK_Read_ROM_Version_Command(), check_status=False HCI_RTK_Read_ROM_Version_Command()
) )
if response1.status != hci.HCI_SUCCESS: if (
not isinstance(
response1.return_parameters,
HCI_RTK_Read_ROM_Version_ReturnParameters,
)
or response1.return_parameters.status != hci.HCI_SUCCESS
):
logger.warning("can't get ROM version") logger.warning("can't get ROM version")
return None return None
rom_version = response1.version rom_version = response1.return_parameters.version
logger.debug(f"ROM version before download: {rom_version:04X}") logger.debug(f"ROM version before download: {rom_version:04X}")
else: else:
rom_version = 0 rom_version = 0
@@ -691,13 +706,18 @@ class Driver(common.Driver):
logger.debug("download complete!") logger.debug("download complete!")
# Read the version again # Read the version again
response2 = await self.host.send_sync_command( response2 = await self.host.send_sync_command_raw(
HCI_RTK_Read_ROM_Version_Command(), check_status=False HCI_RTK_Read_ROM_Version_Command()
) )
if response2.status != hci.HCI_SUCCESS: if (
not isinstance(
response2.return_parameters, HCI_RTK_Read_ROM_Version_ReturnParameters
)
or response2.return_parameters.status != hci.HCI_SUCCESS
):
logger.warning("can't get ROM version") logger.warning("can't get ROM version")
else: else:
rom_version = response2.version rom_version = response2.return_parameters.version
logger.debug(f"ROM version after download: {rom_version:02X}") logger.debug(f"ROM version after download: {rom_version:02X}")
return firmware.version return firmware.version

View File

@@ -2407,24 +2407,28 @@ class HCI_Packet:
@classmethod @classmethod
def from_bytes(cls, packet: bytes) -> HCI_Packet: def from_bytes(cls, packet: bytes) -> HCI_Packet:
packet_type = packet[0] try:
packet_type = packet[0]
if packet_type == HCI_COMMAND_PACKET: if packet_type == HCI_COMMAND_PACKET:
return HCI_Command.from_bytes(packet) return HCI_Command.from_bytes(packet)
if packet_type == HCI_ACL_DATA_PACKET: if packet_type == HCI_ACL_DATA_PACKET:
return HCI_AclDataPacket.from_bytes(packet) return HCI_AclDataPacket.from_bytes(packet)
if packet_type == HCI_SYNCHRONOUS_DATA_PACKET: if packet_type == HCI_SYNCHRONOUS_DATA_PACKET:
return HCI_SynchronousDataPacket.from_bytes(packet) return HCI_SynchronousDataPacket.from_bytes(packet)
if packet_type == HCI_EVENT_PACKET: if packet_type == HCI_EVENT_PACKET:
return HCI_Event.from_bytes(packet) return HCI_Event.from_bytes(packet)
if packet_type == HCI_ISO_DATA_PACKET: if packet_type == HCI_ISO_DATA_PACKET:
return HCI_IsoDataPacket.from_bytes(packet) return HCI_IsoDataPacket.from_bytes(packet)
return HCI_CustomPacket(packet) return HCI_CustomPacket(packet)
except Exception as e:
logger.error(f'error parsing HCI packet [{packet.hex()}]: {e}')
raise
def __init__(self, name: str) -> None: def __init__(self, name: str) -> None:
self.name = name self.name = name
@@ -2597,6 +2601,21 @@ class HCI_GenericReturnParameters(HCI_ReturnParameters):
class HCI_StatusReturnParameters(HCI_ReturnParameters): class HCI_StatusReturnParameters(HCI_ReturnParameters):
status: HCI_ErrorCode = field(metadata=HCI_ErrorCode.type_metadata(1)) status: HCI_ErrorCode = field(metadata=HCI_ErrorCode.type_metadata(1))
@classmethod
def from_parameters(cls, parameters: bytes) -> Self | HCI_StatusReturnParameters:
status = HCI_ErrorCode(parameters[0])
if status != HCI_ErrorCode.SUCCESS:
# Don't parse further, just return the status.
return HCI_StatusReturnParameters(status=status)
return cls(**HCI_Object.dict_from_bytes(parameters, 0, cls.fields))
@dataclasses.dataclass
class HCI_GenericStatusReturnParameters(HCI_StatusReturnParameters):
data: bytes = field(metadata=metadata('*'))
@dataclasses.dataclass @dataclasses.dataclass
class HCI_StatusAndAddressReturnParameters(HCI_StatusReturnParameters): class HCI_StatusAndAddressReturnParameters(HCI_StatusReturnParameters):
@@ -5854,7 +5873,7 @@ class HCI_LE_CS_Read_Local_Supported_Capabilities_ReturnParameters(
rtt_capability: int = field(metadata=metadata(1)) rtt_capability: int = field(metadata=metadata(1))
rtt_aa_only_n: int = field(metadata=metadata(1)) rtt_aa_only_n: int = field(metadata=metadata(1))
rtt_sounding_n: int = field(metadata=metadata(1)) rtt_sounding_n: int = field(metadata=metadata(1))
rtt_random_payload_n: int = field(metadata=metadata(1)) rtt_random_sequence_n: int = field(metadata=metadata(1))
nadm_sounding_capability: int = field(metadata=metadata(2)) nadm_sounding_capability: int = field(metadata=metadata(2))
nadm_random_capability: int = field(metadata=metadata(2)) nadm_random_capability: int = field(metadata=metadata(2))
cs_sync_phys_supported: int = field(metadata=metadata(CS_SYNC_PHY_SUPPORTED_SPEC)) cs_sync_phys_supported: int = field(metadata=metadata(CS_SYNC_PHY_SUPPORTED_SPEC))
@@ -5910,7 +5929,7 @@ class HCI_LE_CS_Write_Cached_Remote_Supported_Capabilities_Command(
rtt_capability: int = field(metadata=metadata(1)) rtt_capability: int = field(metadata=metadata(1))
rtt_aa_only_n: int = field(metadata=metadata(1)) rtt_aa_only_n: int = field(metadata=metadata(1))
rtt_sounding_n: int = field(metadata=metadata(1)) rtt_sounding_n: int = field(metadata=metadata(1))
rtt_random_payload_n: int = field(metadata=metadata(1)) rtt_random_sequence_n: int = field(metadata=metadata(1))
nadm_sounding_capability: int = field(metadata=metadata(2)) nadm_sounding_capability: int = field(metadata=metadata(2))
nadm_random_capability: int = field(metadata=metadata(2)) nadm_random_capability: int = field(metadata=metadata(2))
cs_sync_phys_supported: int = field(metadata=metadata(CS_SYNC_PHY_SUPPORTED_SPEC)) cs_sync_phys_supported: int = field(metadata=metadata(CS_SYNC_PHY_SUPPORTED_SPEC))
@@ -7118,7 +7137,7 @@ class HCI_LE_CS_Read_Remote_Supported_Capabilities_Complete_Event(HCI_LE_Meta_Ev
rtt_capability: int = field(metadata=metadata(1)) rtt_capability: int = field(metadata=metadata(1))
rtt_aa_only_n: int = field(metadata=metadata(1)) rtt_aa_only_n: int = field(metadata=metadata(1))
rtt_sounding_n: int = field(metadata=metadata(1)) rtt_sounding_n: int = field(metadata=metadata(1))
rtt_random_payload_n: int = field(metadata=metadata(1)) rtt_random_sequence_n: int = field(metadata=metadata(1))
nadm_sounding_capability: int = field(metadata=metadata(2)) nadm_sounding_capability: int = field(metadata=metadata(2))
nadm_random_capability: int = field(metadata=metadata(2)) nadm_random_capability: int = field(metadata=metadata(2))
cs_sync_phys_supported: int = field(metadata=metadata(CS_SYNC_PHY_SUPPORTED_SPEC)) cs_sync_phys_supported: int = field(metadata=metadata(CS_SYNC_PHY_SUPPORTED_SPEC))
@@ -7494,6 +7513,7 @@ class HCI_Command_Complete_Event(HCI_Event, Generic[_RP]):
def from_parameters(cls, parameters: bytes) -> Self: def from_parameters(cls, parameters: bytes) -> Self:
event = cls(**HCI_Object.dict_from_bytes(parameters, 0, cls.fields)) event = cls(**HCI_Object.dict_from_bytes(parameters, 0, cls.fields))
event.parameters = parameters event.parameters = parameters
return_parameters_bytes = parameters[3:]
# Find the class for the matching command. # Find the class for the matching command.
subclass = HCI_Command.command_classes.get(event.command_opcode) subclass = HCI_Command.command_classes.get(event.command_opcode)
@@ -7506,16 +7526,16 @@ class HCI_Command_Complete_Event(HCI_Event, Generic[_RP]):
'HCI Command Complete event with opcode for a class that is not' 'HCI Command Complete event with opcode for a class that is not'
' an HCI_SyncCommand subclass: ' ' an HCI_SyncCommand subclass: '
f'opcode={event.command_opcode:#04x}, ' f'opcode={event.command_opcode:#04x}, '
f'type={type(subclass).__name__}' f'type={subclass.__name__}'
) )
event.return_parameters = HCI_GenericReturnParameters( event.return_parameters = HCI_GenericReturnParameters(
data=event.return_parameters # type: ignore[arg-type] data=return_parameters_bytes
) # type: ignore[assignment] ) # type: ignore[assignment]
return event return event
# Parse the return parameters bytes into an object. # Parse the return parameters bytes into an object.
event.return_parameters = subclass.parse_return_parameters( event.return_parameters = subclass.parse_return_parameters(
event.return_parameters # type: ignore[arg-type] return_parameters_bytes
) # type: ignore[assignment] ) # type: ignore[assignment]
return event return event

View File

@@ -270,7 +270,12 @@ class Host(utils.EventEmitter):
self.sco_links = {} # SCO links, by connection handle self.sco_links = {} # SCO links, by connection handle
self.bigs = {} # BIG Handle to BIS Handles self.bigs = {} # BIG Handle to BIS Handles
self.pending_command: hci.HCI_SyncCommand | hci.HCI_AsyncCommand | None = None self.pending_command: hci.HCI_SyncCommand | hci.HCI_AsyncCommand | None = None
self.pending_response: asyncio.Future[Any] | None = None self.pending_response: (
asyncio.Future[
hci.HCI_Command_Complete_Event | hci.HCI_Command_Status_Event
]
| None
) = None
self.number_of_supported_advertising_sets = 0 self.number_of_supported_advertising_sets = 0
self.maximum_advertising_data_length = 31 self.maximum_advertising_data_length = 31
self.local_version: ( self.local_version: (
@@ -658,25 +663,35 @@ class Host(utils.EventEmitter):
response_timeout: float | None = None, response_timeout: float | None = None,
) -> hci.HCI_Command_Complete_Event | hci.HCI_Command_Status_Event: ) -> hci.HCI_Command_Complete_Event | hci.HCI_Command_Status_Event:
# Wait until we can send (only one pending command at a time) # Wait until we can send (only one pending command at a time)
async with self.command_semaphore: await self.command_semaphore.acquire()
assert self.pending_command is None
assert self.pending_response is None
# Create a future value to hold the eventual response # Create a future value to hold the eventual response
self.pending_response = asyncio.get_running_loop().create_future() assert self.pending_command is None
self.pending_command = command assert self.pending_response is None
self.pending_response = asyncio.get_running_loop().create_future()
self.pending_command = command
try: response: (
self.send_hci_packet(command) hci.HCI_Command_Complete_Event | hci.HCI_Command_Status_Event | None
return await asyncio.wait_for( ) = None
self.pending_response, timeout=response_timeout try:
) self.send_hci_packet(command)
except Exception: response = await asyncio.wait_for(
logger.exception(color("!!! Exception while sending command:", "red")) self.pending_response, timeout=response_timeout
raise )
finally: return response
self.pending_command = None except Exception:
self.pending_response = None logger.exception(color("!!! Exception while sending command:", "red"))
raise
finally:
self.pending_command = None
self.pending_response = None
if (
response is not None
and response.num_hci_command_packets
and self.command_semaphore.locked()
):
self.command_semaphore.release()
@overload @overload
async def send_command( async def send_command(
@@ -729,30 +744,42 @@ class Host(utils.EventEmitter):
return response return response
async def send_sync_command( async def send_sync_command(
self, command: hci.HCI_SyncCommand[_RP], response_timeout: float | None = None
) -> _RP:
response = await self.send_sync_command_raw(command, response_timeout)
return_parameters = response.return_parameters
# Check the return parameters's status
if isinstance(return_parameters, hci.HCI_StatusReturnParameters):
status = return_parameters.status
elif isinstance(return_parameters, hci.HCI_GenericReturnParameters):
# if the payload has at least one byte, assume the first byte is the status
if not return_parameters.data:
raise RuntimeError('no status byte in return parameters')
status = hci.HCI_ErrorCode(return_parameters.data[0])
else:
raise RuntimeError(
f'unexpected return parameters type ({type(return_parameters)})'
)
if status != hci.HCI_ErrorCode.SUCCESS:
logger.warning(
f'{command.name} failed ' f'({hci.HCI_Constant.error_name(status)})'
)
raise hci.HCI_Error(status)
return return_parameters
async def send_sync_command_raw(
self, self,
command: hci.HCI_SyncCommand[_RP], command: hci.HCI_SyncCommand[_RP],
check_status: bool = True,
response_timeout: float | None = None, response_timeout: float | None = None,
) -> _RP: ) -> hci.HCI_Command_Complete_Event[_RP]:
response = await self._send_command(command, response_timeout) response = await self._send_command(command, response_timeout)
# Check that the response is of the expected type # Check that the response is of the expected type
assert isinstance(response, hci.HCI_Command_Complete_Event) assert isinstance(response, hci.HCI_Command_Complete_Event)
return_parameters: _RP = response.return_parameters
assert isinstance(return_parameters, command.return_parameters_class)
# Check the return parameters if required return response
if check_status:
if isinstance(return_parameters, hci.HCI_StatusReturnParameters):
status = return_parameters.status
if status != hci.HCI_SUCCESS:
logger.warning(
f'{command.name} failed '
f'({hci.HCI_Constant.error_name(status)})'
)
raise hci.HCI_Error(status)
return return_parameters
async def send_async_command( async def send_async_command(
self, self,
@@ -1003,6 +1030,8 @@ class Host(utils.EventEmitter):
self.pending_response.set_result(event) self.pending_response.set_result(event)
else: else:
logger.warning('!!! no pending response future to set') logger.warning('!!! no pending response future to set')
if event.num_hci_command_packets and self.command_semaphore.locked():
self.command_semaphore.release()
############################################################ ############################################################
# HCI handlers # HCI handlers
@@ -1014,7 +1043,13 @@ class Host(utils.EventEmitter):
if event.command_opcode == 0: if event.command_opcode == 0:
# This is used just for the Num_HCI_Command_Packets field, not related to # This is used just for the Num_HCI_Command_Packets field, not related to
# an actual command # an actual command
logger.debug('no-command event') logger.debug('no-command event for flow control')
# Release the command semaphore if needed
if event.num_hci_command_packets and self.command_semaphore.locked():
logger.debug('command complete event releasing semaphore')
self.command_semaphore.release()
return return
return self.on_command_processed(event) return self.on_command_processed(event)

View File

@@ -194,7 +194,7 @@ async def open_android_netsim_controller_transport(
# We only accept BLUETOOTH # We only accept BLUETOOTH
if request.initial_info.chip.kind != ChipKind.BLUETOOTH: if request.initial_info.chip.kind != ChipKind.BLUETOOTH:
logger.warning('Unsupported chip type') logger.debug('Request for unsupported chip type')
error = PacketResponse(error='Unsupported chip type') error = PacketResponse(error='Unsupported chip type')
await self.context.write(error) await self.context.write(error)
# return # return

View File

@@ -42,8 +42,7 @@ response = await host.send_sync_command(
handle_type=HCI_Write_Tx_Power_Level_Command.TX_POWER_HANDLE_TYPE_ADV, handle_type=HCI_Write_Tx_Power_Level_Command.TX_POWER_HANDLE_TYPE_ADV,
connection_handle=0, connection_handle=0,
tx_power_level=-4, tx_power_level=-4,
), )
check_status=False
) )
if response.status == HCI_SUCCESS: if response.status == HCI_SUCCESS:

View File

@@ -218,9 +218,9 @@ def test_return_parameters() -> None:
assert isinstance(params.status, utils.OpenIntEnum) assert isinstance(params.status, utils.OpenIntEnum)
params = hci.HCI_Read_BD_ADDR_Command.parse_return_parameters( params = hci.HCI_Read_BD_ADDR_Command.parse_return_parameters(
bytes.fromhex('3C001122334455') bytes.fromhex('00001122334455')
) )
assert params.status == hci.HCI_ErrorCode.ADVERTISING_TIMEOUT_ERROR assert params.status == hci.HCI_ErrorCode.SUCCESS
assert isinstance(params.status, utils.OpenIntEnum) assert isinstance(params.status, utils.OpenIntEnum)
assert isinstance(params.bd_addr, hci.Address) assert isinstance(params.bd_addr, hci.Address)

View File

@@ -26,9 +26,11 @@ from bumble.controller import Controller
from bumble.hci import ( from bumble.hci import (
HCI_AclDataPacket, HCI_AclDataPacket,
HCI_Command_Complete_Event, HCI_Command_Complete_Event,
HCI_Disconnect_Command,
HCI_Error, HCI_Error,
HCI_ErrorCode, HCI_ErrorCode,
HCI_Event, HCI_Event,
HCI_GenericReturnParameters,
HCI_Reset_Command, HCI_Reset_Command,
HCI_StatusReturnParameters, HCI_StatusReturnParameters,
) )
@@ -195,6 +197,7 @@ async def test_send_sync_command() -> None:
) )
host = Host(source, sink) host = Host(source, sink)
host.ready = True
# Sync command with success # Sync command with success
response1 = await host.send_sync_command(HCI_Reset_Command()) response1 = await host.send_sync_command(HCI_Reset_Command())
@@ -212,6 +215,17 @@ async def test_send_sync_command() -> None:
assert excinfo.value.error_code == error_response.return_parameters.status assert excinfo.value.error_code == error_response.return_parameters.status
# Sync command with error status should not raise when `check_status` is False # Sync command with raw result
response2 = await host.send_sync_command(HCI_Reset_Command(), check_status=False) response2 = await host.send_sync_command_raw(HCI_Reset_Command())
assert response2.status == HCI_ErrorCode.COMMAND_DISALLOWED_ERROR assert response2.return_parameters.status == HCI_ErrorCode.COMMAND_DISALLOWED_ERROR
# Sync command with a command that's not an HCI_SyncCommand
# (here, for convenience, we use an HCI_AsyncCommand instance)
command = HCI_Disconnect_Command(connection_handle=0x1234, reason=0x13)
sink.response = HCI_Command_Complete_Event(
1,
command.op_code,
HCI_GenericReturnParameters(data=bytes.fromhex("00112233")),
)
response3 = await host.send_sync_command_raw(command) # type: ignore
assert isinstance(response3.return_parameters, HCI_GenericReturnParameters)