Compare commits

...

20 Commits

Author SHA1 Message Date
Gilles Boccon-Gibod 81985c47a9 remove superfluous statement 2026-02-02 11:12:28 -08:00
Gilles Boccon-Gibod 2a59e19283 fix comment 2026-01-29 19:09:46 -08:00
Gilles Boccon-Gibod f44d013690 make bridge more robust 2026-01-27 09:47:52 -08:00
Gilles Boccon-Gibod e63dc15ede fix handling of return parameters 2026-01-27 09:39:22 -08:00
Gilles Boccon-Gibod c901e15666 fix a few HCI types and make the bridge more robust 2026-01-25 13:47:14 -08:00
Gilles Boccon-Gibod 022323b19c Merge pull request #871 from google/gbg/sci
add basic support for SCI
2026-01-24 10:39:11 -08:00
Josh Wu 7efbd303e0 Merge pull request #876 from ttdennis/await_termination_fix
Update apps and examples to await .terminated instead of wait_for_termination()
2026-01-24 11:44:19 +08:00
Dennis Heinze 49530d8d6d Update apps and examples to await .terminated 2026-01-24 00:20:55 +01:00
Josh Wu 3f9ef5aac2 Merge pull request #873 from zxzxwu/l2cap
L2CAP: Fix wrong CID on reject
2026-01-23 12:44:59 +08:00
Josh Wu e488ea9783 Merge pull request #872 from zxzxwu/avrcp
AVRCP: Fix wrong field specs
2026-01-23 12:36:14 +08:00
Josh Wu 21d937c2f1 Merge pull request #865 from willnix/pcapsnoop
Added a PcapSnooper class
2026-01-23 12:33:15 +08:00
Frieder Steinmetz a8396e6cce Formatted with black again. 2026-01-22 17:49:58 +01:00
Josh Wu 7e1b1c8f78 L2CAP: Fix wrong CID on reject 2026-01-22 23:16:25 +08:00
Josh Wu 55719bf6de AVRCP: Fix wrong field specs 2026-01-22 22:18:58 +08:00
Frieder Steinmetz 5059920696 Please mypy.\n\nTwo calls to open(), some more annotations and a rescoped global were needed. 2026-01-22 10:40:08 +01:00
Gilles Boccon-Gibod 252f3e49b6 Merge pull request #870 from antipatico/feat_AV53C1 2026-01-20 10:46:52 -08:00
Jacopo Scannella f3ecf04479 Added support for STA-AV53C1-USB-BLUETOOTH StarTech(dot)com dongle - RTL8761BUE 2026-01-20 09:32:51 +01:00
Frieder Steinmetz c69c1532cc Fix comments that were messed up by black 2026-01-15 19:06:03 +01:00
Frieder Steinmetz f95b2054c8 Formatted with 2026-01-15 10:50:33 +01:00
Frieder Steinmetz 3fdd7ee45e Added the PcapSnooper class.
The class implements a bumble snooper that writes PCAP records.
It can write to either a file or a named pipe.
The latter is useful to bridge with wireshark extcap for live logging.
2026-01-14 23:40:59 +01:00
38 changed files with 415 additions and 168 deletions
+1 -1
View File
@@ -352,7 +352,7 @@ async def run(
await bridge.start()
# Wait until the source terminates
await hci_source.wait_for_termination()
await hci_source.terminated
@click.command()
+3 -1
View File
@@ -81,7 +81,9 @@ async def async_main():
response = hci.HCI_Command_Complete_Event(
num_hci_command_packets=1,
command_opcode=hci_packet.op_code,
return_parameters=bytes([hci.HCI_SUCCESS]),
return_parameters=hci.HCI_StatusReturnParameters(
status=hci.HCI_ErrorCode.SUCCESS
),
)
# Return a packet with 'respond to sender' set to True
return (bytes(response), True)
+1 -1
View File
@@ -268,7 +268,7 @@ async def run(device_config, hci_transport, bridge):
await bridge.start(device)
# Wait until the transport terminates
await hci_source.wait_for_termination()
await hci_source.terminated
# -----------------------------------------------------------------------------
+1 -1
View File
@@ -421,7 +421,7 @@ async def run(device_config, hci_transport, bridge):
await bridge.start(device)
# Wait until the transport terminates
await hci_source.wait_for_termination()
await hci_source.terminated
except core.ConnectionError as error:
print(color(f"!!! Bluetooth connection failed: {error}", "red"))
except Exception as error:
+1 -1
View File
@@ -190,7 +190,7 @@ async def scan(
scanning_phys=scanning_phys,
)
await hci_source.wait_for_termination()
await hci_source.terminated
# -----------------------------------------------------------------------------
+1 -1
View File
@@ -726,7 +726,7 @@ class Speaker:
print("Waiting for connection...")
await self.advertise()
await hci_source.wait_for_termination()
await hci_source.terminated
for output in self.outputs:
await output.stop()
+20 -20
View File
@@ -55,13 +55,15 @@ AVRCP_PID = 0x110E
AVRCP_BLUETOOTH_SIG_COMPANY_ID = 0x001958
_UINT64_BE_METADATA = {
'parser': lambda data, offset: (
offset + 8,
int.from_bytes(data[offset : offset + 8], byteorder='big'),
),
'serializer': lambda x: x.to_bytes(8, byteorder='big'),
}
_UINT64_BE_METADATA = hci.metadata(
{
'parser': lambda data, offset: (
offset + 8,
int.from_bytes(data[offset : offset + 8], byteorder='big'),
),
'serializer': lambda x: x.to_bytes(8, byteorder='big'),
}
)
class PduId(utils.OpenIntEnum):
@@ -92,7 +94,7 @@ class PduId(utils.OpenIntEnum):
class CharacterSetId(hci.SpecableEnum):
UTF_8 = 0x06
UTF_8 = 0x6A
class MediaAttributeId(hci.SpecableEnum):
@@ -491,14 +493,12 @@ class BrowseableItem:
**hci.HCI_Object.dict_from_bytes(data, offset + 3, subclass.fields)
)
instance._payload = data[3:]
return offset + length, instance
return offset + length + 3, instance
def __bytes__(self) -> bytes:
if self._payload is None:
self._payload = hci.HCI_Object.dict_to_bytes(self.__dict__, self.fields)
return (
struct.pack('>BH', self.item_type, len(self._payload) + 3) + self._payload
)
return struct.pack('>BH', self.item_type, len(self._payload)) + self._payload
_Item = TypeVar('_Item', bound='BrowseableItem')
@@ -601,11 +601,11 @@ class MediaPlayerItem(BrowseableItem):
metadata=MajorPlayerType.type_metadata(1)
)
player_sub_type: PlayerSubType = field(
metadata=PlayerSubType.type_metadata(4, byteorder='big')
metadata=PlayerSubType.type_metadata(4, byteorder='little')
)
play_status: PlayStatus = field(metadata=PlayStatus.type_metadata(1))
feature_bitmask: Features = field(
metadata=Features.type_metadata(16, byteorder='big')
metadata=Features.type_metadata(16, byteorder='little')
)
character_set_id: CharacterSetId = field(
metadata=CharacterSetId.type_metadata(2, byteorder='big')
@@ -634,7 +634,7 @@ class FolderItem(BrowseableItem):
folder_uid: int = field(metadata=_UINT64_BE_METADATA)
folder_type: FolderType = field(metadata=FolderType.type_metadata(1))
is_playable: FolderType = field(metadata=Playable.type_metadata(1))
is_playable: Playable = field(metadata=Playable.type_metadata(1))
character_set_id: CharacterSetId = field(
metadata=CharacterSetId.type_metadata(2, byteorder='big')
)
@@ -876,7 +876,7 @@ class GetPlayStatusCommand(Command):
class GetElementAttributesCommand(Command):
pdu_id = PduId.GET_ELEMENT_ATTRIBUTES
identifier: int = field(metadata=hci.metadata(_UINT64_BE_METADATA))
identifier: int = field(metadata=_UINT64_BE_METADATA)
attribute_ids: Sequence[MediaAttributeId] = field(
metadata=MediaAttributeId.type_metadata(
4, list_begin=True, list_end=True, byteorder='big'
@@ -951,7 +951,7 @@ class ChangePathCommand(Command):
uid_counter: int = field(metadata=hci.metadata('>2'))
direction: Direction = field(metadata=Direction.type_metadata(1))
folder_uid: int = field(metadata=hci.metadata(_UINT64_BE_METADATA))
folder_uid: int = field(metadata=_UINT64_BE_METADATA)
# -----------------------------------------------------------------------------
@@ -961,7 +961,7 @@ class GetItemAttributesCommand(Command):
pdu_id = PduId.GET_ITEM_ATTRIBUTES
scope: Scope = field(metadata=Scope.type_metadata(1))
uid: int = field(metadata=hci.metadata(_UINT64_BE_METADATA))
uid: int = field(metadata=_UINT64_BE_METADATA)
uid_counter: int = field(metadata=hci.metadata('>2'))
start_item: int = field(metadata=hci.metadata('>4'))
end_item: int = field(metadata=hci.metadata('>4'))
@@ -999,7 +999,7 @@ class PlayItemCommand(Command):
pdu_id = PduId.PLAY_ITEM
scope: Scope = field(metadata=Scope.type_metadata(1))
uid: int = field(metadata=hci.metadata(_UINT64_BE_METADATA))
uid: int = field(metadata=_UINT64_BE_METADATA)
uid_counter: int = field(metadata=hci.metadata('>2'))
@@ -1010,7 +1010,7 @@ class AddToNowPlayingCommand(Command):
pdu_id = PduId.ADD_TO_NOW_PLAYING
scope: Scope = field(metadata=Scope.type_metadata(1))
uid: int = field(metadata=hci.metadata(_UINT64_BE_METADATA))
uid: int = field(metadata=_UINT64_BE_METADATA)
uid_counter: int = field(metadata=hci.metadata('>2'))
+10 -2
View File
@@ -37,7 +37,12 @@ class HCI_Bridge:
def on_packet(self, packet):
# Convert the packet bytes to an object
hci_packet = HCI_Packet.from_bytes(packet)
try:
hci_packet = HCI_Packet.from_bytes(packet)
except Exception:
logger.warning('forwarding unparsed packet as-is')
self.hci_sink.on_packet(packet)
return
# Filter the packet
if self.packet_filter is not None:
@@ -50,7 +55,10 @@ class HCI_Bridge:
return
# Analyze the packet
self.trace(hci_packet)
try:
self.trace(hci_packet)
except Exception:
logger.exception('Exception while tracing packet')
# Bridge the packet
self.hci_sink.on_packet(packet)
+39 -28
View File
@@ -1177,7 +1177,7 @@ class ChannelSoundingCapabilities:
rtt_capability: int
rtt_aa_only_n: int
rtt_sounding_n: int
rtt_random_payload_n: int
rtt_random_sequence_n: int
nadm_sounding_capability: int
nadm_random_capability: int
cs_sync_phys_supported: int
@@ -2763,24 +2763,39 @@ class Device(utils.CompositeEventEmitter):
logger.warning(f'!!! Command {command.name} timed out')
raise CommandTimeoutError() from error
async def send_sync_command(
self, command: hci.HCI_SyncCommand[_RP], check_status: bool = True
) -> _RP:
async def send_sync_command(self, command: hci.HCI_SyncCommand[_RP]) -> _RP:
'''
Send a synchronous command via the host.
If the `status` field of the response's `return_parameters` is not equal to
`SUCCESS` an exception is raised.
Params:
command: the command to send.
check_status: If `True`, check the `status` field of the response's
`return_parameters` and raise and exception if not equal to `SUCCESS`.
Returns:
An instance of the return parameters class associated with the command class.
'''
try:
return await self.host.send_sync_command(
command, check_status, self.command_timeout
)
return await self.host.send_sync_command(command, self.command_timeout)
except asyncio.TimeoutError as error:
logger.warning(f'!!! Command {command.name} timed out')
raise CommandTimeoutError() from error
async def send_sync_command_raw(
self, command: hci.HCI_SyncCommand[_RP]
) -> hci.HCI_Command_Complete_Event[_RP]:
'''
Send a synchronous command via the host without checking the response.
Params:
command: the command to send.
Returns:
An HCI_Command_Complete_Event instance.
'''
try:
return await self.host.send_sync_command_raw(command, self.command_timeout)
except asyncio.TimeoutError as error:
logger.warning(f'!!! Command {command.name} timed out')
raise CommandTimeoutError() from error
@@ -2797,7 +2812,7 @@ class Device(utils.CompositeEventEmitter):
raise and exception if not equal to `PENDING`.
Returns:
An instance of the return parameters class associated with the command class.
A status code.
'''
try:
return await self.host.send_async_command(
@@ -2812,12 +2827,12 @@ class Device(utils.CompositeEventEmitter):
await self.host.reset()
# Try to get the public address from the controller
response = await self.host.send_sync_command(
hci.HCI_Read_BD_ADDR_Command(), check_status=False
)
if response.status == hci.HCI_SUCCESS:
try:
response = await self.host.send_sync_command(hci.HCI_Read_BD_ADDR_Command())
logger.debug(color(f'BD_ADDR: {response.bd_addr}', 'yellow'))
self.public_address = response.bd_addr
except hci.HCI_Error:
logger.debug('Controller has no public address')
# Instantiate the Key Store (we do this here rather than at __init__ time
# because some Key Store implementations use the public address as a namespace)
@@ -2926,7 +2941,7 @@ class Device(utils.CompositeEventEmitter):
rtt_capability=result.rtt_capability,
rtt_aa_only_n=result.rtt_aa_only_n,
rtt_sounding_n=result.rtt_sounding_n,
rtt_random_payload_n=result.rtt_random_payload_n,
rtt_random_sequence_n=result.rtt_random_sequence_n,
nadm_sounding_capability=result.nadm_sounding_capability,
nadm_random_capability=result.nadm_random_capability,
cs_sync_phys_supported=result.cs_sync_phys_supported,
@@ -2954,27 +2969,23 @@ class Device(utils.CompositeEventEmitter):
)
if self.classic_enabled:
await self.send_sync_command(
hci.HCI_Write_Local_Name_Command(local_name=self.name.encode('utf8')),
check_status=False,
await self.send_sync_command_raw(
hci.HCI_Write_Local_Name_Command(local_name=self.name.encode('utf8'))
)
await self.send_sync_command(
await self.send_sync_command_raw(
hci.HCI_Write_Class_Of_Device_Command(
class_of_device=self.class_of_device
),
check_status=False,
)
)
await self.send_sync_command(
await self.send_sync_command_raw(
hci.HCI_Write_Simple_Pairing_Mode_Command(
simple_pairing_mode=int(self.classic_ssp_enabled)
),
check_status=False,
)
)
await self.send_sync_command(
await self.send_sync_command_raw(
hci.HCI_Write_Secure_Connections_Host_Support_Command(
secure_connections_host_support=int(self.classic_sc_enabled)
),
check_status=False,
)
)
await self.set_connectable(self.connectable)
await self.set_discoverable(self.discoverable)
@@ -6673,7 +6684,7 @@ class Device(utils.CompositeEventEmitter):
rtt_capability=event.rtt_capability,
rtt_aa_only_n=event.rtt_aa_only_n,
rtt_sounding_n=event.rtt_sounding_n,
rtt_random_payload_n=event.rtt_random_payload_n,
rtt_random_sequence_n=event.rtt_random_sequence_n,
nadm_sounding_capability=event.nadm_sounding_capability,
nadm_random_capability=event.nadm_random_capability,
cs_sync_phys_supported=event.cs_sync_phys_supported,
+16 -8
View File
@@ -663,10 +663,13 @@ class Driver(common.Driver):
async def read_device_info(self) -> dict[ValueType, Any]:
self.host.ready = True
response1 = await self.host.send_sync_command(
hci.HCI_Reset_Command(), check_status=False
)
if response1.status not in (hci.HCI_UNKNOWN_HCI_COMMAND_ERROR, hci.HCI_SUCCESS):
response1 = await self.host.send_sync_command_raw(hci.HCI_Reset_Command())
if not isinstance(
response1.return_parameters, hci.HCI_StatusReturnParameters
) or response1.return_parameters.status not in (
hci.HCI_UNKNOWN_HCI_COMMAND_ERROR,
hci.HCI_SUCCESS,
):
# When the controller is in operational mode, the response is a
# successful response.
# When the controller is in bootloader mode,
@@ -676,13 +679,18 @@ class Driver(common.Driver):
raise DriverError("unexpected HCI response")
# Read the firmware version.
response2 = await self.host.send_sync_command(
HCI_Intel_Read_Version_Command(param0=0xFF), check_status=False
response2 = await self.host.send_sync_command_raw(
HCI_Intel_Read_Version_Command(param0=0xFF)
)
if response2.status != 0: # type: ignore
if (
not isinstance(
response2.return_parameters, HCI_Intel_Read_Version_ReturnParameters
)
or response2.return_parameters.status != 0
):
raise DriverError("HCI_Intel_Read_Version_Command error")
tlvs = _parse_tlv(response2.tlv) # type: ignore
tlvs = _parse_tlv(response2.return_parameters.tlv) # type: ignore
# Convert the list to a dict. That's Ok here because we only expect each type
# to appear just once.
+39 -18
View File
@@ -129,6 +129,7 @@ RTK_USB_PRODUCTS = {
(0x2357, 0x0604),
(0x2550, 0x8761),
(0x2B89, 0x8761),
(0x2C0A, 0x8761),
(0x7392, 0xC611),
# Realtek 8761CUV
(0x0B05, 0x1BF6),
@@ -533,11 +534,13 @@ class Driver(common.Driver):
@staticmethod
async def get_loaded_firmware_version(host: Host) -> int | None:
response1 = await host.send_sync_command(
HCI_RTK_Read_ROM_Version_Command(), check_status=False
)
if response1.status != hci.HCI_SUCCESS:
response1 = await host.send_sync_command_raw(HCI_RTK_Read_ROM_Version_Command())
if (
not isinstance(
response1.return_parameters, HCI_RTK_Read_ROM_Version_ReturnParameters
)
or response1.return_parameters.status != hci.HCI_SUCCESS
):
return None
response2 = await host.send_sync_command(
@@ -558,13 +561,20 @@ class Driver(common.Driver):
await host.send_sync_command(hci.HCI_Reset_Command())
host.ready = True
command = hci.HCI_Read_Local_Version_Information_Command()
response = await host.send_sync_command(command, check_status=False)
if response.status != hci.HCI_SUCCESS:
response = await host.send_sync_command_raw(
hci.HCI_Read_Local_Version_Information_Command()
)
if (
not isinstance(
response.return_parameters,
hci.HCI_Read_Local_Version_Information_ReturnParameters,
)
or response.return_parameters.status != hci.HCI_SUCCESS
):
logger.error("failed to probe local version information")
return None
local_version = response
local_version = response.return_parameters
logger.debug(
f"looking for a driver: 0x{local_version.lmp_subversion:04X} "
@@ -640,15 +650,21 @@ class Driver(common.Driver):
# TODO: load the firmware
async def download_for_rtl8723b(self):
async def download_for_rtl8723b(self) -> int | None:
if self.driver_info.has_rom_version:
response1 = await self.host.send_sync_command(
HCI_RTK_Read_ROM_Version_Command(), check_status=False
response1 = await self.host.send_sync_command_raw(
HCI_RTK_Read_ROM_Version_Command()
)
if response1.status != hci.HCI_SUCCESS:
if (
not isinstance(
response1.return_parameters,
HCI_RTK_Read_ROM_Version_ReturnParameters,
)
or response1.return_parameters.status != hci.HCI_SUCCESS
):
logger.warning("can't get ROM version")
return None
rom_version = response1.version
rom_version = response1.return_parameters.version
logger.debug(f"ROM version before download: {rom_version:04X}")
else:
rom_version = 0
@@ -690,13 +706,18 @@ class Driver(common.Driver):
logger.debug("download complete!")
# Read the version again
response2 = await self.host.send_sync_command(
HCI_RTK_Read_ROM_Version_Command(), check_status=False
response2 = await self.host.send_sync_command_raw(
HCI_RTK_Read_ROM_Version_Command()
)
if response2.status != hci.HCI_SUCCESS:
if (
not isinstance(
response2.return_parameters, HCI_RTK_Read_ROM_Version_ReturnParameters
)
or response2.return_parameters.status != hci.HCI_SUCCESS
):
logger.warning("can't get ROM version")
else:
rom_version = response2.version
rom_version = response2.return_parameters.version
logger.debug(f"ROM version after download: {rom_version:02X}")
return firmware.version
+38 -18
View File
@@ -2407,24 +2407,28 @@ class HCI_Packet:
@classmethod
def from_bytes(cls, packet: bytes) -> HCI_Packet:
packet_type = packet[0]
try:
packet_type = packet[0]
if packet_type == HCI_COMMAND_PACKET:
return HCI_Command.from_bytes(packet)
if packet_type == HCI_COMMAND_PACKET:
return HCI_Command.from_bytes(packet)
if packet_type == HCI_ACL_DATA_PACKET:
return HCI_AclDataPacket.from_bytes(packet)
if packet_type == HCI_ACL_DATA_PACKET:
return HCI_AclDataPacket.from_bytes(packet)
if packet_type == HCI_SYNCHRONOUS_DATA_PACKET:
return HCI_SynchronousDataPacket.from_bytes(packet)
if packet_type == HCI_SYNCHRONOUS_DATA_PACKET:
return HCI_SynchronousDataPacket.from_bytes(packet)
if packet_type == HCI_EVENT_PACKET:
return HCI_Event.from_bytes(packet)
if packet_type == HCI_EVENT_PACKET:
return HCI_Event.from_bytes(packet)
if packet_type == HCI_ISO_DATA_PACKET:
return HCI_IsoDataPacket.from_bytes(packet)
if packet_type == HCI_ISO_DATA_PACKET:
return HCI_IsoDataPacket.from_bytes(packet)
return HCI_CustomPacket(packet)
return HCI_CustomPacket(packet)
except Exception as e:
logger.error(f'error parsing HCI packet [{packet.hex()}]: {e}')
raise
def __init__(self, name: str) -> None:
self.name = name
@@ -2597,6 +2601,21 @@ class HCI_GenericReturnParameters(HCI_ReturnParameters):
class HCI_StatusReturnParameters(HCI_ReturnParameters):
status: HCI_ErrorCode = field(metadata=HCI_ErrorCode.type_metadata(1))
@classmethod
def from_parameters(cls, parameters: bytes) -> Self | HCI_StatusReturnParameters:
status = HCI_ErrorCode(parameters[0])
if status != HCI_ErrorCode.SUCCESS:
# Don't parse further, just return the status.
return HCI_StatusReturnParameters(status=status)
return cls(**HCI_Object.dict_from_bytes(parameters, 0, cls.fields))
@dataclasses.dataclass
class HCI_GenericStatusReturnParameters(HCI_StatusReturnParameters):
data: bytes = field(metadata=metadata('*'))
@dataclasses.dataclass
class HCI_StatusAndAddressReturnParameters(HCI_StatusReturnParameters):
@@ -5854,7 +5873,7 @@ class HCI_LE_CS_Read_Local_Supported_Capabilities_ReturnParameters(
rtt_capability: int = field(metadata=metadata(1))
rtt_aa_only_n: int = field(metadata=metadata(1))
rtt_sounding_n: int = field(metadata=metadata(1))
rtt_random_payload_n: int = field(metadata=metadata(1))
rtt_random_sequence_n: int = field(metadata=metadata(1))
nadm_sounding_capability: int = field(metadata=metadata(2))
nadm_random_capability: int = field(metadata=metadata(2))
cs_sync_phys_supported: int = field(metadata=metadata(CS_SYNC_PHY_SUPPORTED_SPEC))
@@ -5910,7 +5929,7 @@ class HCI_LE_CS_Write_Cached_Remote_Supported_Capabilities_Command(
rtt_capability: int = field(metadata=metadata(1))
rtt_aa_only_n: int = field(metadata=metadata(1))
rtt_sounding_n: int = field(metadata=metadata(1))
rtt_random_payload_n: int = field(metadata=metadata(1))
rtt_random_sequence_n: int = field(metadata=metadata(1))
nadm_sounding_capability: int = field(metadata=metadata(2))
nadm_random_capability: int = field(metadata=metadata(2))
cs_sync_phys_supported: int = field(metadata=metadata(CS_SYNC_PHY_SUPPORTED_SPEC))
@@ -7118,7 +7137,7 @@ class HCI_LE_CS_Read_Remote_Supported_Capabilities_Complete_Event(HCI_LE_Meta_Ev
rtt_capability: int = field(metadata=metadata(1))
rtt_aa_only_n: int = field(metadata=metadata(1))
rtt_sounding_n: int = field(metadata=metadata(1))
rtt_random_payload_n: int = field(metadata=metadata(1))
rtt_random_sequence_n: int = field(metadata=metadata(1))
nadm_sounding_capability: int = field(metadata=metadata(2))
nadm_random_capability: int = field(metadata=metadata(2))
cs_sync_phys_supported: int = field(metadata=metadata(CS_SYNC_PHY_SUPPORTED_SPEC))
@@ -7494,6 +7513,7 @@ class HCI_Command_Complete_Event(HCI_Event, Generic[_RP]):
def from_parameters(cls, parameters: bytes) -> Self:
event = cls(**HCI_Object.dict_from_bytes(parameters, 0, cls.fields))
event.parameters = parameters
return_parameters_bytes = parameters[3:]
# Find the class for the matching command.
subclass = HCI_Command.command_classes.get(event.command_opcode)
@@ -7506,16 +7526,16 @@ class HCI_Command_Complete_Event(HCI_Event, Generic[_RP]):
'HCI Command Complete event with opcode for a class that is not'
' an HCI_SyncCommand subclass: '
f'opcode={event.command_opcode:#04x}, '
f'type={type(subclass).__name__}'
f'type={subclass.__name__}'
)
event.return_parameters = HCI_GenericReturnParameters(
data=event.return_parameters # type: ignore[arg-type]
data=return_parameters_bytes
) # type: ignore[assignment]
return event
# Parse the return parameters bytes into an object.
event.return_parameters = subclass.parse_return_parameters(
event.return_parameters # type: ignore[arg-type]
return_parameters_bytes
) # type: ignore[assignment]
return event
+70 -35
View File
@@ -270,7 +270,12 @@ class Host(utils.EventEmitter):
self.sco_links = {} # SCO links, by connection handle
self.bigs = {} # BIG Handle to BIS Handles
self.pending_command: hci.HCI_SyncCommand | hci.HCI_AsyncCommand | None = None
self.pending_response: asyncio.Future[Any] | None = None
self.pending_response: (
asyncio.Future[
hci.HCI_Command_Complete_Event | hci.HCI_Command_Status_Event
]
| None
) = None
self.number_of_supported_advertising_sets = 0
self.maximum_advertising_data_length = 31
self.local_version: (
@@ -658,25 +663,35 @@ class Host(utils.EventEmitter):
response_timeout: float | None = None,
) -> hci.HCI_Command_Complete_Event | hci.HCI_Command_Status_Event:
# Wait until we can send (only one pending command at a time)
async with self.command_semaphore:
assert self.pending_command is None
assert self.pending_response is None
await self.command_semaphore.acquire()
# Create a future value to hold the eventual response
self.pending_response = asyncio.get_running_loop().create_future()
self.pending_command = command
# Create a future value to hold the eventual response
assert self.pending_command is None
assert self.pending_response is None
self.pending_response = asyncio.get_running_loop().create_future()
self.pending_command = command
try:
self.send_hci_packet(command)
return await asyncio.wait_for(
self.pending_response, timeout=response_timeout
)
except Exception:
logger.exception(color("!!! Exception while sending command:", "red"))
raise
finally:
self.pending_command = None
self.pending_response = None
response: (
hci.HCI_Command_Complete_Event | hci.HCI_Command_Status_Event | None
) = None
try:
self.send_hci_packet(command)
response = await asyncio.wait_for(
self.pending_response, timeout=response_timeout
)
return response
except Exception:
logger.exception(color("!!! Exception while sending command:", "red"))
raise
finally:
self.pending_command = None
self.pending_response = None
if (
response is not None
and response.num_hci_command_packets
and self.command_semaphore.locked()
):
self.command_semaphore.release()
@overload
async def send_command(
@@ -729,30 +744,42 @@ class Host(utils.EventEmitter):
return response
async def send_sync_command(
self, command: hci.HCI_SyncCommand[_RP], response_timeout: float | None = None
) -> _RP:
response = await self.send_sync_command_raw(command, response_timeout)
return_parameters = response.return_parameters
# Check the return parameters's status
if isinstance(return_parameters, hci.HCI_StatusReturnParameters):
status = return_parameters.status
elif isinstance(return_parameters, hci.HCI_GenericReturnParameters):
# if the payload has at least one byte, assume the first byte is the status
if not return_parameters.data:
raise RuntimeError('no status byte in return parameters')
status = hci.HCI_ErrorCode(return_parameters.data[0])
else:
raise RuntimeError(
f'unexpected return parameters type ({type(return_parameters)})'
)
if status != hci.HCI_ErrorCode.SUCCESS:
logger.warning(
f'{command.name} failed ' f'({hci.HCI_Constant.error_name(status)})'
)
raise hci.HCI_Error(status)
return return_parameters
async def send_sync_command_raw(
self,
command: hci.HCI_SyncCommand[_RP],
check_status: bool = True,
response_timeout: float | None = None,
) -> _RP:
) -> hci.HCI_Command_Complete_Event[_RP]:
response = await self._send_command(command, response_timeout)
# Check that the response is of the expected type
assert isinstance(response, hci.HCI_Command_Complete_Event)
return_parameters: _RP = response.return_parameters
assert isinstance(return_parameters, command.return_parameters_class)
# Check the return parameters if required
if check_status:
if isinstance(return_parameters, hci.HCI_StatusReturnParameters):
status = return_parameters.status
if status != hci.HCI_SUCCESS:
logger.warning(
f'{command.name} failed '
f'({hci.HCI_Constant.error_name(status)})'
)
raise hci.HCI_Error(status)
return return_parameters
return response
async def send_async_command(
self,
@@ -1001,6 +1028,8 @@ class Host(utils.EventEmitter):
self.pending_response.set_result(event)
else:
logger.warning('!!! no pending response future to set')
if event.num_hci_command_packets and self.command_semaphore.locked():
self.command_semaphore.release()
############################################################
# HCI handlers
@@ -1012,7 +1041,13 @@ class Host(utils.EventEmitter):
if event.command_opcode == 0:
# This is used just for the Num_HCI_Command_Packets field, not related to
# an actual command
logger.debug('no-command event')
logger.debug('no-command event for flow control')
# Release the command semaphore if needed
if event.num_hci_command_packets and self.command_semaphore.locked():
logger.debug('command complete event releasing semaphore')
self.command_semaphore.release()
return
return self.on_command_processed(event)
+10 -5
View File
@@ -2342,8 +2342,8 @@ class ChannelManager:
cid,
L2CAP_Connection_Response(
identifier=request.identifier,
destination_cid=request.source_cid,
source_cid=0,
destination_cid=0,
source_cid=request.source_cid,
result=L2CAP_Connection_Response.Result.CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE,
status=0x0000,
),
@@ -2355,7 +2355,12 @@ class ChannelManager:
f'creating server channel with cid={source_cid} for psm {request.psm}'
)
channel = ClassicChannel(
self, connection, cid, request.psm, source_cid, server.spec
manager=self,
connection=connection,
signaling_cid=cid,
psm=request.psm,
source_cid=source_cid,
spec=server.spec,
)
connection_channels[source_cid] = channel
@@ -2372,8 +2377,8 @@ class ChannelManager:
cid,
L2CAP_Connection_Response(
identifier=request.identifier,
destination_cid=request.source_cid,
source_cid=0,
destination_cid=0,
source_cid=request.source_cid,
result=L2CAP_Connection_Response.Result.CONNECTION_REFUSED_PSM_NOT_SUPPORTED,
status=0x0000,
),
+111 -1
View File
@@ -110,6 +110,53 @@ class BtSnooper(Snooper):
)
# -----------------------------------------------------------------------------
class PcapSnooper(Snooper):
"""
Snooper that saves or streames HCI packets using the PCAP format.
"""
PCAP_MAGIC = 0xA1B2C3D4
DLT_BLUETOOTH_HCI_H4_WITH_PHDR = 201
def __init__(self, output: BinaryIO):
self.output = output
# Write the header
self.output.write(
struct.pack(
"<IHHIIII",
self.PCAP_MAGIC,
2, # Major PCAP Version
4, # Minor PCAP Version
0, # Reserved 1
0, # Reserved 2
65535, # SnapLen
# FCS and f are set to 0 implicitly by the next line
self.DLT_BLUETOOTH_HCI_H4_WITH_PHDR, # The DLT in this PCAP
)
)
def snoop(self, hci_packet: bytes, direction: Snooper.Direction):
now = datetime.datetime.now(datetime.timezone.utc)
sec = int(now.timestamp())
usec = now.microsecond
# Emit the record
self.output.write(
struct.pack(
"<IIII",
sec, # Timestamp (Seconds)
usec, # Timestamp (Microseconds)
len(hci_packet) + 4,
len(hci_packet) + 4, # +4 because of the addtional direction info...
)
+ struct.pack(">I", int(direction)) # ...thats being added here
+ hci_packet
)
self.output.flush() # flush after every packet for live logging
# -----------------------------------------------------------------------------
_SNOOPER_INSTANCE_COUNT = 0
@@ -140,9 +187,38 @@ def create_snooper(spec: str) -> Generator[Snooper, None, None]:
pid: the current process ID.
instance: the instance ID in the current process.
pcapsnoop
The syntax for the type-specific arguments for this type is:
<io-type>:<io-type-specific-arguments>
Supported I/O types are:
file
The type-specific arguments for this I/O type is a string that is converted
to a file path using the python `str.format()` string formatting. The log
records will be written to that file if it can be opened/created.
The keyword args that may be referenced by the string pattern are:
now: the value of `datetime.now()`
utcnow: the value of `datetime.now(tz=datetime.timezone.utc)`
pid: the current process ID.
instance: the instance ID in the current process.
pipe
The type-specific arguments for this I/O type is a string that is converted
to a path using the python `str.format()` string formatting. The log
records will be written to the named pipe referenced by this path
if it can be opened. The keyword args that may be referenced by the
string pattern are:
now: the value of `datetime.now()`
utcnow: the value of `datetime.now(tz=datetime.timezone.utc)`
pid: the current process ID.
instance: the instance ID in the current process.
Examples:
btsnoop:file:my_btsnoop.log
btsnoop:file:/tmp/bumble_{now:%Y-%m-%d-%H:%M:%S}_{pid}.log
pcapsnoop:pipe:/tmp/bumble-extcap
"""
if ':' not in spec:
@@ -150,6 +226,8 @@ def create_snooper(spec: str) -> Generator[Snooper, None, None]:
snooper_type, snooper_args = spec.split(':', maxsplit=1)
global _SNOOPER_INSTANCE_COUNT
if snooper_type == 'btsnoop':
if ':' not in snooper_args:
raise core.InvalidArgumentError('I/O type for btsnoop snooper type missing')
@@ -157,7 +235,6 @@ def create_snooper(spec: str) -> Generator[Snooper, None, None]:
io_type, io_name = snooper_args.split(':', maxsplit=1)
if io_type == 'file':
# Process the file name string pattern.
global _SNOOPER_INSTANCE_COUNT
file_path = io_name.format(
now=datetime.datetime.now(),
utcnow=datetime.datetime.now(tz=datetime.timezone.utc),
@@ -173,6 +250,39 @@ def create_snooper(spec: str) -> Generator[Snooper, None, None]:
_SNOOPER_INSTANCE_COUNT -= 1
return
elif snooper_type == 'pcapsnoop':
if ':' not in snooper_args:
raise core.InvalidArgumentError(
'I/O type for pcapsnoop snooper type missing'
)
io_type, io_name = snooper_args.split(':', maxsplit=1)
if io_type in {'pipe', 'file'}:
# Process the file name string pattern.
file_path = io_name.format(
now=datetime.datetime.now(),
utcnow=datetime.datetime.now(tz=datetime.timezone.utc),
pid=os.getpid(),
instance=_SNOOPER_INSTANCE_COUNT,
)
# Open a file or pipe
logger.debug(f'PCAP file: {file_path}')
# Pipes we have to open with unbuffered binary I/O
# so we pass ``buffering`` for pipes but not for files
pcap_file: BinaryIO
if io_type == 'pipe':
pcap_file = open(file_path, 'wb', buffering=0)
else:
pcap_file = open(file_path, 'wb')
with pcap_file:
_SNOOPER_INSTANCE_COUNT += 1
yield PcapSnooper(pcap_file)
_SNOOPER_INSTANCE_COUNT -= 1
return
raise core.InvalidArgumentError(f'I/O type {io_type} not supported')
raise core.InvalidArgumentError(f'snooper type {snooper_type} not found')
+1 -1
View File
@@ -194,7 +194,7 @@ async def open_android_netsim_controller_transport(
# We only accept BLUETOOTH
if request.initial_info.chip.kind != ChipKind.BLUETOOTH:
logger.warning('Unsupported chip type')
logger.debug('Request for unsupported chip type')
error = PacketResponse(error='Unsupported chip type')
await self.context.write(error)
# return
+1 -2
View File
@@ -42,8 +42,7 @@ response = await host.send_sync_command(
handle_type=HCI_Write_Tx_Power_Level_Command.TX_POWER_HANDLE_TYPE_ADV,
connection_handle=0,
tx_power_level=-4,
),
check_status=False
)
)
if response.status == HCI_SUCCESS:
+1 -1
View File
@@ -65,7 +65,7 @@ async def main() -> None:
# Go!
await device.power_on()
await device.start_advertising(auto_restart=True)
await hci_transport.source.wait_for_termination()
await hci_transport.source.terminated
# -----------------------------------------------------------------------------
+1 -1
View File
@@ -161,7 +161,7 @@ async def main() -> None:
await device.set_discoverable(True)
await device.set_connectable(True)
await hci_transport.source.wait_for_termination()
await hci_transport.source.terminated
# -----------------------------------------------------------------------------
+1 -1
View File
@@ -181,7 +181,7 @@ async def main() -> None:
await device.set_discoverable(True)
await device.set_connectable(True)
await hci_transport.source.wait_for_termination()
await hci_transport.source.terminated
# -----------------------------------------------------------------------------
+1 -1
View File
@@ -70,7 +70,7 @@ async def main() -> None:
await device.power_on()
await device.start_advertising(advertising_type=advertising_type, target=target)
await hci_transport.source.wait_for_termination()
await hci_transport.source.terminated
# -----------------------------------------------------------------------------
+1 -1
View File
@@ -112,7 +112,7 @@ async def main() -> None:
await device.set_discoverable(True)
await device.set_connectable(True)
await hci_transport.source.wait_for_termination()
await hci_transport.source.terminated
# -----------------------------------------------------------------------------
+1 -1
View File
@@ -73,7 +73,7 @@ async def main() -> None:
await device.power_on()
await device.start_discovery()
await hci_transport.source.wait_for_termination()
await hci_transport.source.terminated
# -----------------------------------------------------------------------------
+1 -1
View File
@@ -57,7 +57,7 @@ async def main() -> None:
print(f'!!! Encryption failed: {error}')
return
await hci_transport.source.wait_for_termination()
await hci_transport.source.terminated
# -----------------------------------------------------------------------------
+1 -1
View File
@@ -101,7 +101,7 @@ async def main() -> None:
await device.start_advertising()
await device.start_scanning()
await hci_transport.source.wait_for_termination()
await hci_transport.source.terminated
# -----------------------------------------------------------------------------
+1 -1
View File
@@ -48,7 +48,7 @@ async def main() -> None:
await device.power_on()
await device.start_scanning()
await hci_transport.source.wait_for_termination()
await hci_transport.source.terminated
# -----------------------------------------------------------------------------
+1 -1
View File
@@ -147,7 +147,7 @@ async def main() -> None:
else:
await device.start_advertising(auto_restart=True)
await hci_transport.source.wait_for_termination()
await hci_transport.source.terminated
# -----------------------------------------------------------------------------
@@ -99,7 +99,7 @@ async def main() -> None:
else:
await device.start_advertising(auto_restart=True)
await hci_transport.source.wait_for_termination()
await hci_transport.source.terminated
# -----------------------------------------------------------------------------
+1 -1
View File
@@ -422,7 +422,7 @@ async def main() -> None:
# Setup a server
await server(device)
await hci_transport.source.wait_for_termination()
await hci_transport.source.terminated
# -----------------------------------------------------------------------------
+1 -1
View File
@@ -167,7 +167,7 @@ async def main() -> None:
await websockets.asyncio.server.serve(serve, 'localhost', 8989)
await hci_transport.source.wait_for_termination()
await hci_transport.source.terminated
# -----------------------------------------------------------------------------
+1 -1
View File
@@ -735,7 +735,7 @@ async def main() -> None:
print("Executing in Web mode")
await keyboard_device(hid_device)
await hci_transport.source.wait_for_termination()
await hci_transport.source.terminated
# -----------------------------------------------------------------------------
+1 -1
View File
@@ -556,7 +556,7 @@ async def main() -> None:
# Interrupt Channel
await hid_host.connect_interrupt_channel()
await hci_transport.source.wait_for_termination()
await hci_transport.source.terminated
# -----------------------------------------------------------------------------
+1 -1
View File
@@ -227,7 +227,7 @@ async def main() -> None:
tcp_port = int(sys.argv[5])
asyncio.create_task(tcp_server(tcp_port, session))
await hci_transport.source.wait_for_termination()
await hci_transport.source.terminated
# -----------------------------------------------------------------------------
+1 -1
View File
@@ -153,7 +153,7 @@ async def main() -> None:
await device.set_discoverable(True)
await device.set_connectable(True)
await hci_transport.source.wait_for_termination()
await hci_transport.source.terminated
# -----------------------------------------------------------------------------
+1 -1
View File
@@ -75,7 +75,7 @@ async def main() -> None:
await device.power_on()
await device.start_scanning(filter_duplicates=filter_duplicates)
await hci_transport.source.wait_for_termination()
await hci_transport.source.terminated
# -----------------------------------------------------------------------------
+15 -1
View File
@@ -233,7 +233,21 @@ def test_event(event: avrcp.Event):
feature_bitmask=avrcp.MediaPlayerItem.Features.ADD_TO_NOW_PLAYING,
character_set_id=avrcp.CharacterSetId.UTF_8,
displayable_name="Woo",
)
),
avrcp.FolderItem(
folder_uid=1,
folder_type=avrcp.FolderItem.FolderType.ALBUMS,
is_playable=avrcp.FolderItem.Playable.PLAYABLE,
character_set_id=avrcp.CharacterSetId.UTF_8,
displayable_name="Album",
),
avrcp.MediaElementItem(
media_element_uid=1,
media_type=avrcp.MediaElementItem.MediaType.AUDIO,
character_set_id=avrcp.CharacterSetId.UTF_8,
displayable_name="Song",
attribute_value_entry_list=[],
),
],
),
avrcp.ChangePathResponse(
+2 -2
View File
@@ -218,9 +218,9 @@ def test_return_parameters() -> None:
assert isinstance(params.status, utils.OpenIntEnum)
params = hci.HCI_Read_BD_ADDR_Command.parse_return_parameters(
bytes.fromhex('3C001122334455')
bytes.fromhex('00001122334455')
)
assert params.status == hci.HCI_ErrorCode.ADVERTISING_TIMEOUT_ERROR
assert params.status == hci.HCI_ErrorCode.SUCCESS
assert isinstance(params.status, utils.OpenIntEnum)
assert isinstance(params.bd_addr, hci.Address)
+17 -3
View File
@@ -26,9 +26,11 @@ from bumble.controller import Controller
from bumble.hci import (
HCI_AclDataPacket,
HCI_Command_Complete_Event,
HCI_Disconnect_Command,
HCI_Error,
HCI_ErrorCode,
HCI_Event,
HCI_GenericReturnParameters,
HCI_Reset_Command,
HCI_StatusReturnParameters,
)
@@ -195,6 +197,7 @@ async def test_send_sync_command() -> None:
)
host = Host(source, sink)
host.ready = True
# Sync command with success
response1 = await host.send_sync_command(HCI_Reset_Command())
@@ -212,6 +215,17 @@ async def test_send_sync_command() -> None:
assert excinfo.value.error_code == error_response.return_parameters.status
# Sync command with error status should not raise when `check_status` is False
response2 = await host.send_sync_command(HCI_Reset_Command(), check_status=False)
assert response2.status == HCI_ErrorCode.COMMAND_DISALLOWED_ERROR
# Sync command with raw result
response2 = await host.send_sync_command_raw(HCI_Reset_Command())
assert response2.return_parameters.status == HCI_ErrorCode.COMMAND_DISALLOWED_ERROR
# Sync command with a command that's not an HCI_SyncCommand
# (here, for convenience, we use an HCI_AsyncCommand instance)
command = HCI_Disconnect_Command(connection_handle=0x1234, reason=0x13)
sink.response = HCI_Command_Complete_Event(
1,
command.op_code,
HCI_GenericReturnParameters(data=bytes.fromhex("00112233")),
)
response3 = await host.send_sync_command_raw(command) # type: ignore
assert isinstance(response3.return_parameters, HCI_GenericReturnParameters)