Compare commits

..

60 Commits

Author SHA1 Message Date
Gilles Boccon-Gibod
2d17a5f742 Merge pull request #880 from google/gbg/command-status-hack
add workaround for some buggy controllers
2026-02-02 23:37:52 -08:00
Gilles Boccon-Gibod
3894b14467 better handling of complete/status events 2026-02-02 23:28:40 -08:00
Gilles Boccon-Gibod
e62f947430 add workaround for some buggy controllers 2026-02-02 13:19:55 -08:00
Gilles Boccon-Gibod
dcb8a4b607 Merge pull request #877 from google/gbg/hci-fixes
fix a few HCI types and make the bridge more robust
2026-02-02 11:19:28 -08:00
Gilles Boccon-Gibod
81985c47a9 remove superfluous statement 2026-02-02 11:12:28 -08:00
Gilles Boccon-Gibod
7118328b07 Merge pull request #879 from google/gbg/resolve-when-bonded
resolve addresses when connecting to bonded peers
2026-01-31 11:09:55 -08:00
Gilles Boccon-Gibod
5dc01d792a address PR comments 2026-01-31 10:55:58 -08:00
Gilles Boccon-Gibod
255f357975 resolve when bonded 2026-01-30 21:53:01 -08:00
Josh Wu
c86920558b Merge pull request #878 from zxzxwu/avrcp
AVRCP: SDP record classes and some delegation
2026-01-31 00:01:55 +08:00
Josh Wu
8e6efd0b2f Fix error in AVRCP example 2026-01-30 23:01:11 +08:00
Gilles Boccon-Gibod
2a59e19283 fix comment 2026-01-29 19:09:46 -08:00
Josh Wu
34f5b81c7d AVRCP: Delegate Company ID capabilities 2026-01-29 22:13:14 +08:00
Josh Wu
d34d6a5c98 AVRCP: Delegate Playback Status 2026-01-29 21:33:57 +08:00
Josh Wu
aedc971653 AVRCP: Add SDP record class and finder 2026-01-29 16:00:50 +08:00
Josh Wu
c6815fb820 AVRCP: Delegate passthrough key event 2026-01-29 14:50:14 +08:00
Gilles Boccon-Gibod
f44d013690 make bridge more robust 2026-01-27 09:47:52 -08:00
Gilles Boccon-Gibod
e63dc15ede fix handling of return parameters 2026-01-27 09:39:22 -08:00
Gilles Boccon-Gibod
c901e15666 fix a few HCI types and make the bridge more robust 2026-01-25 13:47:14 -08:00
Gilles Boccon-Gibod
022323b19c Merge pull request #871 from google/gbg/sci
add basic support for SCI
2026-01-24 10:39:11 -08:00
Gilles Boccon-Gibod
a0d24e95e7 fix spacing_type 2026-01-24 10:15:32 -08:00
Josh Wu
7efbd303e0 Merge pull request #876 from ttdennis/await_termination_fix
Update apps and examples to await .terminated instead of wait_for_termination()
2026-01-24 11:44:19 +08:00
Dennis Heinze
49530d8d6d Update apps and examples to await .terminated 2026-01-24 00:20:55 +01:00
Gilles Boccon-Gibod
85b78b46f8 Merge pull request #870 from antipatico/feat_AV53C1 2026-01-23 13:43:12 -08:00
Josh Wu
3f9ef5aac2 Merge pull request #873 from zxzxwu/l2cap
L2CAP: Fix wrong CID on reject
2026-01-23 12:44:59 +08:00
Josh Wu
e488ea9783 Merge pull request #872 from zxzxwu/avrcp
AVRCP: Fix wrong field specs
2026-01-23 12:36:14 +08:00
Josh Wu
21d937c2f1 Merge pull request #865 from willnix/pcapsnoop
Added a PcapSnooper class
2026-01-23 12:33:15 +08:00
Frieder Steinmetz
a8396e6cce Formatted with black again. 2026-01-22 17:49:58 +01:00
Josh Wu
7e1b1c8f78 L2CAP: Fix wrong CID on reject 2026-01-22 23:16:25 +08:00
Josh Wu
55719bf6de AVRCP: Fix wrong field specs 2026-01-22 22:18:58 +08:00
Frieder Steinmetz
5059920696 Please mypy.\n\nTwo calls to open(), some more annotations and a rescoped global were needed. 2026-01-22 10:40:08 +01:00
Gilles Boccon-Gibod
c577f17c99 add basic support for SCI 2026-01-20 15:32:55 -08:00
Gilles Boccon-Gibod
252f3e49b6 Merge pull request #870 from antipatico/feat_AV53C1 2026-01-20 10:46:52 -08:00
Jacopo Scannella
f3ecf04479 Added support for STA-AV53C1-USB-BLUETOOTH StarTech(dot)com dongle - RTL8761BUE 2026-01-20 09:32:51 +01:00
Gilles Boccon-Gibod
4986f55043 Merge pull request #869 from timrid/android-fix
Make bumble work on Android using briefcase/chaquopy
2026-01-19 09:50:08 -08:00
Gilles Boccon-Gibod
7e89c8a7f8 Merge pull request #868 from google/gbg/return-parameters
typing support for HCI commands return parameters
2026-01-19 09:49:15 -08:00
timrid
085905a7bf Make bumble work on Android using briefcase that is using chaquopy under the hood. 2026-01-18 23:32:37 +01:00
zxzxwu
c619f1f21b Merge pull request #867 from zxzxwu/fix-import-error
Fix missing ClassVar import
2026-01-16 15:33:07 +08:00
Josh Wu
d4b0da9265 Fix missing ClassVar import 2026-01-16 15:21:26 +08:00
zxzxwu
f1058e4d4e Merge pull request #859 from istemon/att-read-by-type-request-fix
Return 'invalid handle' for malformed read by type request
2026-01-16 15:09:20 +08:00
zxzxwu
454d477d7e Merge pull request #864 from zxzxwu/hci-packets-typing
Add HCI Packets annotations and send_sco_sdu
2026-01-16 15:08:42 +08:00
zxzxwu
6966228d74 Merge pull request #863 from zxzxwu/eatt-mtu
Correct ATT_MTU in enhanced bearers
2026-01-16 15:08:12 +08:00
zxzxwu
f4271a5646 Merge pull request #862 from zxzxwu/gatt-multiple
GATT: Support Multiple Requests
2026-01-16 15:08:02 +08:00
zxzxwu
534209f0af Merge pull request #861 from zxzxwu/l2cap
Replace send_pdu() with write()
2026-01-16 15:07:54 +08:00
zxzxwu
549b82999a Merge pull request #860 from zxzxwu/address
Improve Address type annotations
2026-01-16 14:04:56 +08:00
zxzxwu
551f577b2a Merge pull request #866 from zxzxwu/template-service
Fix GATT TemplateSerivce annotations
2026-01-16 09:41:48 +08:00
Frieder Steinmetz
c69c1532cc Fix comments that were messed up by black 2026-01-15 19:06:03 +01:00
Frieder Steinmetz
f95b2054c8 Formatted with 2026-01-15 10:50:33 +01:00
Josh Wu
84a6453dda Fix GATT TemplateSerivce annotations 2026-01-15 12:06:05 +08:00
Frieder Steinmetz
3fdd7ee45e Added the PcapSnooper class.
The class implements a bumble snooper that writes PCAP records.
It can write to either a file or a named pipe.
The latter is useful to bridge with wireshark extcap for live logging.
2026-01-14 23:40:59 +01:00
Gilles Boccon-Gibod
591ed61686 Merge pull request #858 from klow68/feat/add-usb-probe-filtering 2026-01-13 08:54:55 -08:00
Josh Wu
3d3acbb374 Add HCI Packets annotations and send_sco_sdu 2026-01-13 17:58:37 +08:00
Stryxion
671f306a27 fix: black 2026-01-13 09:42:40 +01:00
Josh Wu
f7364db992 Correct ATT_MTU in enhanced bearers 2026-01-12 21:03:14 +08:00
Josh Wu
0fb2b3bd66 GATT: Support Multiple Requests 2026-01-12 20:51:38 +08:00
Stryxion
9e270d4d62 fix: mypy 2026-01-12 09:36:35 +01:00
Josh Wu
cf60b5ffbb Replace send_pdu() with write() 2026-01-12 13:16:49 +08:00
Josh Wu
aa4c57d105 Improve Address type annotations
* Add missing annotations
* Declare address constants as ClassVar
2026-01-12 13:07:04 +08:00
Istemon
61a601e6e2 Return 'invalid handle' for malformed read by type request 2026-01-10 01:43:30 +00:00
Stryxion
05fd4fbfc6 fix: review 2026-01-09 08:46:31 +01:00
Stryxion
6aa9e0bdf7 feat: Add filtering options for usb probe 2026-01-08 14:54:58 +01:00
58 changed files with 2709 additions and 869 deletions

View File

@@ -27,9 +27,8 @@ from bumble.core import name_or_number
from bumble.hci import (
HCI_LE_READ_BUFFER_SIZE_COMMAND,
HCI_LE_READ_BUFFER_SIZE_V2_COMMAND,
HCI_LE_READ_MAXIMUM_ADVERTISING_DATA_LENGTH_COMMAND,
HCI_LE_READ_MAXIMUM_DATA_LENGTH_COMMAND,
HCI_LE_READ_NUMBER_OF_SUPPORTED_ADVERTISING_SETS_COMMAND,
HCI_LE_READ_MINIMUM_SUPPORTED_CONNECTION_INTERVAL_COMMAND,
HCI_LE_READ_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND,
HCI_READ_BD_ADDR_COMMAND,
HCI_READ_BUFFER_SIZE_COMMAND,
@@ -37,9 +36,8 @@ from bumble.hci import (
HCI_Command,
HCI_LE_Read_Buffer_Size_Command,
HCI_LE_Read_Buffer_Size_V2_Command,
HCI_LE_Read_Maximum_Advertising_Data_Length_Command,
HCI_LE_Read_Maximum_Data_Length_Command,
HCI_LE_Read_Number_Of_Supported_Advertising_Sets_Command,
HCI_LE_Read_Minimum_Supported_Connection_Interval_Command,
HCI_LE_Read_Suggested_Default_Data_Length_Command,
HCI_Read_BD_ADDR_Command,
HCI_Read_Buffer_Size_Command,
@@ -78,52 +76,61 @@ async def get_classic_info(host: Host) -> None:
async def get_le_info(host: Host) -> None:
print()
if host.supports_command(HCI_LE_READ_NUMBER_OF_SUPPORTED_ADVERTISING_SETS_COMMAND):
response1 = await host.send_sync_command(
HCI_LE_Read_Number_Of_Supported_Advertising_Sets_Command()
)
print(
color('LE Number Of Supported Advertising Sets:', 'yellow'),
response1.num_supported_advertising_sets,
'\n',
)
print(
color('LE Number Of Supported Advertising Sets:', 'yellow'),
host.number_of_supported_advertising_sets,
'\n',
)
if host.supports_command(HCI_LE_READ_MAXIMUM_ADVERTISING_DATA_LENGTH_COMMAND):
response2 = await host.send_sync_command(
HCI_LE_Read_Maximum_Advertising_Data_Length_Command()
)
print(
color('LE Maximum Advertising Data Length:', 'yellow'),
response2.max_advertising_data_length,
'\n',
)
print(
color('LE Maximum Advertising Data Length:', 'yellow'),
host.maximum_advertising_data_length,
'\n',
)
if host.supports_command(HCI_LE_READ_MAXIMUM_DATA_LENGTH_COMMAND):
response3 = await host.send_sync_command(
response1 = await host.send_sync_command(
HCI_LE_Read_Maximum_Data_Length_Command()
)
print(
color('Maximum Data Length:', 'yellow'),
color('LE Maximum Data Length:', 'yellow'),
(
f'tx:{response3.supported_max_tx_octets}/'
f'{response3.supported_max_tx_time}, '
f'rx:{response3.supported_max_rx_octets}/'
f'{response3.supported_max_rx_time}'
f'tx:{response1.supported_max_tx_octets}/'
f'{response1.supported_max_tx_time}, '
f'rx:{response1.supported_max_rx_octets}/'
f'{response1.supported_max_rx_time}'
),
'\n',
)
if host.supports_command(HCI_LE_READ_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND):
response4 = await host.send_sync_command(
response2 = await host.send_sync_command(
HCI_LE_Read_Suggested_Default_Data_Length_Command()
)
print(
color('Suggested Default Data Length:', 'yellow'),
f'{response4.suggested_max_tx_octets}/'
f'{response4.suggested_max_tx_time}',
color('LE Suggested Default Data Length:', 'yellow'),
f'{response2.suggested_max_tx_octets}/'
f'{response2.suggested_max_tx_time}',
'\n',
)
if host.supports_command(HCI_LE_READ_MINIMUM_SUPPORTED_CONNECTION_INTERVAL_COMMAND):
response3 = await host.send_sync_command(
HCI_LE_Read_Minimum_Supported_Connection_Interval_Command()
)
print(
color('LE Minimum Supported Connection Interval:', 'yellow'),
f'{response3.minimum_supported_connection_interval * 125} µs',
)
for group in range(len(response3.group_min)):
print(
f' Group {group}: '
f'{response3.group_min[group] * 125} µs to '
f'{response3.group_max[group] * 125} µs '
'by increments of '
f'{response3.group_stride[group] * 125} µs',
'\n',
)
print(color('LE Features:', 'yellow'))
for feature in host.supported_le_features:
print(f' {LeFeature(feature).name}')

View File

@@ -352,7 +352,7 @@ async def run(
await bridge.start()
# Wait until the source terminates
await hci_source.wait_for_termination()
await hci_source.terminated
@click.command()

View File

@@ -81,7 +81,9 @@ async def async_main():
response = hci.HCI_Command_Complete_Event(
num_hci_command_packets=1,
command_opcode=hci_packet.op_code,
return_parameters=bytes([hci.HCI_SUCCESS]),
return_parameters=hci.HCI_StatusReturnParameters(
status=hci.HCI_ErrorCode.SUCCESS
),
)
# Return a packet with 'respond to sender' set to True
return (bytes(response), True)

View File

@@ -268,7 +268,7 @@ async def run(device_config, hci_transport, bridge):
await bridge.start(device)
# Wait until the transport terminates
await hci_source.wait_for_termination()
await hci_source.terminated
# -----------------------------------------------------------------------------

View File

@@ -421,7 +421,7 @@ async def run(device_config, hci_transport, bridge):
await bridge.start(device)
# Wait until the transport terminates
await hci_source.wait_for_termination()
await hci_source.terminated
except core.ConnectionError as error:
print(color(f"!!! Bluetooth connection failed: {error}", "red"))
except Exception as error:

View File

@@ -22,7 +22,7 @@ import click
import bumble.logging
from bumble import data_types
from bumble.colors import color
from bumble.device import Advertisement, Device
from bumble.device import Advertisement, Device, DeviceConfiguration
from bumble.hci import HCI_LE_1M_PHY, HCI_LE_CODED_PHY, Address, HCI_Constant
from bumble.keys import JsonKeyStore
from bumble.smp import AddressResolver
@@ -144,8 +144,14 @@ async def scan(
device_config, hci_source, hci_sink
)
else:
device = Device.with_hci(
'Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink
device = Device.from_config_with_hci(
DeviceConfiguration(
name='Bumble',
address=Address('F0:F1:F2:F3:F4:F5'),
keystore='JsonKeyStore',
),
hci_source,
hci_sink,
)
await device.power_on()
@@ -190,7 +196,7 @@ async def scan(
scanning_phys=scanning_phys,
)
await hci_source.wait_for_termination()
await hci_source.terminated
# -----------------------------------------------------------------------------

View File

@@ -726,7 +726,7 @@ class Speaker:
print("Waiting for connection...")
await self.advertise()
await hci_source.wait_for_termination()
await hci_source.terminated
for output in self.outputs:
await output.stop()

View File

@@ -26,6 +26,8 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from typing import Any
import click
import usb1
@@ -166,13 +168,16 @@ def is_bluetooth_hci(device):
# -----------------------------------------------------------------------------
@click.command()
@click.option('--verbose', is_flag=True, default=False, help='Print more details')
def main(verbose):
@click.option('--hci-only', is_flag=True, default=False, help='only show HCI device')
@click.option('--manufacturer', help='filter by manufacturer')
@click.option('--product', help='filter by product')
def main(verbose: bool, manufacturer: str, product: str, hci_only: bool):
bumble.logging.setup_basic_logging('WARNING')
load_libusb()
with usb1.USBContext() as context:
bluetooth_device_count = 0
devices = {}
devices: dict[tuple[Any, Any], list[str | None]] = {}
for device in context.getDeviceIterator(skip_on_error=True):
device_class = device.getDeviceClass()
@@ -234,6 +239,14 @@ def main(verbose):
f'{basic_transport_name}/{device_serial_number}'
)
# Filter
if product and device_product != product:
continue
if manufacturer and device_manufacturer != manufacturer:
continue
if not is_bluetooth_hci(device) and hci_only:
continue
# Print the results
print(
color(

View File

@@ -29,7 +29,7 @@ import enum
import functools
import inspect
import struct
from collections.abc import Awaitable, Callable
from collections.abc import Awaitable, Callable, Sequence
from typing import (
TYPE_CHECKING,
ClassVar,
@@ -72,34 +72,36 @@ ATT_PSM = 0x001F
EATT_PSM = 0x0027
class Opcode(hci.SpecableEnum):
ATT_ERROR_RESPONSE = 0x01
ATT_EXCHANGE_MTU_REQUEST = 0x02
ATT_EXCHANGE_MTU_RESPONSE = 0x03
ATT_FIND_INFORMATION_REQUEST = 0x04
ATT_FIND_INFORMATION_RESPONSE = 0x05
ATT_FIND_BY_TYPE_VALUE_REQUEST = 0x06
ATT_FIND_BY_TYPE_VALUE_RESPONSE = 0x07
ATT_READ_BY_TYPE_REQUEST = 0x08
ATT_READ_BY_TYPE_RESPONSE = 0x09
ATT_READ_REQUEST = 0x0A
ATT_READ_RESPONSE = 0x0B
ATT_READ_BLOB_REQUEST = 0x0C
ATT_READ_BLOB_RESPONSE = 0x0D
ATT_READ_MULTIPLE_REQUEST = 0x0E
ATT_READ_MULTIPLE_RESPONSE = 0x0F
ATT_READ_BY_GROUP_TYPE_REQUEST = 0x10
ATT_READ_BY_GROUP_TYPE_RESPONSE = 0x11
ATT_WRITE_REQUEST = 0x12
ATT_WRITE_RESPONSE = 0x13
ATT_WRITE_COMMAND = 0x52
ATT_SIGNED_WRITE_COMMAND = 0xD2
ATT_PREPARE_WRITE_REQUEST = 0x16
ATT_PREPARE_WRITE_RESPONSE = 0x17
ATT_EXECUTE_WRITE_REQUEST = 0x18
ATT_EXECUTE_WRITE_RESPONSE = 0x19
ATT_HANDLE_VALUE_NOTIFICATION = 0x1B
ATT_HANDLE_VALUE_INDICATION = 0x1D
ATT_HANDLE_VALUE_CONFIRMATION = 0x1E
ATT_ERROR_RESPONSE = 0x01
ATT_EXCHANGE_MTU_REQUEST = 0x02
ATT_EXCHANGE_MTU_RESPONSE = 0x03
ATT_FIND_INFORMATION_REQUEST = 0x04
ATT_FIND_INFORMATION_RESPONSE = 0x05
ATT_FIND_BY_TYPE_VALUE_REQUEST = 0x06
ATT_FIND_BY_TYPE_VALUE_RESPONSE = 0x07
ATT_READ_BY_TYPE_REQUEST = 0x08
ATT_READ_BY_TYPE_RESPONSE = 0x09
ATT_READ_REQUEST = 0x0A
ATT_READ_RESPONSE = 0x0B
ATT_READ_BLOB_REQUEST = 0x0C
ATT_READ_BLOB_RESPONSE = 0x0D
ATT_READ_MULTIPLE_REQUEST = 0x0E
ATT_READ_MULTIPLE_RESPONSE = 0x0F
ATT_READ_BY_GROUP_TYPE_REQUEST = 0x10
ATT_READ_BY_GROUP_TYPE_RESPONSE = 0x11
ATT_READ_MULTIPLE_VARIABLE_REQUEST = 0x20
ATT_READ_MULTIPLE_VARIABLE_RESPONSE = 0x21
ATT_WRITE_REQUEST = 0x12
ATT_WRITE_RESPONSE = 0x13
ATT_WRITE_COMMAND = 0x52
ATT_SIGNED_WRITE_COMMAND = 0xD2
ATT_PREPARE_WRITE_REQUEST = 0x16
ATT_PREPARE_WRITE_RESPONSE = 0x17
ATT_EXECUTE_WRITE_REQUEST = 0x18
ATT_EXECUTE_WRITE_RESPONSE = 0x19
ATT_HANDLE_VALUE_NOTIFICATION = 0x1B
ATT_HANDLE_VALUE_INDICATION = 0x1D
ATT_HANDLE_VALUE_CONFIRMATION = 0x1E
ATT_REQUESTS = [
Opcode.ATT_EXCHANGE_MTU_REQUEST,
@@ -110,9 +112,10 @@ ATT_REQUESTS = [
Opcode.ATT_READ_BLOB_REQUEST,
Opcode.ATT_READ_MULTIPLE_REQUEST,
Opcode.ATT_READ_BY_GROUP_TYPE_REQUEST,
Opcode.ATT_READ_MULTIPLE_VARIABLE_REQUEST,
Opcode.ATT_WRITE_REQUEST,
Opcode.ATT_PREPARE_WRITE_REQUEST,
Opcode.ATT_EXECUTE_WRITE_REQUEST
Opcode.ATT_EXECUTE_WRITE_REQUEST,
]
ATT_RESPONSES = [
@@ -125,9 +128,10 @@ ATT_RESPONSES = [
Opcode.ATT_READ_BLOB_RESPONSE,
Opcode.ATT_READ_MULTIPLE_RESPONSE,
Opcode.ATT_READ_BY_GROUP_TYPE_RESPONSE,
Opcode.ATT_READ_MULTIPLE_VARIABLE_RESPONSE,
Opcode.ATT_WRITE_RESPONSE,
Opcode.ATT_PREPARE_WRITE_RESPONSE,
Opcode.ATT_EXECUTE_WRITE_RESPONSE
Opcode.ATT_EXECUTE_WRITE_RESPONSE,
]
class ErrorCode(hci.SpecableEnum):
@@ -185,6 +189,18 @@ ATT_INSUFFICIENT_RESOURCES_ERROR = ErrorCode.INSUFFICIENT_RESOURCES
ATT_DEFAULT_MTU = 23
HANDLE_FIELD_SPEC = {'size': 2, 'mapper': lambda x: f'0x{x:04X}'}
_SET_OF_HANDLES_METADATA = hci.metadata({
'parser': lambda data, offset: (
len(data),
[
struct.unpack_from('<H', data, i)[0]
for i in range(offset, len(data), 2)
],
),
'serializer': lambda handles: b''.join(
[struct.pack('<H', handle) for handle in handles]
),
})
# fmt: on
# pylint: enable=line-too-long
@@ -554,7 +570,7 @@ class ATT_Read_Multiple_Request(ATT_PDU):
See Bluetooth spec @ Vol 3, Part F - 3.4.4.7 Read Multiple Request
'''
set_of_handles: bytes = dataclasses.field(metadata=hci.metadata("*"))
set_of_handles: Sequence[int] = dataclasses.field(metadata=_SET_OF_HANDLES_METADATA)
# -----------------------------------------------------------------------------
@@ -635,6 +651,55 @@ class ATT_Read_By_Group_Type_Response(ATT_PDU):
return result
# -----------------------------------------------------------------------------
@ATT_PDU.subclass
@dataclasses.dataclass
class ATT_Read_Multiple_Variable_Request(ATT_PDU):
'''
See Bluetooth spec @ Vol 3, Part F - 3.4.4.11 Read Multiple Variable Request
'''
set_of_handles: Sequence[int] = dataclasses.field(metadata=_SET_OF_HANDLES_METADATA)
# -----------------------------------------------------------------------------
@ATT_PDU.subclass
@dataclasses.dataclass
class ATT_Read_Multiple_Variable_Response(ATT_PDU):
'''
See Bluetooth spec @ Vol 3, Part F - 3.4.4.12 Read Multiple Variable Response
'''
@classmethod
def _parse_length_value_tuples(
cls, data: bytes, offset: int
) -> tuple[int, list[tuple[int, bytes]]]:
length_value_tuple_list: list[tuple[int, bytes]] = []
while offset < len(data):
length = struct.unpack_from('<H', data, offset)[0]
length_value_tuple_list.append(
(length, data[offset + 2 : offset + 2 + length])
)
offset += 2 + length
return (len(data), length_value_tuple_list)
length_value_tuple_list: Sequence[tuple[int, bytes]] = dataclasses.field(
metadata=hci.metadata(
{
'parser': lambda data, offset: ATT_Read_Multiple_Variable_Response._parse_length_value_tuples(
data, offset
),
'serializer': lambda length_value_tuple_list: b''.join(
[
struct.pack('<H', length) + value
for length, value in length_value_tuple_list
]
),
}
)
)
# -----------------------------------------------------------------------------
@ATT_PDU.subclass
@dataclasses.dataclass

View File

@@ -235,7 +235,7 @@ class Protocol:
)
+ payload
)
self.l2cap_channel.send_pdu(pdu)
self.l2cap_channel.write(pdu)
def send_command(self, transaction_label: int, pid: int, payload: bytes) -> None:
logger.debug(

View File

@@ -268,7 +268,7 @@ class MediaPacketPump:
await self.clock.sleep(delay)
# Emit
rtp_channel.send_pdu(bytes(packet))
rtp_channel.write(bytes(packet))
logger.debug(
f'{color(">>> sending RTP packet:", "green")} {packet}'
)
@@ -1519,7 +1519,7 @@ class Protocol(utils.EventEmitter):
header = bytes([first_header_byte])
# Send one packet
self.l2cap_channel.send_pdu(header + payload[:max_fragment_size])
self.l2cap_channel.write(header + payload[:max_fragment_size])
# Prepare for the next packet
payload = payload[max_fragment_size:]
@@ -1829,7 +1829,7 @@ class Stream:
def send_media_packet(self, packet: MediaPacket) -> None:
assert self.rtp_channel
self.rtp_channel.send_pdu(bytes(packet))
self.rtp_channel.write(bytes(packet))
async def configure(self) -> None:
if self.state != State.IDLE:

View File

@@ -26,7 +26,7 @@ from collections.abc import AsyncIterator, Awaitable, Callable, Iterable, Sequen
from dataclasses import dataclass, field
from typing import ClassVar, SupportsBytes, TypeVar
from bumble import avc, avctp, core, hci, l2cap, utils
from bumble import avc, avctp, core, hci, l2cap, sdp, utils
from bumble.colors import color
from bumble.device import Connection, Device
from bumble.sdp import (
@@ -55,13 +55,15 @@ AVRCP_PID = 0x110E
AVRCP_BLUETOOTH_SIG_COMPANY_ID = 0x001958
_UINT64_BE_METADATA = {
'parser': lambda data, offset: (
offset + 8,
int.from_bytes(data[offset : offset + 8], byteorder='big'),
),
'serializer': lambda x: x.to_bytes(8, byteorder='big'),
}
_UINT64_BE_METADATA = hci.metadata(
{
'parser': lambda data, offset: (
offset + 8,
int.from_bytes(data[offset : offset + 8], byteorder='big'),
),
'serializer': lambda x: x.to_bytes(8, byteorder='big'),
}
)
class PduId(utils.OpenIntEnum):
@@ -92,7 +94,7 @@ class PduId(utils.OpenIntEnum):
class CharacterSetId(hci.SpecableEnum):
UTF_8 = 0x06
UTF_8 = 0x6A
class MediaAttributeId(hci.SpecableEnum):
@@ -192,82 +194,43 @@ class TargetFeatures(enum.IntFlag):
# -----------------------------------------------------------------------------
def make_controller_service_sdp_records(
service_record_handle: int,
avctp_version: tuple[int, int] = (1, 4),
avrcp_version: tuple[int, int] = (1, 6),
supported_features: int | ControllerFeatures = 1,
) -> list[ServiceAttribute]:
avctp_version_int = avctp_version[0] << 8 | avctp_version[1]
avrcp_version_int = avrcp_version[0] << 8 | avrcp_version[1]
@dataclass
class ControllerServiceSdpRecord:
service_record_handle: int
avctp_version: tuple[int, int] = (1, 4)
avrcp_version: tuple[int, int] = (1, 6)
supported_features: int | ControllerFeatures = ControllerFeatures(1)
attributes = [
ServiceAttribute(
SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID,
DataElement.unsigned_integer_32(service_record_handle),
),
ServiceAttribute(
SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID,
DataElement.sequence([DataElement.uuid(SDP_PUBLIC_BROWSE_ROOT)]),
),
ServiceAttribute(
SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
DataElement.sequence(
[
DataElement.uuid(core.BT_AV_REMOTE_CONTROL_SERVICE),
DataElement.uuid(core.BT_AV_REMOTE_CONTROL_CONTROLLER_SERVICE),
]
),
),
ServiceAttribute(
SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
DataElement.sequence(
[
DataElement.sequence(
[
DataElement.uuid(core.BT_L2CAP_PROTOCOL_ID),
DataElement.unsigned_integer_16(avctp.AVCTP_PSM),
]
),
DataElement.sequence(
[
DataElement.uuid(core.BT_AVCTP_PROTOCOL_ID),
DataElement.unsigned_integer_16(avctp_version_int),
]
),
]
),
),
ServiceAttribute(
SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
DataElement.sequence(
[
DataElement.sequence(
[
DataElement.uuid(core.BT_AV_REMOTE_CONTROL_SERVICE),
DataElement.unsigned_integer_16(avrcp_version_int),
]
),
]
),
),
ServiceAttribute(
SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID,
DataElement.unsigned_integer_16(supported_features),
),
]
if supported_features & ControllerFeatures.SUPPORTS_BROWSING:
attributes.append(
def to_service_attributes(self) -> list[ServiceAttribute]:
avctp_version_int = self.avctp_version[0] << 8 | self.avctp_version[1]
avrcp_version_int = self.avrcp_version[0] << 8 | self.avrcp_version[1]
attributes = [
ServiceAttribute(
SDP_ADDITIONAL_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID,
DataElement.unsigned_integer_32(self.service_record_handle),
),
ServiceAttribute(
SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID,
DataElement.sequence([DataElement.uuid(SDP_PUBLIC_BROWSE_ROOT)]),
),
ServiceAttribute(
SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
DataElement.sequence(
[
DataElement.uuid(core.BT_AV_REMOTE_CONTROL_SERVICE),
DataElement.uuid(core.BT_AV_REMOTE_CONTROL_CONTROLLER_SERVICE),
]
),
),
ServiceAttribute(
SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
DataElement.sequence(
[
DataElement.sequence(
[
DataElement.uuid(core.BT_L2CAP_PROTOCOL_ID),
DataElement.unsigned_integer_16(
avctp.AVCTP_BROWSING_PSM
),
DataElement.unsigned_integer_16(avctp.AVCTP_PSM),
]
),
DataElement.sequence(
@@ -279,87 +242,130 @@ def make_controller_service_sdp_records(
]
),
),
)
return attributes
ServiceAttribute(
SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
DataElement.sequence(
[
DataElement.sequence(
[
DataElement.uuid(core.BT_AV_REMOTE_CONTROL_SERVICE),
DataElement.unsigned_integer_16(avrcp_version_int),
]
),
]
),
),
ServiceAttribute(
SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID,
DataElement.unsigned_integer_16(self.supported_features),
),
]
if self.supported_features & ControllerFeatures.SUPPORTS_BROWSING:
attributes.append(
ServiceAttribute(
SDP_ADDITIONAL_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
DataElement.sequence(
[
DataElement.sequence(
[
DataElement.uuid(core.BT_L2CAP_PROTOCOL_ID),
DataElement.unsigned_integer_16(
avctp.AVCTP_BROWSING_PSM
),
]
),
DataElement.sequence(
[
DataElement.uuid(core.BT_AVCTP_PROTOCOL_ID),
DataElement.unsigned_integer_16(avctp_version_int),
]
),
]
),
),
)
return attributes
@classmethod
async def find(cls, connection: Connection) -> list[ControllerServiceSdpRecord]:
async with sdp.Client(connection) as sdp_client:
search_result = await sdp_client.search_attributes(
uuids=[core.BT_AV_REMOTE_CONTROL_CONTROLLER_SERVICE],
attribute_ids=[
SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID,
SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID,
],
)
records: list[ControllerServiceSdpRecord] = []
for attribute_lists in search_result:
record = cls(0)
for attribute in attribute_lists:
if attribute.id == SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID:
record.service_record_handle = attribute.value.value
elif attribute.id == SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID:
# [[L2CAP, PSM], [AVCTP, version]]
record.avctp_version = (
attribute.value.value[1].value[1].value >> 8,
attribute.value.value[1].value[1].value & 0xFF,
)
elif (
attribute.id
== SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID
):
# [[AV_REMOTE_CONTROL, version]]
record.avrcp_version = (
attribute.value.value[0].value[1].value >> 8,
attribute.value.value[0].value[1].value & 0xFF,
)
elif attribute.id == SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID:
record.supported_features = ControllerFeatures(
attribute.value.value
)
records.append(record)
return records
# -----------------------------------------------------------------------------
def make_target_service_sdp_records(
service_record_handle: int,
avctp_version: tuple[int, int] = (1, 4),
avrcp_version: tuple[int, int] = (1, 6),
supported_features: int | TargetFeatures = 0x23,
) -> list[ServiceAttribute]:
# TODO: support a way to compute the supported features from a feature list
avctp_version_int = avctp_version[0] << 8 | avctp_version[1]
avrcp_version_int = avrcp_version[0] << 8 | avrcp_version[1]
@dataclass
class TargetServiceSdpRecord:
service_record_handle: int
avctp_version: tuple[int, int] = (1, 4)
avrcp_version: tuple[int, int] = (1, 6)
supported_features: int | TargetFeatures = TargetFeatures(0x23)
attributes = [
ServiceAttribute(
SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID,
DataElement.unsigned_integer_32(service_record_handle),
),
ServiceAttribute(
SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID,
DataElement.sequence([DataElement.uuid(SDP_PUBLIC_BROWSE_ROOT)]),
),
ServiceAttribute(
SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
DataElement.sequence(
[
DataElement.uuid(core.BT_AV_REMOTE_CONTROL_TARGET_SERVICE),
]
),
),
ServiceAttribute(
SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
DataElement.sequence(
[
DataElement.sequence(
[
DataElement.uuid(core.BT_L2CAP_PROTOCOL_ID),
DataElement.unsigned_integer_16(avctp.AVCTP_PSM),
]
),
DataElement.sequence(
[
DataElement.uuid(core.BT_AVCTP_PROTOCOL_ID),
DataElement.unsigned_integer_16(avctp_version_int),
]
),
]
),
),
ServiceAttribute(
SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
DataElement.sequence(
[
DataElement.sequence(
[
DataElement.uuid(core.BT_AV_REMOTE_CONTROL_SERVICE),
DataElement.unsigned_integer_16(avrcp_version_int),
]
),
]
),
),
ServiceAttribute(
SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID,
DataElement.unsigned_integer_16(supported_features),
),
]
if supported_features & TargetFeatures.SUPPORTS_BROWSING:
attributes.append(
def to_service_attributes(self) -> list[ServiceAttribute]:
# TODO: support a way to compute the supported features from a feature list
avctp_version_int = self.avctp_version[0] << 8 | self.avctp_version[1]
avrcp_version_int = self.avrcp_version[0] << 8 | self.avrcp_version[1]
attributes = [
ServiceAttribute(
SDP_ADDITIONAL_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID,
DataElement.unsigned_integer_32(self.service_record_handle),
),
ServiceAttribute(
SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID,
DataElement.sequence([DataElement.uuid(SDP_PUBLIC_BROWSE_ROOT)]),
),
ServiceAttribute(
SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
DataElement.sequence(
[
DataElement.uuid(core.BT_AV_REMOTE_CONTROL_TARGET_SERVICE),
]
),
),
ServiceAttribute(
SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
DataElement.sequence(
[
DataElement.sequence(
[
DataElement.uuid(core.BT_L2CAP_PROTOCOL_ID),
DataElement.unsigned_integer_16(
avctp.AVCTP_BROWSING_PSM
),
DataElement.unsigned_integer_16(avctp.AVCTP_PSM),
]
),
DataElement.sequence(
@@ -371,8 +377,90 @@ def make_target_service_sdp_records(
]
),
),
)
return attributes
ServiceAttribute(
SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
DataElement.sequence(
[
DataElement.sequence(
[
DataElement.uuid(core.BT_AV_REMOTE_CONTROL_SERVICE),
DataElement.unsigned_integer_16(avrcp_version_int),
]
),
]
),
),
ServiceAttribute(
SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID,
DataElement.unsigned_integer_16(self.supported_features),
),
]
if self.supported_features & TargetFeatures.SUPPORTS_BROWSING:
attributes.append(
ServiceAttribute(
SDP_ADDITIONAL_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
DataElement.sequence(
[
DataElement.sequence(
[
DataElement.uuid(core.BT_L2CAP_PROTOCOL_ID),
DataElement.unsigned_integer_16(
avctp.AVCTP_BROWSING_PSM
),
]
),
DataElement.sequence(
[
DataElement.uuid(core.BT_AVCTP_PROTOCOL_ID),
DataElement.unsigned_integer_16(avctp_version_int),
]
),
]
),
),
)
return attributes
@classmethod
async def find(cls, connection: Connection) -> list[TargetServiceSdpRecord]:
async with sdp.Client(connection) as sdp_client:
search_result = await sdp_client.search_attributes(
uuids=[core.BT_AV_REMOTE_CONTROL_TARGET_SERVICE],
attribute_ids=[
SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID,
SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID,
],
)
records: list[TargetServiceSdpRecord] = []
for attribute_lists in search_result:
record = cls(0)
for attribute in attribute_lists:
if attribute.id == SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID:
record.service_record_handle = attribute.value.value
elif attribute.id == SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID:
# [[L2CAP, PSM], [AVCTP, version]]
record.avctp_version = (
attribute.value.value[1].value[1].value >> 8,
attribute.value.value[1].value[1].value & 0xFF,
)
elif (
attribute.id
== SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID
):
# [[AV_REMOTE_CONTROL, version]]
record.avrcp_version = (
attribute.value.value[0].value[1].value >> 8,
attribute.value.value[0].value[1].value & 0xFF,
)
elif attribute.id == SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID:
record.supported_features = TargetFeatures(
attribute.value.value
)
records.append(record)
return records
# -----------------------------------------------------------------------------
@@ -491,14 +579,12 @@ class BrowseableItem:
**hci.HCI_Object.dict_from_bytes(data, offset + 3, subclass.fields)
)
instance._payload = data[3:]
return offset + length, instance
return offset + length + 3, instance
def __bytes__(self) -> bytes:
if self._payload is None:
self._payload = hci.HCI_Object.dict_to_bytes(self.__dict__, self.fields)
return (
struct.pack('>BH', self.item_type, len(self._payload) + 3) + self._payload
)
return struct.pack('>BH', self.item_type, len(self._payload)) + self._payload
_Item = TypeVar('_Item', bound='BrowseableItem')
@@ -601,11 +687,11 @@ class MediaPlayerItem(BrowseableItem):
metadata=MajorPlayerType.type_metadata(1)
)
player_sub_type: PlayerSubType = field(
metadata=PlayerSubType.type_metadata(4, byteorder='big')
metadata=PlayerSubType.type_metadata(4, byteorder='little')
)
play_status: PlayStatus = field(metadata=PlayStatus.type_metadata(1))
feature_bitmask: Features = field(
metadata=Features.type_metadata(16, byteorder='big')
metadata=Features.type_metadata(16, byteorder='little')
)
character_set_id: CharacterSetId = field(
metadata=CharacterSetId.type_metadata(2, byteorder='big')
@@ -634,7 +720,7 @@ class FolderItem(BrowseableItem):
folder_uid: int = field(metadata=_UINT64_BE_METADATA)
folder_type: FolderType = field(metadata=FolderType.type_metadata(1))
is_playable: FolderType = field(metadata=Playable.type_metadata(1))
is_playable: Playable = field(metadata=Playable.type_metadata(1))
character_set_id: CharacterSetId = field(
metadata=CharacterSetId.type_metadata(2, byteorder='big')
)
@@ -876,7 +962,7 @@ class GetPlayStatusCommand(Command):
class GetElementAttributesCommand(Command):
pdu_id = PduId.GET_ELEMENT_ATTRIBUTES
identifier: int = field(metadata=hci.metadata(_UINT64_BE_METADATA))
identifier: int = field(metadata=_UINT64_BE_METADATA)
attribute_ids: Sequence[MediaAttributeId] = field(
metadata=MediaAttributeId.type_metadata(
4, list_begin=True, list_end=True, byteorder='big'
@@ -951,7 +1037,7 @@ class ChangePathCommand(Command):
uid_counter: int = field(metadata=hci.metadata('>2'))
direction: Direction = field(metadata=Direction.type_metadata(1))
folder_uid: int = field(metadata=hci.metadata(_UINT64_BE_METADATA))
folder_uid: int = field(metadata=_UINT64_BE_METADATA)
# -----------------------------------------------------------------------------
@@ -961,7 +1047,7 @@ class GetItemAttributesCommand(Command):
pdu_id = PduId.GET_ITEM_ATTRIBUTES
scope: Scope = field(metadata=Scope.type_metadata(1))
uid: int = field(metadata=hci.metadata(_UINT64_BE_METADATA))
uid: int = field(metadata=_UINT64_BE_METADATA)
uid_counter: int = field(metadata=hci.metadata('>2'))
start_item: int = field(metadata=hci.metadata('>4'))
end_item: int = field(metadata=hci.metadata('>4'))
@@ -999,7 +1085,7 @@ class PlayItemCommand(Command):
pdu_id = PduId.PLAY_ITEM
scope: Scope = field(metadata=Scope.type_metadata(1))
uid: int = field(metadata=hci.metadata(_UINT64_BE_METADATA))
uid: int = field(metadata=_UINT64_BE_METADATA)
uid_counter: int = field(metadata=hci.metadata('>2'))
@@ -1010,7 +1096,7 @@ class AddToNowPlayingCommand(Command):
pdu_id = PduId.ADD_TO_NOW_PLAYING
scope: Scope = field(metadata=Scope.type_metadata(1))
uid: int = field(metadata=hci.metadata(_UINT64_BE_METADATA))
uid: int = field(metadata=_UINT64_BE_METADATA)
uid_counter: int = field(metadata=hci.metadata('>2'))
@@ -1204,6 +1290,10 @@ class InformBatteryStatusOfCtResponse(Response):
@dataclass
class GetPlayStatusResponse(Response):
pdu_id = PduId.GET_PLAY_STATUS
# TG doesn't support Song Length or Position.
UNAVAILABLE = 0xFFFFFFFF
song_length: int = field(metadata=hci.metadata(">4"))
song_position: int = field(metadata=hci.metadata(">4"))
play_status: PlayStatus = field(metadata=PlayStatus.type_metadata(1))
@@ -1521,16 +1611,33 @@ class Delegate:
def __init__(self, status_code: StatusCode) -> None:
self.status_code = status_code
supported_events: list[EventId]
volume: int
class AvcError(Exception):
"""The delegate AVC method failed, with a specified status code."""
def __init__(self, supported_events: Iterable[EventId] = ()) -> None:
def __init__(self, status_code: avc.ResponseFrame.ResponseCode) -> None:
self.status_code = status_code
supported_events: list[EventId]
supported_company_ids: list[int]
volume: int
playback_status: PlayStatus
def __init__(
self,
supported_events: Iterable[EventId] = (),
supported_company_ids: Iterable[int] = (AVRCP_BLUETOOTH_SIG_COMPANY_ID,),
) -> None:
self.supported_company_ids = list(supported_company_ids)
self.supported_events = list(supported_events)
self.volume = 0
self.playback_status = PlayStatus.STOPPED
async def get_supported_events(self) -> list[EventId]:
return self.supported_events
async def get_supported_company_ids(self) -> list[int]:
return self.supported_company_ids
async def set_absolute_volume(self, volume: int) -> None:
"""
Set the absolute volume.
@@ -1543,6 +1650,19 @@ class Delegate:
async def get_absolute_volume(self) -> int:
return self.volume
async def on_key_event(
self,
key: avc.PassThroughFrame.OperationId,
pressed: bool,
data: bytes,
) -> None:
logger.debug(
"@@@ on_key_event: key=%s, pressed=%s, data=%s", key, pressed, data.hex()
)
async def get_playback_status(self) -> PlayStatus:
return self.playback_status
# TODO add other delegate methods
@@ -1756,6 +1876,19 @@ class Protocol(utils.EventEmitter):
if isinstance(capability, EventId)
)
async def get_supported_company_ids(self) -> list[int]:
"""Get the list of events supported by the connected peer."""
response_context = await self.send_avrcp_command(
avc.CommandFrame.CommandType.STATUS,
GetCapabilitiesCommand(GetCapabilitiesCommand.CapabilityId.COMPANY_ID),
)
response = self._check_response(response_context, GetCapabilitiesResponse)
return list(
int.from_bytes(capability, 'big')
for capability in response.capabilities
if isinstance(capability, bytes)
)
async def get_play_status(self) -> SongAndPlayStatus:
"""Get the play status of the connected peer."""
response_context = await self.send_avrcp_command(
@@ -2052,16 +2185,28 @@ class Protocol(utils.EventEmitter):
return
if isinstance(command, avc.PassThroughCommandFrame):
# TODO: delegate
response = avc.PassThroughResponseFrame(
avc.ResponseFrame.ResponseCode.ACCEPTED,
command.subunit_type,
command.subunit_id,
command.state_flag,
command.operation_id,
command.operation_data,
)
self.send_response(transaction_label, response)
async def dispatch_key_event() -> None:
try:
await self.delegate.on_key_event(
command.operation_id,
command.state_flag == avc.PassThroughFrame.StateFlag.PRESSED,
command.operation_data,
)
response_code = avc.ResponseFrame.ResponseCode.ACCEPTED
except Delegate.AvcError as error:
logger.exception("delegate method raised exception")
response_code = error.status_code
except Exception:
logger.exception("delegate method raised exception")
response_code = avc.ResponseFrame.ResponseCode.REJECTED
self.send_passthrough_response(
transaction_label=transaction_label,
command=command,
response_code=response_code,
)
utils.AsyncRunner.spawn(dispatch_key_event())
return
# TODO handle other types
@@ -2141,6 +2286,8 @@ class Protocol(utils.EventEmitter):
self._on_set_absolute_volume_command(transaction_label, command)
elif isinstance(command, RegisterNotificationCommand):
self._on_register_notification_command(transaction_label, command)
elif isinstance(command, GetPlayStatusCommand):
self._on_get_play_status_command(transaction_label, command)
else:
# Not supported.
# TODO: check that this is the right way to respond in this case.
@@ -2364,17 +2511,27 @@ class Protocol(utils.EventEmitter):
logger.debug(f"<<< AVRCP command PDU: {command}")
async def get_supported_events() -> None:
capabilities: Sequence[bytes | SupportsBytes]
if (
command.capability_id
!= GetCapabilitiesCommand.CapabilityId.EVENTS_SUPPORTED
== GetCapabilitiesCommand.CapabilityId.EVENTS_SUPPORTED
):
raise core.InvalidArgumentError()
supported_events = await self.delegate.get_supported_events()
capabilities = await self.delegate.get_supported_events()
elif (
command.capability_id == GetCapabilitiesCommand.CapabilityId.COMPANY_ID
):
company_ids = await self.delegate.get_supported_company_ids()
capabilities = [
company_id.to_bytes(3, 'big') for company_id in company_ids
]
else:
raise core.InvalidArgumentError(
f"Unsupported capability: {command.capability_id}"
)
self.send_avrcp_response(
transaction_label,
avc.ResponseFrame.ResponseCode.IMPLEMENTED_OR_STABLE,
GetCapabilitiesResponse(command.capability_id, supported_events),
GetCapabilitiesResponse(command.capability_id, capabilities),
)
self._delegate_command(transaction_label, command, get_supported_events())
@@ -2395,6 +2552,26 @@ class Protocol(utils.EventEmitter):
self._delegate_command(transaction_label, command, set_absolute_volume())
def _on_get_play_status_command(
self, transaction_label: int, command: GetPlayStatusCommand
) -> None:
logger.debug("<<< AVRCP command PDU: %s", command)
async def get_playback_status() -> None:
play_status: PlayStatus = await self.delegate.get_playback_status()
self.send_avrcp_response(
transaction_label,
avc.ResponseFrame.ResponseCode.IMPLEMENTED_OR_STABLE,
GetPlayStatusResponse(
# TODO: Delegate this.
song_length=GetPlayStatusResponse.UNAVAILABLE,
song_position=GetPlayStatusResponse.UNAVAILABLE,
play_status=play_status,
),
)
self._delegate_command(transaction_label, command, get_playback_status())
def _on_register_notification_command(
self, transaction_label: int, command: RegisterNotificationCommand
) -> None:
@@ -2410,28 +2587,27 @@ class Protocol(utils.EventEmitter):
)
return
response: Response
if command.event_id == EventId.VOLUME_CHANGED:
volume = await self.delegate.get_absolute_volume()
response = RegisterNotificationResponse(VolumeChangedEvent(volume))
self.send_avrcp_response(
transaction_label,
avc.ResponseFrame.ResponseCode.INTERIM,
response,
elif command.event_id == EventId.PLAYBACK_STATUS_CHANGED:
playback_status = await self.delegate.get_playback_status()
response = RegisterNotificationResponse(
PlaybackStatusChangedEvent(play_status=playback_status)
)
self._register_notification_listener(transaction_label, command)
elif command.event_id == EventId.NOW_PLAYING_CONTENT_CHANGED:
playback_status = await self.delegate.get_playback_status()
response = RegisterNotificationResponse(NowPlayingContentChangedEvent())
else:
logger.warning("Event supported but not handled %s", command.event_id)
return
if command.event_id == EventId.PLAYBACK_STATUS_CHANGED:
# TODO: testing only, use delegate
response = RegisterNotificationResponse(
PlaybackStatusChangedEvent(play_status=PlayStatus.PLAYING)
)
self.send_avrcp_response(
transaction_label,
avc.ResponseFrame.ResponseCode.INTERIM,
response,
)
self._register_notification_listener(transaction_label, command)
return
self.send_avrcp_response(
transaction_label,
avc.ResponseFrame.ResponseCode.INTERIM,
response,
)
self._register_notification_listener(transaction_label, command)
self._delegate_command(transaction_label, command, register_notification())

View File

@@ -37,7 +37,12 @@ class HCI_Bridge:
def on_packet(self, packet):
# Convert the packet bytes to an object
hci_packet = HCI_Packet.from_bytes(packet)
try:
hci_packet = HCI_Packet.from_bytes(packet)
except Exception:
logger.warning('forwarding unparsed packet as-is')
self.hci_sink.on_packet(packet)
return
# Filter the packet
if self.packet_filter is not None:
@@ -50,7 +55,10 @@ class HCI_Bridge:
return
# Analyze the packet
self.trace(hci_packet)
try:
self.trace(hci_packet)
except Exception:
logger.exception('Exception while tracing packet')
# Bridge the packet
self.hci_sink.on_packet(packet)

View File

@@ -1898,6 +1898,19 @@ class Controller:
'''
return bytes([hci.HCI_SUCCESS]) + self.le_features.value.to_bytes(8, 'little')
def on_hci_le_read_all_local_supported_features_command(
self, _command: hci.HCI_LE_Read_All_Local_Supported_Features_Command
) -> bytes | None:
'''
See Bluetooth spec Vol 4, Part E - 7.8.128 LE Read All Local Supported Features
Command
'''
return (
bytes([hci.HCI_SUCCESS])
+ bytes([0])
+ self.le_features.value.to_bytes(248, 'little')
)
def on_hci_le_set_random_address_command(
self, command: hci.HCI_LE_Set_Random_Address_Command
) -> bytes | None:

File diff suppressed because it is too large Load Diff

View File

@@ -663,10 +663,13 @@ class Driver(common.Driver):
async def read_device_info(self) -> dict[ValueType, Any]:
self.host.ready = True
response1 = await self.host.send_sync_command(
hci.HCI_Reset_Command(), check_status=False
)
if response1.status not in (hci.HCI_UNKNOWN_HCI_COMMAND_ERROR, hci.HCI_SUCCESS):
response1 = await self.host.send_sync_command_raw(hci.HCI_Reset_Command())
if not isinstance(
response1.return_parameters, hci.HCI_StatusReturnParameters
) or response1.return_parameters.status not in (
hci.HCI_UNKNOWN_HCI_COMMAND_ERROR,
hci.HCI_SUCCESS,
):
# When the controller is in operational mode, the response is a
# successful response.
# When the controller is in bootloader mode,
@@ -676,13 +679,18 @@ class Driver(common.Driver):
raise DriverError("unexpected HCI response")
# Read the firmware version.
response2 = await self.host.send_sync_command(
HCI_Intel_Read_Version_Command(param0=0xFF), check_status=False
response2 = await self.host.send_sync_command_raw(
HCI_Intel_Read_Version_Command(param0=0xFF)
)
if response2.status != 0: # type: ignore
if (
not isinstance(
response2.return_parameters, HCI_Intel_Read_Version_ReturnParameters
)
or response2.return_parameters.status != 0
):
raise DriverError("HCI_Intel_Read_Version_Command error")
tlvs = _parse_tlv(response2.tlv) # type: ignore
tlvs = _parse_tlv(response2.return_parameters.tlv) # type: ignore
# Convert the list to a dict. That's Ok here because we only expect each type
# to appear just once.

View File

@@ -129,6 +129,7 @@ RTK_USB_PRODUCTS = {
(0x2357, 0x0604),
(0x2550, 0x8761),
(0x2B89, 0x8761),
(0x2C0A, 0x8761),
(0x7392, 0xC611),
# Realtek 8761CUV
(0x0B05, 0x1BF6),
@@ -533,11 +534,13 @@ class Driver(common.Driver):
@staticmethod
async def get_loaded_firmware_version(host: Host) -> int | None:
response1 = await host.send_sync_command(
HCI_RTK_Read_ROM_Version_Command(), check_status=False
)
if response1.status != hci.HCI_SUCCESS:
response1 = await host.send_sync_command_raw(HCI_RTK_Read_ROM_Version_Command())
if (
not isinstance(
response1.return_parameters, HCI_RTK_Read_ROM_Version_ReturnParameters
)
or response1.return_parameters.status != hci.HCI_SUCCESS
):
return None
response2 = await host.send_sync_command(
@@ -558,13 +561,20 @@ class Driver(common.Driver):
await host.send_sync_command(hci.HCI_Reset_Command())
host.ready = True
command = hci.HCI_Read_Local_Version_Information_Command()
response = await host.send_sync_command(command, check_status=False)
if response.status != hci.HCI_SUCCESS:
response = await host.send_sync_command_raw(
hci.HCI_Read_Local_Version_Information_Command()
)
if (
not isinstance(
response.return_parameters,
hci.HCI_Read_Local_Version_Information_ReturnParameters,
)
or response.return_parameters.status != hci.HCI_SUCCESS
):
logger.error("failed to probe local version information")
return None
local_version = response
local_version = response.return_parameters
logger.debug(
f"looking for a driver: 0x{local_version.lmp_subversion:04X} "
@@ -640,15 +650,21 @@ class Driver(common.Driver):
# TODO: load the firmware
async def download_for_rtl8723b(self):
async def download_for_rtl8723b(self) -> int | None:
if self.driver_info.has_rom_version:
response1 = await self.host.send_sync_command(
HCI_RTK_Read_ROM_Version_Command(), check_status=False
response1 = await self.host.send_sync_command_raw(
HCI_RTK_Read_ROM_Version_Command()
)
if response1.status != hci.HCI_SUCCESS:
if (
not isinstance(
response1.return_parameters,
HCI_RTK_Read_ROM_Version_ReturnParameters,
)
or response1.return_parameters.status != hci.HCI_SUCCESS
):
logger.warning("can't get ROM version")
return None
rom_version = response1.version
rom_version = response1.return_parameters.version
logger.debug(f"ROM version before download: {rom_version:04X}")
else:
rom_version = 0
@@ -690,13 +706,18 @@ class Driver(common.Driver):
logger.debug("download complete!")
# Read the version again
response2 = await self.host.send_sync_command(
HCI_RTK_Read_ROM_Version_Command(), check_status=False
response2 = await self.host.send_sync_command_raw(
HCI_RTK_Read_ROM_Version_Command()
)
if response2.status != hci.HCI_SUCCESS:
if (
not isinstance(
response2.return_parameters, HCI_RTK_Read_ROM_Version_ReturnParameters
)
or response2.return_parameters.status != hci.HCI_SUCCESS
):
logger.warning("can't get ROM version")
else:
rom_version = response2.version
rom_version = response2.return_parameters.version
logger.debug(f"ROM version after download: {rom_version:02X}")
return firmware.version

View File

@@ -29,7 +29,7 @@ import functools
import logging
import struct
from collections.abc import Iterable, Sequence
from typing import TypeVar
from typing import ClassVar, TypeVar
from bumble.att import Attribute, AttributeValue, AttributeValueV2
from bumble.colors import color
@@ -403,7 +403,7 @@ class TemplateService(Service):
to expose their UUID as a class property
'''
UUID: UUID
UUID: ClassVar[UUID]
def __init__(
self,

View File

@@ -34,11 +34,14 @@ from datetime import datetime
from typing import (
TYPE_CHECKING,
Any,
ClassVar,
Generic,
TypeVar,
overload,
)
from typing_extensions import Self
from bumble import att, core, l2cap, utils
from bumble.colors import color
from bumble.core import UUID, InvalidStateError
@@ -249,10 +252,10 @@ class ProfileServiceProxy:
Base class for profile-specific service proxies
'''
SERVICE_CLASS: type[TemplateService]
SERVICE_CLASS: ClassVar[type[TemplateService]]
@classmethod
def from_client(cls, client: Client) -> ProfileServiceProxy | None:
def from_client(cls, client: Client) -> Self | None:
return ServiceProxy.from_client(cls, client, cls.SERVICE_CLASS.UUID)
@@ -285,8 +288,6 @@ class Client:
self._bearer_id = (
f'[0x{bearer.connection.handle:04X}|CID=0x{bearer.source_cid:04X}]'
)
# Fill the mtu.
bearer.on_att_mtu_update(att.ATT_DEFAULT_MTU)
self.connection = bearer.connection
else:
bearer.on(bearer.EVENT_DISCONNECTION, self.on_disconnection)

View File

@@ -115,7 +115,6 @@ class Server(utils.EventEmitter):
channel.connection.handle,
channel.source_cid,
)
channel.att_mtu = att.ATT_DEFAULT_MTU
channel.sink = lambda pdu: self.on_gatt_pdu(
channel, att.ATT_PDU.from_bytes(pdu)
)
@@ -777,6 +776,18 @@ class Server(utils.EventEmitter):
error_code=att.ATT_ATTRIBUTE_NOT_FOUND_ERROR,
)
if (
request.starting_handle == 0x0000
or request.starting_handle > request.ending_handle
):
response = att.ATT_Error_Response(
request_opcode_in_error=request.op_code,
attribute_handle_in_error=request.starting_handle,
error_code=att.ATT_INVALID_HANDLE_ERROR,
)
self.send_response(bearer, response)
return
attributes: list[tuple[int, bytes]] = []
for attribute in (
attribute
@@ -977,6 +988,94 @@ class Server(utils.EventEmitter):
self.send_response(bearer, response)
@utils.AsyncRunner.run_in_task()
async def on_att_read_multiple_request(
self, bearer: att.Bearer, request: att.ATT_Read_Multiple_Request
):
'''
See Bluetooth spec Vol 3, Part F - 3.4.4.7 Read Multiple Request.
'''
response: att.ATT_PDU
pdu_space_available = bearer.att_mtu - 1
values: list[bytes] = []
for handle in request.set_of_handles:
if not (attribute := self.get_attribute(handle)):
response = att.ATT_Error_Response(
request_opcode_in_error=request.op_code,
attribute_handle_in_error=handle,
error_code=att.ATT_ATTRIBUTE_NOT_FOUND_ERROR,
)
self.send_response(bearer, response)
return
# No need to catch permission errors here, since these attributes
# must all be world-readable
attribute_value = await attribute.read_value(bearer)
# Check the attribute value size
max_attribute_size = min(bearer.att_mtu - 1, 251)
if len(attribute_value) > max_attribute_size:
# We need to truncate
attribute_value = attribute_value[:max_attribute_size]
# Check if there is enough space
entry_size = len(attribute_value)
if pdu_space_available < entry_size:
break
# Add the attribute to the list
values.append(attribute_value)
pdu_space_available -= entry_size
response = att.ATT_Read_Multiple_Response(set_of_values=b''.join(values))
self.send_response(bearer, response)
@utils.AsyncRunner.run_in_task()
async def on_att_read_multiple_variable_request(
self, bearer: att.Bearer, request: att.ATT_Read_Multiple_Variable_Request
):
'''
See Bluetooth spec Vol 3, Part F - 3.4.4.11 Read Multiple Variable Request.
'''
response: att.ATT_PDU
pdu_space_available = bearer.att_mtu - 1
length_value_tuple_list: list[tuple[int, bytes]] = []
for handle in request.set_of_handles:
if not (attribute := self.get_attribute(handle)):
response = att.ATT_Error_Response(
request_opcode_in_error=request.op_code,
attribute_handle_in_error=handle,
error_code=att.ATT_ATTRIBUTE_NOT_FOUND_ERROR,
)
self.send_response(bearer, response)
return
# No need to catch permission errors here, since these attributes
# must all be world-readable
attribute_value = await attribute.read_value(bearer)
length = len(attribute_value)
# Check the attribute value size
max_attribute_size = min(bearer.att_mtu - 3, 251)
if len(attribute_value) > max_attribute_size:
# We need to truncate
attribute_value = attribute_value[:max_attribute_size]
# Check if there is enough space
entry_size = 2 + len(attribute_value)
# Add the attribute to the list
length_value_tuple_list.append((length, attribute_value))
pdu_space_available -= entry_size
if pdu_space_available <= 0:
break
response = att.ATT_Read_Multiple_Variable_Response(
length_value_tuple_list=length_value_tuple_list
)
self.send_response(bearer, response)
@utils.AsyncRunner.run_in_task()
async def on_att_write_request(
self, bearer: att.Bearer, request: att.ATT_Write_Request

View File

@@ -398,8 +398,9 @@ HCI_LE_CS_SUBEVENT_RESULT_EVENT = 0x31
HCI_LE_CS_SUBEVENT_RESULT_CONTINUE_EVENT = 0x32
HCI_LE_CS_TEST_END_COMPLETE_EVENT = 0x33
HCI_LE_MONITORED_ADVERTISERS_REPORT_EVENT = 0x34
HCI_LE_FRAME_SPACE_UPDATE_EVENT = 0x35
HCI_LE_FRAME_SPACE_UPDATE_COMPLETE_EVENT = 0x35
HCI_LE_UTP_RECEIVE_EVENT = 0x36
HCI_LE_CONNECTION_RATE_CHANGE_EVENT = 0x37
# HCI Command
@@ -736,6 +737,12 @@ HCI_LE_CLEAR_MONITORED_ADVERTISERS_LIST_COMMAND = hci_c
HCI_LE_READ_MONITORED_ADVERTISERS_LIST_SIZE_COMMAND = hci_command_op_code(0x08, 0x009B)
HCI_LE_ENABLE_MONITORING_ADVERTISERS_COMMAND = hci_command_op_code(0x08, 0x009C)
HCI_LE_FRAME_SPACE_UPDATE_COMMAND = hci_command_op_code(0x08, 0x009D)
HCI_LE_SET_RESOLVABLE_PRIVATE_ADDRESS_TIMEOUT_V2_COMMAND = hci_command_op_code(0x08, 0x009E)
HCI_LE_ENABLE_UTP_OTA_MODE_COMMAND = hci_command_op_code(0x08, 0x009F)
HCI_LE_UTP_SEND_COMMAND = hci_command_op_code(0x08, 0x00A0)
HCI_LE_CONNECTION_RATE_REQUEST_COMMAND = hci_command_op_code(0x08, 0x00A1)
HCI_LE_SET_DEFAULT_RATE_PARAMETERS_COMMAND = hci_command_op_code(0x08, 0x00A2)
HCI_LE_READ_MINIMUM_SUPPORTED_CONNECTION_INTERVAL_COMMAND = hci_command_op_code(0x08, 0x00A3)
# HCI Error Codes
@@ -1398,6 +1405,12 @@ HCI_SUPPORTED_COMMANDS_MASKS = {
HCI_LE_CLEAR_MONITORED_ADVERTISERS_LIST_COMMAND : 1 << (47*8+7),
HCI_LE_READ_MONITORED_ADVERTISERS_LIST_SIZE_COMMAND : 1 << (48*8+0),
HCI_LE_FRAME_SPACE_UPDATE_COMMAND : 1 << (48*8+1),
HCI_LE_SET_RESOLVABLE_PRIVATE_ADDRESS_TIMEOUT_V2_COMMAND : 1 << (48*8+2),
HCI_LE_ENABLE_UTP_OTA_MODE_COMMAND : 1 << (48*8+3),
HCI_LE_UTP_SEND_COMMAND : 1 << (48*8+4),
HCI_LE_CONNECTION_RATE_REQUEST_COMMAND : 1 << (48*8+5),
HCI_LE_SET_DEFAULT_RATE_PARAMETERS_COMMAND : 1 << (48*8+6),
HCI_LE_READ_MINIMUM_SUPPORTED_CONNECTION_INTERVAL_COMMAND : 1 << (48*8+7)
}
# LE Supported Features
@@ -1455,6 +1468,14 @@ class LeFeature(SpecableEnum):
LL_EXTENDED_FEATURE_SET = 63
MONITORING_ADVERTISERS = 64
FRAME_SPACE_UPDATE = 65
UTP_OTA_MODE = 66
UTP_HCI_MODE = 67
LL_OTA_UTP_IND_MAXIMUM_LENGTH_0 = 68
LL_OTA_UTP_IND_MAXIMUM_LENGTH_1 = 69
SHORTER_CONNECTION_INTERVALS = 72
SHORTER_CONNECTION_INTERVALS_HOST_SUPPORT = 73
LE_FLUSHABLE_ACL_DATA = 74
class LeFeatureMask(utils.CompatibleIntFlag):
LE_ENCRYPTION = 1 << LeFeature.LE_ENCRYPTION
@@ -1509,6 +1530,13 @@ class LeFeatureMask(utils.CompatibleIntFlag):
LL_EXTENDED_FEATURE_SET = 1 << LeFeature.LL_EXTENDED_FEATURE_SET
MONITORING_ADVERTISERS = 1 << LeFeature.MONITORING_ADVERTISERS
FRAME_SPACE_UPDATE = 1 << LeFeature.FRAME_SPACE_UPDATE
UTP_OTA_MODE = 1 << LeFeature.UTP_OTA_MODE
UTP_HCI_MODE = 1 << LeFeature.UTP_HCI_MODE
LL_OTA_UTP_IND_MAXIMUM_LENGTH_0 = 1 << LeFeature.LL_OTA_UTP_IND_MAXIMUM_LENGTH_0
LL_OTA_UTP_IND_MAXIMUM_LENGTH_1 = 1 << LeFeature.LL_OTA_UTP_IND_MAXIMUM_LENGTH_1
SHORTER_CONNECTION_INTERVALS = 1 << LeFeature.SHORTER_CONNECTION_INTERVALS
SHORTER_CONNECTION_INTERVALS_HOST_SUPPORT = 1 << LeFeature.SHORTER_CONNECTION_INTERVALS_HOST_SUPPORT
LE_FLUSHABLE_ACL_DATA = 1 << LeFeature.LE_FLUSHABLE_ACL_DATA
class LmpFeature(SpecableEnum):
# Page 0 (Legacy LMP features)
@@ -2165,9 +2193,9 @@ class Address:
RANDOM_IDENTITY_ADDRESS = AddressType.RANDOM_IDENTITY
# Type declarations
NIL: Address
ANY: Address
ANY_RANDOM: Address
NIL: ClassVar[Address]
ANY: ClassVar[Address]
ANY_RANDOM: ClassVar[Address]
# pylint: disable-next=unnecessary-lambda
ADDRESS_TYPE_SPEC = {'size': 1, 'mapper': lambda x: Address.address_type_name(x)}
@@ -2279,38 +2307,38 @@ class Address:
self.address_type = address_type
def clone(self):
def clone(self) -> Address:
return Address(self.address_bytes, self.address_type)
@property
def is_public(self):
def is_public(self) -> bool:
return self.address_type in (
self.PUBLIC_DEVICE_ADDRESS,
self.PUBLIC_IDENTITY_ADDRESS,
)
@property
def is_random(self):
def is_random(self) -> bool:
return not self.is_public
@property
def is_resolved(self):
def is_resolved(self) -> bool:
return self.address_type in (
self.PUBLIC_IDENTITY_ADDRESS,
self.RANDOM_IDENTITY_ADDRESS,
)
@property
def is_resolvable(self):
def is_resolvable(self) -> bool:
return self.address_type == self.RANDOM_DEVICE_ADDRESS and (
self.address_bytes[5] >> 6 == 1
)
@property
def is_static(self):
def is_static(self) -> bool:
return self.is_random and (self.address_bytes[5] >> 6 == 3)
def to_string(self, with_type_qualifier=True):
def to_string(self, with_type_qualifier: bool = True) -> str:
'''
String representation of the address, MSB first, with an optional type
qualifier.
@@ -2320,23 +2348,23 @@ class Address:
return result
return result + '/P'
def __bytes__(self):
def __bytes__(self) -> bytes:
return self.address_bytes
def __hash__(self):
def __hash__(self) -> int:
return hash(self.address_bytes)
def __eq__(self, other):
def __eq__(self, other: Any) -> bool:
return (
isinstance(other, Address)
and self.address_bytes == other.address_bytes
and self.is_public == other.is_public
)
def __str__(self):
def __str__(self) -> str:
return self.to_string()
def __repr__(self):
def __repr__(self) -> str:
return f'Address({self.to_string(False)}/{self.address_type_name(self.address_type)})'
@@ -2375,30 +2403,34 @@ class HCI_Packet:
Abstract Base class for HCI packets
'''
hci_packet_type: ClassVar[int]
hci_packet_type: int
@staticmethod
def from_bytes(packet: bytes) -> HCI_Packet:
packet_type = packet[0]
@classmethod
def from_bytes(cls, packet: bytes) -> HCI_Packet:
try:
packet_type = packet[0]
if packet_type == HCI_COMMAND_PACKET:
return HCI_Command.from_bytes(packet)
if packet_type == HCI_COMMAND_PACKET:
return HCI_Command.from_bytes(packet)
if packet_type == HCI_ACL_DATA_PACKET:
return HCI_AclDataPacket.from_bytes(packet)
if packet_type == HCI_ACL_DATA_PACKET:
return HCI_AclDataPacket.from_bytes(packet)
if packet_type == HCI_SYNCHRONOUS_DATA_PACKET:
return HCI_SynchronousDataPacket.from_bytes(packet)
if packet_type == HCI_SYNCHRONOUS_DATA_PACKET:
return HCI_SynchronousDataPacket.from_bytes(packet)
if packet_type == HCI_EVENT_PACKET:
return HCI_Event.from_bytes(packet)
if packet_type == HCI_EVENT_PACKET:
return HCI_Event.from_bytes(packet)
if packet_type == HCI_ISO_DATA_PACKET:
return HCI_IsoDataPacket.from_bytes(packet)
if packet_type == HCI_ISO_DATA_PACKET:
return HCI_IsoDataPacket.from_bytes(packet)
return HCI_CustomPacket(packet)
return HCI_CustomPacket(packet)
except Exception as e:
logger.error(f'error parsing HCI packet [{packet.hex()}]: {e}')
raise
def __init__(self, name):
def __init__(self, name: str) -> None:
self.name = name
def __bytes__(self) -> bytes:
@@ -2410,7 +2442,7 @@ class HCI_Packet:
# -----------------------------------------------------------------------------
class HCI_CustomPacket(HCI_Packet):
def __init__(self, payload):
def __init__(self, payload: bytes) -> None:
super().__init__('HCI_CUSTOM_PACKET')
self.hci_packet_type = payload[0]
self.payload = payload
@@ -2569,6 +2601,21 @@ class HCI_GenericReturnParameters(HCI_ReturnParameters):
class HCI_StatusReturnParameters(HCI_ReturnParameters):
status: HCI_ErrorCode = field(metadata=HCI_ErrorCode.type_metadata(1))
@classmethod
def from_parameters(cls, parameters: bytes) -> Self | HCI_StatusReturnParameters:
status = HCI_ErrorCode(parameters[0])
if status != HCI_ErrorCode.SUCCESS:
# Don't parse further, just return the status.
return HCI_StatusReturnParameters(status=status)
return cls(**HCI_Object.dict_from_bytes(parameters, 0, cls.fields))
@dataclasses.dataclass
class HCI_GenericStatusReturnParameters(HCI_StatusReturnParameters):
data: bytes = field(metadata=metadata('*'))
@dataclasses.dataclass
class HCI_StatusAndAddressReturnParameters(HCI_StatusReturnParameters):
@@ -3746,7 +3793,7 @@ class HCI_Write_Extended_Inquiry_Response_Command(
'''
fec_required: int = field(metadata=metadata(1))
extended_inquiry_response: int = field(
extended_inquiry_response: bytes = field(
metadata=metadata({'size': 240, 'serializer': lambda x: padded_bytes(x, 240)})
)
@@ -5796,7 +5843,25 @@ class HCI_LE_Subrate_Request_Command(HCI_AsyncCommand):
# -----------------------------------------------------------------------------
@dataclasses.dataclass
class HHCI_LE_CS_Read_Local_Supported_Capabilities_ReturnParameters(
class HCI_LE_Read_All_Local_Supported_Features_ReturnParameters(
HCI_StatusReturnParameters
):
max_page: int = field(metadata=metadata(1))
le_features: bytes = field(metadata=metadata(248))
@HCI_SyncCommand.sync_command(HCI_LE_Read_All_Local_Supported_Features_ReturnParameters)
class HCI_LE_Read_All_Local_Supported_Features_Command(
HCI_SyncCommand[HCI_LE_Read_All_Local_Supported_Features_ReturnParameters]
):
'''
See Bluetooth spec @ 7.8.128 LE Read All Local Supported Features Command
'''
# -----------------------------------------------------------------------------
@dataclasses.dataclass
class HCI_LE_CS_Read_Local_Supported_Capabilities_ReturnParameters(
HCI_StatusReturnParameters
):
num_config_supported: int = field(metadata=metadata(1))
@@ -5808,7 +5873,7 @@ class HHCI_LE_CS_Read_Local_Supported_Capabilities_ReturnParameters(
rtt_capability: int = field(metadata=metadata(1))
rtt_aa_only_n: int = field(metadata=metadata(1))
rtt_sounding_n: int = field(metadata=metadata(1))
rtt_random_payload_n: int = field(metadata=metadata(1))
rtt_random_sequence_n: int = field(metadata=metadata(1))
nadm_sounding_capability: int = field(metadata=metadata(2))
nadm_random_capability: int = field(metadata=metadata(2))
cs_sync_phys_supported: int = field(metadata=metadata(CS_SYNC_PHY_SUPPORTED_SPEC))
@@ -5822,11 +5887,11 @@ class HHCI_LE_CS_Read_Local_Supported_Capabilities_ReturnParameters(
@HCI_SyncCommand.sync_command(
HHCI_LE_CS_Read_Local_Supported_Capabilities_ReturnParameters
HCI_LE_CS_Read_Local_Supported_Capabilities_ReturnParameters
)
@dataclasses.dataclass
class HCI_LE_CS_Read_Local_Supported_Capabilities_Command(
HCI_SyncCommand[HHCI_LE_CS_Read_Local_Supported_Capabilities_ReturnParameters]
HCI_SyncCommand[HCI_LE_CS_Read_Local_Supported_Capabilities_ReturnParameters]
):
'''
See Bluetooth spec @ 7.8.130 LE CS Read Local Supported Capabilities command
@@ -5864,7 +5929,7 @@ class HCI_LE_CS_Write_Cached_Remote_Supported_Capabilities_Command(
rtt_capability: int = field(metadata=metadata(1))
rtt_aa_only_n: int = field(metadata=metadata(1))
rtt_sounding_n: int = field(metadata=metadata(1))
rtt_random_payload_n: int = field(metadata=metadata(1))
rtt_random_sequence_n: int = field(metadata=metadata(1))
nadm_sounding_capability: int = field(metadata=metadata(2))
nadm_random_capability: int = field(metadata=metadata(2))
cs_sync_phys_supported: int = field(metadata=metadata(CS_SYNC_PHY_SUPPORTED_SPEC))
@@ -6073,6 +6138,92 @@ class HCI_LE_CS_Test_End_Command(HCI_AsyncCommand):
'''
# -----------------------------------------------------------------------------
@HCI_Command.command
@dataclasses.dataclass
class HCI_LE_Frame_Space_Update_Command(HCI_AsyncCommand):
'''
See Bluetooth spec @ 7.8.151 LE Frame Space Update command
'''
class SpacingType(SpecableFlag):
T_IFS_ACL_CP = 1 << 0
T_IFS_ACL_PC = 1 << 1
T_MCES = 1 << 2
T_IFS_CIS = 1 << 3
T_MSS_CIS = 1 << 4
connection_handle: int = field(metadata=metadata(2))
frame_space_min: int = field(metadata=metadata(2))
frame_space_max: int = field(metadata=metadata(2))
phys: int = field(metadata=PhyBit.type_metadata(1))
spacing_types: int = field(metadata=SpacingType.type_metadata(2))
# -----------------------------------------------------------------------------
@HCI_Command.command
@dataclasses.dataclass
class HCI_LE_Connection_Rate_Request_Command(HCI_AsyncCommand):
'''
See Bluetooth spec @ 7.8.154 LE Connection Rate Request command
'''
connection_handle: int = field(metadata=metadata(2))
connection_interval_min: int = field(metadata=metadata(2))
connection_interval_max: int = field(metadata=metadata(2))
subrate_min: int = field(metadata=metadata(2))
subrate_max: int = field(metadata=metadata(2))
max_latency: int = field(metadata=metadata(2))
continuation_number: int = field(metadata=metadata(2))
supervision_timeout: int = field(metadata=metadata(2))
min_ce_length: int = field(metadata=metadata(2))
max_ce_length: int = field(metadata=metadata(2))
# -----------------------------------------------------------------------------
@HCI_SyncCommand.sync_command(HCI_StatusReturnParameters)
@dataclasses.dataclass
class HCI_LE_Set_Default_Rate_Parameters_Command(
HCI_SyncCommand[HCI_StatusReturnParameters]
):
'''
See Bluetooth spec @ 7.8.155 LE Set Default Rate Parameters command
'''
connection_interval_min: int = field(metadata=metadata(2))
connection_interval_max: int = field(metadata=metadata(2))
subrate_min: int = field(metadata=metadata(2))
subrate_max: int = field(metadata=metadata(2))
max_latency: int = field(metadata=metadata(2))
continuation_number: int = field(metadata=metadata(2))
supervision_timeout: int = field(metadata=metadata(2))
min_ce_length: int = field(metadata=metadata(2))
max_ce_length: int = field(metadata=metadata(2))
# -----------------------------------------------------------------------------
@dataclasses.dataclass
class HCI_LE_Read_Minimum_Supported_Connection_Interval_ReturnParameters(
HCI_StatusReturnParameters
):
minimum_supported_connection_interval: int = field(metadata=metadata(1))
group_min: Sequence[int] = field(metadata=metadata(2, list_begin=True))
group_max: Sequence[int] = field(metadata=metadata(2))
group_stride: Sequence[int] = field(metadata=metadata(2, list_end=True))
@HCI_SyncCommand.sync_command(
HCI_LE_Read_Minimum_Supported_Connection_Interval_ReturnParameters
)
@dataclasses.dataclass
class HCI_LE_Read_Minimum_Supported_Connection_Interval_Command(
HCI_SyncCommand[HCI_LE_Read_Minimum_Supported_Connection_Interval_ReturnParameters]
):
'''
See Bluetooth spec @ 7.8.156 LE Read Minimum Supported Connection Interval command
'''
# -----------------------------------------------------------------------------
# HCI Events
# -----------------------------------------------------------------------------
@@ -6986,7 +7137,7 @@ class HCI_LE_CS_Read_Remote_Supported_Capabilities_Complete_Event(HCI_LE_Meta_Ev
rtt_capability: int = field(metadata=metadata(1))
rtt_aa_only_n: int = field(metadata=metadata(1))
rtt_sounding_n: int = field(metadata=metadata(1))
rtt_random_payload_n: int = field(metadata=metadata(1))
rtt_random_sequence_n: int = field(metadata=metadata(1))
nadm_sounding_capability: int = field(metadata=metadata(2))
nadm_random_capability: int = field(metadata=metadata(2))
cs_sync_phys_supported: int = field(metadata=metadata(CS_SYNC_PHY_SUPPORTED_SPEC))
@@ -7142,6 +7293,23 @@ class HCI_LE_CS_Test_End_Complete_Event(HCI_LE_Meta_Event):
status: int = field(metadata=metadata(STATUS_SPEC))
# -----------------------------------------------------------------------------
@HCI_LE_Meta_Event.event
@dataclasses.dataclass
class HCI_LE_Connection_Rate_Change_Event(HCI_LE_Meta_Event):
'''
See Bluetooth spec @ 7.7.65.50 LE Connection Rate Change event
'''
status: int = field(metadata=metadata(STATUS_SPEC))
connection_handle: int = field(metadata=metadata(2))
connection_interval: int = field(metadata=metadata(2))
subrate_factor: int = field(metadata=metadata(2))
peripheral_latency: int = field(metadata=metadata(2))
continuation_number: int = field(metadata=metadata(2))
supervision_timeout: int = field(metadata=metadata(2))
# -----------------------------------------------------------------------------
@HCI_Event.event
@dataclasses.dataclass
@@ -7345,6 +7513,7 @@ class HCI_Command_Complete_Event(HCI_Event, Generic[_RP]):
def from_parameters(cls, parameters: bytes) -> Self:
event = cls(**HCI_Object.dict_from_bytes(parameters, 0, cls.fields))
event.parameters = parameters
return_parameters_bytes = parameters[3:]
# Find the class for the matching command.
subclass = HCI_Command.command_classes.get(event.command_opcode)
@@ -7357,16 +7526,16 @@ class HCI_Command_Complete_Event(HCI_Event, Generic[_RP]):
'HCI Command Complete event with opcode for a class that is not'
' an HCI_SyncCommand subclass: '
f'opcode={event.command_opcode:#04x}, '
f'type={type(subclass).__name__}'
f'type={subclass.__name__}'
)
event.return_parameters = HCI_GenericReturnParameters(
data=event.return_parameters # type: ignore[arg-type]
data=return_parameters_bytes
) # type: ignore[assignment]
return event
# Parse the return parameters bytes into an object.
event.return_parameters = subclass.parse_return_parameters(
event.return_parameters # type: ignore[arg-type]
return_parameters_bytes
) # type: ignore[assignment]
return event
@@ -7805,6 +7974,7 @@ class HCI_Vendor_Event(HCI_Event):
# -----------------------------------------------------------------------------
@dataclasses.dataclass
class HCI_AclDataPacket(HCI_Packet):
'''
See Bluetooth spec @ 5.4.2 HCI ACL Data Packets
@@ -7812,8 +7982,14 @@ class HCI_AclDataPacket(HCI_Packet):
hci_packet_type = HCI_ACL_DATA_PACKET
@staticmethod
def from_bytes(packet: bytes) -> HCI_AclDataPacket:
connection_handle: int
pb_flag: int
bc_flag: int
data_total_length: int
data: bytes
@classmethod
def from_bytes(cls, packet: bytes) -> HCI_AclDataPacket:
# Read the header
h, data_total_length = struct.unpack_from('<HH', packet, 1)
connection_handle = h & 0xFFF
@@ -7822,25 +7998,22 @@ class HCI_AclDataPacket(HCI_Packet):
data = packet[5:]
if len(data) != data_total_length:
raise InvalidPacketError('invalid packet length')
return HCI_AclDataPacket(
connection_handle, pb_flag, bc_flag, data_total_length, data
return cls(
connection_handle=connection_handle,
pb_flag=pb_flag,
bc_flag=bc_flag,
data_total_length=data_total_length,
data=data,
)
def __bytes__(self):
def __bytes__(self) -> bytes:
h = (self.pb_flag << 12) | (self.bc_flag << 14) | self.connection_handle
return (
struct.pack('<BHH', HCI_ACL_DATA_PACKET, h, self.data_total_length)
+ self.data
)
def __init__(self, connection_handle, pb_flag, bc_flag, data_total_length, data):
self.connection_handle = connection_handle
self.pb_flag = pb_flag
self.bc_flag = bc_flag
self.data_total_length = data_total_length
self.data = data
def __str__(self):
def __str__(self) -> str:
return (
f'{color("ACL", "blue")}: '
f'handle=0x{self.connection_handle:04x}, '
@@ -7851,6 +8024,7 @@ class HCI_AclDataPacket(HCI_Packet):
# -----------------------------------------------------------------------------
@dataclasses.dataclass
class HCI_SynchronousDataPacket(HCI_Packet):
'''
See Bluetooth spec @ 5.4.3 HCI SCO Data Packets
@@ -7858,8 +8032,13 @@ class HCI_SynchronousDataPacket(HCI_Packet):
hci_packet_type = HCI_SYNCHRONOUS_DATA_PACKET
@staticmethod
def from_bytes(packet: bytes) -> HCI_SynchronousDataPacket:
connection_handle: int
packet_status: int
data_total_length: int
data: bytes
@classmethod
def from_bytes(cls, packet: bytes) -> HCI_SynchronousDataPacket:
# Read the header
h, data_total_length = struct.unpack_from('<HB', packet, 1)
connection_handle = h & 0xFFF
@@ -7869,8 +8048,11 @@ class HCI_SynchronousDataPacket(HCI_Packet):
raise InvalidPacketError(
f'invalid packet length {len(data)} != {data_total_length}'
)
return HCI_SynchronousDataPacket(
connection_handle, packet_status, data_total_length, data
return cls(
connection_handle=connection_handle,
packet_status=packet_status,
data_total_length=data_total_length,
data=data,
)
def __bytes__(self) -> bytes:
@@ -7880,18 +8062,6 @@ class HCI_SynchronousDataPacket(HCI_Packet):
+ self.data
)
def __init__(
self,
connection_handle: int,
packet_status: int,
data_total_length: int,
data: bytes,
) -> None:
self.connection_handle = connection_handle
self.packet_status = packet_status
self.data_total_length = data_total_length
self.data = data
def __str__(self) -> str:
return (
f'{color("SCO", "blue")}: '
@@ -7909,7 +8079,7 @@ class HCI_IsoDataPacket(HCI_Packet):
See Bluetooth spec @ 5.4.5 HCI ISO Data Packets
'''
hci_packet_type: ClassVar[int] = HCI_ISO_DATA_PACKET
hci_packet_type = HCI_ISO_DATA_PACKET
connection_handle: int
data_total_length: int

View File

@@ -312,11 +312,11 @@ class HID(ABC, utils.EventEmitter):
def send_pdu_on_ctrl(self, msg: bytes) -> None:
assert self.l2cap_ctrl_channel
self.l2cap_ctrl_channel.send_pdu(msg)
self.l2cap_ctrl_channel.write(msg)
def send_pdu_on_intr(self, msg: bytes) -> None:
assert self.l2cap_intr_channel
self.l2cap_intr_channel.send_pdu(msg)
self.l2cap_intr_channel.write(msg)
def send_data(self, data: bytes) -> None:
if self.role == HID.Role.HOST:

View File

@@ -21,7 +21,6 @@ import asyncio
import collections
import dataclasses
import logging
import struct
from collections.abc import Awaitable, Callable
from typing import TYPE_CHECKING, Any, TypeVar, cast, overload
@@ -271,14 +270,19 @@ class Host(utils.EventEmitter):
self.sco_links = {} # SCO links, by connection handle
self.bigs = {} # BIG Handle to BIS Handles
self.pending_command: hci.HCI_SyncCommand | hci.HCI_AsyncCommand | None = None
self.pending_response: asyncio.Future[Any] | None = None
self.pending_response: (
asyncio.Future[
hci.HCI_Command_Complete_Event | hci.HCI_Command_Status_Event
]
| None
) = None
self.number_of_supported_advertising_sets = 0
self.maximum_advertising_data_length = 31
self.local_version: (
hci.HCI_Read_Local_Version_Information_ReturnParameters | None
) = None
self.local_supported_commands = 0
self.local_le_features = 0
self.local_le_features = hci.LeFeatureMask(0) # LE features
self.local_lmp_features = hci.LmpFeatureMask(0) # Classic LMP features
self.suggested_max_tx_octets = 251 # Max allowed
self.suggested_max_tx_time = 2120 # Max allowed
@@ -348,17 +352,26 @@ class Host(utils.EventEmitter):
response1.supported_commands, 'little'
)
if self.supports_command(hci.HCI_LE_READ_LOCAL_SUPPORTED_FEATURES_COMMAND):
response2 = await self.send_sync_command(
hci.HCI_LE_Read_Local_Supported_Features_Command()
)
self.local_le_features = struct.unpack('<Q', response2.le_features)[0]
if self.supports_command(hci.HCI_READ_LOCAL_VERSION_INFORMATION_COMMAND):
self.local_version = await self.send_sync_command(
hci.HCI_Read_Local_Version_Information_Command()
)
if self.supports_command(hci.HCI_LE_READ_ALL_LOCAL_SUPPORTED_FEATURES_COMMAND):
response2 = await self.send_sync_command(
hci.HCI_LE_Read_All_Local_Supported_Features_Command()
)
self.local_le_features = hci.LeFeatureMask(
int.from_bytes(response2.le_features, 'little')
)
elif self.supports_command(hci.HCI_LE_READ_LOCAL_SUPPORTED_FEATURES_COMMAND):
response3 = await self.send_sync_command(
hci.HCI_LE_Read_Local_Supported_Features_Command()
)
self.local_le_features = hci.LeFeatureMask(
int.from_bytes(response3.le_features, 'little')
)
if self.supports_command(hci.HCI_READ_LOCAL_EXTENDED_FEATURES_COMMAND):
max_page_number = 0
page_number = 0
@@ -375,7 +388,6 @@ class Host(utils.EventEmitter):
max_page_number = response4.maximum_page_number
page_number += 1
self.local_lmp_features = hci.LmpFeatureMask(lmp_features)
elif self.supports_command(hci.HCI_READ_LOCAL_SUPPORTED_FEATURES_COMMAND):
response5 = await self.send_sync_command(
hci.HCI_Read_Local_Supported_Features_Command()
@@ -494,12 +506,17 @@ class Host(utils.EventEmitter):
hci.HCI_LE_TRANSMIT_POWER_REPORTING_EVENT,
hci.HCI_LE_BIGINFO_ADVERTISING_REPORT_EVENT,
hci.HCI_LE_SUBRATE_CHANGE_EVENT,
hci.HCI_LE_READ_ALL_REMOTE_FEATURES_COMPLETE_EVENT,
hci.HCI_LE_CS_READ_REMOTE_SUPPORTED_CAPABILITIES_COMPLETE_EVENT,
hci.HCI_LE_CS_PROCEDURE_ENABLE_COMPLETE_EVENT,
hci.HCI_LE_CS_SECURITY_ENABLE_COMPLETE_EVENT,
hci.HCI_LE_CS_CONFIG_COMPLETE_EVENT,
hci.HCI_LE_CS_SUBEVENT_RESULT_EVENT,
hci.HCI_LE_CS_SUBEVENT_RESULT_CONTINUE_EVENT,
hci.HCI_LE_MONITORED_ADVERTISERS_REPORT_EVENT,
hci.HCI_LE_FRAME_SPACE_UPDATE_COMPLETE_EVENT,
hci.HCI_LE_UTP_RECEIVE_EVENT,
hci.HCI_LE_CONNECTION_RATE_CHANGE_EVENT,
]
)
@@ -599,22 +616,28 @@ class Host(utils.EventEmitter):
if self.supports_command(
hci.HCI_LE_READ_NUMBER_OF_SUPPORTED_ADVERTISING_SETS_COMMAND
):
response10 = await self.send_sync_command(
hci.HCI_LE_Read_Number_Of_Supported_Advertising_Sets_Command()
)
self.number_of_supported_advertising_sets = (
response10.num_supported_advertising_sets
)
try:
response10 = await self.send_sync_command(
hci.HCI_LE_Read_Number_Of_Supported_Advertising_Sets_Command()
)
self.number_of_supported_advertising_sets = (
response10.num_supported_advertising_sets
)
except hci.HCI_Error:
logger.warning('Failed to read number of supported advertising sets')
if self.supports_command(
hci.HCI_LE_READ_MAXIMUM_ADVERTISING_DATA_LENGTH_COMMAND
):
response11 = await self.send_sync_command(
hci.HCI_LE_Read_Maximum_Advertising_Data_Length_Command()
)
self.maximum_advertising_data_length = (
response11.max_advertising_data_length
)
try:
response11 = await self.send_sync_command(
hci.HCI_LE_Read_Maximum_Advertising_Data_Length_Command()
)
self.maximum_advertising_data_length = (
response11.max_advertising_data_length
)
except hci.HCI_Error:
logger.warning('Failed to read maximum advertising data length')
@property
def controller(self) -> TransportSink | None:
@@ -646,25 +669,35 @@ class Host(utils.EventEmitter):
response_timeout: float | None = None,
) -> hci.HCI_Command_Complete_Event | hci.HCI_Command_Status_Event:
# Wait until we can send (only one pending command at a time)
async with self.command_semaphore:
assert self.pending_command is None
assert self.pending_response is None
await self.command_semaphore.acquire()
# Create a future value to hold the eventual response
self.pending_response = asyncio.get_running_loop().create_future()
self.pending_command = command
# Create a future value to hold the eventual response
assert self.pending_command is None
assert self.pending_response is None
self.pending_response = asyncio.get_running_loop().create_future()
self.pending_command = command
try:
self.send_hci_packet(command)
return await asyncio.wait_for(
self.pending_response, timeout=response_timeout
)
except Exception:
logger.exception(color("!!! Exception while sending command:", "red"))
raise
finally:
self.pending_command = None
self.pending_response = None
response: (
hci.HCI_Command_Complete_Event | hci.HCI_Command_Status_Event | None
) = None
try:
self.send_hci_packet(command)
response = await asyncio.wait_for(
self.pending_response, timeout=response_timeout
)
return response
except Exception:
logger.exception(color("!!! Exception while sending command:", "red"))
raise
finally:
self.pending_command = None
self.pending_response = None
if (
response is not None
and response.num_hci_command_packets
and self.command_semaphore.locked()
):
self.command_semaphore.release()
@overload
async def send_command(
@@ -717,30 +750,56 @@ class Host(utils.EventEmitter):
return response
async def send_sync_command(
self, command: hci.HCI_SyncCommand[_RP], response_timeout: float | None = None
) -> _RP:
response = await self.send_sync_command_raw(command, response_timeout)
return_parameters = response.return_parameters
# Check the return parameters's status
if isinstance(return_parameters, hci.HCI_StatusReturnParameters):
status = return_parameters.status
elif isinstance(return_parameters, hci.HCI_GenericReturnParameters):
# if the payload has at least one byte, assume the first byte is the status
if not return_parameters.data:
raise RuntimeError('no status byte in return parameters')
status = hci.HCI_ErrorCode(return_parameters.data[0])
else:
raise RuntimeError(
f'unexpected return parameters type ({type(return_parameters)})'
)
if status != hci.HCI_ErrorCode.SUCCESS:
logger.warning(
f'{command.name} failed ' f'({hci.HCI_Constant.error_name(status)})'
)
raise hci.HCI_Error(status)
return return_parameters
async def send_sync_command_raw(
self,
command: hci.HCI_SyncCommand[_RP],
check_status: bool = True,
response_timeout: float | None = None,
) -> _RP:
) -> hci.HCI_Command_Complete_Event[_RP]:
response = await self._send_command(command, response_timeout)
# For unknown HCI commands, some controllers return Command Status instead of
# Command Complete.
if (
isinstance(response, hci.HCI_Command_Status_Event)
and response.status == hci.HCI_ErrorCode.UNKNOWN_HCI_COMMAND_ERROR
):
return hci.HCI_Command_Complete_Event(
num_hci_command_packets=response.num_hci_command_packets,
command_opcode=command.op_code,
return_parameters=hci.HCI_StatusReturnParameters(
status=hci.HCI_ErrorCode(response.status)
), # type: ignore
)
# Check that the response is of the expected type
assert isinstance(response, hci.HCI_Command_Complete_Event)
return_parameters: _RP = response.return_parameters
assert isinstance(return_parameters, command.return_parameters_class)
# Check the return parameters if required
if check_status:
if isinstance(return_parameters, hci.HCI_StatusReturnParameters):
status = return_parameters.status
if status != hci.HCI_SUCCESS:
logger.warning(
f'{command.name} failed '
f'({hci.HCI_Constant.error_name(status)})'
)
raise hci.HCI_Error(status)
return return_parameters
return response
async def send_async_command(
self,
@@ -750,19 +809,25 @@ class Host(utils.EventEmitter):
) -> hci.HCI_ErrorCode:
response = await self._send_command(command, response_timeout)
# Check that the response is of the expected type
assert isinstance(response, hci.HCI_Command_Status_Event)
# For unknown HCI commands, some controllers return Command Complete instead of
# Command Status.
if isinstance(response, hci.HCI_Command_Complete_Event):
# Assume the first byte of the return parameters is the status
if (
status := hci.HCI_ErrorCode(response.parameters[3])
) != hci.HCI_ErrorCode.UNKNOWN_HCI_COMMAND_ERROR:
logger.warning(f'unexpected return paramerers status {status}')
else:
assert isinstance(response, hci.HCI_Command_Status_Event)
status = hci.HCI_ErrorCode(response.status)
# Check the return parameters if required
status = response.status
# Check the status if required
if check_status:
if status != hci.HCI_CommandStatus.PENDING:
logger.warning(
f'{command.name} failed ' f'({hci.HCI_Constant.error_name(status)})'
)
logger.warning(f'{command.name} failed ' f'({status.name})')
raise hci.HCI_Error(status)
return hci.HCI_ErrorCode(status)
return status
@utils.deprecated("Use utils.AsyncRunner.spawn() instead.")
def send_command_sync(self, command: hci.HCI_AsyncCommand) -> None:
@@ -791,10 +856,22 @@ class Host(utils.EventEmitter):
data=pdu,
)
logger.debug(
'>>> ACL packet enqueue: (Handle=0x%04X) %s', connection_handle, pdu
'>>> ACL packet enqueue: (handle=0x%04X) %s',
connection_handle,
pdu.hex(),
)
packet_queue.enqueue(acl_packet, connection_handle)
def send_sco_sdu(self, connection_handle: int, sdu: bytes) -> None:
self.send_hci_packet(
hci.HCI_SynchronousDataPacket(
connection_handle=connection_handle,
packet_status=0,
data_total_length=len(sdu),
data=sdu,
)
)
def send_l2cap_pdu(self, connection_handle: int, cid: int, pdu: bytes) -> None:
self.send_acl_sdu(connection_handle, bytes(L2CAP_PDU(cid, pdu)))
@@ -879,16 +956,18 @@ class Host(utils.EventEmitter):
if self.local_supported_commands & mask
)
def supports_le_features(self, feature: hci.LeFeatureMask) -> bool:
return (self.local_le_features & feature) == feature
def supports_le_features(self, features: hci.LeFeatureMask) -> bool:
return (self.local_le_features & features) == features
def supports_lmp_features(self, feature: hci.LmpFeatureMask) -> bool:
return self.local_lmp_features & (feature) == feature
def supports_lmp_features(self, features: hci.LmpFeatureMask) -> bool:
return self.local_lmp_features & (features) == features
@property
def supported_le_features(self):
def supported_le_features(self) -> list[hci.LeFeature]:
return [
feature for feature in range(64) if self.local_le_features & (1 << feature)
feature
for feature in hci.LeFeature
if self.local_le_features & (1 << feature)
]
# Packet Sink protocol (packets coming from the controller via HCI)
@@ -977,6 +1056,8 @@ class Host(utils.EventEmitter):
self.pending_response.set_result(event)
else:
logger.warning('!!! no pending response future to set')
if event.num_hci_command_packets and self.command_semaphore.locked():
self.command_semaphore.release()
############################################################
# HCI handlers
@@ -988,7 +1069,13 @@ class Host(utils.EventEmitter):
if event.command_opcode == 0:
# This is used just for the Num_HCI_Command_Packets field, not related to
# an actual command
logger.debug('no-command event')
logger.debug('no-command event for flow control')
# Release the command semaphore if needed
if event.num_hci_command_packets and self.command_semaphore.locked():
logger.debug('command complete event releasing semaphore')
self.command_semaphore.release()
return
return self.on_command_processed(event)
@@ -1169,7 +1256,7 @@ class Host(utils.EventEmitter):
self, event: hci.HCI_LE_Connection_Update_Complete_Event
):
if (connection := self.connections.get(event.connection_handle)) is None:
logger.warning('!!! CONNECTION PARAMETERS UPDATE COMPLETE: unknown handle')
logger.warning('!!! CONNECTION UPDATE COMPLETE: unknown handle')
return
# Notify the client
@@ -1186,6 +1273,29 @@ class Host(utils.EventEmitter):
'connection_parameters_update_failure', connection.handle, event.status
)
def on_hci_le_connection_rate_change_event(
self, event: hci.HCI_LE_Connection_Rate_Change_Event
):
if (connection := self.connections.get(event.connection_handle)) is None:
logger.warning('!!! CONNECTION RATE CHANGE: unknown handle')
return
# Notify the client
if event.status == hci.HCI_SUCCESS:
self.emit(
'le_connection_rate_change',
connection.handle,
event.connection_interval,
event.subrate_factor,
event.peripheral_latency,
event.continuation_number,
event.supervision_timeout,
)
else:
self.emit(
'le_connection_rate_change_failure', connection.handle, event.status
)
def on_hci_le_phy_update_complete_event(
self, event: hci.HCI_LE_PHY_Update_Complete_Event
):
@@ -1745,12 +1855,13 @@ class Host(utils.EventEmitter):
self.emit(
'le_remote_features_failure', event.connection_handle, event.status
)
else:
self.emit(
'le_remote_features',
event.connection_handle,
int.from_bytes(event.le_features, 'little'),
)
return
self.emit(
'le_remote_features',
event.connection_handle,
hci.LeFeatureMask(int.from_bytes(event.le_features, 'little')),
)
def on_hci_le_cs_read_remote_supported_capabilities_complete_event(
self, event: hci.HCI_LE_CS_Read_Remote_Supported_Capabilities_Complete_Event
@@ -1783,6 +1894,12 @@ class Host(utils.EventEmitter):
self.emit('cs_subevent_result_continue', event)
def on_hci_le_subrate_change_event(self, event: hci.HCI_LE_Subrate_Change_Event):
if event.status != hci.HCI_SUCCESS:
self.emit(
'le_subrate_change_failure', event.connection_handle, event.status
)
return
self.emit(
'le_subrate_change',
event.connection_handle,

View File

@@ -1647,7 +1647,9 @@ class LeCreditBasedChannel(utils.EventEmitter):
self.connection_result = None
self.disconnection_result = None
self.drained = asyncio.Event()
self.att_mtu = 0 # Filled by GATT client or server later.
# Core Specification Vol 3, Part G, 5.3.1 ATT_MTU
# ATT_MTU shall be set to the minimum of the MTU field values of the two devices.
self.att_mtu = min(mtu, peer_mtu)
self.drained.set()
@@ -2340,8 +2342,8 @@ class ChannelManager:
cid,
L2CAP_Connection_Response(
identifier=request.identifier,
destination_cid=request.source_cid,
source_cid=0,
destination_cid=0,
source_cid=request.source_cid,
result=L2CAP_Connection_Response.Result.CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE,
status=0x0000,
),
@@ -2353,7 +2355,12 @@ class ChannelManager:
f'creating server channel with cid={source_cid} for psm {request.psm}'
)
channel = ClassicChannel(
self, connection, cid, request.psm, source_cid, server.spec
manager=self,
connection=connection,
signaling_cid=cid,
psm=request.psm,
source_cid=source_cid,
spec=server.spec,
)
connection_channels[source_cid] = channel
@@ -2370,8 +2377,8 @@ class ChannelManager:
cid,
L2CAP_Connection_Response(
identifier=request.identifier,
destination_cid=request.source_cid,
source_cid=0,
destination_cid=0,
source_cid=request.source_cid,
result=L2CAP_Connection_Response.Result.CONNECTION_REFUSED_PSM_NOT_SUPPORTED,
status=0x0000,
),

View File

@@ -278,7 +278,7 @@ class L2CAPService(L2CAPServicer):
if not l2cap_channel:
return SendResponse(error=COMMAND_NOT_UNDERSTOOD)
if isinstance(l2cap_channel, ClassicChannel):
l2cap_channel.send_pdu(request.data)
l2cap_channel.write(request.data)
else:
l2cap_channel.write(request.data)
return SendResponse(success=empty_pb2.Empty())

View File

@@ -800,7 +800,7 @@ class Multiplexer(utils.EventEmitter):
def send_frame(self, frame: RFCOMM_Frame) -> None:
logger.debug(f'>>> Multiplexer sending {frame}')
self.l2cap_channel.send_pdu(frame)
self.l2cap_channel.write(bytes(frame))
def on_pdu(self, pdu: bytes) -> None:
frame = RFCOMM_Frame.from_bytes(pdu)

View File

@@ -847,7 +847,7 @@ class Client:
self.pending_request = request
try:
self.channel.send_pdu(bytes(request))
self.channel.write(bytes(request))
return await self.pending_response
finally:
self.pending_request = None
@@ -1061,7 +1061,7 @@ class Server:
def send_response(self, response):
logger.debug(f'{color(">>> Sending SDP Response", "blue")}: {response}')
self.channel.send_pdu(response)
self.channel.write(response)
def match_services(self, search_pattern: DataElement) -> dict[int, Service]:
# Find the services for which the attributes in the pattern is a subset of the

View File

@@ -27,7 +27,7 @@ from __future__ import annotations
import asyncio
import enum
import logging
from collections.abc import Awaitable, Callable
from collections.abc import Awaitable, Callable, Sequence
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, ClassVar, TypeVar, cast
@@ -507,10 +507,15 @@ def smp_auth_req(bonding: bool, mitm: bool, sc: bool, keypress: bool, ct2: bool)
# -----------------------------------------------------------------------------
class AddressResolver:
def __init__(self, resolving_keys):
def __init__(self, resolving_keys: Sequence[tuple[bytes, Address]]) -> None:
self.resolving_keys = resolving_keys
def resolve(self, address):
def can_resolve_to(self, address: Address) -> bool:
return any(
resolved_address == address for _, resolved_address in self.resolving_keys
)
def resolve(self, address: Address) -> Address | None:
address_bytes = bytes(address)
hash_part = address_bytes[0:3]
prand = address_bytes[3:6]

View File

@@ -110,6 +110,53 @@ class BtSnooper(Snooper):
)
# -----------------------------------------------------------------------------
class PcapSnooper(Snooper):
"""
Snooper that saves or streames HCI packets using the PCAP format.
"""
PCAP_MAGIC = 0xA1B2C3D4
DLT_BLUETOOTH_HCI_H4_WITH_PHDR = 201
def __init__(self, output: BinaryIO):
self.output = output
# Write the header
self.output.write(
struct.pack(
"<IHHIIII",
self.PCAP_MAGIC,
2, # Major PCAP Version
4, # Minor PCAP Version
0, # Reserved 1
0, # Reserved 2
65535, # SnapLen
# FCS and f are set to 0 implicitly by the next line
self.DLT_BLUETOOTH_HCI_H4_WITH_PHDR, # The DLT in this PCAP
)
)
def snoop(self, hci_packet: bytes, direction: Snooper.Direction):
now = datetime.datetime.now(datetime.timezone.utc)
sec = int(now.timestamp())
usec = now.microsecond
# Emit the record
self.output.write(
struct.pack(
"<IIII",
sec, # Timestamp (Seconds)
usec, # Timestamp (Microseconds)
len(hci_packet) + 4,
len(hci_packet) + 4, # +4 because of the addtional direction info...
)
+ struct.pack(">I", int(direction)) # ...thats being added here
+ hci_packet
)
self.output.flush() # flush after every packet for live logging
# -----------------------------------------------------------------------------
_SNOOPER_INSTANCE_COUNT = 0
@@ -140,9 +187,38 @@ def create_snooper(spec: str) -> Generator[Snooper, None, None]:
pid: the current process ID.
instance: the instance ID in the current process.
pcapsnoop
The syntax for the type-specific arguments for this type is:
<io-type>:<io-type-specific-arguments>
Supported I/O types are:
file
The type-specific arguments for this I/O type is a string that is converted
to a file path using the python `str.format()` string formatting. The log
records will be written to that file if it can be opened/created.
The keyword args that may be referenced by the string pattern are:
now: the value of `datetime.now()`
utcnow: the value of `datetime.now(tz=datetime.timezone.utc)`
pid: the current process ID.
instance: the instance ID in the current process.
pipe
The type-specific arguments for this I/O type is a string that is converted
to a path using the python `str.format()` string formatting. The log
records will be written to the named pipe referenced by this path
if it can be opened. The keyword args that may be referenced by the
string pattern are:
now: the value of `datetime.now()`
utcnow: the value of `datetime.now(tz=datetime.timezone.utc)`
pid: the current process ID.
instance: the instance ID in the current process.
Examples:
btsnoop:file:my_btsnoop.log
btsnoop:file:/tmp/bumble_{now:%Y-%m-%d-%H:%M:%S}_{pid}.log
pcapsnoop:pipe:/tmp/bumble-extcap
"""
if ':' not in spec:
@@ -150,6 +226,8 @@ def create_snooper(spec: str) -> Generator[Snooper, None, None]:
snooper_type, snooper_args = spec.split(':', maxsplit=1)
global _SNOOPER_INSTANCE_COUNT
if snooper_type == 'btsnoop':
if ':' not in snooper_args:
raise core.InvalidArgumentError('I/O type for btsnoop snooper type missing')
@@ -157,7 +235,6 @@ def create_snooper(spec: str) -> Generator[Snooper, None, None]:
io_type, io_name = snooper_args.split(':', maxsplit=1)
if io_type == 'file':
# Process the file name string pattern.
global _SNOOPER_INSTANCE_COUNT
file_path = io_name.format(
now=datetime.datetime.now(),
utcnow=datetime.datetime.now(tz=datetime.timezone.utc),
@@ -173,6 +250,39 @@ def create_snooper(spec: str) -> Generator[Snooper, None, None]:
_SNOOPER_INSTANCE_COUNT -= 1
return
elif snooper_type == 'pcapsnoop':
if ':' not in snooper_args:
raise core.InvalidArgumentError(
'I/O type for pcapsnoop snooper type missing'
)
io_type, io_name = snooper_args.split(':', maxsplit=1)
if io_type in {'pipe', 'file'}:
# Process the file name string pattern.
file_path = io_name.format(
now=datetime.datetime.now(),
utcnow=datetime.datetime.now(tz=datetime.timezone.utc),
pid=os.getpid(),
instance=_SNOOPER_INSTANCE_COUNT,
)
# Open a file or pipe
logger.debug(f'PCAP file: {file_path}')
# Pipes we have to open with unbuffered binary I/O
# so we pass ``buffering`` for pipes but not for files
pcap_file: BinaryIO
if io_type == 'pipe':
pcap_file = open(file_path, 'wb', buffering=0)
else:
pcap_file = open(file_path, 'wb')
with pcap_file:
_SNOOPER_INSTANCE_COUNT += 1
yield PcapSnooper(pcap_file)
_SNOOPER_INSTANCE_COUNT -= 1
return
raise core.InvalidArgumentError(f'I/O type {io_type} not supported')
raise core.InvalidArgumentError(f'snooper type {snooper_type} not found')

View File

@@ -194,7 +194,7 @@ async def open_android_netsim_controller_transport(
# We only accept BLUETOOTH
if request.initial_info.chip.kind != ChipKind.BLUETOOTH:
logger.warning('Unsupported chip type')
logger.debug('Request for unsupported chip type')
error = PacketResponse(error='Unsupported chip type')
await self.context.write(error)
# return

View File

@@ -42,8 +42,7 @@ response = await host.send_sync_command(
handle_type=HCI_Write_Tx_Power_Level_Command.TX_POWER_HANDLE_TYPE_ADV,
connection_handle=0,
tx_power_level=-4,
),
check_status=False
)
)
if response.status == HCI_SUCCESS:

View File

@@ -65,7 +65,7 @@ async def main() -> None:
# Go!
await device.power_on()
await device.start_advertising(auto_restart=True)
await hci_transport.source.wait_for_termination()
await hci_transport.source.terminated
# -----------------------------------------------------------------------------

View File

@@ -161,7 +161,7 @@ async def main() -> None:
await device.set_discoverable(True)
await device.set_connectable(True)
await hci_transport.source.wait_for_termination()
await hci_transport.source.terminated
# -----------------------------------------------------------------------------

View File

@@ -181,7 +181,7 @@ async def main() -> None:
await device.set_discoverable(True)
await device.set_connectable(True)
await hci_transport.source.wait_for_termination()
await hci_transport.source.terminated
# -----------------------------------------------------------------------------

View File

@@ -70,7 +70,7 @@ async def main() -> None:
await device.power_on()
await device.start_advertising(advertising_type=advertising_type, target=target)
await hci_transport.source.wait_for_termination()
await hci_transport.source.terminated
# -----------------------------------------------------------------------------

View File

@@ -25,7 +25,7 @@ import sys
import websockets.asyncio.server
import bumble.logging
from bumble import a2dp, avc, avdtp, avrcp, utils
from bumble import a2dp, avc, avdtp, avrcp, sdp, utils
from bumble.core import PhysicalTransport
from bumble.device import Device
from bumble.transport import open_transport
@@ -34,7 +34,7 @@ logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
def sdp_records():
def sdp_records() -> dict[int, list[sdp.ServiceAttribute]]:
a2dp_sink_service_record_handle = 0x00010001
avrcp_controller_service_record_handle = 0x00010002
avrcp_target_service_record_handle = 0x00010003
@@ -43,17 +43,17 @@ def sdp_records():
a2dp_sink_service_record_handle: a2dp.make_audio_sink_service_sdp_records(
a2dp_sink_service_record_handle
),
avrcp_controller_service_record_handle: avrcp.make_controller_service_sdp_records(
avrcp_controller_service_record_handle: avrcp.ControllerServiceSdpRecord(
avrcp_controller_service_record_handle
),
avrcp_target_service_record_handle: avrcp.make_target_service_sdp_records(
avrcp_controller_service_record_handle
),
).to_service_attributes(),
avrcp_target_service_record_handle: avrcp.TargetServiceSdpRecord(
avrcp_target_service_record_handle
).to_service_attributes(),
}
# -----------------------------------------------------------------------------
def codec_capabilities():
def codec_capabilities() -> avdtp.MediaCodecCapabilities:
return avdtp.MediaCodecCapabilities(
media_type=avdtp.AVDTP_AUDIO_MEDIA_TYPE,
media_codec_type=a2dp.A2DP_SBC_CODEC_TYPE,
@@ -81,20 +81,22 @@ def codec_capabilities():
# -----------------------------------------------------------------------------
def on_avdtp_connection(server):
def on_avdtp_connection(server: avdtp.Protocol) -> None:
# Add a sink endpoint to the server
sink = server.add_sink(codec_capabilities())
sink.on('rtp_packet', on_rtp_packet)
sink.on(sink.EVENT_RTP_PACKET, on_rtp_packet)
# -----------------------------------------------------------------------------
def on_rtp_packet(packet):
def on_rtp_packet(packet: avdtp.MediaPacket) -> None:
print(f'RTP: {packet}')
# -----------------------------------------------------------------------------
def on_avrcp_start(avrcp_protocol: avrcp.Protocol, websocket_server: WebSocketServer):
async def get_supported_events():
def on_avrcp_start(
avrcp_protocol: avrcp.Protocol, websocket_server: WebSocketServer
) -> None:
async def get_supported_events() -> None:
events = await avrcp_protocol.get_supported_events()
print("SUPPORTED EVENTS:", events)
websocket_server.send_message(
@@ -130,14 +132,14 @@ def on_avrcp_start(avrcp_protocol: avrcp.Protocol, websocket_server: WebSocketSe
utils.AsyncRunner.spawn(get_supported_events())
async def monitor_track_changed():
async def monitor_track_changed() -> None:
async for identifier in avrcp_protocol.monitor_track_changed():
print("TRACK CHANGED:", identifier.hex())
websocket_server.send_message(
{"type": "track-changed", "params": {"identifier": identifier.hex()}}
)
async def monitor_playback_status():
async def monitor_playback_status() -> None:
async for playback_status in avrcp_protocol.monitor_playback_status():
print("PLAYBACK STATUS CHANGED:", playback_status.name)
websocket_server.send_message(
@@ -147,7 +149,7 @@ def on_avrcp_start(avrcp_protocol: avrcp.Protocol, websocket_server: WebSocketSe
}
)
async def monitor_playback_position():
async def monitor_playback_position() -> None:
async for playback_position in avrcp_protocol.monitor_playback_position(
playback_interval=1
):
@@ -159,7 +161,7 @@ def on_avrcp_start(avrcp_protocol: avrcp.Protocol, websocket_server: WebSocketSe
}
)
async def monitor_player_application_settings():
async def monitor_player_application_settings() -> None:
async for settings in avrcp_protocol.monitor_player_application_settings():
print("PLAYER APPLICATION SETTINGS:", settings)
settings_as_dict = [
@@ -173,14 +175,14 @@ def on_avrcp_start(avrcp_protocol: avrcp.Protocol, websocket_server: WebSocketSe
}
)
async def monitor_available_players():
async def monitor_available_players() -> None:
async for _ in avrcp_protocol.monitor_available_players():
print("AVAILABLE PLAYERS CHANGED")
websocket_server.send_message(
{"type": "available-players-changed", "params": {}}
)
async def monitor_addressed_player():
async def monitor_addressed_player() -> None:
async for player in avrcp_protocol.monitor_addressed_player():
print("ADDRESSED PLAYER CHANGED")
websocket_server.send_message(
@@ -195,7 +197,7 @@ def on_avrcp_start(avrcp_protocol: avrcp.Protocol, websocket_server: WebSocketSe
}
)
async def monitor_uids():
async def monitor_uids() -> None:
async for uid_counter in avrcp_protocol.monitor_uids():
print("UIDS CHANGED")
websocket_server.send_message(
@@ -207,7 +209,7 @@ def on_avrcp_start(avrcp_protocol: avrcp.Protocol, websocket_server: WebSocketSe
}
)
async def monitor_volume():
async def monitor_volume() -> None:
async for volume in avrcp_protocol.monitor_volume():
print("VOLUME CHANGED:", volume)
websocket_server.send_message(
@@ -360,7 +362,7 @@ async def main() -> None:
# Create a listener to wait for AVDTP connections
listener = avdtp.Listener(avdtp.Listener.create_registrar(device))
listener.on('connection', on_avdtp_connection)
listener.on(listener.EVENT_CONNECTION, on_avdtp_connection)
avrcp_delegate = Delegate()
avrcp_protocol = avrcp.Protocol(avrcp_delegate)

View File

@@ -112,7 +112,7 @@ async def main() -> None:
await device.set_discoverable(True)
await device.set_connectable(True)
await hci_transport.source.wait_for_termination()
await hci_transport.source.terminated
# -----------------------------------------------------------------------------

View File

@@ -73,7 +73,7 @@ async def main() -> None:
await device.power_on()
await device.start_discovery()
await hci_transport.source.wait_for_termination()
await hci_transport.source.terminated
# -----------------------------------------------------------------------------

View File

@@ -57,7 +57,7 @@ async def main() -> None:
print(f'!!! Encryption failed: {error}')
return
await hci_transport.source.wait_for_termination()
await hci_transport.source.terminated
# -----------------------------------------------------------------------------

View File

@@ -0,0 +1,201 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import sys
from collections.abc import Callable
import bumble.logging
from bumble.core import BaseError
from bumble.device import Connection, Device
from bumble.hci import Address, LeFeatureMask
from bumble.transport import open_transport
# -----------------------------------------------------------------------------
DEFAULT_CENTRAL_ADDRESS = Address("F0:F0:F0:F0:F0:F0")
DEFAULT_PERIPHERAL_ADDRESS = Address("F1:F1:F1:F1:F1:F1")
# -----------------------------------------------------------------------------
async def run_as_central(
device: Device,
scenario: Callable | None,
) -> None:
# Connect to the peripheral
print(f'=== Connecting to {DEFAULT_PERIPHERAL_ADDRESS}...')
connection = await device.connect(DEFAULT_PERIPHERAL_ADDRESS)
print("=== Connected")
if scenario is not None:
await asyncio.sleep(1)
await scenario(connection)
await asyncio.get_running_loop().create_future()
async def run_as_peripheral(device: Device, scenario: Callable | None) -> None:
# Wait for a connection from the central
print(f'=== Advertising as {DEFAULT_PERIPHERAL_ADDRESS}...')
await device.start_advertising(auto_restart=True)
async def on_connection(connection: Connection) -> None:
assert scenario is not None
await asyncio.sleep(1)
await scenario(connection)
if scenario is not None:
device.on(Device.EVENT_CONNECTION, on_connection)
await asyncio.get_running_loop().create_future()
async def change_parameters(
connection: Connection,
parameter_request_procedure_supported: bool,
subrating_supported: bool,
shorter_connection_intervals_supported: bool,
) -> None:
if parameter_request_procedure_supported:
try:
print(">>> update_parameters(7.5, 200, 0, 4000)")
await connection.update_parameters(7.5, 200, 0, 4000)
await asyncio.sleep(3)
except BaseError as error:
print(f"Error: {error}")
if subrating_supported:
try:
print(">>> update_subrate(1, 2, 2, 1, 4000)")
await connection.update_subrate(1, 2, 2, 1, 4000)
await asyncio.sleep(3)
except BaseError as error:
print(f"Error: {error}")
if shorter_connection_intervals_supported:
try:
print(
">>> update_parameters_with_subrate(7.5, 200, 1, 1, 0, 0, 4000, 5, 1000)"
)
await connection.update_parameters_with_subrate(
7.5, 200, 1, 1, 0, 0, 4000, 5, 1000
)
await asyncio.sleep(3)
except BaseError as error:
print(f"Error: {error}")
try:
print(
">>> update_parameters_with_subrate(0.750, 5, 1, 1, 0, 0, 4000, 0.125, 1000)"
)
await connection.update_parameters_with_subrate(
0.750, 5, 1, 1, 0, 0, 4000, 0.125, 1000
)
await asyncio.sleep(3)
except BaseError as error:
print(f"Error: {error}")
print(">>> done")
def on_connection(connection: Connection) -> None:
print(f"+++ Connection established: {connection}")
def on_le_remote_features_change() -> None:
print(f'... LE Remote Features change: {connection.peer_le_features.name}')
connection.on(
connection.EVENT_LE_REMOTE_FEATURES_CHANGE, on_le_remote_features_change
)
def on_connection_parameters_change() -> None:
print(f'... LE Connection Parameters change: {connection.parameters}')
connection.on(
connection.EVENT_CONNECTION_PARAMETERS_UPDATE, on_connection_parameters_change
)
async def main() -> None:
if len(sys.argv) < 3:
print(
'Usage: run_connection_updates.py <transport-spec> '
'central|peripheral initiator|responder'
)
return
print('<<< connecting to HCI...')
async with await open_transport(sys.argv[1]) as hci_transport:
print('<<< connected')
role = sys.argv[2]
direction = sys.argv[3]
device = Device.with_hci(
role,
(
DEFAULT_CENTRAL_ADDRESS
if role == "central"
else DEFAULT_PERIPHERAL_ADDRESS
),
hci_transport.source,
hci_transport.sink,
)
device.le_subrate_enabled = True
device.le_shorter_connection_intervals_enabled = True
await device.power_on()
parameter_request_procedure_supported = device.supports_le_features(
LeFeatureMask.CONNECTION_PARAMETERS_REQUEST_PROCEDURE
)
print(
"Parameters Request Procedure supported: "
f"{parameter_request_procedure_supported}"
)
subrating_supported = device.supports_le_features(
LeFeatureMask.CONNECTION_SUBRATING
)
print(f"Subrating supported: {subrating_supported}")
shorter_connection_intervals_supported = device.supports_le_features(
LeFeatureMask.SHORTER_CONNECTION_INTERVALS
)
print(
"Shorter Connection Intervals supported: "
f"{shorter_connection_intervals_supported}"
)
device.on(Device.EVENT_CONNECTION, on_connection)
async def run(connection: Connection) -> None:
await change_parameters(
connection,
parameter_request_procedure_supported,
subrating_supported,
shorter_connection_intervals_supported,
)
scenario = run if direction == "initiator" else None
if role == "central":
await run_as_central(device, scenario)
else:
await run_as_peripheral(device, scenario)
# -----------------------------------------------------------------------------
bumble.logging.setup_basic_logging('DEBUG')
asyncio.run(main())

View File

@@ -101,7 +101,7 @@ async def main() -> None:
await device.start_advertising()
await device.start_scanning()
await hci_transport.source.wait_for_termination()
await hci_transport.source.terminated
# -----------------------------------------------------------------------------

View File

@@ -48,7 +48,7 @@ async def main() -> None:
await device.power_on()
await device.start_scanning()
await hci_transport.source.wait_for_termination()
await hci_transport.source.terminated
# -----------------------------------------------------------------------------

View File

@@ -147,7 +147,7 @@ async def main() -> None:
else:
await device.start_advertising(auto_restart=True)
await hci_transport.source.wait_for_termination()
await hci_transport.source.terminated
# -----------------------------------------------------------------------------

View File

@@ -99,7 +99,7 @@ async def main() -> None:
else:
await device.start_advertising(auto_restart=True)
await hci_transport.source.wait_for_termination()
await hci_transport.source.terminated
# -----------------------------------------------------------------------------

View File

@@ -422,7 +422,7 @@ async def main() -> None:
# Setup a server
await server(device)
await hci_transport.source.wait_for_termination()
await hci_transport.source.terminated
# -----------------------------------------------------------------------------

View File

@@ -100,13 +100,9 @@ def on_sco_packet(packet: hci.HCI_SynchronousDataPacket):
if source_file and (pcm_data := source_file.read(packet.data_total_length)):
assert ag_protocol
host = ag_protocol.dlc.multiplexer.l2cap_channel.connection.device.host
host.send_hci_packet(
hci.HCI_SynchronousDataPacket(
connection_handle=packet.connection_handle,
packet_status=0,
data_total_length=len(pcm_data),
data=pcm_data,
)
host.send_sco_sdu(
connection_handle=packet.connection_handle,
sdu=pcm_data,
)

View File

@@ -167,7 +167,7 @@ async def main() -> None:
await websockets.asyncio.server.serve(serve, 'localhost', 8989)
await hci_transport.source.wait_for_termination()
await hci_transport.source.terminated
# -----------------------------------------------------------------------------

View File

@@ -735,7 +735,7 @@ async def main() -> None:
print("Executing in Web mode")
await keyboard_device(hid_device)
await hci_transport.source.wait_for_termination()
await hci_transport.source.terminated
# -----------------------------------------------------------------------------

View File

@@ -556,7 +556,7 @@ async def main() -> None:
# Interrupt Channel
await hid_host.connect_interrupt_channel()
await hci_transport.source.wait_for_termination()
await hci_transport.source.terminated
# -----------------------------------------------------------------------------

View File

@@ -227,7 +227,7 @@ async def main() -> None:
tcp_port = int(sys.argv[5])
asyncio.create_task(tcp_server(tcp_port, session))
await hci_transport.source.wait_for_termination()
await hci_transport.source.terminated
# -----------------------------------------------------------------------------

View File

@@ -153,7 +153,7 @@ async def main() -> None:
await device.set_discoverable(True)
await device.set_connectable(True)
await hci_transport.source.wait_for_termination()
await hci_transport.source.terminated
# -----------------------------------------------------------------------------

View File

@@ -75,7 +75,7 @@ async def main() -> None:
await device.power_on()
await device.start_scanning(filter_duplicates=filter_duplicates)
await hci_transport.source.wait_for_termination()
await hci_transport.source.terminated
# -----------------------------------------------------------------------------

View File

@@ -15,15 +15,20 @@ dependencies = [
"aiohttp ~= 3.8; platform_system!='Emscripten'",
"appdirs >= 1.4; platform_system!='Emscripten'",
"click >= 8.1.3; platform_system!='Emscripten'",
"cryptography >= 44.0.3; platform_system!='Emscripten'",
"cryptography >= 44.0.3; platform_system!='Emscripten' and platform_system!='Android'",
# Pyodide bundles a version of cryptography that is built for wasm, which may not match the
# versions available on PyPI. Relax the version requirement since it's better than being
# completely unable to import the package in case of version mismatch.
"cryptography >= 44.0.3; platform_system=='Emscripten'",
# Android wheels for cryptography are not yet available on PyPI, so chaquopy uses
# the builds from https://chaquo.com/pypi-13.1/cryptography/. But these are not regually
# updated. Relax the version requirement since it's better than being completely unable
# to import the package in case of version mismatch.
"cryptography >= 42.0.8; platform_system=='Android'",
"grpcio >= 1.62.1; platform_system!='Emscripten'",
"humanize >= 4.6.0; platform_system!='Emscripten'",
"libusb1 >= 2.0.1; platform_system!='Emscripten'",
"libusb-package == 1.0.26.1; platform_system!='Emscripten'",
"libusb-package == 1.0.26.1; platform_system!='Emscripten' and platform_system!='Android'",
"platformdirs >= 3.10.0; platform_system!='Emscripten'",
"prompt_toolkit >= 3.0.16; platform_system!='Emscripten'",
"prettytable >= 3.6.0; platform_system!='Emscripten'",

View File

@@ -17,6 +17,7 @@
# -----------------------------------------------------------------------------
from __future__ import annotations
import asyncio
import struct
from collections.abc import Sequence
@@ -233,7 +234,21 @@ def test_event(event: avrcp.Event):
feature_bitmask=avrcp.MediaPlayerItem.Features.ADD_TO_NOW_PLAYING,
character_set_id=avrcp.CharacterSetId.UTF_8,
displayable_name="Woo",
)
),
avrcp.FolderItem(
folder_uid=1,
folder_type=avrcp.FolderItem.FolderType.ALBUMS,
is_playable=avrcp.FolderItem.Playable.PLAYABLE,
character_set_id=avrcp.CharacterSetId.UTF_8,
displayable_name="Album",
),
avrcp.MediaElementItem(
media_element_uid=1,
media_type=avrcp.MediaElementItem.MediaType.AUDIO,
character_set_id=avrcp.CharacterSetId.UTF_8,
displayable_name="Song",
attribute_value_entry_list=[],
),
],
),
avrcp.ChangePathResponse(
@@ -408,6 +423,47 @@ def test_passthrough_commands():
assert bytes(parsed) == play_pressed_bytes
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_find_sdp_records():
two_devices = await TwoDevices.create_with_avdtp()
# Add SDP records to device 1
controller_record = avrcp.ControllerServiceSdpRecord(
service_record_handle=0x10001,
avctp_version=(1, 4),
avrcp_version=(1, 6),
supported_features=(
avrcp.ControllerFeatures.CATEGORY_1
| avrcp.ControllerFeatures.SUPPORTS_BROWSING
),
)
target_record = avrcp.TargetServiceSdpRecord(
service_record_handle=0x10002,
avctp_version=(1, 4),
avrcp_version=(1, 6),
supported_features=(
avrcp.TargetFeatures.CATEGORY_1 | avrcp.TargetFeatures.SUPPORTS_BROWSING
),
)
two_devices.devices[1].sdp_service_records = {
0x10001: controller_record.to_service_attributes(),
0x10002: target_record.to_service_attributes(),
}
# Find records from device 0
controller_records = await avrcp.ControllerServiceSdpRecord.find(
two_devices.connections[0]
)
assert len(controller_records) == 1
assert controller_records[0] == controller_record
target_records = await avrcp.TargetServiceSdpRecord.find(two_devices.connections[0])
assert len(target_records) == 1
assert target_records[0] == target_record
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_get_supported_events():
@@ -422,6 +478,163 @@ async def test_get_supported_events():
assert supported_events == [avrcp.EventId.VOLUME_CHANGED]
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_passthrough_key_event():
two_devices = await TwoDevices.create_with_avdtp()
q = asyncio.Queue[tuple[avc.PassThroughFrame.OperationId, bool, bytes]]()
class Delegate(avrcp.Delegate):
async def on_key_event(
self, key: avc.PassThroughFrame.OperationId, pressed: bool, data: bytes
) -> None:
q.put_nowait((key, pressed, data))
two_devices.protocols[1].delegate = Delegate()
for key, pressed in [
(avc.PassThroughFrame.OperationId.PLAY, True),
(avc.PassThroughFrame.OperationId.PLAY, False),
(avc.PassThroughFrame.OperationId.PAUSE, True),
(avc.PassThroughFrame.OperationId.PAUSE, False),
]:
await two_devices.protocols[0].send_key_event(key, pressed)
assert (await q.get()) == (key, pressed, b'')
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_passthrough_key_event_rejected():
two_devices = await TwoDevices.create_with_avdtp()
class Delegate(avrcp.Delegate):
async def on_key_event(
self, key: avc.PassThroughFrame.OperationId, pressed: bool, data: bytes
) -> None:
raise avrcp.Delegate.AvcError(avc.ResponseFrame.ResponseCode.REJECTED)
two_devices.protocols[1].delegate = Delegate()
response = await two_devices.protocols[0].send_key_event(
avc.PassThroughFrame.OperationId.PLAY, True
)
assert response.response == avc.ResponseFrame.ResponseCode.REJECTED
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_passthrough_key_event_exception():
two_devices = await TwoDevices.create_with_avdtp()
class Delegate(avrcp.Delegate):
async def on_key_event(
self, key: avc.PassThroughFrame.OperationId, pressed: bool, data: bytes
) -> None:
raise Exception()
two_devices.protocols[1].delegate = Delegate()
response = await two_devices.protocols[0].send_key_event(
avc.PassThroughFrame.OperationId.PLAY, True
)
assert response.response == avc.ResponseFrame.ResponseCode.REJECTED
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_set_volume():
two_devices = await TwoDevices.create_with_avdtp()
for volume in range(avrcp.SetAbsoluteVolumeCommand.MAXIMUM_VOLUME + 1):
response = await two_devices.protocols[1].send_avrcp_command(
avc.CommandFrame.CommandType.CONTROL, avrcp.SetAbsoluteVolumeCommand(volume)
)
assert isinstance(response.response, avrcp.SetAbsoluteVolumeResponse)
assert response.response.volume == volume
assert two_devices.protocols[0].delegate.volume == volume
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_get_playback_status():
two_devices = await TwoDevices.create_with_avdtp()
for status in avrcp.PlayStatus:
two_devices.protocols[0].delegate.playback_status = status
response = await two_devices.protocols[1].get_play_status()
assert response.play_status == status
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_get_supported_company_ids():
two_devices = await TwoDevices.create_with_avdtp()
for status in avrcp.PlayStatus:
two_devices.protocols[0].delegate = avrcp.Delegate(
supported_company_ids=[avrcp.AVRCP_BLUETOOTH_SIG_COMPANY_ID]
)
supported_company_ids = await two_devices.protocols[
1
].get_supported_company_ids()
assert supported_company_ids == [avrcp.AVRCP_BLUETOOTH_SIG_COMPANY_ID]
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_monitor_volume():
two_devices = await TwoDevices.create_with_avdtp()
two_devices.protocols[1].delegate = avrcp.Delegate([avrcp.EventId.VOLUME_CHANGED])
volume_iter = two_devices.protocols[0].monitor_volume()
for volume in range(avrcp.SetAbsoluteVolumeCommand.MAXIMUM_VOLUME + 1):
# Interim
two_devices.protocols[1].delegate.volume = 0
assert (await anext(volume_iter)) == 0
# Changed
two_devices.protocols[1].notify_volume_changed(volume)
assert (await anext(volume_iter)) == volume
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_monitor_playback_status():
two_devices = await TwoDevices.create_with_avdtp()
two_devices.protocols[1].delegate = avrcp.Delegate(
[avrcp.EventId.PLAYBACK_STATUS_CHANGED]
)
playback_status_iter = two_devices.protocols[0].monitor_playback_status()
for playback_status in avrcp.PlayStatus:
# Interim
two_devices.protocols[1].delegate.playback_status = avrcp.PlayStatus.STOPPED
assert (await anext(playback_status_iter)) == avrcp.PlayStatus.STOPPED
# Changed
two_devices.protocols[1].notify_playback_status_changed(playback_status)
assert (await anext(playback_status_iter)) == playback_status
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_monitor_now_playing_content():
two_devices = await TwoDevices.create_with_avdtp()
two_devices.protocols[1].delegate = avrcp.Delegate(
[avrcp.EventId.NOW_PLAYING_CONTENT_CHANGED]
)
now_playing_iter = two_devices.protocols[0].monitor_now_playing_content()
for _ in range(2):
# Interim
await anext(now_playing_iter)
# Changed
two_devices.protocols[1].notify_now_playing_content_changed()
await anext(now_playing_iter)
# -----------------------------------------------------------------------------
if __name__ == '__main__':
test_frame_parser()

View File

@@ -619,7 +619,9 @@ async def test_le_request_subrate():
def on_le_subrate_change():
q.put_nowait(lambda: None)
devices.connections[0].on(Connection.EVENT_LE_SUBRATE_CHANGE, on_le_subrate_change)
devices.connections[0].on(
Connection.EVENT_CONNECTION_PARAMETERS_UPDATE, on_le_subrate_change
)
await devices[0].send_command(
hci.HCI_LE_Subrate_Request_Command(

View File

@@ -28,7 +28,7 @@ from unittest.mock import ANY, AsyncMock, Mock
import pytest
from typing_extensions import Self
from bumble import gatt_client, l2cap
from bumble import att, gatt_client, l2cap
from bumble.att import (
ATT_ATTRIBUTE_NOT_FOUND_ERROR,
ATT_PDU,
@@ -1638,6 +1638,104 @@ async def test_eatt_connection_failure():
await gatt_client.Client.connect_eatt(devices.connections[0])
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_read_multiple() -> None:
devices = await TwoDevices.create_with_connection()
characteristic1 = Characteristic(
'0001', Characteristic.Properties.READ, Characteristic.READABLE, b'1234'
)
characteristic2 = Characteristic(
'0002',
Characteristic.Properties.READ,
Characteristic.READABLE,
b'5678',
)
service = Service('0000', [characteristic1, characteristic2])
devices[1].add_service(service)
client = devices.connections[0].gatt_client
server = devices[1].gatt_server
await client.discover_services()
characteristics = await client.discover_characteristics(
[characteristic1.uuid, characteristic2.uuid], None
)
response = await client.send_request(
att.ATT_Read_Multiple_Request(
set_of_handles=[c.handle for c in characteristics]
)
)
assert isinstance(response, att.ATT_Read_Multiple_Response)
assert response.set_of_values == b'12345678'
response = await client.send_request(
att.ATT_Read_Multiple_Request(
set_of_handles=[
next(
handle
for handle in range(0x0001, 0xFFFF)
if not server.get_attribute(handle)
)
]
)
)
assert isinstance(response, att.ATT_Error_Response)
assert response.error_code == att.ATT_ATTRIBUTE_NOT_FOUND_ERROR
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_read_multiple_variable() -> None:
devices = await TwoDevices.create_with_connection()
characteristic1 = Characteristic(
'0001', Characteristic.Properties.READ, Characteristic.READABLE, b'1234'
)
characteristic2 = Characteristic(
'0002',
Characteristic.Properties.READ,
Characteristic.READABLE,
b'99',
)
service = Service('0000', [characteristic1, characteristic2])
devices[1].add_service(service)
client = devices.connections[0].gatt_client
server = devices[1].gatt_server
await client.discover_services()
characteristics = await client.discover_characteristics(
[characteristic1.uuid, characteristic2.uuid], None
)
response = await client.send_request(
att.ATT_Read_Multiple_Variable_Request(
set_of_handles=[c.handle for c in characteristics]
)
)
assert isinstance(response, att.ATT_Read_Multiple_Variable_Response)
assert response.length_value_tuple_list == [(4, b'1234'), (2, b'99')]
response = await client.send_request(
att.ATT_Read_Multiple_Variable_Request(
set_of_handles=[
next(
handle
for handle in range(0x0001, 0xFFFF)
if not server.get_attribute(handle)
)
]
)
)
assert isinstance(response, att.ATT_Error_Response)
assert response.error_code == att.ATT_ATTRIBUTE_NOT_FOUND_ERROR
# -----------------------------------------------------------------------------
if __name__ == '__main__':
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())

View File

@@ -218,9 +218,9 @@ def test_return_parameters() -> None:
assert isinstance(params.status, utils.OpenIntEnum)
params = hci.HCI_Read_BD_ADDR_Command.parse_return_parameters(
bytes.fromhex('3C001122334455')
bytes.fromhex('00001122334455')
)
assert params.status == hci.HCI_ErrorCode.ADVERTISING_TIMEOUT_ERROR
assert params.status == hci.HCI_ErrorCode.SUCCESS
assert isinstance(params.status, utils.OpenIntEnum)
assert isinstance(params.bd_addr, hci.Address)
@@ -232,6 +232,14 @@ def test_return_parameters() -> None:
assert len(params.local_name) == 248
assert hci.map_null_terminated_utf8_string(params.local_name) == 'hello'
# Some return parameters may be shorter than the full length
# (for Command Complete events with errors)
params = hci.HCI_Read_BD_ADDR_Command.parse_return_parameters(
bytes.fromhex('010011223344')
)
assert isinstance(params, hci.HCI_StatusReturnParameters)
assert params.status == hci.HCI_ErrorCode.UNKNOWN_HCI_COMMAND_ERROR
# -----------------------------------------------------------------------------
def test_HCI_Command():

View File

@@ -26,9 +26,14 @@ from bumble.controller import Controller
from bumble.hci import (
HCI_AclDataPacket,
HCI_Command_Complete_Event,
HCI_Command_Status_Event,
HCI_CommandStatus,
HCI_Disconnect_Command,
HCI_Error,
HCI_ErrorCode,
HCI_Event,
HCI_GenericReturnParameters,
HCI_LE_Terminate_BIG_Command,
HCI_Reset_Command,
HCI_StatusReturnParameters,
)
@@ -195,6 +200,7 @@ async def test_send_sync_command() -> None:
)
host = Host(source, sink)
host.ready = True
# Sync command with success
response1 = await host.send_sync_command(HCI_Reset_Command())
@@ -212,6 +218,61 @@ async def test_send_sync_command() -> None:
assert excinfo.value.error_code == error_response.return_parameters.status
# Sync command with error status should not raise when `check_status` is False
response2 = await host.send_sync_command(HCI_Reset_Command(), check_status=False)
assert response2.status == HCI_ErrorCode.COMMAND_DISALLOWED_ERROR
# Sync command with raw result
response2 = await host.send_sync_command_raw(HCI_Reset_Command())
assert response2.return_parameters.status == HCI_ErrorCode.COMMAND_DISALLOWED_ERROR
# Sync command with a command that's not an HCI_SyncCommand
# (here, for convenience, we use an HCI_AsyncCommand instance)
command = HCI_Disconnect_Command(connection_handle=0x1234, reason=0x13)
sink.response = HCI_Command_Complete_Event(
1,
command.op_code,
HCI_GenericReturnParameters(data=bytes.fromhex("00112233")),
)
response3 = await host.send_sync_command_raw(command) # type: ignore
assert isinstance(response3.return_parameters, HCI_GenericReturnParameters)
@pytest.mark.asyncio
async def test_send_async_command() -> None:
source = Source()
sink = Sink(
source,
HCI_Command_Status_Event(
HCI_CommandStatus.PENDING,
1,
HCI_Reset_Command.op_code,
),
)
host = Host(source, sink)
host.ready = True
# Normal pending status
response = await host.send_async_command(
HCI_LE_Terminate_BIG_Command(big_handle=0, reason=0)
)
assert response == HCI_CommandStatus.PENDING
# Unknown HCI command result returned as a Command Status
sink.response = HCI_Command_Status_Event(
HCI_ErrorCode.UNKNOWN_HCI_COMMAND_ERROR,
1,
HCI_LE_Terminate_BIG_Command.op_code,
)
response = await host.send_async_command(
HCI_LE_Terminate_BIG_Command(big_handle=0, reason=0), check_status=False
)
assert response == HCI_ErrorCode.UNKNOWN_HCI_COMMAND_ERROR
# Unknown HCI command result returned as a Command Complete
sink.response = HCI_Command_Complete_Event(
1,
HCI_LE_Terminate_BIG_Command.op_code,
HCI_StatusReturnParameters(HCI_ErrorCode.UNKNOWN_HCI_COMMAND_ERROR),
)
response = await host.send_async_command(
HCI_LE_Terminate_BIG_Command(big_handle=0, reason=0), check_status=False
)
assert response == HCI_ErrorCode.UNKNOWN_HCI_COMMAND_ERROR