mirror of
https://github.com/google/bumble.git
synced 2026-06-22 10:44:18 +00:00
Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 7523118581 |
+76
-107
@@ -34,11 +34,7 @@ from bumble.hci import (
|
||||
HCI_READ_BD_ADDR_COMMAND,
|
||||
HCI_READ_BUFFER_SIZE_COMMAND,
|
||||
HCI_READ_LOCAL_NAME_COMMAND,
|
||||
HCI_SUCCESS,
|
||||
CodecID,
|
||||
HCI_Command,
|
||||
HCI_Command_Complete_Event,
|
||||
HCI_Command_Status_Event,
|
||||
HCI_LE_Read_Buffer_Size_Command,
|
||||
HCI_LE_Read_Buffer_Size_V2_Command,
|
||||
HCI_LE_Read_Maximum_Advertising_Data_Length_Command,
|
||||
@@ -59,34 +55,23 @@ from bumble.host import Host
|
||||
from bumble.transport import open_transport
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
def command_succeeded(response):
|
||||
if isinstance(response, HCI_Command_Status_Event):
|
||||
return response.status == HCI_SUCCESS
|
||||
if isinstance(response, HCI_Command_Complete_Event):
|
||||
return response.return_parameters.status == HCI_SUCCESS
|
||||
return False
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def get_classic_info(host: Host) -> None:
|
||||
if host.supports_command(HCI_READ_BD_ADDR_COMMAND):
|
||||
response = await host.send_command(HCI_Read_BD_ADDR_Command())
|
||||
if command_succeeded(response):
|
||||
print()
|
||||
print(
|
||||
color('Public Address:', 'yellow'),
|
||||
response.return_parameters.bd_addr.to_string(False),
|
||||
)
|
||||
response1 = await host.send_sync_command(HCI_Read_BD_ADDR_Command())
|
||||
print()
|
||||
print(
|
||||
color('Public Address:', 'yellow'),
|
||||
response1.bd_addr.to_string(False),
|
||||
)
|
||||
|
||||
if host.supports_command(HCI_READ_LOCAL_NAME_COMMAND):
|
||||
response = await host.send_command(HCI_Read_Local_Name_Command())
|
||||
if command_succeeded(response):
|
||||
print()
|
||||
print(
|
||||
color('Local Name:', 'yellow'),
|
||||
map_null_terminated_utf8_string(response.return_parameters.local_name),
|
||||
)
|
||||
response2 = await host.send_sync_command(HCI_Read_Local_Name_Command())
|
||||
print()
|
||||
print(
|
||||
color('Local Name:', 'yellow'),
|
||||
map_null_terminated_utf8_string(response2.local_name),
|
||||
)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@@ -94,52 +79,50 @@ async def get_le_info(host: Host) -> None:
|
||||
print()
|
||||
|
||||
if host.supports_command(HCI_LE_READ_NUMBER_OF_SUPPORTED_ADVERTISING_SETS_COMMAND):
|
||||
response = await host.send_command(
|
||||
response1 = await host.send_sync_command(
|
||||
HCI_LE_Read_Number_Of_Supported_Advertising_Sets_Command()
|
||||
)
|
||||
if command_succeeded(response):
|
||||
print(
|
||||
color('LE Number Of Supported Advertising Sets:', 'yellow'),
|
||||
response.return_parameters.num_supported_advertising_sets,
|
||||
'\n',
|
||||
)
|
||||
print(
|
||||
color('LE Number Of Supported Advertising Sets:', 'yellow'),
|
||||
response1.num_supported_advertising_sets,
|
||||
'\n',
|
||||
)
|
||||
|
||||
if host.supports_command(HCI_LE_READ_MAXIMUM_ADVERTISING_DATA_LENGTH_COMMAND):
|
||||
response = await host.send_command(
|
||||
response2 = await host.send_sync_command(
|
||||
HCI_LE_Read_Maximum_Advertising_Data_Length_Command()
|
||||
)
|
||||
if command_succeeded(response):
|
||||
print(
|
||||
color('LE Maximum Advertising Data Length:', 'yellow'),
|
||||
response.return_parameters.max_advertising_data_length,
|
||||
'\n',
|
||||
)
|
||||
print(
|
||||
color('LE Maximum Advertising Data Length:', 'yellow'),
|
||||
response2.max_advertising_data_length,
|
||||
'\n',
|
||||
)
|
||||
|
||||
if host.supports_command(HCI_LE_READ_MAXIMUM_DATA_LENGTH_COMMAND):
|
||||
response = await host.send_command(HCI_LE_Read_Maximum_Data_Length_Command())
|
||||
if command_succeeded(response):
|
||||
print(
|
||||
color('Maximum Data Length:', 'yellow'),
|
||||
(
|
||||
f'tx:{response.return_parameters.supported_max_tx_octets}/'
|
||||
f'{response.return_parameters.supported_max_tx_time}, '
|
||||
f'rx:{response.return_parameters.supported_max_rx_octets}/'
|
||||
f'{response.return_parameters.supported_max_rx_time}'
|
||||
),
|
||||
'\n',
|
||||
)
|
||||
response3 = await host.send_sync_command(
|
||||
HCI_LE_Read_Maximum_Data_Length_Command()
|
||||
)
|
||||
print(
|
||||
color('Maximum Data Length:', 'yellow'),
|
||||
(
|
||||
f'tx:{response3.supported_max_tx_octets}/'
|
||||
f'{response3.supported_max_tx_time}, '
|
||||
f'rx:{response3.supported_max_rx_octets}/'
|
||||
f'{response3.supported_max_rx_time}'
|
||||
),
|
||||
'\n',
|
||||
)
|
||||
|
||||
if host.supports_command(HCI_LE_READ_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND):
|
||||
response = await host.send_command(
|
||||
response4 = await host.send_sync_command(
|
||||
HCI_LE_Read_Suggested_Default_Data_Length_Command()
|
||||
)
|
||||
if command_succeeded(response):
|
||||
print(
|
||||
color('Suggested Default Data Length:', 'yellow'),
|
||||
f'{response.return_parameters.suggested_max_tx_octets}/'
|
||||
f'{response.return_parameters.suggested_max_tx_time}',
|
||||
'\n',
|
||||
)
|
||||
print(
|
||||
color('Suggested Default Data Length:', 'yellow'),
|
||||
f'{response4.suggested_max_tx_octets}/'
|
||||
f'{response4.suggested_max_tx_time}',
|
||||
'\n',
|
||||
)
|
||||
|
||||
print(color('LE Features:', 'yellow'))
|
||||
for feature in host.supported_le_features:
|
||||
@@ -151,37 +134,31 @@ async def get_flow_control_info(host: Host) -> None:
|
||||
print()
|
||||
|
||||
if host.supports_command(HCI_READ_BUFFER_SIZE_COMMAND):
|
||||
response = await host.send_command(
|
||||
HCI_Read_Buffer_Size_Command(), check_result=True
|
||||
)
|
||||
response1 = await host.send_sync_command(HCI_Read_Buffer_Size_Command())
|
||||
print(
|
||||
color('ACL Flow Control:', 'yellow'),
|
||||
f'{response.return_parameters.hc_total_num_acl_data_packets} '
|
||||
f'packets of size {response.return_parameters.hc_acl_data_packet_length}',
|
||||
f'{response1.hc_total_num_acl_data_packets} '
|
||||
f'packets of size {response1.hc_acl_data_packet_length}',
|
||||
)
|
||||
|
||||
if host.supports_command(HCI_LE_READ_BUFFER_SIZE_V2_COMMAND):
|
||||
response = await host.send_command(
|
||||
HCI_LE_Read_Buffer_Size_V2_Command(), check_result=True
|
||||
)
|
||||
response2 = await host.send_sync_command(HCI_LE_Read_Buffer_Size_V2_Command())
|
||||
print(
|
||||
color('LE ACL Flow Control:', 'yellow'),
|
||||
f'{response.return_parameters.total_num_le_acl_data_packets} '
|
||||
f'packets of size {response.return_parameters.le_acl_data_packet_length}',
|
||||
f'{response2.total_num_le_acl_data_packets} '
|
||||
f'packets of size {response2.le_acl_data_packet_length}',
|
||||
)
|
||||
print(
|
||||
color('LE ISO Flow Control:', 'yellow'),
|
||||
f'{response.return_parameters.total_num_iso_data_packets} '
|
||||
f'packets of size {response.return_parameters.iso_data_packet_length}',
|
||||
f'{response2.total_num_iso_data_packets} '
|
||||
f'packets of size {response2.iso_data_packet_length}',
|
||||
)
|
||||
elif host.supports_command(HCI_LE_READ_BUFFER_SIZE_COMMAND):
|
||||
response = await host.send_command(
|
||||
HCI_LE_Read_Buffer_Size_Command(), check_result=True
|
||||
)
|
||||
response3 = await host.send_sync_command(HCI_LE_Read_Buffer_Size_Command())
|
||||
print(
|
||||
color('LE ACL Flow Control:', 'yellow'),
|
||||
f'{response.return_parameters.total_num_le_acl_data_packets} '
|
||||
f'packets of size {response.return_parameters.le_acl_data_packet_length}',
|
||||
f'{response3.total_num_le_acl_data_packets} '
|
||||
f'packets of size {response3.le_acl_data_packet_length}',
|
||||
)
|
||||
|
||||
|
||||
@@ -190,52 +167,44 @@ async def get_codecs_info(host: Host) -> None:
|
||||
print()
|
||||
|
||||
if host.supports_command(HCI_Read_Local_Supported_Codecs_V2_Command.op_code):
|
||||
response = await host.send_command(
|
||||
HCI_Read_Local_Supported_Codecs_V2_Command(), check_result=True
|
||||
response1 = await host.send_sync_command(
|
||||
HCI_Read_Local_Supported_Codecs_V2_Command()
|
||||
)
|
||||
print(color('Codecs:', 'yellow'))
|
||||
|
||||
for codec_id, transport in zip(
|
||||
response.return_parameters.standard_codec_ids,
|
||||
response.return_parameters.standard_codec_transports,
|
||||
response1.standard_codec_ids,
|
||||
response1.standard_codec_transports,
|
||||
):
|
||||
transport_name = HCI_Read_Local_Supported_Codecs_V2_Command.Transport(
|
||||
transport
|
||||
).name
|
||||
codec_name = CodecID(codec_id).name
|
||||
print(f' {codec_name} - {transport_name}')
|
||||
print(f' {codec_id.name} - {transport.name}')
|
||||
|
||||
for codec_id, transport in zip(
|
||||
response.return_parameters.vendor_specific_codec_ids,
|
||||
response.return_parameters.vendor_specific_codec_transports,
|
||||
for vendor_codec_id, vendor_transport in zip(
|
||||
response1.vendor_specific_codec_ids,
|
||||
response1.vendor_specific_codec_transports,
|
||||
):
|
||||
transport_name = HCI_Read_Local_Supported_Codecs_V2_Command.Transport(
|
||||
transport
|
||||
).name
|
||||
company = name_or_number(COMPANY_IDENTIFIERS, codec_id >> 16)
|
||||
print(f' {company} / {codec_id & 0xFFFF} - {transport_name}')
|
||||
company = name_or_number(COMPANY_IDENTIFIERS, vendor_codec_id >> 16)
|
||||
print(f' {company} / {vendor_codec_id & 0xFFFF} - {vendor_transport.name}')
|
||||
|
||||
if not response.return_parameters.standard_codec_ids:
|
||||
if not response1.standard_codec_ids:
|
||||
print(' No standard codecs')
|
||||
if not response.return_parameters.vendor_specific_codec_ids:
|
||||
if not response1.vendor_specific_codec_ids:
|
||||
print(' No Vendor-specific codecs')
|
||||
|
||||
if host.supports_command(HCI_Read_Local_Supported_Codecs_Command.op_code):
|
||||
response = await host.send_command(
|
||||
HCI_Read_Local_Supported_Codecs_Command(), check_result=True
|
||||
response2 = await host.send_sync_command(
|
||||
HCI_Read_Local_Supported_Codecs_Command()
|
||||
)
|
||||
print(color('Codecs (BR/EDR):', 'yellow'))
|
||||
for codec_id in response.return_parameters.standard_codec_ids:
|
||||
codec_name = CodecID(codec_id).name
|
||||
print(f' {codec_name}')
|
||||
for codec_id in response2.standard_codec_ids:
|
||||
print(f' {codec_id.name}')
|
||||
|
||||
for codec_id in response.return_parameters.vendor_specific_codec_ids:
|
||||
company = name_or_number(COMPANY_IDENTIFIERS, codec_id >> 16)
|
||||
print(f' {company} / {codec_id & 0xFFFF}')
|
||||
for vendor_codec_id in response2.vendor_specific_codec_ids:
|
||||
company = name_or_number(COMPANY_IDENTIFIERS, vendor_codec_id >> 16)
|
||||
print(f' {company} / {vendor_codec_id & 0xFFFF}')
|
||||
|
||||
if not response.return_parameters.standard_codec_ids:
|
||||
if not response2.standard_codec_ids:
|
||||
print(' No standard codecs')
|
||||
if not response.return_parameters.vendor_specific_codec_ids:
|
||||
if not response2.vendor_specific_codec_ids:
|
||||
print(' No Vendor-specific codecs')
|
||||
|
||||
|
||||
|
||||
@@ -85,7 +85,7 @@ class Loopback:
|
||||
print(color('@@@ Received last packet', 'green'))
|
||||
self.done.set()
|
||||
|
||||
async def run(self):
|
||||
async def run(self) -> None:
|
||||
"""Run a loopback throughput test"""
|
||||
print(color('>>> Connecting to HCI...', 'green'))
|
||||
async with await open_transport(self.transport) as (
|
||||
@@ -100,11 +100,15 @@ class Loopback:
|
||||
# make sure data can fit in one l2cap pdu
|
||||
l2cap_header_size = 4
|
||||
|
||||
max_packet_size = (
|
||||
packet_queue = (
|
||||
host.acl_packet_queue
|
||||
if host.acl_packet_queue
|
||||
else host.le_acl_packet_queue
|
||||
).max_packet_size - l2cap_header_size
|
||||
)
|
||||
if packet_queue is None:
|
||||
print(color('!!! No packet queue', 'red'))
|
||||
return
|
||||
max_packet_size = packet_queue.max_packet_size - l2cap_header_size
|
||||
if self.packet_size > max_packet_size:
|
||||
print(
|
||||
color(
|
||||
@@ -128,20 +132,18 @@ class Loopback:
|
||||
loopback_mode = LoopbackMode.LOCAL
|
||||
|
||||
print(color('### Setting loopback mode', 'blue'))
|
||||
await host.send_command(
|
||||
await host.send_sync_command(
|
||||
HCI_Write_Loopback_Mode_Command(loopback_mode=LoopbackMode.LOCAL),
|
||||
check_result=True,
|
||||
)
|
||||
|
||||
print(color('### Checking loopback mode', 'blue'))
|
||||
response = await host.send_command(
|
||||
HCI_Read_Loopback_Mode_Command(), check_result=True
|
||||
)
|
||||
if response.return_parameters.loopback_mode != loopback_mode:
|
||||
response = await host.send_sync_command(HCI_Read_Loopback_Mode_Command())
|
||||
if response.loopback_mode != loopback_mode:
|
||||
print(color('!!! Loopback mode mismatch', 'red'))
|
||||
return
|
||||
|
||||
await self.connection_event.wait()
|
||||
assert self.connection_handle is not None
|
||||
print(color('### Connected', 'cyan'))
|
||||
|
||||
print(color('=== Start sending', 'magenta'))
|
||||
|
||||
+2
-15
@@ -26,8 +26,6 @@
|
||||
# -----------------------------------------------------------------------------
|
||||
# Imports
|
||||
# -----------------------------------------------------------------------------
|
||||
from typing import Any
|
||||
|
||||
import click
|
||||
import usb1
|
||||
|
||||
@@ -168,16 +166,13 @@ def is_bluetooth_hci(device):
|
||||
# -----------------------------------------------------------------------------
|
||||
@click.command()
|
||||
@click.option('--verbose', is_flag=True, default=False, help='Print more details')
|
||||
@click.option('--hci-only', is_flag=True, default=False, help='only show HCI device')
|
||||
@click.option('--manufacturer', help='filter by manufacturer')
|
||||
@click.option('--product', help='filter by product')
|
||||
def main(verbose: bool, manufacturer: str, product: str, hci_only: bool):
|
||||
def main(verbose):
|
||||
bumble.logging.setup_basic_logging('WARNING')
|
||||
|
||||
load_libusb()
|
||||
with usb1.USBContext() as context:
|
||||
bluetooth_device_count = 0
|
||||
devices: dict[tuple[Any, Any], list[str | None]] = {}
|
||||
devices = {}
|
||||
|
||||
for device in context.getDeviceIterator(skip_on_error=True):
|
||||
device_class = device.getDeviceClass()
|
||||
@@ -239,14 +234,6 @@ def main(verbose: bool, manufacturer: str, product: str, hci_only: bool):
|
||||
f'{basic_transport_name}/{device_serial_number}'
|
||||
)
|
||||
|
||||
# Filter
|
||||
if product and device_product != product:
|
||||
continue
|
||||
if manufacturer and device_manufacturer != manufacturer:
|
||||
continue
|
||||
if not is_bluetooth_hci(device) and hci_only:
|
||||
continue
|
||||
|
||||
# Print the results
|
||||
print(
|
||||
color(
|
||||
|
||||
+32
-97
@@ -29,7 +29,7 @@ import enum
|
||||
import functools
|
||||
import inspect
|
||||
import struct
|
||||
from collections.abc import Awaitable, Callable, Sequence
|
||||
from collections.abc import Awaitable, Callable
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
ClassVar,
|
||||
@@ -72,36 +72,34 @@ ATT_PSM = 0x001F
|
||||
EATT_PSM = 0x0027
|
||||
|
||||
class Opcode(hci.SpecableEnum):
|
||||
ATT_ERROR_RESPONSE = 0x01
|
||||
ATT_EXCHANGE_MTU_REQUEST = 0x02
|
||||
ATT_EXCHANGE_MTU_RESPONSE = 0x03
|
||||
ATT_FIND_INFORMATION_REQUEST = 0x04
|
||||
ATT_FIND_INFORMATION_RESPONSE = 0x05
|
||||
ATT_FIND_BY_TYPE_VALUE_REQUEST = 0x06
|
||||
ATT_FIND_BY_TYPE_VALUE_RESPONSE = 0x07
|
||||
ATT_READ_BY_TYPE_REQUEST = 0x08
|
||||
ATT_READ_BY_TYPE_RESPONSE = 0x09
|
||||
ATT_READ_REQUEST = 0x0A
|
||||
ATT_READ_RESPONSE = 0x0B
|
||||
ATT_READ_BLOB_REQUEST = 0x0C
|
||||
ATT_READ_BLOB_RESPONSE = 0x0D
|
||||
ATT_READ_MULTIPLE_REQUEST = 0x0E
|
||||
ATT_READ_MULTIPLE_RESPONSE = 0x0F
|
||||
ATT_READ_BY_GROUP_TYPE_REQUEST = 0x10
|
||||
ATT_READ_BY_GROUP_TYPE_RESPONSE = 0x11
|
||||
ATT_READ_MULTIPLE_VARIABLE_REQUEST = 0x20
|
||||
ATT_READ_MULTIPLE_VARIABLE_RESPONSE = 0x21
|
||||
ATT_WRITE_REQUEST = 0x12
|
||||
ATT_WRITE_RESPONSE = 0x13
|
||||
ATT_WRITE_COMMAND = 0x52
|
||||
ATT_SIGNED_WRITE_COMMAND = 0xD2
|
||||
ATT_PREPARE_WRITE_REQUEST = 0x16
|
||||
ATT_PREPARE_WRITE_RESPONSE = 0x17
|
||||
ATT_EXECUTE_WRITE_REQUEST = 0x18
|
||||
ATT_EXECUTE_WRITE_RESPONSE = 0x19
|
||||
ATT_HANDLE_VALUE_NOTIFICATION = 0x1B
|
||||
ATT_HANDLE_VALUE_INDICATION = 0x1D
|
||||
ATT_HANDLE_VALUE_CONFIRMATION = 0x1E
|
||||
ATT_ERROR_RESPONSE = 0x01
|
||||
ATT_EXCHANGE_MTU_REQUEST = 0x02
|
||||
ATT_EXCHANGE_MTU_RESPONSE = 0x03
|
||||
ATT_FIND_INFORMATION_REQUEST = 0x04
|
||||
ATT_FIND_INFORMATION_RESPONSE = 0x05
|
||||
ATT_FIND_BY_TYPE_VALUE_REQUEST = 0x06
|
||||
ATT_FIND_BY_TYPE_VALUE_RESPONSE = 0x07
|
||||
ATT_READ_BY_TYPE_REQUEST = 0x08
|
||||
ATT_READ_BY_TYPE_RESPONSE = 0x09
|
||||
ATT_READ_REQUEST = 0x0A
|
||||
ATT_READ_RESPONSE = 0x0B
|
||||
ATT_READ_BLOB_REQUEST = 0x0C
|
||||
ATT_READ_BLOB_RESPONSE = 0x0D
|
||||
ATT_READ_MULTIPLE_REQUEST = 0x0E
|
||||
ATT_READ_MULTIPLE_RESPONSE = 0x0F
|
||||
ATT_READ_BY_GROUP_TYPE_REQUEST = 0x10
|
||||
ATT_READ_BY_GROUP_TYPE_RESPONSE = 0x11
|
||||
ATT_WRITE_REQUEST = 0x12
|
||||
ATT_WRITE_RESPONSE = 0x13
|
||||
ATT_WRITE_COMMAND = 0x52
|
||||
ATT_SIGNED_WRITE_COMMAND = 0xD2
|
||||
ATT_PREPARE_WRITE_REQUEST = 0x16
|
||||
ATT_PREPARE_WRITE_RESPONSE = 0x17
|
||||
ATT_EXECUTE_WRITE_REQUEST = 0x18
|
||||
ATT_EXECUTE_WRITE_RESPONSE = 0x19
|
||||
ATT_HANDLE_VALUE_NOTIFICATION = 0x1B
|
||||
ATT_HANDLE_VALUE_INDICATION = 0x1D
|
||||
ATT_HANDLE_VALUE_CONFIRMATION = 0x1E
|
||||
|
||||
ATT_REQUESTS = [
|
||||
Opcode.ATT_EXCHANGE_MTU_REQUEST,
|
||||
@@ -112,10 +110,9 @@ ATT_REQUESTS = [
|
||||
Opcode.ATT_READ_BLOB_REQUEST,
|
||||
Opcode.ATT_READ_MULTIPLE_REQUEST,
|
||||
Opcode.ATT_READ_BY_GROUP_TYPE_REQUEST,
|
||||
Opcode.ATT_READ_MULTIPLE_VARIABLE_REQUEST,
|
||||
Opcode.ATT_WRITE_REQUEST,
|
||||
Opcode.ATT_PREPARE_WRITE_REQUEST,
|
||||
Opcode.ATT_EXECUTE_WRITE_REQUEST,
|
||||
Opcode.ATT_EXECUTE_WRITE_REQUEST
|
||||
]
|
||||
|
||||
ATT_RESPONSES = [
|
||||
@@ -128,10 +125,9 @@ ATT_RESPONSES = [
|
||||
Opcode.ATT_READ_BLOB_RESPONSE,
|
||||
Opcode.ATT_READ_MULTIPLE_RESPONSE,
|
||||
Opcode.ATT_READ_BY_GROUP_TYPE_RESPONSE,
|
||||
Opcode.ATT_READ_MULTIPLE_VARIABLE_RESPONSE,
|
||||
Opcode.ATT_WRITE_RESPONSE,
|
||||
Opcode.ATT_PREPARE_WRITE_RESPONSE,
|
||||
Opcode.ATT_EXECUTE_WRITE_RESPONSE,
|
||||
Opcode.ATT_EXECUTE_WRITE_RESPONSE
|
||||
]
|
||||
|
||||
class ErrorCode(hci.SpecableEnum):
|
||||
@@ -189,18 +185,6 @@ ATT_INSUFFICIENT_RESOURCES_ERROR = ErrorCode.INSUFFICIENT_RESOURCES
|
||||
ATT_DEFAULT_MTU = 23
|
||||
|
||||
HANDLE_FIELD_SPEC = {'size': 2, 'mapper': lambda x: f'0x{x:04X}'}
|
||||
_SET_OF_HANDLES_METADATA = hci.metadata({
|
||||
'parser': lambda data, offset: (
|
||||
len(data),
|
||||
[
|
||||
struct.unpack_from('<H', data, i)[0]
|
||||
for i in range(offset, len(data), 2)
|
||||
],
|
||||
),
|
||||
'serializer': lambda handles: b''.join(
|
||||
[struct.pack('<H', handle) for handle in handles]
|
||||
),
|
||||
})
|
||||
|
||||
# fmt: on
|
||||
# pylint: enable=line-too-long
|
||||
@@ -570,7 +554,7 @@ class ATT_Read_Multiple_Request(ATT_PDU):
|
||||
See Bluetooth spec @ Vol 3, Part F - 3.4.4.7 Read Multiple Request
|
||||
'''
|
||||
|
||||
set_of_handles: Sequence[int] = dataclasses.field(metadata=_SET_OF_HANDLES_METADATA)
|
||||
set_of_handles: bytes = dataclasses.field(metadata=hci.metadata("*"))
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@@ -651,55 +635,6 @@ class ATT_Read_By_Group_Type_Response(ATT_PDU):
|
||||
return result
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@ATT_PDU.subclass
|
||||
@dataclasses.dataclass
|
||||
class ATT_Read_Multiple_Variable_Request(ATT_PDU):
|
||||
'''
|
||||
See Bluetooth spec @ Vol 3, Part F - 3.4.4.11 Read Multiple Variable Request
|
||||
'''
|
||||
|
||||
set_of_handles: Sequence[int] = dataclasses.field(metadata=_SET_OF_HANDLES_METADATA)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@ATT_PDU.subclass
|
||||
@dataclasses.dataclass
|
||||
class ATT_Read_Multiple_Variable_Response(ATT_PDU):
|
||||
'''
|
||||
See Bluetooth spec @ Vol 3, Part F - 3.4.4.12 Read Multiple Variable Response
|
||||
'''
|
||||
|
||||
@classmethod
|
||||
def _parse_length_value_tuples(
|
||||
cls, data: bytes, offset: int
|
||||
) -> tuple[int, list[tuple[int, bytes]]]:
|
||||
length_value_tuple_list: list[tuple[int, bytes]] = []
|
||||
while offset < len(data):
|
||||
length = struct.unpack_from('<H', data, offset)[0]
|
||||
length_value_tuple_list.append(
|
||||
(length, data[offset + 2 : offset + 2 + length])
|
||||
)
|
||||
offset += 2 + length
|
||||
return (len(data), length_value_tuple_list)
|
||||
|
||||
length_value_tuple_list: Sequence[tuple[int, bytes]] = dataclasses.field(
|
||||
metadata=hci.metadata(
|
||||
{
|
||||
'parser': lambda data, offset: ATT_Read_Multiple_Variable_Response._parse_length_value_tuples(
|
||||
data, offset
|
||||
),
|
||||
'serializer': lambda length_value_tuple_list: b''.join(
|
||||
[
|
||||
struct.pack('<H', length) + value
|
||||
for length, value in length_value_tuple_list
|
||||
]
|
||||
),
|
||||
}
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@ATT_PDU.subclass
|
||||
@dataclasses.dataclass
|
||||
|
||||
+1
-1
@@ -235,7 +235,7 @@ class Protocol:
|
||||
)
|
||||
+ payload
|
||||
)
|
||||
self.l2cap_channel.write(pdu)
|
||||
self.l2cap_channel.send_pdu(pdu)
|
||||
|
||||
def send_command(self, transaction_label: int, pid: int, payload: bytes) -> None:
|
||||
logger.debug(
|
||||
|
||||
+3
-3
@@ -268,7 +268,7 @@ class MediaPacketPump:
|
||||
await self.clock.sleep(delay)
|
||||
|
||||
# Emit
|
||||
rtp_channel.write(bytes(packet))
|
||||
rtp_channel.send_pdu(bytes(packet))
|
||||
logger.debug(
|
||||
f'{color(">>> sending RTP packet:", "green")} {packet}'
|
||||
)
|
||||
@@ -1519,7 +1519,7 @@ class Protocol(utils.EventEmitter):
|
||||
header = bytes([first_header_byte])
|
||||
|
||||
# Send one packet
|
||||
self.l2cap_channel.write(header + payload[:max_fragment_size])
|
||||
self.l2cap_channel.send_pdu(header + payload[:max_fragment_size])
|
||||
|
||||
# Prepare for the next packet
|
||||
payload = payload[max_fragment_size:]
|
||||
@@ -1829,7 +1829,7 @@ class Stream:
|
||||
|
||||
def send_media_packet(self, packet: MediaPacket) -> None:
|
||||
assert self.rtp_channel
|
||||
self.rtp_channel.write(bytes(packet))
|
||||
self.rtp_channel.send_pdu(bytes(packet))
|
||||
|
||||
async def configure(self) -> None:
|
||||
if self.state != State.IDLE:
|
||||
|
||||
@@ -421,7 +421,7 @@ class Controller:
|
||||
hci.HCI_Command_Complete_Event(
|
||||
num_hci_command_packets=1,
|
||||
command_opcode=command.op_code,
|
||||
return_parameters=result,
|
||||
return_parameters=hci.HCI_GenericReturnParameters(data=result),
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
+1
-1
@@ -923,7 +923,7 @@ class DeviceClass:
|
||||
# pylint: enable=line-too-long
|
||||
|
||||
@staticmethod
|
||||
def split_class_of_device(class_of_device):
|
||||
def split_class_of_device(class_of_device: int) -> tuple[int, int, int]:
|
||||
# Split the bit fields of the composite class of device value into:
|
||||
# (service_classes, major_device_class, minor_device_class)
|
||||
return (
|
||||
|
||||
+311
-313
File diff suppressed because it is too large
Load Diff
+41
-43
@@ -89,51 +89,54 @@ HCI_INTEL_WRITE_BOOT_PARAMS_COMMAND = hci.hci_vendor_command_op_code(0x000E)
|
||||
hci.HCI_Command.register_commands(globals())
|
||||
|
||||
|
||||
@hci.HCI_Command.command
|
||||
@dataclasses.dataclass
|
||||
class HCI_Intel_Read_Version_Command(hci.HCI_Command):
|
||||
class HCI_Intel_Read_Version_ReturnParameters(hci.HCI_StatusReturnParameters):
|
||||
tlv: bytes = hci.field(metadata=hci.metadata('*'))
|
||||
|
||||
|
||||
@hci.HCI_SyncCommand.sync_command(HCI_Intel_Read_Version_ReturnParameters)
|
||||
@dataclasses.dataclass
|
||||
class HCI_Intel_Read_Version_Command(
|
||||
hci.HCI_SyncCommand[HCI_Intel_Read_Version_ReturnParameters]
|
||||
):
|
||||
param0: int = dataclasses.field(metadata=hci.metadata(1))
|
||||
|
||||
return_parameters_fields = [
|
||||
("status", hci.STATUS_SPEC),
|
||||
("tlv", "*"),
|
||||
]
|
||||
|
||||
|
||||
@hci.HCI_Command.command
|
||||
@hci.HCI_SyncCommand.sync_command(hci.HCI_StatusReturnParameters)
|
||||
@dataclasses.dataclass
|
||||
class Hci_Intel_Secure_Send_Command(hci.HCI_Command):
|
||||
class Hci_Intel_Secure_Send_Command(
|
||||
hci.HCI_SyncCommand[hci.HCI_StatusReturnParameters]
|
||||
):
|
||||
data_type: int = dataclasses.field(metadata=hci.metadata(1))
|
||||
data: bytes = dataclasses.field(metadata=hci.metadata("*"))
|
||||
|
||||
return_parameters_fields = [
|
||||
("status", 1),
|
||||
]
|
||||
|
||||
|
||||
@hci.HCI_Command.command
|
||||
@dataclasses.dataclass
|
||||
class HCI_Intel_Reset_Command(hci.HCI_Command):
|
||||
class HCI_Intel_Reset_ReturnParameters(hci.HCI_ReturnParameters):
|
||||
data: bytes = hci.field(metadata=hci.metadata('*'))
|
||||
|
||||
|
||||
@hci.HCI_SyncCommand.sync_command(HCI_Intel_Reset_ReturnParameters)
|
||||
@dataclasses.dataclass
|
||||
class HCI_Intel_Reset_Command(hci.HCI_SyncCommand[HCI_Intel_Reset_ReturnParameters]):
|
||||
reset_type: int = dataclasses.field(metadata=hci.metadata(1))
|
||||
patch_enable: int = dataclasses.field(metadata=hci.metadata(1))
|
||||
ddc_reload: int = dataclasses.field(metadata=hci.metadata(1))
|
||||
boot_option: int = dataclasses.field(metadata=hci.metadata(1))
|
||||
boot_address: int = dataclasses.field(metadata=hci.metadata(4))
|
||||
|
||||
return_parameters_fields = [
|
||||
("data", "*"),
|
||||
]
|
||||
|
||||
|
||||
@hci.HCI_Command.command
|
||||
@dataclasses.dataclass
|
||||
class Hci_Intel_Write_Device_Config_Command(hci.HCI_Command):
|
||||
data: bytes = dataclasses.field(metadata=hci.metadata("*"))
|
||||
class HCI_Intel_Write_Device_Config_ReturnParameters(hci.HCI_StatusReturnParameters):
|
||||
params: bytes = hci.field(metadata=hci.metadata('*'))
|
||||
|
||||
return_parameters_fields = [
|
||||
("status", hci.STATUS_SPEC),
|
||||
("params", "*"),
|
||||
]
|
||||
|
||||
@hci.HCI_SyncCommand.sync_command(HCI_Intel_Write_Device_Config_ReturnParameters)
|
||||
@dataclasses.dataclass
|
||||
class HCI_Intel_Write_Device_Config_Command(
|
||||
hci.HCI_SyncCommand[HCI_Intel_Write_Device_Config_ReturnParameters]
|
||||
):
|
||||
data: bytes = dataclasses.field(metadata=hci.metadata("*"))
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@@ -402,7 +405,7 @@ class Driver(common.Driver):
|
||||
self.host.on_hci_event_packet(event)
|
||||
return
|
||||
|
||||
if not event.return_parameters == hci.HCI_SUCCESS:
|
||||
if not event.return_parameters.status == hci.HCI_SUCCESS:
|
||||
raise DriverError("HCI_Command_Complete_Event error")
|
||||
|
||||
if self.max_in_flight_firmware_load_commands != event.num_hci_command_packets:
|
||||
@@ -641,8 +644,8 @@ class Driver(common.Driver):
|
||||
while ddc_data:
|
||||
ddc_len = 1 + ddc_data[0]
|
||||
ddc_payload = ddc_data[:ddc_len]
|
||||
await self.host.send_command(
|
||||
Hci_Intel_Write_Device_Config_Command(data=ddc_payload)
|
||||
await self.host.send_sync_command(
|
||||
HCI_Intel_Write_Device_Config_Command(data=ddc_payload)
|
||||
)
|
||||
ddc_data = ddc_data[ddc_len:]
|
||||
|
||||
@@ -660,31 +663,26 @@ class Driver(common.Driver):
|
||||
|
||||
async def read_device_info(self) -> dict[ValueType, Any]:
|
||||
self.host.ready = True
|
||||
response = await self.host.send_command(hci.HCI_Reset_Command())
|
||||
if not (
|
||||
isinstance(response, hci.HCI_Command_Complete_Event)
|
||||
and response.return_parameters
|
||||
in (hci.HCI_UNKNOWN_HCI_COMMAND_ERROR, hci.HCI_SUCCESS)
|
||||
):
|
||||
response1 = await self.host.send_sync_command(
|
||||
hci.HCI_Reset_Command(), check_status=False
|
||||
)
|
||||
if response1.status not in (hci.HCI_UNKNOWN_HCI_COMMAND_ERROR, hci.HCI_SUCCESS):
|
||||
# When the controller is in operational mode, the response is a
|
||||
# successful response.
|
||||
# When the controller is in bootloader mode,
|
||||
# HCI_UNKNOWN_HCI_COMMAND_ERROR is the expected response. Anything
|
||||
# else is a failure.
|
||||
logger.warning(f"unexpected response: {response}")
|
||||
logger.warning(f"unexpected response: {response1}")
|
||||
raise DriverError("unexpected HCI response")
|
||||
|
||||
# Read the firmware version.
|
||||
response = await self.host.send_command(
|
||||
HCI_Intel_Read_Version_Command(param0=0xFF)
|
||||
response2 = await self.host.send_sync_command(
|
||||
HCI_Intel_Read_Version_Command(param0=0xFF), check_status=False
|
||||
)
|
||||
if not isinstance(response, hci.HCI_Command_Complete_Event):
|
||||
raise DriverError("unexpected HCI response")
|
||||
|
||||
if response.return_parameters.status != 0: # type: ignore
|
||||
if response2.status != 0: # type: ignore
|
||||
raise DriverError("HCI_Intel_Read_Version_Command error")
|
||||
|
||||
tlvs = _parse_tlv(response.return_parameters.tlv) # type: ignore
|
||||
tlvs = _parse_tlv(response2.tlv) # type: ignore
|
||||
|
||||
# Convert the list to a dict. That's Ok here because we only expect each type
|
||||
# to appear just once.
|
||||
|
||||
+53
-38
@@ -16,6 +16,7 @@ Support for Realtek USB dongles.
|
||||
Based on various online bits of information, including the Linux kernel.
|
||||
(see `drivers/bluetooth/btrtl.c`)
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import enum
|
||||
@@ -31,10 +32,14 @@ import weakref
|
||||
# Imports
|
||||
# -----------------------------------------------------------------------------
|
||||
from dataclasses import dataclass, field
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from bumble import core, hci
|
||||
from bumble.drivers import common
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from bumble.host import Host
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Logging
|
||||
# -----------------------------------------------------------------------------
|
||||
@@ -188,23 +193,36 @@ HCI_RTK_DROP_FIRMWARE_COMMAND = hci.hci_vendor_command_op_code(0x66)
|
||||
hci.HCI_Command.register_commands(globals())
|
||||
|
||||
|
||||
@hci.HCI_Command.command
|
||||
@dataclass
|
||||
class HCI_RTK_Read_ROM_Version_Command(hci.HCI_Command):
|
||||
return_parameters_fields = [("status", hci.STATUS_SPEC), ("version", 1)]
|
||||
class HCI_RTK_Read_ROM_Version_ReturnParameters(hci.HCI_StatusReturnParameters):
|
||||
version: int = field(metadata=hci.metadata(1))
|
||||
|
||||
|
||||
@hci.HCI_Command.command
|
||||
@hci.HCI_SyncCommand.sync_command(HCI_RTK_Read_ROM_Version_ReturnParameters)
|
||||
@dataclass
|
||||
class HCI_RTK_Download_Command(hci.HCI_Command):
|
||||
class HCI_RTK_Read_ROM_Version_Command(
|
||||
hci.HCI_SyncCommand[HCI_RTK_Read_ROM_Version_ReturnParameters]
|
||||
):
|
||||
pass
|
||||
|
||||
|
||||
@dataclass
|
||||
class HCI_RTK_Download_ReturnParameters(hci.HCI_StatusReturnParameters):
|
||||
index: int = field(metadata=hci.metadata(1))
|
||||
|
||||
|
||||
@hci.HCI_SyncCommand.sync_command(HCI_RTK_Download_ReturnParameters)
|
||||
@dataclass
|
||||
class HCI_RTK_Download_Command(hci.HCI_SyncCommand[HCI_RTK_Download_ReturnParameters]):
|
||||
index: int = field(metadata=hci.metadata(1))
|
||||
payload: bytes = field(metadata=hci.metadata(RTK_FRAGMENT_LENGTH))
|
||||
return_parameters_fields = [("status", hci.STATUS_SPEC), ("index", 1)]
|
||||
|
||||
|
||||
@hci.HCI_Command.command
|
||||
@hci.HCI_SyncCommand.sync_command(hci.HCI_GenericReturnParameters)
|
||||
@dataclass
|
||||
class HCI_RTK_Drop_Firmware_Command(hci.HCI_Command):
|
||||
class HCI_RTK_Drop_Firmware_Command(
|
||||
hci.HCI_SyncCommand[hci.HCI_GenericReturnParameters]
|
||||
):
|
||||
pass
|
||||
|
||||
|
||||
@@ -490,7 +508,7 @@ class Driver(common.Driver):
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def check(host):
|
||||
def check(host: Host) -> bool:
|
||||
if not host.hci_metadata:
|
||||
logger.debug("USB metadata not found")
|
||||
return False
|
||||
@@ -514,41 +532,39 @@ class Driver(common.Driver):
|
||||
return True
|
||||
|
||||
@staticmethod
|
||||
async def get_loaded_firmware_version(host):
|
||||
response = await host.send_command(HCI_RTK_Read_ROM_Version_Command())
|
||||
async def get_loaded_firmware_version(host: Host) -> int | None:
|
||||
response1 = await host.send_sync_command(
|
||||
HCI_RTK_Read_ROM_Version_Command(), check_status=False
|
||||
)
|
||||
|
||||
if response.return_parameters.status != hci.HCI_SUCCESS:
|
||||
if response1.status != hci.HCI_SUCCESS:
|
||||
return None
|
||||
|
||||
response = await host.send_command(
|
||||
hci.HCI_Read_Local_Version_Information_Command(), check_result=True
|
||||
)
|
||||
return (
|
||||
response.return_parameters.hci_subversion << 16
|
||||
| response.return_parameters.lmp_subversion
|
||||
response2 = await host.send_sync_command(
|
||||
hci.HCI_Read_Local_Version_Information_Command()
|
||||
)
|
||||
return response2.hci_subversion << 16 | response2.lmp_subversion
|
||||
|
||||
@classmethod
|
||||
async def driver_info_for_host(cls, host):
|
||||
async def driver_info_for_host(cls, host: Host) -> DriverInfo | None:
|
||||
try:
|
||||
await host.send_command(
|
||||
await host.send_sync_command(
|
||||
hci.HCI_Reset_Command(),
|
||||
check_result=True,
|
||||
response_timeout=cls.POST_RESET_DELAY,
|
||||
)
|
||||
host.ready = True # Needed to let the host know the controller is ready.
|
||||
except asyncio.exceptions.TimeoutError:
|
||||
logger.warning("timeout waiting for hci reset, retrying")
|
||||
await host.send_command(hci.HCI_Reset_Command(), check_result=True)
|
||||
await host.send_sync_command(hci.HCI_Reset_Command())
|
||||
host.ready = True
|
||||
|
||||
command = hci.HCI_Read_Local_Version_Information_Command()
|
||||
response = await host.send_command(command, check_result=True)
|
||||
if response.command_opcode != command.op_code:
|
||||
response = await host.send_sync_command(command, check_status=False)
|
||||
if response.status != hci.HCI_SUCCESS:
|
||||
logger.error("failed to probe local version information")
|
||||
return None
|
||||
|
||||
local_version = response.return_parameters
|
||||
local_version = response
|
||||
|
||||
logger.debug(
|
||||
f"looking for a driver: 0x{local_version.lmp_subversion:04X} "
|
||||
@@ -569,7 +585,7 @@ class Driver(common.Driver):
|
||||
return driver_info
|
||||
|
||||
@classmethod
|
||||
async def for_host(cls, host, force=False):
|
||||
async def for_host(cls, host: Host, force: bool = False):
|
||||
# Check that a driver is needed for this host
|
||||
if not force and not cls.check(host):
|
||||
return None
|
||||
@@ -626,13 +642,13 @@ class Driver(common.Driver):
|
||||
|
||||
async def download_for_rtl8723b(self):
|
||||
if self.driver_info.has_rom_version:
|
||||
response = await self.host.send_command(
|
||||
HCI_RTK_Read_ROM_Version_Command(), check_result=True
|
||||
response1 = await self.host.send_sync_command(
|
||||
HCI_RTK_Read_ROM_Version_Command(), check_status=False
|
||||
)
|
||||
if response.return_parameters.status != hci.HCI_SUCCESS:
|
||||
if response1.status != hci.HCI_SUCCESS:
|
||||
logger.warning("can't get ROM version")
|
||||
return None
|
||||
rom_version = response.return_parameters.version
|
||||
rom_version = response1.version
|
||||
logger.debug(f"ROM version before download: {rom_version:04X}")
|
||||
else:
|
||||
rom_version = 0
|
||||
@@ -667,21 +683,20 @@ class Driver(common.Driver):
|
||||
fragment_offset = fragment_index * RTK_FRAGMENT_LENGTH
|
||||
fragment = payload[fragment_offset : fragment_offset + RTK_FRAGMENT_LENGTH]
|
||||
logger.debug(f"downloading fragment {fragment_index}")
|
||||
await self.host.send_command(
|
||||
HCI_RTK_Download_Command(index=download_index, payload=fragment),
|
||||
check_result=True,
|
||||
await self.host.send_sync_command(
|
||||
HCI_RTK_Download_Command(index=download_index, payload=fragment)
|
||||
)
|
||||
|
||||
logger.debug("download complete!")
|
||||
|
||||
# Read the version again
|
||||
response = await self.host.send_command(
|
||||
HCI_RTK_Read_ROM_Version_Command(), check_result=True
|
||||
response2 = await self.host.send_sync_command(
|
||||
HCI_RTK_Read_ROM_Version_Command(), check_status=False
|
||||
)
|
||||
if response.return_parameters.status != hci.HCI_SUCCESS:
|
||||
if response2.status != hci.HCI_SUCCESS:
|
||||
logger.warning("can't get ROM version")
|
||||
else:
|
||||
rom_version = response.return_parameters.version
|
||||
rom_version = response2.version
|
||||
logger.debug(f"ROM version after download: {rom_version:02X}")
|
||||
|
||||
return firmware.version
|
||||
@@ -703,7 +718,7 @@ class Driver(common.Driver):
|
||||
|
||||
async def init_controller(self):
|
||||
await self.download_firmware()
|
||||
await self.host.send_command(hci.HCI_Reset_Command(), check_result=True)
|
||||
await self.host.send_sync_command(hci.HCI_Reset_Command())
|
||||
logger.info(f"loaded FW image {self.driver_info.fw_name}")
|
||||
|
||||
|
||||
|
||||
+2
-2
@@ -29,7 +29,7 @@ import functools
|
||||
import logging
|
||||
import struct
|
||||
from collections.abc import Iterable, Sequence
|
||||
from typing import ClassVar, TypeVar
|
||||
from typing import TypeVar
|
||||
|
||||
from bumble.att import Attribute, AttributeValue, AttributeValueV2
|
||||
from bumble.colors import color
|
||||
@@ -403,7 +403,7 @@ class TemplateService(Service):
|
||||
to expose their UUID as a class property
|
||||
'''
|
||||
|
||||
UUID: ClassVar[UUID]
|
||||
UUID: UUID
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
|
||||
@@ -34,14 +34,11 @@ from datetime import datetime
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
Any,
|
||||
ClassVar,
|
||||
Generic,
|
||||
TypeVar,
|
||||
overload,
|
||||
)
|
||||
|
||||
from typing_extensions import Self
|
||||
|
||||
from bumble import att, core, l2cap, utils
|
||||
from bumble.colors import color
|
||||
from bumble.core import UUID, InvalidStateError
|
||||
@@ -252,10 +249,10 @@ class ProfileServiceProxy:
|
||||
Base class for profile-specific service proxies
|
||||
'''
|
||||
|
||||
SERVICE_CLASS: ClassVar[type[TemplateService]]
|
||||
SERVICE_CLASS: type[TemplateService]
|
||||
|
||||
@classmethod
|
||||
def from_client(cls, client: Client) -> Self | None:
|
||||
def from_client(cls, client: Client) -> ProfileServiceProxy | None:
|
||||
return ServiceProxy.from_client(cls, client, cls.SERVICE_CLASS.UUID)
|
||||
|
||||
|
||||
@@ -288,6 +285,8 @@ class Client:
|
||||
self._bearer_id = (
|
||||
f'[0x{bearer.connection.handle:04X}|CID=0x{bearer.source_cid:04X}]'
|
||||
)
|
||||
# Fill the mtu.
|
||||
bearer.on_att_mtu_update(att.ATT_DEFAULT_MTU)
|
||||
self.connection = bearer.connection
|
||||
else:
|
||||
bearer.on(bearer.EVENT_DISCONNECTION, self.on_disconnection)
|
||||
|
||||
+1
-100
@@ -115,6 +115,7 @@ class Server(utils.EventEmitter):
|
||||
channel.connection.handle,
|
||||
channel.source_cid,
|
||||
)
|
||||
channel.att_mtu = att.ATT_DEFAULT_MTU
|
||||
channel.sink = lambda pdu: self.on_gatt_pdu(
|
||||
channel, att.ATT_PDU.from_bytes(pdu)
|
||||
)
|
||||
@@ -776,18 +777,6 @@ class Server(utils.EventEmitter):
|
||||
error_code=att.ATT_ATTRIBUTE_NOT_FOUND_ERROR,
|
||||
)
|
||||
|
||||
if (
|
||||
request.starting_handle == 0x0000
|
||||
or request.starting_handle > request.ending_handle
|
||||
):
|
||||
response = att.ATT_Error_Response(
|
||||
request_opcode_in_error=request.op_code,
|
||||
attribute_handle_in_error=request.starting_handle,
|
||||
error_code=att.ATT_INVALID_HANDLE_ERROR,
|
||||
)
|
||||
self.send_response(bearer, response)
|
||||
return
|
||||
|
||||
attributes: list[tuple[int, bytes]] = []
|
||||
for attribute in (
|
||||
attribute
|
||||
@@ -988,94 +977,6 @@ class Server(utils.EventEmitter):
|
||||
|
||||
self.send_response(bearer, response)
|
||||
|
||||
@utils.AsyncRunner.run_in_task()
|
||||
async def on_att_read_multiple_request(
|
||||
self, bearer: att.Bearer, request: att.ATT_Read_Multiple_Request
|
||||
):
|
||||
'''
|
||||
See Bluetooth spec Vol 3, Part F - 3.4.4.7 Read Multiple Request.
|
||||
'''
|
||||
response: att.ATT_PDU
|
||||
|
||||
pdu_space_available = bearer.att_mtu - 1
|
||||
values: list[bytes] = []
|
||||
|
||||
for handle in request.set_of_handles:
|
||||
if not (attribute := self.get_attribute(handle)):
|
||||
response = att.ATT_Error_Response(
|
||||
request_opcode_in_error=request.op_code,
|
||||
attribute_handle_in_error=handle,
|
||||
error_code=att.ATT_ATTRIBUTE_NOT_FOUND_ERROR,
|
||||
)
|
||||
self.send_response(bearer, response)
|
||||
return
|
||||
# No need to catch permission errors here, since these attributes
|
||||
# must all be world-readable
|
||||
attribute_value = await attribute.read_value(bearer)
|
||||
# Check the attribute value size
|
||||
max_attribute_size = min(bearer.att_mtu - 1, 251)
|
||||
if len(attribute_value) > max_attribute_size:
|
||||
# We need to truncate
|
||||
attribute_value = attribute_value[:max_attribute_size]
|
||||
|
||||
# Check if there is enough space
|
||||
entry_size = len(attribute_value)
|
||||
if pdu_space_available < entry_size:
|
||||
break
|
||||
|
||||
# Add the attribute to the list
|
||||
values.append(attribute_value)
|
||||
pdu_space_available -= entry_size
|
||||
|
||||
response = att.ATT_Read_Multiple_Response(set_of_values=b''.join(values))
|
||||
self.send_response(bearer, response)
|
||||
|
||||
@utils.AsyncRunner.run_in_task()
|
||||
async def on_att_read_multiple_variable_request(
|
||||
self, bearer: att.Bearer, request: att.ATT_Read_Multiple_Variable_Request
|
||||
):
|
||||
'''
|
||||
See Bluetooth spec Vol 3, Part F - 3.4.4.11 Read Multiple Variable Request.
|
||||
'''
|
||||
response: att.ATT_PDU
|
||||
|
||||
pdu_space_available = bearer.att_mtu - 1
|
||||
length_value_tuple_list: list[tuple[int, bytes]] = []
|
||||
|
||||
for handle in request.set_of_handles:
|
||||
if not (attribute := self.get_attribute(handle)):
|
||||
response = att.ATT_Error_Response(
|
||||
request_opcode_in_error=request.op_code,
|
||||
attribute_handle_in_error=handle,
|
||||
error_code=att.ATT_ATTRIBUTE_NOT_FOUND_ERROR,
|
||||
)
|
||||
self.send_response(bearer, response)
|
||||
return
|
||||
# No need to catch permission errors here, since these attributes
|
||||
# must all be world-readable
|
||||
attribute_value = await attribute.read_value(bearer)
|
||||
length = len(attribute_value)
|
||||
# Check the attribute value size
|
||||
max_attribute_size = min(bearer.att_mtu - 3, 251)
|
||||
if len(attribute_value) > max_attribute_size:
|
||||
# We need to truncate
|
||||
attribute_value = attribute_value[:max_attribute_size]
|
||||
|
||||
# Check if there is enough space
|
||||
entry_size = 2 + len(attribute_value)
|
||||
|
||||
# Add the attribute to the list
|
||||
length_value_tuple_list.append((length, attribute_value))
|
||||
pdu_space_available -= entry_size
|
||||
|
||||
if pdu_space_available <= 0:
|
||||
break
|
||||
|
||||
response = att.ATT_Read_Multiple_Variable_Response(
|
||||
length_value_tuple_list=length_value_tuple_list
|
||||
)
|
||||
self.send_response(bearer, response)
|
||||
|
||||
@utils.AsyncRunner.run_in_task()
|
||||
async def on_att_write_request(
|
||||
self, bearer: att.Bearer, request: att.ATT_Write_Request
|
||||
|
||||
+1181
-829
File diff suppressed because it is too large
Load Diff
+2
-2
@@ -312,11 +312,11 @@ class HID(ABC, utils.EventEmitter):
|
||||
|
||||
def send_pdu_on_ctrl(self, msg: bytes) -> None:
|
||||
assert self.l2cap_ctrl_channel
|
||||
self.l2cap_ctrl_channel.write(msg)
|
||||
self.l2cap_ctrl_channel.send_pdu(msg)
|
||||
|
||||
def send_pdu_on_intr(self, msg: bytes) -> None:
|
||||
assert self.l2cap_intr_channel
|
||||
self.l2cap_intr_channel.write(msg)
|
||||
self.l2cap_intr_channel.send_pdu(msg)
|
||||
|
||||
def send_data(self, data: bytes) -> None:
|
||||
if self.role == HID.Role.HOST:
|
||||
|
||||
+179
-124
@@ -23,11 +23,15 @@ import dataclasses
|
||||
import logging
|
||||
import struct
|
||||
from collections.abc import Awaitable, Callable
|
||||
from typing import TYPE_CHECKING, Any, cast
|
||||
from typing import TYPE_CHECKING, Any, TypeVar, cast, overload
|
||||
|
||||
from bumble import drivers, hci, utils
|
||||
from bumble.colors import color
|
||||
from bumble.core import ConnectionPHY, InvalidStateError, PhysicalTransport
|
||||
from bumble.core import (
|
||||
ConnectionPHY,
|
||||
InvalidStateError,
|
||||
PhysicalTransport,
|
||||
)
|
||||
from bumble.l2cap import L2CAP_PDU
|
||||
from bumble.snoop import Snooper
|
||||
from bumble.transport.common import TransportLostError
|
||||
@@ -35,7 +39,6 @@ from bumble.transport.common import TransportLostError
|
||||
if TYPE_CHECKING:
|
||||
from bumble.transport.common import TransportSink, TransportSource
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Logging
|
||||
# -----------------------------------------------------------------------------
|
||||
@@ -236,6 +239,9 @@ class IsoLink:
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
_RP = TypeVar('_RP', bound=hci.HCI_ReturnParameters)
|
||||
|
||||
|
||||
class Host(utils.EventEmitter):
|
||||
connections: dict[int, Connection]
|
||||
cis_links: dict[int, IsoLink]
|
||||
@@ -264,11 +270,13 @@ class Host(utils.EventEmitter):
|
||||
self.bis_links = {} # BIS links, by connection handle
|
||||
self.sco_links = {} # SCO links, by connection handle
|
||||
self.bigs = {} # BIG Handle to BIS Handles
|
||||
self.pending_command = None
|
||||
self.pending_command: hci.HCI_SyncCommand | hci.HCI_AsyncCommand | None = None
|
||||
self.pending_response: asyncio.Future[Any] | None = None
|
||||
self.number_of_supported_advertising_sets = 0
|
||||
self.maximum_advertising_data_length = 31
|
||||
self.local_version = None
|
||||
self.local_version: (
|
||||
hci.HCI_Read_Local_Version_Information_ReturnParameters | None
|
||||
) = None
|
||||
self.local_supported_commands = 0
|
||||
self.local_le_features = 0
|
||||
self.local_lmp_features = hci.LmpFeatureMask(0) # Classic LMP features
|
||||
@@ -312,7 +320,7 @@ class Host(utils.EventEmitter):
|
||||
self.emit('flush')
|
||||
self.command_semaphore.release()
|
||||
|
||||
async def reset(self, driver_factory=drivers.get_driver_for_host):
|
||||
async def reset(self, driver_factory=drivers.get_driver_for_host) -> None:
|
||||
if self.ready:
|
||||
self.ready = False
|
||||
await self.flush()
|
||||
@@ -330,57 +338,53 @@ class Host(utils.EventEmitter):
|
||||
|
||||
# Send a reset command unless a driver has already done so.
|
||||
if reset_needed:
|
||||
await self.send_command(hci.HCI_Reset_Command(), check_result=True)
|
||||
await self.send_sync_command(hci.HCI_Reset_Command())
|
||||
self.ready = True
|
||||
|
||||
response = await self.send_command(
|
||||
hci.HCI_Read_Local_Supported_Commands_Command(), check_result=True
|
||||
response1 = await self.send_sync_command(
|
||||
hci.HCI_Read_Local_Supported_Commands_Command()
|
||||
)
|
||||
self.local_supported_commands = int.from_bytes(
|
||||
response.return_parameters.supported_commands, 'little'
|
||||
response1.supported_commands, 'little'
|
||||
)
|
||||
|
||||
if self.supports_command(hci.HCI_LE_READ_LOCAL_SUPPORTED_FEATURES_COMMAND):
|
||||
response = await self.send_command(
|
||||
hci.HCI_LE_Read_Local_Supported_Features_Command(), check_result=True
|
||||
response2 = await self.send_sync_command(
|
||||
hci.HCI_LE_Read_Local_Supported_Features_Command()
|
||||
)
|
||||
self.local_le_features = struct.unpack(
|
||||
'<Q', response.return_parameters.le_features
|
||||
)[0]
|
||||
self.local_le_features = struct.unpack('<Q', response2.le_features)[0]
|
||||
|
||||
if self.supports_command(hci.HCI_READ_LOCAL_VERSION_INFORMATION_COMMAND):
|
||||
response = await self.send_command(
|
||||
hci.HCI_Read_Local_Version_Information_Command(), check_result=True
|
||||
self.local_version = await self.send_sync_command(
|
||||
hci.HCI_Read_Local_Version_Information_Command()
|
||||
)
|
||||
self.local_version = response.return_parameters
|
||||
|
||||
if self.supports_command(hci.HCI_READ_LOCAL_EXTENDED_FEATURES_COMMAND):
|
||||
max_page_number = 0
|
||||
page_number = 0
|
||||
lmp_features = 0
|
||||
while page_number <= max_page_number:
|
||||
response = await self.send_command(
|
||||
response4 = await self.send_sync_command(
|
||||
hci.HCI_Read_Local_Extended_Features_Command(
|
||||
page_number=page_number
|
||||
),
|
||||
check_result=True,
|
||||
)
|
||||
)
|
||||
lmp_features |= int.from_bytes(
|
||||
response.return_parameters.extended_lmp_features, 'little'
|
||||
response4.extended_lmp_features, 'little'
|
||||
) << (64 * page_number)
|
||||
max_page_number = response.return_parameters.maximum_page_number
|
||||
max_page_number = response4.maximum_page_number
|
||||
page_number += 1
|
||||
self.local_lmp_features = hci.LmpFeatureMask(lmp_features)
|
||||
|
||||
elif self.supports_command(hci.HCI_READ_LOCAL_SUPPORTED_FEATURES_COMMAND):
|
||||
response = await self.send_command(
|
||||
hci.HCI_Read_Local_Supported_Features_Command(), check_result=True
|
||||
response5 = await self.send_sync_command(
|
||||
hci.HCI_Read_Local_Supported_Features_Command()
|
||||
)
|
||||
self.local_lmp_features = hci.LmpFeatureMask(
|
||||
int.from_bytes(response.return_parameters.lmp_features, 'little')
|
||||
int.from_bytes(response5.lmp_features, 'little')
|
||||
)
|
||||
|
||||
await self.send_command(
|
||||
await self.send_sync_command(
|
||||
hci.HCI_Set_Event_Mask_Command(
|
||||
event_mask=hci.HCI_Set_Event_Mask_Command.mask(
|
||||
[
|
||||
@@ -437,7 +441,7 @@ class Host(utils.EventEmitter):
|
||||
)
|
||||
)
|
||||
if self.supports_command(hci.HCI_SET_EVENT_MASK_PAGE_2_COMMAND):
|
||||
await self.send_command(
|
||||
await self.send_sync_command(
|
||||
hci.HCI_Set_Event_Mask_Page_2_Command(
|
||||
event_mask_page_2=hci.HCI_Set_Event_Mask_Page_2_Command.mask(
|
||||
[hci.HCI_ENCRYPTION_CHANGE_V2_EVENT]
|
||||
@@ -499,20 +503,14 @@ class Host(utils.EventEmitter):
|
||||
]
|
||||
)
|
||||
|
||||
await self.send_command(
|
||||
await self.send_sync_command(
|
||||
hci.HCI_LE_Set_Event_Mask_Command(le_event_mask=le_event_mask)
|
||||
)
|
||||
|
||||
if self.supports_command(hci.HCI_READ_BUFFER_SIZE_COMMAND):
|
||||
response = await self.send_command(
|
||||
hci.HCI_Read_Buffer_Size_Command(), check_result=True
|
||||
)
|
||||
hc_acl_data_packet_length = (
|
||||
response.return_parameters.hc_acl_data_packet_length
|
||||
)
|
||||
hc_total_num_acl_data_packets = (
|
||||
response.return_parameters.hc_total_num_acl_data_packets
|
||||
)
|
||||
response6 = await self.send_sync_command(hci.HCI_Read_Buffer_Size_Command())
|
||||
hc_acl_data_packet_length = response6.hc_acl_data_packet_length
|
||||
hc_total_num_acl_data_packets = response6.hc_total_num_acl_data_packets
|
||||
|
||||
logger.debug(
|
||||
'HCI ACL flow control: '
|
||||
@@ -531,19 +529,13 @@ class Host(utils.EventEmitter):
|
||||
iso_data_packet_length = 0
|
||||
total_num_iso_data_packets = 0
|
||||
if self.supports_command(hci.HCI_LE_READ_BUFFER_SIZE_V2_COMMAND):
|
||||
response = await self.send_command(
|
||||
hci.HCI_LE_Read_Buffer_Size_V2_Command(), check_result=True
|
||||
)
|
||||
le_acl_data_packet_length = (
|
||||
response.return_parameters.le_acl_data_packet_length
|
||||
)
|
||||
total_num_le_acl_data_packets = (
|
||||
response.return_parameters.total_num_le_acl_data_packets
|
||||
)
|
||||
iso_data_packet_length = response.return_parameters.iso_data_packet_length
|
||||
total_num_iso_data_packets = (
|
||||
response.return_parameters.total_num_iso_data_packets
|
||||
response7 = await self.send_sync_command(
|
||||
hci.HCI_LE_Read_Buffer_Size_V2_Command()
|
||||
)
|
||||
le_acl_data_packet_length = response7.le_acl_data_packet_length
|
||||
total_num_le_acl_data_packets = response7.total_num_le_acl_data_packets
|
||||
iso_data_packet_length = response7.iso_data_packet_length
|
||||
total_num_iso_data_packets = response7.total_num_iso_data_packets
|
||||
|
||||
logger.debug(
|
||||
'HCI LE flow control: '
|
||||
@@ -553,15 +545,11 @@ class Host(utils.EventEmitter):
|
||||
f'total_num_iso_data_packets={total_num_iso_data_packets}'
|
||||
)
|
||||
elif self.supports_command(hci.HCI_LE_READ_BUFFER_SIZE_COMMAND):
|
||||
response = await self.send_command(
|
||||
hci.HCI_LE_Read_Buffer_Size_Command(), check_result=True
|
||||
)
|
||||
le_acl_data_packet_length = (
|
||||
response.return_parameters.le_acl_data_packet_length
|
||||
)
|
||||
total_num_le_acl_data_packets = (
|
||||
response.return_parameters.total_num_le_acl_data_packets
|
||||
response8 = await self.send_sync_command(
|
||||
hci.HCI_LE_Read_Buffer_Size_Command()
|
||||
)
|
||||
le_acl_data_packet_length = response8.le_acl_data_packet_length
|
||||
total_num_le_acl_data_packets = response8.total_num_le_acl_data_packets
|
||||
|
||||
logger.debug(
|
||||
'HCI LE ACL flow control: '
|
||||
@@ -592,16 +580,16 @@ class Host(utils.EventEmitter):
|
||||
) and self.supports_command(
|
||||
hci.HCI_LE_WRITE_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND
|
||||
):
|
||||
response = await self.send_command(
|
||||
response9 = await self.send_sync_command(
|
||||
hci.HCI_LE_Read_Suggested_Default_Data_Length_Command()
|
||||
)
|
||||
suggested_max_tx_octets = response.return_parameters.suggested_max_tx_octets
|
||||
suggested_max_tx_time = response.return_parameters.suggested_max_tx_time
|
||||
suggested_max_tx_octets = response9.suggested_max_tx_octets
|
||||
suggested_max_tx_time = response9.suggested_max_tx_time
|
||||
if (
|
||||
suggested_max_tx_octets != self.suggested_max_tx_octets
|
||||
or suggested_max_tx_time != self.suggested_max_tx_time
|
||||
):
|
||||
await self.send_command(
|
||||
await self.send_sync_command(
|
||||
hci.HCI_LE_Write_Suggested_Default_Data_Length_Command(
|
||||
suggested_max_tx_octets=self.suggested_max_tx_octets,
|
||||
suggested_max_tx_time=self.suggested_max_tx_time,
|
||||
@@ -611,23 +599,21 @@ class Host(utils.EventEmitter):
|
||||
if self.supports_command(
|
||||
hci.HCI_LE_READ_NUMBER_OF_SUPPORTED_ADVERTISING_SETS_COMMAND
|
||||
):
|
||||
response = await self.send_command(
|
||||
hci.HCI_LE_Read_Number_Of_Supported_Advertising_Sets_Command(),
|
||||
check_result=True,
|
||||
response10 = await self.send_sync_command(
|
||||
hci.HCI_LE_Read_Number_Of_Supported_Advertising_Sets_Command()
|
||||
)
|
||||
self.number_of_supported_advertising_sets = (
|
||||
response.return_parameters.num_supported_advertising_sets
|
||||
response10.num_supported_advertising_sets
|
||||
)
|
||||
|
||||
if self.supports_command(
|
||||
hci.HCI_LE_READ_MAXIMUM_ADVERTISING_DATA_LENGTH_COMMAND
|
||||
):
|
||||
response = await self.send_command(
|
||||
hci.HCI_LE_Read_Maximum_Advertising_Data_Length_Command(),
|
||||
check_result=True,
|
||||
response11 = await self.send_sync_command(
|
||||
hci.HCI_LE_Read_Maximum_Advertising_Data_Length_Command()
|
||||
)
|
||||
self.maximum_advertising_data_length = (
|
||||
response.return_parameters.max_advertising_data_length
|
||||
response11.max_advertising_data_length
|
||||
)
|
||||
|
||||
@property
|
||||
@@ -654,9 +640,11 @@ class Host(utils.EventEmitter):
|
||||
if self.hci_sink:
|
||||
self.hci_sink.on_packet(bytes(packet))
|
||||
|
||||
async def send_command(
|
||||
self, command, check_result=False, response_timeout: int | None = None
|
||||
):
|
||||
async def _send_command(
|
||||
self,
|
||||
command: hci.HCI_SyncCommand | hci.HCI_AsyncCommand,
|
||||
response_timeout: float | None = None,
|
||||
) -> hci.HCI_Command_Complete_Event | hci.HCI_Command_Status_Event:
|
||||
# Wait until we can send (only one pending command at a time)
|
||||
async with self.command_semaphore:
|
||||
assert self.pending_command is None
|
||||
@@ -668,29 +656,9 @@ class Host(utils.EventEmitter):
|
||||
|
||||
try:
|
||||
self.send_hci_packet(command)
|
||||
await asyncio.wait_for(self.pending_response, timeout=response_timeout)
|
||||
response = self.pending_response.result()
|
||||
|
||||
# Check the return parameters if required
|
||||
if check_result:
|
||||
if isinstance(response, hci.HCI_Command_Status_Event):
|
||||
status = response.status # type: ignore[attr-defined]
|
||||
elif isinstance(response.return_parameters, int):
|
||||
status = response.return_parameters
|
||||
elif isinstance(response.return_parameters, bytes):
|
||||
# return parameters first field is a one byte status code
|
||||
status = response.return_parameters[0]
|
||||
else:
|
||||
status = response.return_parameters.status
|
||||
|
||||
if status != hci.HCI_SUCCESS:
|
||||
logger.warning(
|
||||
f'{command.name} failed '
|
||||
f'({hci.HCI_Constant.error_name(status)})'
|
||||
)
|
||||
raise hci.HCI_Error(status)
|
||||
|
||||
return response
|
||||
return await asyncio.wait_for(
|
||||
self.pending_response, timeout=response_timeout
|
||||
)
|
||||
except Exception:
|
||||
logger.exception(color("!!! Exception while sending command:", "red"))
|
||||
raise
|
||||
@@ -698,12 +666,107 @@ class Host(utils.EventEmitter):
|
||||
self.pending_command = None
|
||||
self.pending_response = None
|
||||
|
||||
# Use this method to send a command from a task
|
||||
def send_command_sync(self, command: hci.HCI_Command) -> None:
|
||||
async def send_command(command: hci.HCI_Command) -> None:
|
||||
await self.send_command(command)
|
||||
@overload
|
||||
async def send_command(
|
||||
self,
|
||||
command: hci.HCI_SyncCommand[_RP],
|
||||
check_result: bool = False,
|
||||
response_timeout: float | None = None,
|
||||
) -> hci.HCI_Command_Complete_Event[_RP]: ...
|
||||
|
||||
asyncio.create_task(send_command(command))
|
||||
@overload
|
||||
async def send_command(
|
||||
self,
|
||||
command: hci.HCI_AsyncCommand,
|
||||
check_result: bool = False,
|
||||
response_timeout: float | None = None,
|
||||
) -> hci.HCI_Command_Status_Event: ...
|
||||
|
||||
async def send_command(
|
||||
self,
|
||||
command: hci.HCI_SyncCommand[_RP] | hci.HCI_AsyncCommand,
|
||||
check_result: bool = False,
|
||||
response_timeout: float | None = None,
|
||||
) -> hci.HCI_Command_Complete_Event[_RP] | hci.HCI_Command_Status_Event:
|
||||
response = await self._send_command(command, response_timeout)
|
||||
|
||||
# Check the return parameters if required
|
||||
if check_result:
|
||||
if isinstance(response, hci.HCI_Command_Status_Event):
|
||||
status = response.status # type: ignore[attr-defined]
|
||||
elif isinstance(response.return_parameters, int):
|
||||
status = response.return_parameters
|
||||
elif isinstance(response.return_parameters, bytes):
|
||||
# return parameters first field is a one byte status code
|
||||
status = response.return_parameters[0]
|
||||
elif isinstance(
|
||||
response.return_parameters, hci.HCI_GenericReturnParameters
|
||||
):
|
||||
# FIXME: temporary workaround
|
||||
# NO STATUS
|
||||
status = hci.HCI_SUCCESS
|
||||
else:
|
||||
status = response.return_parameters.status
|
||||
|
||||
if status != hci.HCI_SUCCESS:
|
||||
logger.warning(
|
||||
f'{command.name} failed ' f'({hci.HCI_Constant.error_name(status)})'
|
||||
)
|
||||
raise hci.HCI_Error(status)
|
||||
|
||||
return response
|
||||
|
||||
async def send_sync_command(
|
||||
self,
|
||||
command: hci.HCI_SyncCommand[_RP],
|
||||
check_status: bool = True,
|
||||
response_timeout: float | None = None,
|
||||
) -> _RP:
|
||||
response = await self._send_command(command, response_timeout)
|
||||
|
||||
# Check that the response is of the expected type
|
||||
assert isinstance(response, hci.HCI_Command_Complete_Event)
|
||||
return_parameters: _RP = response.return_parameters
|
||||
assert isinstance(return_parameters, command.return_parameters_class)
|
||||
|
||||
# Check the return parameters if required
|
||||
if check_status:
|
||||
if isinstance(return_parameters, hci.HCI_StatusReturnParameters):
|
||||
status = return_parameters.status
|
||||
if status != hci.HCI_SUCCESS:
|
||||
logger.warning(
|
||||
f'{command.name} failed '
|
||||
f'({hci.HCI_Constant.error_name(status)})'
|
||||
)
|
||||
raise hci.HCI_Error(status)
|
||||
|
||||
return return_parameters
|
||||
|
||||
async def send_async_command(
|
||||
self,
|
||||
command: hci.HCI_AsyncCommand,
|
||||
check_status: bool = True,
|
||||
response_timeout: float | None = None,
|
||||
) -> hci.HCI_ErrorCode:
|
||||
response = await self._send_command(command, response_timeout)
|
||||
|
||||
# Check that the response is of the expected type
|
||||
assert isinstance(response, hci.HCI_Command_Status_Event)
|
||||
|
||||
# Check the return parameters if required
|
||||
status = response.status
|
||||
if check_status:
|
||||
if status != hci.HCI_CommandStatus.PENDING:
|
||||
logger.warning(
|
||||
f'{command.name} failed ' f'({hci.HCI_Constant.error_name(status)})'
|
||||
)
|
||||
raise hci.HCI_Error(status)
|
||||
|
||||
return hci.HCI_ErrorCode(status)
|
||||
|
||||
@utils.deprecated("Use utils.AsyncRunner.spawn() instead.")
|
||||
def send_command_sync(self, command: hci.HCI_AsyncCommand) -> None:
|
||||
utils.AsyncRunner.spawn(self.send_async_command(command))
|
||||
|
||||
def send_acl_sdu(self, connection_handle: int, sdu: bytes) -> None:
|
||||
if not (connection := self.connections.get(connection_handle)):
|
||||
@@ -732,16 +795,6 @@ class Host(utils.EventEmitter):
|
||||
)
|
||||
packet_queue.enqueue(acl_packet, connection_handle)
|
||||
|
||||
def send_sco_sdu(self, connection_handle: int, sdu: bytes) -> None:
|
||||
self.send_hci_packet(
|
||||
hci.HCI_SynchronousDataPacket(
|
||||
connection_handle=connection_handle,
|
||||
packet_status=0,
|
||||
data_total_length=len(sdu),
|
||||
data=sdu,
|
||||
)
|
||||
)
|
||||
|
||||
def send_l2cap_pdu(self, connection_handle: int, cid: int, pdu: bytes) -> None:
|
||||
self.send_acl_sdu(connection_handle, bytes(L2CAP_PDU(cid, pdu)))
|
||||
|
||||
@@ -1348,15 +1401,17 @@ class Host(utils.EventEmitter):
|
||||
|
||||
# For now, just accept everything
|
||||
# TODO: delegate the decision
|
||||
self.send_command_sync(
|
||||
hci.HCI_LE_Remote_Connection_Parameter_Request_Reply_Command(
|
||||
connection_handle=event.connection_handle,
|
||||
interval_min=event.interval_min,
|
||||
interval_max=event.interval_max,
|
||||
max_latency=event.max_latency,
|
||||
timeout=event.timeout,
|
||||
min_ce_length=0,
|
||||
max_ce_length=0,
|
||||
utils.AsyncRunner.spawn(
|
||||
self.send_sync_command(
|
||||
hci.HCI_LE_Remote_Connection_Parameter_Request_Reply_Command(
|
||||
connection_handle=event.connection_handle,
|
||||
interval_min=event.interval_min,
|
||||
interval_max=event.interval_max,
|
||||
max_latency=event.max_latency,
|
||||
timeout=event.timeout,
|
||||
min_ce_length=0,
|
||||
max_ce_length=0,
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
@@ -1392,9 +1447,9 @@ class Host(utils.EventEmitter):
|
||||
connection_handle=event.connection_handle
|
||||
)
|
||||
|
||||
await self.send_command(response)
|
||||
await self.send_sync_command(response)
|
||||
|
||||
asyncio.create_task(send_long_term_key())
|
||||
utils.AsyncRunner.spawn(send_long_term_key())
|
||||
|
||||
def on_hci_synchronous_connection_complete_event(
|
||||
self, event: hci.HCI_Synchronous_Connection_Complete_Event
|
||||
@@ -1593,9 +1648,9 @@ class Host(utils.EventEmitter):
|
||||
bd_addr=event.bd_addr
|
||||
)
|
||||
|
||||
await self.send_command(response)
|
||||
await self.send_sync_command(response)
|
||||
|
||||
asyncio.create_task(send_link_key())
|
||||
utils.AsyncRunner.spawn(send_link_key())
|
||||
|
||||
def on_hci_io_capability_request_event(
|
||||
self, event: hci.HCI_IO_Capability_Request_Event
|
||||
|
||||
+1
-3
@@ -1647,9 +1647,7 @@ class LeCreditBasedChannel(utils.EventEmitter):
|
||||
self.connection_result = None
|
||||
self.disconnection_result = None
|
||||
self.drained = asyncio.Event()
|
||||
# Core Specification Vol 3, Part G, 5.3.1 ATT_MTU
|
||||
# ATT_MTU shall be set to the minimum of the MTU field values of the two devices.
|
||||
self.att_mtu = min(mtu, peer_mtu)
|
||||
self.att_mtu = 0 # Filled by GATT client or server later.
|
||||
|
||||
self.drained.set()
|
||||
|
||||
|
||||
@@ -278,7 +278,7 @@ class L2CAPService(L2CAPServicer):
|
||||
if not l2cap_channel:
|
||||
return SendResponse(error=COMMAND_NOT_UNDERSTOOD)
|
||||
if isinstance(l2cap_channel, ClassicChannel):
|
||||
l2cap_channel.write(request.data)
|
||||
l2cap_channel.send_pdu(request.data)
|
||||
else:
|
||||
l2cap_channel.write(request.data)
|
||||
return SendResponse(success=empty_pb2.Empty())
|
||||
|
||||
+1
-1
@@ -800,7 +800,7 @@ class Multiplexer(utils.EventEmitter):
|
||||
|
||||
def send_frame(self, frame: RFCOMM_Frame) -> None:
|
||||
logger.debug(f'>>> Multiplexer sending {frame}')
|
||||
self.l2cap_channel.write(bytes(frame))
|
||||
self.l2cap_channel.send_pdu(frame)
|
||||
|
||||
def on_pdu(self, pdu: bytes) -> None:
|
||||
frame = RFCOMM_Frame.from_bytes(pdu)
|
||||
|
||||
+2
-2
@@ -847,7 +847,7 @@ class Client:
|
||||
self.pending_request = request
|
||||
|
||||
try:
|
||||
self.channel.write(bytes(request))
|
||||
self.channel.send_pdu(bytes(request))
|
||||
return await self.pending_response
|
||||
finally:
|
||||
self.pending_request = None
|
||||
@@ -1061,7 +1061,7 @@ class Server:
|
||||
|
||||
def send_response(self, response):
|
||||
logger.debug(f'{color(">>> Sending SDP Response", "blue")}: {response}')
|
||||
self.channel.write(response)
|
||||
self.channel.send_pdu(response)
|
||||
|
||||
def match_services(self, search_pattern: DataElement) -> dict[int, Service]:
|
||||
# Find the services for which the attributes in the pattern is a subset of the
|
||||
|
||||
Vendored
+108
-86
@@ -43,44 +43,53 @@ hci.HCI_Command.register_commands(globals())
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@hci.HCI_Command.command
|
||||
@dataclasses.dataclass
|
||||
class HCI_LE_Get_Vendor_Capabilities_Command(hci.HCI_Command):
|
||||
class HCI_LE_Get_Vendor_Capabilities_ReturnParameters(hci.HCI_StatusReturnParameters):
|
||||
max_advt_instances: int = field(metadata=hci.metadata(1), default=0)
|
||||
offloaded_resolution_of_private_address: int = field(
|
||||
metadata=hci.metadata(1), default=0
|
||||
)
|
||||
total_scan_results_storage: int = field(metadata=hci.metadata(2), default=0)
|
||||
max_irk_list_sz: int = field(metadata=hci.metadata(1), default=0)
|
||||
filtering_support: int = field(metadata=hci.metadata(1), default=0)
|
||||
max_filter: int = field(metadata=hci.metadata(1), default=0)
|
||||
activity_energy_info_support: int = field(metadata=hci.metadata(1), default=0)
|
||||
version_supported: int = field(metadata=hci.metadata(2), default=0)
|
||||
total_num_of_advt_tracked: int = field(metadata=hci.metadata(2), default=0)
|
||||
extended_scan_support: int = field(metadata=hci.metadata(1), default=0)
|
||||
debug_logging_supported: int = field(metadata=hci.metadata(1), default=0)
|
||||
le_address_generation_offloading_support: int = field(
|
||||
metadata=hci.metadata(1), default=0
|
||||
)
|
||||
a2dp_source_offload_capability_mask: int = field(
|
||||
metadata=hci.metadata(4), default=0
|
||||
)
|
||||
bluetooth_quality_report_support: int = field(metadata=hci.metadata(1), default=0)
|
||||
dynamic_audio_buffer_support: int = field(metadata=hci.metadata(4), default=0)
|
||||
|
||||
|
||||
@hci.HCI_SyncCommand.sync_command(HCI_LE_Get_Vendor_Capabilities_ReturnParameters)
|
||||
@dataclasses.dataclass
|
||||
class HCI_LE_Get_Vendor_Capabilities_Command(
|
||||
hci.HCI_SyncCommand[HCI_LE_Get_Vendor_Capabilities_ReturnParameters]
|
||||
):
|
||||
# pylint: disable=line-too-long
|
||||
'''
|
||||
See https://source.android.com/docs/core/connect/bluetooth/hci_requirements#vendor-specific-capabilities
|
||||
'''
|
||||
|
||||
return_parameters_fields = [
|
||||
('status', hci.STATUS_SPEC),
|
||||
('max_advt_instances', 1),
|
||||
('offloaded_resolution_of_private_address', 1),
|
||||
('total_scan_results_storage', 2),
|
||||
('max_irk_list_sz', 1),
|
||||
('filtering_support', 1),
|
||||
('max_filter', 1),
|
||||
('activity_energy_info_support', 1),
|
||||
('version_supported', 2),
|
||||
('total_num_of_advt_tracked', 2),
|
||||
('extended_scan_support', 1),
|
||||
('debug_logging_supported', 1),
|
||||
('le_address_generation_offloading_support', 1),
|
||||
('a2dp_source_offload_capability_mask', 4),
|
||||
('bluetooth_quality_report_support', 1),
|
||||
('dynamic_audio_buffer_support', 4),
|
||||
]
|
||||
|
||||
@classmethod
|
||||
def parse_return_parameters(cls, parameters):
|
||||
# There are many versions of this data structure, so we need to parse until
|
||||
# there are no more bytes to parse, and leave un-signal parameters set to
|
||||
# None (older versions)
|
||||
nones = {field: None for field, _ in cls.return_parameters_fields}
|
||||
return_parameters = hci.HCI_Object(cls.return_parameters_fields, **nones)
|
||||
# there are no more bytes to parse, and leave un-signaled parameters set to
|
||||
# 0
|
||||
return_parameters = HCI_LE_Get_Vendor_Capabilities_ReturnParameters(
|
||||
hci.HCI_ErrorCode.SUCCESS
|
||||
)
|
||||
|
||||
try:
|
||||
offset = 0
|
||||
for field in cls.return_parameters_fields:
|
||||
for field in cls.return_parameters_class.fields:
|
||||
field_name, field_type = field
|
||||
field_value, field_size = hci.HCI_Object.parse_field(
|
||||
parameters, offset, field_type
|
||||
@@ -94,9 +103,30 @@ class HCI_LE_Get_Vendor_Capabilities_Command(hci.HCI_Command):
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@hci.HCI_Command.command
|
||||
# APCF Subcommands
|
||||
class LeApcfOpcode(hci.SpecableEnum):
|
||||
ENABLE = 0x00
|
||||
SET_FILTERING_PARAMETERS = 0x01
|
||||
BROADCASTER_ADDRESS = 0x02
|
||||
SERVICE_UUID = 0x03
|
||||
SERVICE_SOLICITATION_UUID = 0x04
|
||||
LOCAL_NAME = 0x05
|
||||
MANUFACTURER_DATA = 0x06
|
||||
SERVICE_DATA = 0x07
|
||||
TRANSPORT_DISCOVERY_SERVICE = 0x08
|
||||
AD_TYPE_FILTER = 0x09
|
||||
READ_EXTENDED_FEATURES = 0xFF
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
class HCI_LE_APCF_Command(hci.HCI_Command):
|
||||
class HCI_LE_APCF_ReturnParameters(hci.HCI_StatusReturnParameters):
|
||||
opcode: int = field(metadata=LeApcfOpcode.type_metadata(1))
|
||||
payload: bytes = field(metadata=hci.metadata('*'))
|
||||
|
||||
|
||||
@hci.HCI_SyncCommand.sync_command(HCI_LE_APCF_ReturnParameters)
|
||||
@dataclasses.dataclass
|
||||
class HCI_LE_APCF_Command(hci.HCI_SyncCommand[HCI_LE_APCF_ReturnParameters]):
|
||||
# pylint: disable=line-too-long
|
||||
'''
|
||||
See https://source.android.com/docs/core/connect/bluetooth/hci_requirements#le_apcf_command
|
||||
@@ -105,52 +135,52 @@ class HCI_LE_APCF_Command(hci.HCI_Command):
|
||||
implementation. A future enhancement may define subcommand-specific data structures.
|
||||
'''
|
||||
|
||||
# APCF Subcommands
|
||||
class Opcode(hci.SpecableEnum):
|
||||
ENABLE = 0x00
|
||||
SET_FILTERING_PARAMETERS = 0x01
|
||||
BROADCASTER_ADDRESS = 0x02
|
||||
SERVICE_UUID = 0x03
|
||||
SERVICE_SOLICITATION_UUID = 0x04
|
||||
LOCAL_NAME = 0x05
|
||||
MANUFACTURER_DATA = 0x06
|
||||
SERVICE_DATA = 0x07
|
||||
TRANSPORT_DISCOVERY_SERVICE = 0x08
|
||||
AD_TYPE_FILTER = 0x09
|
||||
READ_EXTENDED_FEATURES = 0xFF
|
||||
|
||||
opcode: int = dataclasses.field(metadata=Opcode.type_metadata(1))
|
||||
opcode: int = dataclasses.field(metadata=LeApcfOpcode.type_metadata(1))
|
||||
payload: bytes = dataclasses.field(metadata=hci.metadata("*"))
|
||||
|
||||
return_parameters_fields = [
|
||||
('status', hci.STATUS_SPEC),
|
||||
('opcode', Opcode.type_spec(1)),
|
||||
('payload', '*'),
|
||||
]
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@hci.HCI_Command.command
|
||||
@dataclasses.dataclass
|
||||
class HCI_Get_Controller_Activity_Energy_Info_Command(hci.HCI_Command):
|
||||
class HCI_Get_Controller_Activity_Energy_Info_ReturnParameters(
|
||||
hci.HCI_StatusReturnParameters
|
||||
):
|
||||
total_tx_time_ms: int = field(metadata=hci.metadata(4))
|
||||
total_rx_time_ms: int = field(metadata=hci.metadata(4))
|
||||
total_idle_time_ms: int = field(metadata=hci.metadata(4))
|
||||
total_energy_used: int = field(metadata=hci.metadata(4))
|
||||
|
||||
|
||||
@hci.HCI_SyncCommand.sync_command(
|
||||
HCI_Get_Controller_Activity_Energy_Info_ReturnParameters
|
||||
)
|
||||
@dataclasses.dataclass
|
||||
class HCI_Get_Controller_Activity_Energy_Info_Command(
|
||||
hci.HCI_SyncCommand[HCI_Get_Controller_Activity_Energy_Info_ReturnParameters]
|
||||
):
|
||||
# pylint: disable=line-too-long
|
||||
'''
|
||||
See https://source.android.com/docs/core/connect/bluetooth/hci_requirements#le_get_controller_activity_energy_info
|
||||
'''
|
||||
|
||||
return_parameters_fields = [
|
||||
('status', hci.STATUS_SPEC),
|
||||
('total_tx_time_ms', 4),
|
||||
('total_rx_time_ms', 4),
|
||||
('total_idle_time_ms', 4),
|
||||
('total_energy_used', 4),
|
||||
]
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@hci.HCI_Command.command
|
||||
# A2DP Hardware Offload Subcommands
|
||||
class A2dpHardwareOffloadOpcode(hci.SpecableEnum):
|
||||
START_A2DP_OFFLOAD = 0x01
|
||||
STOP_A2DP_OFFLOAD = 0x02
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
class HCI_A2DP_Hardware_Offload_Command(hci.HCI_Command):
|
||||
class HCI_A2DP_Hardware_Offload_ReturnParameters(hci.HCI_StatusReturnParameters):
|
||||
opcode: int = dataclasses.field(metadata=A2dpHardwareOffloadOpcode.type_metadata(1))
|
||||
payload: bytes = dataclasses.field(metadata=hci.metadata("*"))
|
||||
|
||||
|
||||
@hci.HCI_SyncCommand.sync_command(HCI_A2DP_Hardware_Offload_ReturnParameters)
|
||||
@dataclasses.dataclass
|
||||
class HCI_A2DP_Hardware_Offload_Command(
|
||||
hci.HCI_SyncCommand[HCI_A2DP_Hardware_Offload_ReturnParameters]
|
||||
):
|
||||
# pylint: disable=line-too-long
|
||||
'''
|
||||
See https://source.android.com/docs/core/connect/bluetooth/hci_requirements#a2dp-hardware-offload-support
|
||||
@@ -159,25 +189,27 @@ class HCI_A2DP_Hardware_Offload_Command(hci.HCI_Command):
|
||||
implementation. A future enhancement may define subcommand-specific data structures.
|
||||
'''
|
||||
|
||||
# A2DP Hardware Offload Subcommands
|
||||
class Opcode(hci.SpecableEnum):
|
||||
START_A2DP_OFFLOAD = 0x01
|
||||
STOP_A2DP_OFFLOAD = 0x02
|
||||
|
||||
opcode: int = dataclasses.field(metadata=Opcode.type_metadata(1))
|
||||
opcode: int = dataclasses.field(metadata=A2dpHardwareOffloadOpcode.type_metadata(1))
|
||||
payload: bytes = dataclasses.field(metadata=hci.metadata("*"))
|
||||
|
||||
return_parameters_fields = [
|
||||
('status', hci.STATUS_SPEC),
|
||||
('opcode', Opcode.type_spec(1)),
|
||||
('payload', '*'),
|
||||
]
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@hci.HCI_Command.command
|
||||
# Dynamic Audio Buffer Subcommands
|
||||
class DynamicAudioBufferOpcode(hci.SpecableEnum):
|
||||
GET_AUDIO_BUFFER_TIME_CAPABILITY = 0x01
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
class HCI_Dynamic_Audio_Buffer_Command(hci.HCI_Command):
|
||||
class HCI_Dynamic_Audio_Buffer_ReturnParameters(hci.HCI_StatusReturnParameters):
|
||||
opcode: int = dataclasses.field(metadata=DynamicAudioBufferOpcode.type_metadata(1))
|
||||
payload: bytes = dataclasses.field(metadata=hci.metadata("*"))
|
||||
|
||||
|
||||
@hci.HCI_SyncCommand.sync_command(HCI_Dynamic_Audio_Buffer_ReturnParameters)
|
||||
@dataclasses.dataclass
|
||||
class HCI_Dynamic_Audio_Buffer_Command(
|
||||
hci.HCI_SyncCommand[HCI_Dynamic_Audio_Buffer_ReturnParameters]
|
||||
):
|
||||
# pylint: disable=line-too-long
|
||||
'''
|
||||
See https://source.android.com/docs/core/connect/bluetooth/hci_requirements#dynamic-audio-buffer-command
|
||||
@@ -186,19 +218,9 @@ class HCI_Dynamic_Audio_Buffer_Command(hci.HCI_Command):
|
||||
implementation. A future enhancement may define subcommand-specific data structures.
|
||||
'''
|
||||
|
||||
# Dynamic Audio Buffer Subcommands
|
||||
class Opcode(hci.SpecableEnum):
|
||||
GET_AUDIO_BUFFER_TIME_CAPABILITY = 0x01
|
||||
|
||||
opcode: int = dataclasses.field(metadata=Opcode.type_metadata(1))
|
||||
opcode: int = dataclasses.field(metadata=DynamicAudioBufferOpcode.type_metadata(1))
|
||||
payload: bytes = dataclasses.field(metadata=hci.metadata("*"))
|
||||
|
||||
return_parameters_fields = [
|
||||
('status', hci.STATUS_SPEC),
|
||||
('opcode', Opcode.type_spec(1)),
|
||||
('payload', '*'),
|
||||
]
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
class HCI_Android_Vendor_Event(hci.HCI_Extended_Event):
|
||||
|
||||
Vendored
+24
-18
@@ -46,9 +46,19 @@ class TX_Power_Level_Command:
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@hci.HCI_Command.command
|
||||
@dataclasses.dataclass
|
||||
class HCI_Write_Tx_Power_Level_Command(hci.HCI_Command, TX_Power_Level_Command):
|
||||
class HCI_Write_Tx_Power_Level_ReturnParameters(hci.HCI_StatusReturnParameters):
|
||||
handle_type: int = hci.field(metadata=hci.metadata(1))
|
||||
connection_handle: int = hci.field(metadata=hci.metadata(2))
|
||||
selected_tx_power_level: int = hci.field(metadata=hci.metadata(-1))
|
||||
|
||||
|
||||
@hci.HCI_SyncCommand.sync_command(HCI_Write_Tx_Power_Level_ReturnParameters)
|
||||
@dataclasses.dataclass
|
||||
class HCI_Write_Tx_Power_Level_Command(
|
||||
hci.HCI_SyncCommand[HCI_Write_Tx_Power_Level_ReturnParameters],
|
||||
TX_Power_Level_Command,
|
||||
):
|
||||
'''
|
||||
Write TX power level. See BT_HCI_OP_VS_WRITE_TX_POWER_LEVEL in
|
||||
https://github.com/zephyrproject-rtos/zephyr/blob/main/include/zephyr/bluetooth/hci_vs.h
|
||||
@@ -61,18 +71,21 @@ class HCI_Write_Tx_Power_Level_Command(hci.HCI_Command, TX_Power_Level_Command):
|
||||
connection_handle: int = dataclasses.field(metadata=hci.metadata(2))
|
||||
tx_power_level: int = dataclasses.field(metadata=hci.metadata(-1))
|
||||
|
||||
return_parameters_fields = [
|
||||
('status', hci.STATUS_SPEC),
|
||||
('handle_type', 1),
|
||||
('connection_handle', 2),
|
||||
('selected_tx_power_level', -1),
|
||||
]
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@hci.HCI_Command.command
|
||||
@dataclasses.dataclass
|
||||
class HCI_Read_Tx_Power_Level_Command(hci.HCI_Command, TX_Power_Level_Command):
|
||||
class HCI_Read_Tx_Power_Level_ReturnParameters(hci.HCI_StatusReturnParameters):
|
||||
handle_type: int = hci.field(metadata=hci.metadata(1))
|
||||
connection_handle: int = hci.field(metadata=hci.metadata(2))
|
||||
tx_power_level: int = hci.field(metadata=hci.metadata(-1))
|
||||
|
||||
|
||||
@hci.HCI_SyncCommand.sync_command(HCI_Read_Tx_Power_Level_ReturnParameters)
|
||||
@dataclasses.dataclass
|
||||
class HCI_Read_Tx_Power_Level_Command(
|
||||
hci.HCI_SyncCommand[HCI_Read_Tx_Power_Level_ReturnParameters],
|
||||
TX_Power_Level_Command,
|
||||
):
|
||||
'''
|
||||
Read TX power level. See BT_HCI_OP_VS_READ_TX_POWER_LEVEL in
|
||||
https://github.com/zephyrproject-rtos/zephyr/blob/main/include/zephyr/bluetooth/hci_vs.h
|
||||
@@ -83,10 +96,3 @@ class HCI_Read_Tx_Power_Level_Command(hci.HCI_Command, TX_Power_Level_Command):
|
||||
|
||||
handle_type: int = dataclasses.field(metadata=hci.metadata(1))
|
||||
connection_handle: int = dataclasses.field(metadata=hci.metadata(2))
|
||||
|
||||
return_parameters_fields = [
|
||||
('status', hci.STATUS_SPEC),
|
||||
('handle_type', 1),
|
||||
('connection_handle', 2),
|
||||
('tx_power_level', -1),
|
||||
]
|
||||
|
||||
@@ -37,15 +37,16 @@ The vendor specific HCI commands to read and write TX power are defined in
|
||||
from bumble.vendor.zephyr.hci import HCI_Write_Tx_Power_Level_Command
|
||||
|
||||
# set advertising power to -4 dB
|
||||
response = await host.send_command(
|
||||
response = await host.send_sync_command(
|
||||
HCI_Write_Tx_Power_Level_Command(
|
||||
handle_type=HCI_Write_Tx_Power_Level_Command.TX_POWER_HANDLE_TYPE_ADV,
|
||||
connection_handle=0,
|
||||
tx_power_level=-4,
|
||||
)
|
||||
),
|
||||
check_status=False
|
||||
)
|
||||
|
||||
if response.return_parameters.status == HCI_SUCCESS:
|
||||
print(f"TX power set to {response.return_parameters.selected_tx_power_level}")
|
||||
if response.status == HCI_SUCCESS:
|
||||
print(f"TX power set to {response.selected_tx_power_level}")
|
||||
|
||||
```
|
||||
|
||||
@@ -100,9 +100,13 @@ def on_sco_packet(packet: hci.HCI_SynchronousDataPacket):
|
||||
if source_file and (pcm_data := source_file.read(packet.data_total_length)):
|
||||
assert ag_protocol
|
||||
host = ag_protocol.dlc.multiplexer.l2cap_channel.connection.device.host
|
||||
host.send_sco_sdu(
|
||||
connection_handle=packet.connection_handle,
|
||||
sdu=pcm_data,
|
||||
host.send_hci_packet(
|
||||
hci.HCI_SynchronousDataPacket(
|
||||
connection_handle=packet.connection_handle,
|
||||
packet_status=0,
|
||||
data_total_length=len(pcm_data),
|
||||
data=pcm_data,
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -42,7 +42,6 @@ from bumble.hci import (
|
||||
HCI_CREATE_CONNECTION_COMMAND,
|
||||
HCI_SUCCESS,
|
||||
Address,
|
||||
HCI_Command_Complete_Event,
|
||||
HCI_Command_Status_Event,
|
||||
HCI_Connection_Complete_Event,
|
||||
HCI_Connection_Request_Event,
|
||||
@@ -154,10 +153,10 @@ async def test_device_connect_parallel():
|
||||
assert packet.name == 'HCI_ACCEPT_CONNECTION_REQUEST_COMMAND'
|
||||
|
||||
d1.host.on_hci_packet(
|
||||
HCI_Command_Complete_Event(
|
||||
HCI_Command_Status_Event(
|
||||
status=HCI_COMMAND_STATUS_PENDING,
|
||||
num_hci_command_packets=1,
|
||||
command_opcode=HCI_ACCEPT_CONNECTION_REQUEST_COMMAND,
|
||||
return_parameters=b"\x00",
|
||||
)
|
||||
)
|
||||
|
||||
@@ -188,10 +187,10 @@ async def test_device_connect_parallel():
|
||||
assert packet.name == 'HCI_ACCEPT_CONNECTION_REQUEST_COMMAND'
|
||||
|
||||
d2.host.on_hci_packet(
|
||||
HCI_Command_Complete_Event(
|
||||
HCI_Command_Status_Event(
|
||||
status=HCI_COMMAND_STATUS_PENDING,
|
||||
num_hci_command_packets=1,
|
||||
command_opcode=HCI_ACCEPT_CONNECTION_REQUEST_COMMAND,
|
||||
return_parameters=b"\x00",
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
+1
-99
@@ -28,7 +28,7 @@ from unittest.mock import ANY, AsyncMock, Mock
|
||||
import pytest
|
||||
from typing_extensions import Self
|
||||
|
||||
from bumble import att, gatt_client, l2cap
|
||||
from bumble import gatt_client, l2cap
|
||||
from bumble.att import (
|
||||
ATT_ATTRIBUTE_NOT_FOUND_ERROR,
|
||||
ATT_PDU,
|
||||
@@ -1638,104 +1638,6 @@ async def test_eatt_connection_failure():
|
||||
await gatt_client.Client.connect_eatt(devices.connections[0])
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@pytest.mark.asyncio
|
||||
async def test_read_multiple() -> None:
|
||||
devices = await TwoDevices.create_with_connection()
|
||||
|
||||
characteristic1 = Characteristic(
|
||||
'0001', Characteristic.Properties.READ, Characteristic.READABLE, b'1234'
|
||||
)
|
||||
|
||||
characteristic2 = Characteristic(
|
||||
'0002',
|
||||
Characteristic.Properties.READ,
|
||||
Characteristic.READABLE,
|
||||
b'5678',
|
||||
)
|
||||
|
||||
service = Service('0000', [characteristic1, characteristic2])
|
||||
devices[1].add_service(service)
|
||||
|
||||
client = devices.connections[0].gatt_client
|
||||
server = devices[1].gatt_server
|
||||
|
||||
await client.discover_services()
|
||||
characteristics = await client.discover_characteristics(
|
||||
[characteristic1.uuid, characteristic2.uuid], None
|
||||
)
|
||||
response = await client.send_request(
|
||||
att.ATT_Read_Multiple_Request(
|
||||
set_of_handles=[c.handle for c in characteristics]
|
||||
)
|
||||
)
|
||||
assert isinstance(response, att.ATT_Read_Multiple_Response)
|
||||
assert response.set_of_values == b'12345678'
|
||||
|
||||
response = await client.send_request(
|
||||
att.ATT_Read_Multiple_Request(
|
||||
set_of_handles=[
|
||||
next(
|
||||
handle
|
||||
for handle in range(0x0001, 0xFFFF)
|
||||
if not server.get_attribute(handle)
|
||||
)
|
||||
]
|
||||
)
|
||||
)
|
||||
assert isinstance(response, att.ATT_Error_Response)
|
||||
assert response.error_code == att.ATT_ATTRIBUTE_NOT_FOUND_ERROR
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@pytest.mark.asyncio
|
||||
async def test_read_multiple_variable() -> None:
|
||||
devices = await TwoDevices.create_with_connection()
|
||||
|
||||
characteristic1 = Characteristic(
|
||||
'0001', Characteristic.Properties.READ, Characteristic.READABLE, b'1234'
|
||||
)
|
||||
|
||||
characteristic2 = Characteristic(
|
||||
'0002',
|
||||
Characteristic.Properties.READ,
|
||||
Characteristic.READABLE,
|
||||
b'99',
|
||||
)
|
||||
|
||||
service = Service('0000', [characteristic1, characteristic2])
|
||||
devices[1].add_service(service)
|
||||
|
||||
client = devices.connections[0].gatt_client
|
||||
server = devices[1].gatt_server
|
||||
|
||||
await client.discover_services()
|
||||
characteristics = await client.discover_characteristics(
|
||||
[characteristic1.uuid, characteristic2.uuid], None
|
||||
)
|
||||
response = await client.send_request(
|
||||
att.ATT_Read_Multiple_Variable_Request(
|
||||
set_of_handles=[c.handle for c in characteristics]
|
||||
)
|
||||
)
|
||||
assert isinstance(response, att.ATT_Read_Multiple_Variable_Response)
|
||||
assert response.length_value_tuple_list == [(4, b'1234'), (2, b'99')]
|
||||
|
||||
response = await client.send_request(
|
||||
att.ATT_Read_Multiple_Variable_Request(
|
||||
set_of_handles=[
|
||||
next(
|
||||
handle
|
||||
for handle in range(0x0001, 0xFFFF)
|
||||
if not server.get_attribute(handle)
|
||||
)
|
||||
]
|
||||
)
|
||||
)
|
||||
assert isinstance(response, att.ATT_Error_Response)
|
||||
assert response.error_code == att.ATT_ATTRIBUTE_NOT_FOUND_ERROR
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
if __name__ == '__main__':
|
||||
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
|
||||
|
||||
+48
-45
@@ -20,7 +20,7 @@ import struct
|
||||
|
||||
import pytest
|
||||
|
||||
from bumble import hci
|
||||
from bumble import hci, utils
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# pylint: disable=invalid-name
|
||||
@@ -136,43 +136,25 @@ def test_HCI_LE_Channel_Selection_Algorithm_Event():
|
||||
# -----------------------------------------------------------------------------
|
||||
def test_HCI_Command_Complete_Event():
|
||||
# With a serializable object
|
||||
event = hci.HCI_Command_Complete_Event(
|
||||
event1 = hci.HCI_Command_Complete_Event(
|
||||
num_hci_command_packets=34,
|
||||
command_opcode=hci.HCI_LE_READ_BUFFER_SIZE_COMMAND,
|
||||
return_parameters=hci.HCI_LE_Read_Buffer_Size_Command.create_return_parameters(
|
||||
return_parameters=hci.HCI_LE_Read_Buffer_Size_Command.return_parameters_class(
|
||||
status=0,
|
||||
le_acl_data_packet_length=1234,
|
||||
total_num_le_acl_data_packets=56,
|
||||
),
|
||||
)
|
||||
basic_check(event)
|
||||
|
||||
# With an arbitrary byte array
|
||||
event = hci.HCI_Command_Complete_Event(
|
||||
num_hci_command_packets=1,
|
||||
command_opcode=hci.HCI_RESET_COMMAND,
|
||||
return_parameters=bytes([1, 2, 3, 4]),
|
||||
)
|
||||
basic_check(event)
|
||||
|
||||
# With a simple status as a 1-byte array
|
||||
event = hci.HCI_Command_Complete_Event(
|
||||
num_hci_command_packets=1,
|
||||
command_opcode=hci.HCI_RESET_COMMAND,
|
||||
return_parameters=bytes([7]),
|
||||
)
|
||||
basic_check(event)
|
||||
event = hci.HCI_Packet.from_bytes(bytes(event))
|
||||
assert event.return_parameters == 7
|
||||
basic_check(event1)
|
||||
|
||||
# With a simple status as an integer status
|
||||
event = hci.HCI_Command_Complete_Event(
|
||||
event3 = hci.HCI_Command_Complete_Event(
|
||||
num_hci_command_packets=1,
|
||||
command_opcode=hci.HCI_RESET_COMMAND,
|
||||
return_parameters=9,
|
||||
return_parameters=hci.HCI_StatusReturnParameters(hci.HCI_ErrorCode(9)),
|
||||
)
|
||||
basic_check(event)
|
||||
assert event.return_parameters == 9
|
||||
basic_check(event3)
|
||||
assert event3.return_parameters.status == 9
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@@ -229,6 +211,28 @@ def test_HCI_Vendor_Event():
|
||||
assert isinstance(parsed, hci.HCI_Vendor_Event)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
def test_return_parameters() -> None:
|
||||
params = hci.HCI_Reset_Command.parse_return_parameters(bytes.fromhex('3C'))
|
||||
assert params.status == hci.HCI_ErrorCode.ADVERTISING_TIMEOUT_ERROR
|
||||
assert isinstance(params.status, utils.OpenIntEnum)
|
||||
|
||||
params = hci.HCI_Read_BD_ADDR_Command.parse_return_parameters(
|
||||
bytes.fromhex('3C001122334455')
|
||||
)
|
||||
assert params.status == hci.HCI_ErrorCode.ADVERTISING_TIMEOUT_ERROR
|
||||
assert isinstance(params.status, utils.OpenIntEnum)
|
||||
assert isinstance(params.bd_addr, hci.Address)
|
||||
|
||||
params = hci.HCI_Read_Local_Name_Command.parse_return_parameters(
|
||||
bytes.fromhex('0068656c6c6f') + bytes(248 - 5)
|
||||
)
|
||||
assert params.status == hci.HCI_ErrorCode.SUCCESS
|
||||
assert isinstance(params.local_name, bytes)
|
||||
assert len(params.local_name) == 248
|
||||
assert hci.map_null_terminated_utf8_string(params.local_name) == 'hello'
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
def test_HCI_Command():
|
||||
command = hci.HCI_Command(op_code=0x5566)
|
||||
@@ -291,7 +295,7 @@ def test_custom_le_meta_event():
|
||||
for clazz in inspect.getmembers(hci)
|
||||
if isinstance(clazz[1], type)
|
||||
and issubclass(clazz[1], hci.HCI_Command)
|
||||
and clazz[1] is not hci.HCI_Command
|
||||
and clazz[1] not in (hci.HCI_Command, hci.HCI_SyncCommand, hci.HCI_AsyncCommand)
|
||||
],
|
||||
)
|
||||
def test_hci_command_subclasses_op_code(clazz: type[hci.HCI_Command]):
|
||||
@@ -620,21 +624,19 @@ def test_HCI_Read_Local_Supported_Codecs_Command_Complete():
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
def test_HCI_Read_Local_Supported_Codecs_V2_Command_Complete():
|
||||
returned_parameters = (
|
||||
hci.HCI_Read_Local_Supported_Codecs_V2_Command.parse_return_parameters(
|
||||
bytes(
|
||||
[
|
||||
hci.HCI_SUCCESS,
|
||||
3,
|
||||
hci.CodecID.A_LOG,
|
||||
hci.HCI_Read_Local_Supported_Codecs_V2_Command.Transport.BR_EDR_ACL,
|
||||
hci.CodecID.CVSD,
|
||||
hci.HCI_Read_Local_Supported_Codecs_V2_Command.Transport.BR_EDR_SCO,
|
||||
hci.CodecID.LINEAR_PCM,
|
||||
hci.HCI_Read_Local_Supported_Codecs_V2_Command.Transport.LE_CIS,
|
||||
0,
|
||||
]
|
||||
)
|
||||
returned_parameters = hci.HCI_Read_Local_Supported_Codecs_V2_Command.parse_return_parameters(
|
||||
bytes(
|
||||
[
|
||||
hci.HCI_SUCCESS,
|
||||
3,
|
||||
hci.CodecID.A_LOG,
|
||||
hci.HCI_Read_Local_Supported_Codecs_V2_ReturnParameters.Transport.BR_EDR_ACL,
|
||||
hci.CodecID.CVSD,
|
||||
hci.HCI_Read_Local_Supported_Codecs_V2_ReturnParameters.Transport.BR_EDR_SCO,
|
||||
hci.CodecID.LINEAR_PCM,
|
||||
hci.HCI_Read_Local_Supported_Codecs_V2_ReturnParameters.Transport.LE_CIS,
|
||||
0,
|
||||
]
|
||||
)
|
||||
)
|
||||
assert returned_parameters.standard_codec_ids == [
|
||||
@@ -643,9 +645,9 @@ def test_HCI_Read_Local_Supported_Codecs_V2_Command_Complete():
|
||||
hci.CodecID.LINEAR_PCM,
|
||||
]
|
||||
assert returned_parameters.standard_codec_transports == [
|
||||
hci.HCI_Read_Local_Supported_Codecs_V2_Command.Transport.BR_EDR_ACL,
|
||||
hci.HCI_Read_Local_Supported_Codecs_V2_Command.Transport.BR_EDR_SCO,
|
||||
hci.HCI_Read_Local_Supported_Codecs_V2_Command.Transport.LE_CIS,
|
||||
hci.HCI_Read_Local_Supported_Codecs_V2_ReturnParameters.Transport.BR_EDR_ACL,
|
||||
hci.HCI_Read_Local_Supported_Codecs_V2_ReturnParameters.Transport.BR_EDR_SCO,
|
||||
hci.HCI_Read_Local_Supported_Codecs_V2_ReturnParameters.Transport.LE_CIS,
|
||||
]
|
||||
|
||||
|
||||
@@ -737,6 +739,7 @@ def run_test_commands():
|
||||
if __name__ == '__main__':
|
||||
run_test_events()
|
||||
run_test_commands()
|
||||
test_return_parameters()
|
||||
test_address()
|
||||
test_custom()
|
||||
test_iso_data_packet()
|
||||
|
||||
+66
-2
@@ -15,6 +15,7 @@
|
||||
# -----------------------------------------------------------------------------
|
||||
# Imports
|
||||
# -----------------------------------------------------------------------------
|
||||
import asyncio
|
||||
import logging
|
||||
import unittest
|
||||
import unittest.mock
|
||||
@@ -22,9 +23,17 @@ import unittest.mock
|
||||
import pytest
|
||||
|
||||
from bumble.controller import Controller
|
||||
from bumble.hci import HCI_AclDataPacket
|
||||
from bumble.hci import (
|
||||
HCI_AclDataPacket,
|
||||
HCI_Command_Complete_Event,
|
||||
HCI_Error,
|
||||
HCI_ErrorCode,
|
||||
HCI_Event,
|
||||
HCI_Reset_Command,
|
||||
HCI_StatusReturnParameters,
|
||||
)
|
||||
from bumble.host import DataPacketQueue, Host
|
||||
from bumble.transport.common import AsyncPipeSink
|
||||
from bumble.transport.common import AsyncPipeSink, TransportSink
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Logging
|
||||
@@ -151,3 +160,58 @@ def test_data_packet_queue():
|
||||
assert drain_listener.on_flow.call_count == 1
|
||||
assert queue.queued == 15
|
||||
assert queue.completed == 15
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
class Source:
|
||||
terminated: asyncio.Future[None]
|
||||
sink: TransportSink
|
||||
|
||||
def set_packet_sink(self, sink: TransportSink) -> None:
|
||||
self.sink = sink
|
||||
|
||||
|
||||
class Sink:
|
||||
response: HCI_Event
|
||||
|
||||
def __init__(self, source: Source, response: HCI_Event) -> None:
|
||||
self.source = source
|
||||
self.response = response
|
||||
|
||||
def on_packet(self, packet: bytes) -> None:
|
||||
self.source.sink.on_packet(bytes(self.response))
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_send_sync_command() -> None:
|
||||
source = Source()
|
||||
sink = Sink(
|
||||
source,
|
||||
HCI_Command_Complete_Event(
|
||||
1,
|
||||
HCI_Reset_Command.op_code,
|
||||
HCI_StatusReturnParameters(status=HCI_ErrorCode.SUCCESS),
|
||||
),
|
||||
)
|
||||
|
||||
host = Host(source, sink)
|
||||
|
||||
# Sync command with success
|
||||
response1 = await host.send_sync_command(HCI_Reset_Command())
|
||||
assert response1.status == HCI_ErrorCode.SUCCESS
|
||||
|
||||
# Sync command with error status should raise
|
||||
error_response = HCI_Command_Complete_Event(
|
||||
1,
|
||||
HCI_Reset_Command.op_code,
|
||||
HCI_StatusReturnParameters(status=HCI_ErrorCode.COMMAND_DISALLOWED_ERROR),
|
||||
)
|
||||
sink.response = error_response
|
||||
with pytest.raises(HCI_Error) as excinfo:
|
||||
await host.send_sync_command(HCI_Reset_Command())
|
||||
|
||||
assert excinfo.value.error_code == error_response.return_parameters.status
|
||||
|
||||
# Sync command with error status should not raise when `check_status` is False
|
||||
response2 = await host.send_sync_command(HCI_Reset_Command(), check_status=False)
|
||||
assert response2.status == HCI_ErrorCode.COMMAND_DISALLOWED_ERROR
|
||||
|
||||
@@ -89,7 +89,7 @@ class HeartRateMonitor:
|
||||
|
||||
async def stop(self):
|
||||
# TODO: replace this once a proper reset is implemented in the lib.
|
||||
await self.device.host.send_command(HCI_Reset_Command())
|
||||
await self.device.host.send_sync_command(HCI_Reset_Command())
|
||||
await self.device.power_off()
|
||||
print('### Monitor stopped')
|
||||
|
||||
|
||||
@@ -60,7 +60,7 @@ class Scanner(utils.EventEmitter):
|
||||
|
||||
async def stop(self):
|
||||
# TODO: replace this once a proper reset is implemented in the lib.
|
||||
await self.device.host.send_command(HCI_Reset_Command())
|
||||
await self.device.host.send_sync_command(HCI_Reset_Command())
|
||||
await self.device.power_off()
|
||||
print('### Scanner stopped')
|
||||
|
||||
|
||||
@@ -311,7 +311,7 @@ class Speaker:
|
||||
|
||||
async def stop(self):
|
||||
# TODO: replace this once a proper reset is implemented in the lib.
|
||||
await self.device.host.send_command(HCI_Reset_Command())
|
||||
await self.device.host.send_sync_command(HCI_Reset_Command())
|
||||
await self.device.power_off()
|
||||
print('Speaker stopped')
|
||||
|
||||
|
||||
Reference in New Issue
Block a user