Compare commits

..

46 Commits

Author SHA1 Message Date
Gilles Boccon-Gibod 81985c47a9 remove superfluous statement 2026-02-02 11:12:28 -08:00
Gilles Boccon-Gibod 2a59e19283 fix comment 2026-01-29 19:09:46 -08:00
Gilles Boccon-Gibod f44d013690 make bridge more robust 2026-01-27 09:47:52 -08:00
Gilles Boccon-Gibod e63dc15ede fix handling of return parameters 2026-01-27 09:39:22 -08:00
Gilles Boccon-Gibod c901e15666 fix a few HCI types and make the bridge more robust 2026-01-25 13:47:14 -08:00
Gilles Boccon-Gibod 022323b19c Merge pull request #871 from google/gbg/sci
add basic support for SCI
2026-01-24 10:39:11 -08:00
Gilles Boccon-Gibod a0d24e95e7 fix spacing_type 2026-01-24 10:15:32 -08:00
Josh Wu 7efbd303e0 Merge pull request #876 from ttdennis/await_termination_fix
Update apps and examples to await .terminated instead of wait_for_termination()
2026-01-24 11:44:19 +08:00
Dennis Heinze 49530d8d6d Update apps and examples to await .terminated 2026-01-24 00:20:55 +01:00
Josh Wu 3f9ef5aac2 Merge pull request #873 from zxzxwu/l2cap
L2CAP: Fix wrong CID on reject
2026-01-23 12:44:59 +08:00
Josh Wu e488ea9783 Merge pull request #872 from zxzxwu/avrcp
AVRCP: Fix wrong field specs
2026-01-23 12:36:14 +08:00
Josh Wu 21d937c2f1 Merge pull request #865 from willnix/pcapsnoop
Added a PcapSnooper class
2026-01-23 12:33:15 +08:00
Frieder Steinmetz a8396e6cce Formatted with black again. 2026-01-22 17:49:58 +01:00
Josh Wu 7e1b1c8f78 L2CAP: Fix wrong CID on reject 2026-01-22 23:16:25 +08:00
Josh Wu 55719bf6de AVRCP: Fix wrong field specs 2026-01-22 22:18:58 +08:00
Frieder Steinmetz 5059920696 Please mypy.\n\nTwo calls to open(), some more annotations and a rescoped global were needed. 2026-01-22 10:40:08 +01:00
Gilles Boccon-Gibod c577f17c99 add basic support for SCI 2026-01-20 15:32:55 -08:00
Gilles Boccon-Gibod 252f3e49b6 Merge pull request #870 from antipatico/feat_AV53C1 2026-01-20 10:46:52 -08:00
Jacopo Scannella f3ecf04479 Added support for STA-AV53C1-USB-BLUETOOTH StarTech(dot)com dongle - RTL8761BUE 2026-01-20 09:32:51 +01:00
Gilles Boccon-Gibod 4986f55043 Merge pull request #869 from timrid/android-fix
Make bumble work on Android using briefcase/chaquopy
2026-01-19 09:50:08 -08:00
Gilles Boccon-Gibod 7e89c8a7f8 Merge pull request #868 from google/gbg/return-parameters
typing support for HCI commands return parameters
2026-01-19 09:49:15 -08:00
timrid 085905a7bf Make bumble work on Android using briefcase that is using chaquopy under the hood. 2026-01-18 23:32:37 +01:00
zxzxwu c619f1f21b Merge pull request #867 from zxzxwu/fix-import-error
Fix missing ClassVar import
2026-01-16 15:33:07 +08:00
Josh Wu d4b0da9265 Fix missing ClassVar import 2026-01-16 15:21:26 +08:00
zxzxwu f1058e4d4e Merge pull request #859 from istemon/att-read-by-type-request-fix
Return 'invalid handle' for malformed read by type request
2026-01-16 15:09:20 +08:00
zxzxwu 454d477d7e Merge pull request #864 from zxzxwu/hci-packets-typing
Add HCI Packets annotations and send_sco_sdu
2026-01-16 15:08:42 +08:00
zxzxwu 6966228d74 Merge pull request #863 from zxzxwu/eatt-mtu
Correct ATT_MTU in enhanced bearers
2026-01-16 15:08:12 +08:00
zxzxwu f4271a5646 Merge pull request #862 from zxzxwu/gatt-multiple
GATT: Support Multiple Requests
2026-01-16 15:08:02 +08:00
zxzxwu 534209f0af Merge pull request #861 from zxzxwu/l2cap
Replace send_pdu() with write()
2026-01-16 15:07:54 +08:00
zxzxwu 549b82999a Merge pull request #860 from zxzxwu/address
Improve Address type annotations
2026-01-16 14:04:56 +08:00
zxzxwu 551f577b2a Merge pull request #866 from zxzxwu/template-service
Fix GATT TemplateSerivce annotations
2026-01-16 09:41:48 +08:00
Frieder Steinmetz c69c1532cc Fix comments that were messed up by black 2026-01-15 19:06:03 +01:00
Frieder Steinmetz f95b2054c8 Formatted with 2026-01-15 10:50:33 +01:00
Josh Wu 84a6453dda Fix GATT TemplateSerivce annotations 2026-01-15 12:06:05 +08:00
Frieder Steinmetz 3fdd7ee45e Added the PcapSnooper class.
The class implements a bumble snooper that writes PCAP records.
It can write to either a file or a named pipe.
The latter is useful to bridge with wireshark extcap for live logging.
2026-01-14 23:40:59 +01:00
Gilles Boccon-Gibod 591ed61686 Merge pull request #858 from klow68/feat/add-usb-probe-filtering 2026-01-13 08:54:55 -08:00
Josh Wu 3d3acbb374 Add HCI Packets annotations and send_sco_sdu 2026-01-13 17:58:37 +08:00
Stryxion 671f306a27 fix: black 2026-01-13 09:42:40 +01:00
Josh Wu f7364db992 Correct ATT_MTU in enhanced bearers 2026-01-12 21:03:14 +08:00
Josh Wu 0fb2b3bd66 GATT: Support Multiple Requests 2026-01-12 20:51:38 +08:00
Stryxion 9e270d4d62 fix: mypy 2026-01-12 09:36:35 +01:00
Josh Wu cf60b5ffbb Replace send_pdu() with write() 2026-01-12 13:16:49 +08:00
Josh Wu aa4c57d105 Improve Address type annotations
* Add missing annotations
* Declare address constants as ClassVar
2026-01-12 13:07:04 +08:00
Istemon 61a601e6e2 Return 'invalid handle' for malformed read by type request 2026-01-10 01:43:30 +00:00
Stryxion 05fd4fbfc6 fix: review 2026-01-09 08:46:31 +01:00
Stryxion 6aa9e0bdf7 feat: Add filtering options for usb probe 2026-01-08 14:54:58 +01:00
56 changed files with 1701 additions and 378 deletions
+40 -33
View File
@@ -27,9 +27,8 @@ from bumble.core import name_or_number
from bumble.hci import (
HCI_LE_READ_BUFFER_SIZE_COMMAND,
HCI_LE_READ_BUFFER_SIZE_V2_COMMAND,
HCI_LE_READ_MAXIMUM_ADVERTISING_DATA_LENGTH_COMMAND,
HCI_LE_READ_MAXIMUM_DATA_LENGTH_COMMAND,
HCI_LE_READ_NUMBER_OF_SUPPORTED_ADVERTISING_SETS_COMMAND,
HCI_LE_READ_MINIMUM_SUPPORTED_CONNECTION_INTERVAL_COMMAND,
HCI_LE_READ_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND,
HCI_READ_BD_ADDR_COMMAND,
HCI_READ_BUFFER_SIZE_COMMAND,
@@ -37,9 +36,8 @@ from bumble.hci import (
HCI_Command,
HCI_LE_Read_Buffer_Size_Command,
HCI_LE_Read_Buffer_Size_V2_Command,
HCI_LE_Read_Maximum_Advertising_Data_Length_Command,
HCI_LE_Read_Maximum_Data_Length_Command,
HCI_LE_Read_Number_Of_Supported_Advertising_Sets_Command,
HCI_LE_Read_Minimum_Supported_Connection_Interval_Command,
HCI_LE_Read_Suggested_Default_Data_Length_Command,
HCI_Read_BD_ADDR_Command,
HCI_Read_Buffer_Size_Command,
@@ -78,52 +76,61 @@ async def get_classic_info(host: Host) -> None:
async def get_le_info(host: Host) -> None:
print()
if host.supports_command(HCI_LE_READ_NUMBER_OF_SUPPORTED_ADVERTISING_SETS_COMMAND):
response1 = await host.send_sync_command(
HCI_LE_Read_Number_Of_Supported_Advertising_Sets_Command()
)
print(
color('LE Number Of Supported Advertising Sets:', 'yellow'),
response1.num_supported_advertising_sets,
'\n',
)
print(
color('LE Number Of Supported Advertising Sets:', 'yellow'),
host.number_of_supported_advertising_sets,
'\n',
)
if host.supports_command(HCI_LE_READ_MAXIMUM_ADVERTISING_DATA_LENGTH_COMMAND):
response2 = await host.send_sync_command(
HCI_LE_Read_Maximum_Advertising_Data_Length_Command()
)
print(
color('LE Maximum Advertising Data Length:', 'yellow'),
response2.max_advertising_data_length,
'\n',
)
print(
color('LE Maximum Advertising Data Length:', 'yellow'),
host.maximum_advertising_data_length,
'\n',
)
if host.supports_command(HCI_LE_READ_MAXIMUM_DATA_LENGTH_COMMAND):
response3 = await host.send_sync_command(
response1 = await host.send_sync_command(
HCI_LE_Read_Maximum_Data_Length_Command()
)
print(
color('Maximum Data Length:', 'yellow'),
color('LE Maximum Data Length:', 'yellow'),
(
f'tx:{response3.supported_max_tx_octets}/'
f'{response3.supported_max_tx_time}, '
f'rx:{response3.supported_max_rx_octets}/'
f'{response3.supported_max_rx_time}'
f'tx:{response1.supported_max_tx_octets}/'
f'{response1.supported_max_tx_time}, '
f'rx:{response1.supported_max_rx_octets}/'
f'{response1.supported_max_rx_time}'
),
'\n',
)
if host.supports_command(HCI_LE_READ_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND):
response4 = await host.send_sync_command(
response2 = await host.send_sync_command(
HCI_LE_Read_Suggested_Default_Data_Length_Command()
)
print(
color('Suggested Default Data Length:', 'yellow'),
f'{response4.suggested_max_tx_octets}/'
f'{response4.suggested_max_tx_time}',
color('LE Suggested Default Data Length:', 'yellow'),
f'{response2.suggested_max_tx_octets}/'
f'{response2.suggested_max_tx_time}',
'\n',
)
if host.supports_command(HCI_LE_READ_MINIMUM_SUPPORTED_CONNECTION_INTERVAL_COMMAND):
response3 = await host.send_sync_command(
HCI_LE_Read_Minimum_Supported_Connection_Interval_Command()
)
print(
color('LE Minimum Supported Connection Interval:', 'yellow'),
f'{response3.minimum_supported_connection_interval * 125} µs',
)
for group in range(len(response3.group_min)):
print(
f' Group {group}: '
f'{response3.group_min[group] * 125} µs to '
f'{response3.group_max[group] * 125} µs '
'by increments of '
f'{response3.group_stride[group] * 125} µs',
'\n',
)
print(color('LE Features:', 'yellow'))
for feature in host.supported_le_features:
print(f' {LeFeature(feature).name}')
+1 -1
View File
@@ -352,7 +352,7 @@ async def run(
await bridge.start()
# Wait until the source terminates
await hci_source.wait_for_termination()
await hci_source.terminated
@click.command()
+3 -1
View File
@@ -81,7 +81,9 @@ async def async_main():
response = hci.HCI_Command_Complete_Event(
num_hci_command_packets=1,
command_opcode=hci_packet.op_code,
return_parameters=bytes([hci.HCI_SUCCESS]),
return_parameters=hci.HCI_StatusReturnParameters(
status=hci.HCI_ErrorCode.SUCCESS
),
)
# Return a packet with 'respond to sender' set to True
return (bytes(response), True)
+1 -1
View File
@@ -268,7 +268,7 @@ async def run(device_config, hci_transport, bridge):
await bridge.start(device)
# Wait until the transport terminates
await hci_source.wait_for_termination()
await hci_source.terminated
# -----------------------------------------------------------------------------
+1 -1
View File
@@ -421,7 +421,7 @@ async def run(device_config, hci_transport, bridge):
await bridge.start(device)
# Wait until the transport terminates
await hci_source.wait_for_termination()
await hci_source.terminated
except core.ConnectionError as error:
print(color(f"!!! Bluetooth connection failed: {error}", "red"))
except Exception as error:
+1 -1
View File
@@ -190,7 +190,7 @@ async def scan(
scanning_phys=scanning_phys,
)
await hci_source.wait_for_termination()
await hci_source.terminated
# -----------------------------------------------------------------------------
+1 -1
View File
@@ -726,7 +726,7 @@ class Speaker:
print("Waiting for connection...")
await self.advertise()
await hci_source.wait_for_termination()
await hci_source.terminated
for output in self.outputs:
await output.stop()
+15 -2
View File
@@ -26,6 +26,8 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from typing import Any
import click
import usb1
@@ -166,13 +168,16 @@ def is_bluetooth_hci(device):
# -----------------------------------------------------------------------------
@click.command()
@click.option('--verbose', is_flag=True, default=False, help='Print more details')
def main(verbose):
@click.option('--hci-only', is_flag=True, default=False, help='only show HCI device')
@click.option('--manufacturer', help='filter by manufacturer')
@click.option('--product', help='filter by product')
def main(verbose: bool, manufacturer: str, product: str, hci_only: bool):
bumble.logging.setup_basic_logging('WARNING')
load_libusb()
with usb1.USBContext() as context:
bluetooth_device_count = 0
devices = {}
devices: dict[tuple[Any, Any], list[str | None]] = {}
for device in context.getDeviceIterator(skip_on_error=True):
device_class = device.getDeviceClass()
@@ -234,6 +239,14 @@ def main(verbose):
f'{basic_transport_name}/{device_serial_number}'
)
# Filter
if product and device_product != product:
continue
if manufacturer and device_manufacturer != manufacturer:
continue
if not is_bluetooth_hci(device) and hci_only:
continue
# Print the results
print(
color(
+97 -32
View File
@@ -29,7 +29,7 @@ import enum
import functools
import inspect
import struct
from collections.abc import Awaitable, Callable
from collections.abc import Awaitable, Callable, Sequence
from typing import (
TYPE_CHECKING,
ClassVar,
@@ -72,34 +72,36 @@ ATT_PSM = 0x001F
EATT_PSM = 0x0027
class Opcode(hci.SpecableEnum):
ATT_ERROR_RESPONSE = 0x01
ATT_EXCHANGE_MTU_REQUEST = 0x02
ATT_EXCHANGE_MTU_RESPONSE = 0x03
ATT_FIND_INFORMATION_REQUEST = 0x04
ATT_FIND_INFORMATION_RESPONSE = 0x05
ATT_FIND_BY_TYPE_VALUE_REQUEST = 0x06
ATT_FIND_BY_TYPE_VALUE_RESPONSE = 0x07
ATT_READ_BY_TYPE_REQUEST = 0x08
ATT_READ_BY_TYPE_RESPONSE = 0x09
ATT_READ_REQUEST = 0x0A
ATT_READ_RESPONSE = 0x0B
ATT_READ_BLOB_REQUEST = 0x0C
ATT_READ_BLOB_RESPONSE = 0x0D
ATT_READ_MULTIPLE_REQUEST = 0x0E
ATT_READ_MULTIPLE_RESPONSE = 0x0F
ATT_READ_BY_GROUP_TYPE_REQUEST = 0x10
ATT_READ_BY_GROUP_TYPE_RESPONSE = 0x11
ATT_WRITE_REQUEST = 0x12
ATT_WRITE_RESPONSE = 0x13
ATT_WRITE_COMMAND = 0x52
ATT_SIGNED_WRITE_COMMAND = 0xD2
ATT_PREPARE_WRITE_REQUEST = 0x16
ATT_PREPARE_WRITE_RESPONSE = 0x17
ATT_EXECUTE_WRITE_REQUEST = 0x18
ATT_EXECUTE_WRITE_RESPONSE = 0x19
ATT_HANDLE_VALUE_NOTIFICATION = 0x1B
ATT_HANDLE_VALUE_INDICATION = 0x1D
ATT_HANDLE_VALUE_CONFIRMATION = 0x1E
ATT_ERROR_RESPONSE = 0x01
ATT_EXCHANGE_MTU_REQUEST = 0x02
ATT_EXCHANGE_MTU_RESPONSE = 0x03
ATT_FIND_INFORMATION_REQUEST = 0x04
ATT_FIND_INFORMATION_RESPONSE = 0x05
ATT_FIND_BY_TYPE_VALUE_REQUEST = 0x06
ATT_FIND_BY_TYPE_VALUE_RESPONSE = 0x07
ATT_READ_BY_TYPE_REQUEST = 0x08
ATT_READ_BY_TYPE_RESPONSE = 0x09
ATT_READ_REQUEST = 0x0A
ATT_READ_RESPONSE = 0x0B
ATT_READ_BLOB_REQUEST = 0x0C
ATT_READ_BLOB_RESPONSE = 0x0D
ATT_READ_MULTIPLE_REQUEST = 0x0E
ATT_READ_MULTIPLE_RESPONSE = 0x0F
ATT_READ_BY_GROUP_TYPE_REQUEST = 0x10
ATT_READ_BY_GROUP_TYPE_RESPONSE = 0x11
ATT_READ_MULTIPLE_VARIABLE_REQUEST = 0x20
ATT_READ_MULTIPLE_VARIABLE_RESPONSE = 0x21
ATT_WRITE_REQUEST = 0x12
ATT_WRITE_RESPONSE = 0x13
ATT_WRITE_COMMAND = 0x52
ATT_SIGNED_WRITE_COMMAND = 0xD2
ATT_PREPARE_WRITE_REQUEST = 0x16
ATT_PREPARE_WRITE_RESPONSE = 0x17
ATT_EXECUTE_WRITE_REQUEST = 0x18
ATT_EXECUTE_WRITE_RESPONSE = 0x19
ATT_HANDLE_VALUE_NOTIFICATION = 0x1B
ATT_HANDLE_VALUE_INDICATION = 0x1D
ATT_HANDLE_VALUE_CONFIRMATION = 0x1E
ATT_REQUESTS = [
Opcode.ATT_EXCHANGE_MTU_REQUEST,
@@ -110,9 +112,10 @@ ATT_REQUESTS = [
Opcode.ATT_READ_BLOB_REQUEST,
Opcode.ATT_READ_MULTIPLE_REQUEST,
Opcode.ATT_READ_BY_GROUP_TYPE_REQUEST,
Opcode.ATT_READ_MULTIPLE_VARIABLE_REQUEST,
Opcode.ATT_WRITE_REQUEST,
Opcode.ATT_PREPARE_WRITE_REQUEST,
Opcode.ATT_EXECUTE_WRITE_REQUEST
Opcode.ATT_EXECUTE_WRITE_REQUEST,
]
ATT_RESPONSES = [
@@ -125,9 +128,10 @@ ATT_RESPONSES = [
Opcode.ATT_READ_BLOB_RESPONSE,
Opcode.ATT_READ_MULTIPLE_RESPONSE,
Opcode.ATT_READ_BY_GROUP_TYPE_RESPONSE,
Opcode.ATT_READ_MULTIPLE_VARIABLE_RESPONSE,
Opcode.ATT_WRITE_RESPONSE,
Opcode.ATT_PREPARE_WRITE_RESPONSE,
Opcode.ATT_EXECUTE_WRITE_RESPONSE
Opcode.ATT_EXECUTE_WRITE_RESPONSE,
]
class ErrorCode(hci.SpecableEnum):
@@ -185,6 +189,18 @@ ATT_INSUFFICIENT_RESOURCES_ERROR = ErrorCode.INSUFFICIENT_RESOURCES
ATT_DEFAULT_MTU = 23
HANDLE_FIELD_SPEC = {'size': 2, 'mapper': lambda x: f'0x{x:04X}'}
_SET_OF_HANDLES_METADATA = hci.metadata({
'parser': lambda data, offset: (
len(data),
[
struct.unpack_from('<H', data, i)[0]
for i in range(offset, len(data), 2)
],
),
'serializer': lambda handles: b''.join(
[struct.pack('<H', handle) for handle in handles]
),
})
# fmt: on
# pylint: enable=line-too-long
@@ -554,7 +570,7 @@ class ATT_Read_Multiple_Request(ATT_PDU):
See Bluetooth spec @ Vol 3, Part F - 3.4.4.7 Read Multiple Request
'''
set_of_handles: bytes = dataclasses.field(metadata=hci.metadata("*"))
set_of_handles: Sequence[int] = dataclasses.field(metadata=_SET_OF_HANDLES_METADATA)
# -----------------------------------------------------------------------------
@@ -635,6 +651,55 @@ class ATT_Read_By_Group_Type_Response(ATT_PDU):
return result
# -----------------------------------------------------------------------------
@ATT_PDU.subclass
@dataclasses.dataclass
class ATT_Read_Multiple_Variable_Request(ATT_PDU):
'''
See Bluetooth spec @ Vol 3, Part F - 3.4.4.11 Read Multiple Variable Request
'''
set_of_handles: Sequence[int] = dataclasses.field(metadata=_SET_OF_HANDLES_METADATA)
# -----------------------------------------------------------------------------
@ATT_PDU.subclass
@dataclasses.dataclass
class ATT_Read_Multiple_Variable_Response(ATT_PDU):
'''
See Bluetooth spec @ Vol 3, Part F - 3.4.4.12 Read Multiple Variable Response
'''
@classmethod
def _parse_length_value_tuples(
cls, data: bytes, offset: int
) -> tuple[int, list[tuple[int, bytes]]]:
length_value_tuple_list: list[tuple[int, bytes]] = []
while offset < len(data):
length = struct.unpack_from('<H', data, offset)[0]
length_value_tuple_list.append(
(length, data[offset + 2 : offset + 2 + length])
)
offset += 2 + length
return (len(data), length_value_tuple_list)
length_value_tuple_list: Sequence[tuple[int, bytes]] = dataclasses.field(
metadata=hci.metadata(
{
'parser': lambda data, offset: ATT_Read_Multiple_Variable_Response._parse_length_value_tuples(
data, offset
),
'serializer': lambda length_value_tuple_list: b''.join(
[
struct.pack('<H', length) + value
for length, value in length_value_tuple_list
]
),
}
)
)
# -----------------------------------------------------------------------------
@ATT_PDU.subclass
@dataclasses.dataclass
+1 -1
View File
@@ -235,7 +235,7 @@ class Protocol:
)
+ payload
)
self.l2cap_channel.send_pdu(pdu)
self.l2cap_channel.write(pdu)
def send_command(self, transaction_label: int, pid: int, payload: bytes) -> None:
logger.debug(
+3 -3
View File
@@ -268,7 +268,7 @@ class MediaPacketPump:
await self.clock.sleep(delay)
# Emit
rtp_channel.send_pdu(bytes(packet))
rtp_channel.write(bytes(packet))
logger.debug(
f'{color(">>> sending RTP packet:", "green")} {packet}'
)
@@ -1519,7 +1519,7 @@ class Protocol(utils.EventEmitter):
header = bytes([first_header_byte])
# Send one packet
self.l2cap_channel.send_pdu(header + payload[:max_fragment_size])
self.l2cap_channel.write(header + payload[:max_fragment_size])
# Prepare for the next packet
payload = payload[max_fragment_size:]
@@ -1829,7 +1829,7 @@ class Stream:
def send_media_packet(self, packet: MediaPacket) -> None:
assert self.rtp_channel
self.rtp_channel.send_pdu(bytes(packet))
self.rtp_channel.write(bytes(packet))
async def configure(self) -> None:
if self.state != State.IDLE:
+20 -20
View File
@@ -55,13 +55,15 @@ AVRCP_PID = 0x110E
AVRCP_BLUETOOTH_SIG_COMPANY_ID = 0x001958
_UINT64_BE_METADATA = {
'parser': lambda data, offset: (
offset + 8,
int.from_bytes(data[offset : offset + 8], byteorder='big'),
),
'serializer': lambda x: x.to_bytes(8, byteorder='big'),
}
_UINT64_BE_METADATA = hci.metadata(
{
'parser': lambda data, offset: (
offset + 8,
int.from_bytes(data[offset : offset + 8], byteorder='big'),
),
'serializer': lambda x: x.to_bytes(8, byteorder='big'),
}
)
class PduId(utils.OpenIntEnum):
@@ -92,7 +94,7 @@ class PduId(utils.OpenIntEnum):
class CharacterSetId(hci.SpecableEnum):
UTF_8 = 0x06
UTF_8 = 0x6A
class MediaAttributeId(hci.SpecableEnum):
@@ -491,14 +493,12 @@ class BrowseableItem:
**hci.HCI_Object.dict_from_bytes(data, offset + 3, subclass.fields)
)
instance._payload = data[3:]
return offset + length, instance
return offset + length + 3, instance
def __bytes__(self) -> bytes:
if self._payload is None:
self._payload = hci.HCI_Object.dict_to_bytes(self.__dict__, self.fields)
return (
struct.pack('>BH', self.item_type, len(self._payload) + 3) + self._payload
)
return struct.pack('>BH', self.item_type, len(self._payload)) + self._payload
_Item = TypeVar('_Item', bound='BrowseableItem')
@@ -601,11 +601,11 @@ class MediaPlayerItem(BrowseableItem):
metadata=MajorPlayerType.type_metadata(1)
)
player_sub_type: PlayerSubType = field(
metadata=PlayerSubType.type_metadata(4, byteorder='big')
metadata=PlayerSubType.type_metadata(4, byteorder='little')
)
play_status: PlayStatus = field(metadata=PlayStatus.type_metadata(1))
feature_bitmask: Features = field(
metadata=Features.type_metadata(16, byteorder='big')
metadata=Features.type_metadata(16, byteorder='little')
)
character_set_id: CharacterSetId = field(
metadata=CharacterSetId.type_metadata(2, byteorder='big')
@@ -634,7 +634,7 @@ class FolderItem(BrowseableItem):
folder_uid: int = field(metadata=_UINT64_BE_METADATA)
folder_type: FolderType = field(metadata=FolderType.type_metadata(1))
is_playable: FolderType = field(metadata=Playable.type_metadata(1))
is_playable: Playable = field(metadata=Playable.type_metadata(1))
character_set_id: CharacterSetId = field(
metadata=CharacterSetId.type_metadata(2, byteorder='big')
)
@@ -876,7 +876,7 @@ class GetPlayStatusCommand(Command):
class GetElementAttributesCommand(Command):
pdu_id = PduId.GET_ELEMENT_ATTRIBUTES
identifier: int = field(metadata=hci.metadata(_UINT64_BE_METADATA))
identifier: int = field(metadata=_UINT64_BE_METADATA)
attribute_ids: Sequence[MediaAttributeId] = field(
metadata=MediaAttributeId.type_metadata(
4, list_begin=True, list_end=True, byteorder='big'
@@ -951,7 +951,7 @@ class ChangePathCommand(Command):
uid_counter: int = field(metadata=hci.metadata('>2'))
direction: Direction = field(metadata=Direction.type_metadata(1))
folder_uid: int = field(metadata=hci.metadata(_UINT64_BE_METADATA))
folder_uid: int = field(metadata=_UINT64_BE_METADATA)
# -----------------------------------------------------------------------------
@@ -961,7 +961,7 @@ class GetItemAttributesCommand(Command):
pdu_id = PduId.GET_ITEM_ATTRIBUTES
scope: Scope = field(metadata=Scope.type_metadata(1))
uid: int = field(metadata=hci.metadata(_UINT64_BE_METADATA))
uid: int = field(metadata=_UINT64_BE_METADATA)
uid_counter: int = field(metadata=hci.metadata('>2'))
start_item: int = field(metadata=hci.metadata('>4'))
end_item: int = field(metadata=hci.metadata('>4'))
@@ -999,7 +999,7 @@ class PlayItemCommand(Command):
pdu_id = PduId.PLAY_ITEM
scope: Scope = field(metadata=Scope.type_metadata(1))
uid: int = field(metadata=hci.metadata(_UINT64_BE_METADATA))
uid: int = field(metadata=_UINT64_BE_METADATA)
uid_counter: int = field(metadata=hci.metadata('>2'))
@@ -1010,7 +1010,7 @@ class AddToNowPlayingCommand(Command):
pdu_id = PduId.ADD_TO_NOW_PLAYING
scope: Scope = field(metadata=Scope.type_metadata(1))
uid: int = field(metadata=hci.metadata(_UINT64_BE_METADATA))
uid: int = field(metadata=_UINT64_BE_METADATA)
uid_counter: int = field(metadata=hci.metadata('>2'))
+10 -2
View File
@@ -37,7 +37,12 @@ class HCI_Bridge:
def on_packet(self, packet):
# Convert the packet bytes to an object
hci_packet = HCI_Packet.from_bytes(packet)
try:
hci_packet = HCI_Packet.from_bytes(packet)
except Exception:
logger.warning('forwarding unparsed packet as-is')
self.hci_sink.on_packet(packet)
return
# Filter the packet
if self.packet_filter is not None:
@@ -50,7 +55,10 @@ class HCI_Bridge:
return
# Analyze the packet
self.trace(hci_packet)
try:
self.trace(hci_packet)
except Exception:
logger.exception('Exception while tracing packet')
# Bridge the packet
self.hci_sink.on_packet(packet)
+13
View File
@@ -1898,6 +1898,19 @@ class Controller:
'''
return bytes([hci.HCI_SUCCESS]) + self.le_features.value.to_bytes(8, 'little')
def on_hci_le_read_all_local_supported_features_command(
self, _command: hci.HCI_LE_Read_All_Local_Supported_Features_Command
) -> bytes | None:
'''
See Bluetooth spec Vol 4, Part E - 7.8.128 LE Read All Local Supported Features
Command
'''
return (
bytes([hci.HCI_SUCCESS])
+ bytes([0])
+ self.le_features.value.to_bytes(248, 'little')
)
def on_hci_le_set_random_address_command(
self, command: hci.HCI_LE_Set_Random_Address_Command
) -> bytes | None:
+445 -64
View File
@@ -1177,7 +1177,7 @@ class ChannelSoundingCapabilities:
rtt_capability: int
rtt_aa_only_n: int
rtt_sounding_n: int
rtt_random_payload_n: int
rtt_random_sequence_n: int
nadm_sounding_capability: int
nadm_random_capability: int
cs_sync_phys_supported: int
@@ -1367,10 +1367,7 @@ class Peer:
def create_service_proxy(
self, proxy_class: type[_PROXY_CLASS]
) -> _PROXY_CLASS | None:
if proxy := proxy_class.from_client(self.gatt_client):
return cast(_PROXY_CLASS, proxy)
return None
return proxy_class.from_client(self.gatt_client)
async def discover_service_and_create_proxy(
self, proxy_class: type[_PROXY_CLASS]
@@ -1412,8 +1409,8 @@ class ConnectionParametersPreferences:
connection_interval_max: float = DEVICE_DEFAULT_CONNECTION_INTERVAL_MAX
max_latency: int = DEVICE_DEFAULT_CONNECTION_MAX_LATENCY
supervision_timeout: int = DEVICE_DEFAULT_CONNECTION_SUPERVISION_TIMEOUT
min_ce_length: int = DEVICE_DEFAULT_CONNECTION_MIN_CE_LENGTH
max_ce_length: int = DEVICE_DEFAULT_CONNECTION_MAX_CE_LENGTH
min_ce_length: float = DEVICE_DEFAULT_CONNECTION_MIN_CE_LENGTH
max_ce_length: float = DEVICE_DEFAULT_CONNECTION_MAX_CE_LENGTH
ConnectionParametersPreferences.default = ConnectionParametersPreferences()
@@ -1523,7 +1520,7 @@ class _IsoLink:
self.device.host.send_iso_sdu(connection_handle=self.handle, sdu=sdu)
async def get_tx_time_stamp(self) -> tuple[int, int, int]:
response = await self.device.host.send_sync_command(
response = await self.device.send_sync_command(
hci.HCI_LE_Read_ISO_TX_Sync_Command(connection_handle=self.handle)
)
return (
@@ -1700,7 +1697,7 @@ class Connection(utils.CompositeEventEmitter):
peer_address: hci.Address
peer_name: str | None
peer_resolvable_address: hci.Address | None
peer_le_features: hci.LeFeatureMask | None
peer_le_features: hci.LeFeatureMask
role: hci.Role
parameters: Parameters
encryption: int
@@ -1753,8 +1750,8 @@ class Connection(utils.CompositeEventEmitter):
EVENT_CIS_REQUEST = "cis_request"
EVENT_CIS_ESTABLISHMENT = "cis_establishment"
EVENT_CIS_ESTABLISHMENT_FAILURE = "cis_establishment_failure"
EVENT_LE_SUBRATE_CHANGE = "le_subrate_change"
EVENT_LE_SUBRATE_CHANGE_FAILURE = "le_subrate_change_failure"
EVENT_LE_REMOTE_FEATURES_CHANGE = "le_remote_features_change"
EVENT_LE_REMOTE_FEATURES_CHANGE_FAILURE = "le_remote_features_change_failure"
@utils.composite_listener
class Listener:
@@ -1832,14 +1829,14 @@ class Connection(utils.CompositeEventEmitter):
self.authenticated = False
self.sc = False
self.att_mtu = att.ATT_DEFAULT_MTU
self.data_length = DEVICE_DEFAULT_DATA_LENGTH
self.data_length: tuple[int, int, int, int] = DEVICE_DEFAULT_DATA_LENGTH
self.gatt_client = gatt_client.Client(self) # Per-connection client
self.gatt_server = (
device.gatt_server
) # By default, use the device's shared server
self.pairing_peer_io_capability = None
self.pairing_peer_authentication_requirements = None
self.peer_le_features = None
self.peer_le_features = hci.LeFeatureMask(0)
self.cs_configs = {}
self.cs_procedures = {}
@@ -1921,16 +1918,21 @@ class Connection(utils.CompositeEventEmitter):
connection_interval_max: float,
max_latency: int,
supervision_timeout: float,
min_ce_length: float = 0.0,
max_ce_length: float = 0.0,
use_l2cap=False,
) -> None:
"""
Request an update of the connection parameters.
Request a change of the connection parameters.
For short connection intervals (below 7.5ms, introduced in Bluetooth 6.2),
use the `update_parameters_with_subrate` method instead.
Args:
connection_interval_min: Minimum interval, in milliseconds.
connection_interval_max: Maximum interval, in milliseconds.
max_latency: Latency, in number of intervals.
supervision_timeout: Timeout, in milliseconds.
max_latency: Max latency, in number of intervals.
supervision_timeout: Supervision Timeout, in milliseconds.
use_l2cap: Request the update via L2CAP.
"""
return await self.device.update_connection_parameters(
@@ -1940,6 +1942,77 @@ class Connection(utils.CompositeEventEmitter):
max_latency,
supervision_timeout,
use_l2cap=use_l2cap,
min_ce_length=min_ce_length,
max_ce_length=max_ce_length,
)
async def update_parameters_with_subrate(
self,
connection_interval_min: float,
connection_interval_max: float,
subrate_min: int,
subrate_max: int,
max_latency: int,
continuation_number: int,
supervision_timeout: float,
min_ce_length: float,
max_ce_length: float,
) -> None:
"""
Request a change of the connection parameters.
This is similar to `update_parameters` but also allows specifying
the subrate parameters and supports shorter connection intervals (below
7.5ms, as introduced in Bluetooth 6.2).
Args:
connection_interval_min: Minimum interval, in milliseconds.
connection_interval_max: Maximum interval, in milliseconds.
subrate_min: Minimum subrate factor.
subrate_max: Maximum subrate factor.
max_latency: Max latency, in number of intervals.
continuation_number: Continuation number.
supervision_timeout: Supervision Timeout, in milliseconds.
min_ce_length: Minimum connection event length, in milliseconds.
max_ce_length: Maximumsub connection event length, in milliseconds.
"""
return await self.device.update_connection_parameters_with_subrate(
self,
connection_interval_min,
connection_interval_max,
subrate_min,
subrate_max,
max_latency,
continuation_number,
supervision_timeout,
min_ce_length,
max_ce_length,
)
async def update_subrate(
self,
subrate_min: int,
subrate_max: int,
max_latency: int,
continuation_number: int,
supervision_timeout: float,
) -> None:
"""
Request request a change to the subrating factor and/or other parameters.
Args:
subrate_min: Minimum subrate factor.
subrate_max: Maximum subrate factor.
max_latency: Max latency, in number of intervals.
continuation_number: Continuation number.
supervision_timeout: Supervision Timeout, in milliseconds.
"""
return await self.device.update_connection_subrate(
self,
subrate_min,
subrate_max,
max_latency,
continuation_number,
supervision_timeout,
)
async def set_phy(
@@ -2046,6 +2119,7 @@ class DeviceConfiguration:
le_privacy_enabled: bool = False
le_rpa_timeout: int = DEVICE_DEFAULT_LE_RPA_TIMEOUT
le_subrate_enabled: bool = False
le_shorter_connection_intervals_enabled: bool = False
classic_enabled: bool = False
classic_sc_enabled: bool = True
classic_ssp_enabled: bool = True
@@ -2400,6 +2474,9 @@ class Device(utils.CompositeEventEmitter):
self.le_rpa_timeout = config.le_rpa_timeout
self.le_rpa_periodic_update_task: asyncio.Task | None = None
self.le_subrate_enabled = config.le_subrate_enabled
self.le_shorter_connection_intervals_enabled = (
config.le_shorter_connection_intervals_enabled
)
self.classic_enabled = config.classic_enabled
self.cis_enabled = config.cis_enabled
self.classic_sc_enabled = config.classic_sc_enabled
@@ -2686,24 +2763,39 @@ class Device(utils.CompositeEventEmitter):
logger.warning(f'!!! Command {command.name} timed out')
raise CommandTimeoutError() from error
async def send_sync_command(
self, command: hci.HCI_SyncCommand[_RP], check_status: bool = True
) -> _RP:
async def send_sync_command(self, command: hci.HCI_SyncCommand[_RP]) -> _RP:
'''
Send a synchronous command via the host.
If the `status` field of the response's `return_parameters` is not equal to
`SUCCESS` an exception is raised.
Params:
command: the command to send.
check_status: If `True`, check the `status` field of the response's
`return_parameters` and raise and exception if not equal to `SUCCESS`.
Returns:
An instance of the return parameters class associated with the command class.
'''
try:
return await self.host.send_sync_command(
command, check_status, self.command_timeout
)
return await self.host.send_sync_command(command, self.command_timeout)
except asyncio.TimeoutError as error:
logger.warning(f'!!! Command {command.name} timed out')
raise CommandTimeoutError() from error
async def send_sync_command_raw(
self, command: hci.HCI_SyncCommand[_RP]
) -> hci.HCI_Command_Complete_Event[_RP]:
'''
Send a synchronous command via the host without checking the response.
Params:
command: the command to send.
Returns:
An HCI_Command_Complete_Event instance.
'''
try:
return await self.host.send_sync_command_raw(command, self.command_timeout)
except asyncio.TimeoutError as error:
logger.warning(f'!!! Command {command.name} timed out')
raise CommandTimeoutError() from error
@@ -2720,7 +2812,7 @@ class Device(utils.CompositeEventEmitter):
raise and exception if not equal to `PENDING`.
Returns:
An instance of the return parameters class associated with the command class.
A status code.
'''
try:
return await self.host.send_async_command(
@@ -2735,12 +2827,12 @@ class Device(utils.CompositeEventEmitter):
await self.host.reset()
# Try to get the public address from the controller
response = await self.host.send_sync_command(
hci.HCI_Read_BD_ADDR_Command(), check_status=False
)
if response.status == hci.HCI_SUCCESS:
try:
response = await self.host.send_sync_command(hci.HCI_Read_BD_ADDR_Command())
logger.debug(color(f'BD_ADDR: {response.bd_addr}', 'yellow'))
self.public_address = response.bd_addr
except hci.HCI_Error:
logger.debug('Controller has no public address')
# Instantiate the Key Store (we do this here rather than at __init__ time
# because some Key Store implementations use the public address as a namespace)
@@ -2803,7 +2895,9 @@ class Device(utils.CompositeEventEmitter):
)
)
if self.cis_enabled:
if self.cis_enabled and self.host.supports_command(
hci.HCI_LE_SET_HOST_FEATURE_COMMAND
):
await self.send_sync_command(
hci.HCI_LE_Set_Host_Feature_Command(
bit_number=hci.LeFeature.CONNECTED_ISOCHRONOUS_STREAM,
@@ -2811,7 +2905,13 @@ class Device(utils.CompositeEventEmitter):
)
)
if self.le_subrate_enabled:
if (
self.le_subrate_enabled
and self.host.supports_command(hci.HCI_LE_SET_HOST_FEATURE_COMMAND)
and self.host.supports_le_features(
hci.LeFeatureMask.CONNECTION_SUBRATING
)
):
await self.send_sync_command(
hci.HCI_LE_Set_Host_Feature_Command(
bit_number=hci.LeFeature.CONNECTION_SUBRATING_HOST_SUPPORT,
@@ -2819,7 +2919,9 @@ class Device(utils.CompositeEventEmitter):
)
)
if self.config.channel_sounding_enabled:
if self.config.channel_sounding_enabled and self.host.supports_command(
hci.HCI_LE_SET_HOST_FEATURE_COMMAND
):
await self.send_sync_command(
hci.HCI_LE_Set_Host_Feature_Command(
bit_number=hci.LeFeature.CHANNEL_SOUNDING_HOST_SUPPORT,
@@ -2839,7 +2941,7 @@ class Device(utils.CompositeEventEmitter):
rtt_capability=result.rtt_capability,
rtt_aa_only_n=result.rtt_aa_only_n,
rtt_sounding_n=result.rtt_sounding_n,
rtt_random_payload_n=result.rtt_random_payload_n,
rtt_random_sequence_n=result.rtt_random_sequence_n,
nadm_sounding_capability=result.nadm_sounding_capability,
nadm_random_capability=result.nadm_random_capability,
cs_sync_phys_supported=result.cs_sync_phys_supported,
@@ -2852,28 +2954,38 @@ class Device(utils.CompositeEventEmitter):
tx_snr_capability=result.tx_snr_capability,
)
if (
self.le_shorter_connection_intervals_enabled
and self.host.supports_command(hci.HCI_LE_SET_HOST_FEATURE_COMMAND)
and self.host.supports_le_features(
hci.LeFeatureMask.SHORTER_CONNECTION_INTERVALS
)
):
await self.send_sync_command(
hci.HCI_LE_Set_Host_Feature_Command(
bit_number=hci.LeFeature.SHORTER_CONNECTION_INTERVALS_HOST_SUPPORT,
bit_value=1,
)
)
if self.classic_enabled:
await self.send_sync_command(
hci.HCI_Write_Local_Name_Command(local_name=self.name.encode('utf8')),
check_status=False,
await self.send_sync_command_raw(
hci.HCI_Write_Local_Name_Command(local_name=self.name.encode('utf8'))
)
await self.send_sync_command(
await self.send_sync_command_raw(
hci.HCI_Write_Class_Of_Device_Command(
class_of_device=self.class_of_device
),
check_status=False,
)
)
await self.send_sync_command(
await self.send_sync_command_raw(
hci.HCI_Write_Simple_Pairing_Mode_Command(
simple_pairing_mode=int(self.classic_ssp_enabled)
),
check_status=False,
)
)
await self.send_sync_command(
await self.send_sync_command_raw(
hci.HCI_Write_Secure_Connections_Host_Support_Command(
secure_connections_host_support=int(self.classic_sc_enabled)
),
check_status=False,
)
)
await self.set_connectable(self.connectable)
await self.set_discoverable(self.discoverable)
@@ -4183,6 +4295,9 @@ class Device(utils.CompositeEventEmitter):
'''
Request an update of the connection parameters.
For short connection intervals (below 7.5 ms, introduced in Bluetooth 6.2),
use `update_connection_parameters_with_subrate` instead.
Args:
connection: The connection to update
connection_interval_min: Minimum interval, in milliseconds.
@@ -4223,17 +4338,148 @@ class Device(utils.CompositeEventEmitter):
return
await self.send_async_command(
hci.HCI_LE_Connection_Update_Command(
connection_handle=connection.handle,
connection_interval_min=connection_interval_min,
connection_interval_max=connection_interval_max,
max_latency=max_latency,
supervision_timeout=supervision_timeout,
min_ce_length=min_ce_length,
max_ce_length=max_ce_length,
pending_result = asyncio.get_running_loop().create_future()
with closing(utils.EventWatcher()) as watcher:
@watcher.on(connection, connection.EVENT_CONNECTION_PARAMETERS_UPDATE)
def _():
pending_result.set_result(None)
@watcher.on(
connection, connection.EVENT_CONNECTION_PARAMETERS_UPDATE_FAILURE
)
)
def _(error_code: int):
pending_result.set_exception(hci.HCI_Error(error_code))
await self.send_async_command(
hci.HCI_LE_Connection_Update_Command(
connection_handle=connection.handle,
connection_interval_min=connection_interval_min,
connection_interval_max=connection_interval_max,
max_latency=max_latency,
supervision_timeout=supervision_timeout,
min_ce_length=min_ce_length,
max_ce_length=max_ce_length,
)
)
await connection.cancel_on_disconnection(pending_result)
async def update_connection_parameters_with_subrate(
self,
connection: Connection,
connection_interval_min: float,
connection_interval_max: float,
subrate_min: int,
subrate_max: int,
max_latency: int,
continuation_number: int,
supervision_timeout: float,
min_ce_length: float = 0.0,
max_ce_length: float = 0.0,
) -> None:
'''
Request a change of the connection parameters.
This is similar to `update_connection_parameters` but also allows specifying
the subrate parameters and supports shorter connection intervals (below
7.5ms, as introduced in Bluetooth 6.2).
Args:
connection: The connection to update
connection_interval_min: Minimum interval, in milliseconds.
connection_interval_max: Maximum interval, in milliseconds.
subrate_min: Minimum subrate factor.
subrate_max: Maximum subrate factor.
max_latency: Max latency, in number of intervals.
continuation_number: Continuation number.
supervision_timeout: Supervision Timeout, in milliseconds.
min_ce_length: Minimum connection event length, in milliseconds.
max_ce_length: Maximum connection event length, in milliseconds.
'''
# Convert the input parameters
connection_interval_min = int(connection_interval_min / 0.125)
connection_interval_max = int(connection_interval_max / 0.125)
supervision_timeout = int(supervision_timeout / 10)
min_ce_length = int(min_ce_length / 0.125)
max_ce_length = int(max_ce_length / 0.125)
pending_result = asyncio.get_running_loop().create_future()
with closing(utils.EventWatcher()) as watcher:
@watcher.on(connection, connection.EVENT_CONNECTION_PARAMETERS_UPDATE)
def _():
pending_result.set_result(None)
@watcher.on(
connection, connection.EVENT_CONNECTION_PARAMETERS_UPDATE_FAILURE
)
def _(error_code: int):
pending_result.set_exception(hci.HCI_Error(error_code))
await self.send_async_command(
hci.HCI_LE_Connection_Rate_Request_Command(
connection_handle=connection.handle,
connection_interval_min=connection_interval_min,
connection_interval_max=connection_interval_max,
subrate_min=subrate_min,
subrate_max=subrate_max,
max_latency=max_latency,
continuation_number=continuation_number,
supervision_timeout=supervision_timeout,
min_ce_length=min_ce_length,
max_ce_length=max_ce_length,
)
)
await connection.cancel_on_disconnection(pending_result)
async def update_connection_subrate(
self,
connection: Connection,
subrate_min: int,
subrate_max: int,
max_latency: int,
continuation_number: int,
supervision_timeout: float,
) -> None:
'''
Request a change to the subrating factor and/or other parameters.
Args:
connection: The connection to update
subrate_min: Minimum subrate factor.
subrate_max: Maximum subrate factor.
max_latency: Max latency, in number of intervals.
continuation_number: Continuation number.
supervision_timeout: Supervision Timeout, in milliseconds.
'''
pending_result = asyncio.get_running_loop().create_future()
with closing(utils.EventWatcher()) as watcher:
@watcher.on(connection, connection.EVENT_CONNECTION_PARAMETERS_UPDATE)
def _():
pending_result.set_result(None)
@watcher.on(
connection, connection.EVENT_CONNECTION_PARAMETERS_UPDATE_FAILURE
)
def _(error_code: int):
pending_result.set_exception(hci.HCI_Error(error_code))
await self.send_async_command(
hci.HCI_LE_Subrate_Request_Command(
connection_handle=connection.handle,
subrate_min=subrate_min,
subrate_max=subrate_max,
max_latency=max_latency,
continuation_number=continuation_number,
supervision_timeout=int(supervision_timeout / 10),
)
)
await connection.cancel_on_disconnection(pending_result)
async def get_connection_rssi(self, connection):
result = await self.send_sync_command(
@@ -4292,6 +4538,87 @@ class Device(utils.CompositeEventEmitter):
)
)
async def set_default_connection_subrate(
self,
subrate_min: int,
subrate_max: int,
max_latency: int,
continuation_number: int,
supervision_timeout: float,
) -> None:
'''
Set the default subrate parameters for new connections.
Args:
subrate_min: Minimum subrate factor.
subrate_max: Maximum subrate factor.
max_latency: Max latency, in number of intervals.
continuation_number: Continuation number.
supervision_timeout: Supervision Timeout, in milliseconds.
'''
# Convert the input parameters
supervision_timeout = int(supervision_timeout / 10)
await self.send_command(
hci.HCI_LE_Set_Default_Subrate_Command(
subrate_min=subrate_min,
subrate_max=subrate_max,
max_latency=max_latency,
continuation_number=continuation_number,
supervision_timeout=supervision_timeout,
),
check_result=True,
)
async def set_default_connection_parameters(
self,
connection_interval_min: float,
connection_interval_max: float,
subrate_min: int,
subrate_max: int,
max_latency: int,
continuation_number: int,
supervision_timeout: float,
min_ce_length: float = 0.0,
max_ce_length: float = 0.0,
) -> None:
'''
Set the default connection parameters for new connections.
Args:
connection_interval_min: Minimum interval, in milliseconds.
connection_interval_max: Maximum interval, in milliseconds.
subrate_min: Minimum subrate factor.
subrate_max: Maximum subrate factor.
max_latency: Max latency, in number of intervals.
continuation_number: Continuation number.
supervision_timeout: Supervision Timeout, in milliseconds.
min_ce_length: Minimum connection event length, in milliseconds.
max_ce_length: Maximum connection event length, in milliseconds.
'''
# Convert the input parameters
connection_interval_min = int(connection_interval_min / 0.125)
connection_interval_max = int(connection_interval_max / 0.125)
supervision_timeout = int(supervision_timeout / 10)
min_ce_length = int(min_ce_length / 0.125)
max_ce_length = int(max_ce_length / 0.125)
await self.send_sync_command(
hci.HCI_LE_Set_Default_Rate_Parameters_Command(
connection_interval_min=connection_interval_min,
connection_interval_max=connection_interval_max,
subrate_min=subrate_min,
subrate_max=subrate_max,
max_latency=max_latency,
continuation_number=continuation_number,
supervision_timeout=supervision_timeout,
min_ce_length=min_ce_length,
max_ce_length=max_ce_length,
)
)
async def transfer_periodic_sync(
self, connection: Connection, sync_handle: int, service_data: int = 0
) -> None:
@@ -4314,7 +4641,9 @@ class Device(utils.CompositeEventEmitter):
)
)
async def find_peer_by_name(self, name: str, transport=PhysicalTransport.LE):
async def find_peer_by_name(
self, name: str, transport=PhysicalTransport.LE
) -> hci.Address:
"""
Scan for a peer with a given name and return its address.
"""
@@ -4332,6 +4661,7 @@ class Device(utils.CompositeEventEmitter):
listener: Callable[..., None] | None = None
was_scanning = self.scanning
was_discovering = self.discovering
event_name: str | None = None
try:
if transport == PhysicalTransport.LE:
event_name = 'advertisement'
@@ -4357,11 +4687,11 @@ class Device(utils.CompositeEventEmitter):
if not self.discovering:
await self.start_discovery()
else:
return None
raise ValueError('invalid transport')
return await utils.cancel_on_event(self, Device.EVENT_FLUSH, peer_address)
finally:
if listener is not None:
if listener is not None and event_name is not None:
self.remove_listener(event_name, listener)
if transport == PhysicalTransport.LE and not was_scanning:
@@ -4387,7 +4717,7 @@ class Device(utils.CompositeEventEmitter):
peer_address.set_result(address)
return
if address.is_resolvable:
if address.is_resolvable and self.address_resolver is not None:
resolved_address = self.address_resolver.resolve(address)
if resolved_address == identity_address:
if not peer_address.done():
@@ -4602,7 +4932,6 @@ class Device(utils.CompositeEventEmitter):
# [Classic only]
async def request_remote_name(self, remote: hci.Address | Connection) -> str:
# Set up event handlers
pending_name: asyncio.Future[str] = asyncio.get_running_loop().create_future()
peer_address = (
@@ -6189,7 +6518,7 @@ class Device(utils.CompositeEventEmitter):
):
logger.debug(
f'*** Connection Parameters Update: [0x{connection.handle:04X}] '
f'{connection.peer_address} as {connection.role_name}, '
f'{connection.peer_address} as {connection.role_name}'
)
if connection.parameters.connection_interval != connection_interval * 1.25:
connection.parameters = Connection.Parameters(
@@ -6213,7 +6542,41 @@ class Device(utils.CompositeEventEmitter):
self, connection: Connection, error: int
):
logger.debug(
f'*** Connection Parameters Update Failed: [0x{connection.handle:04X}] '
f'*** Connection Parameters Update failed: [0x{connection.handle:04X}] '
f'{connection.peer_address} as {connection.role_name}, '
f'error={error}'
)
connection.emit(connection.EVENT_CONNECTION_PARAMETERS_UPDATE_FAILURE, error)
@host_event_handler
@with_connection_from_handle
def on_le_connection_rate_change(
self,
connection: Connection,
connection_interval: int,
subrate_factor: int,
peripheral_latency: int,
continuation_number: int,
supervision_timeout: int,
):
logger.debug(
f'*** Connection Rate Change: [0x{connection.handle:04X}] '
f'{connection.peer_address} as {connection.role_name}'
)
connection.parameters = Connection.Parameters(
connection_interval=connection_interval * 0.125,
subrate_factor=subrate_factor,
peripheral_latency=peripheral_latency,
continuation_number=continuation_number,
supervision_timeout=supervision_timeout * 10.0,
)
connection.emit(connection.EVENT_CONNECTION_PARAMETERS_UPDATE)
@host_event_handler
@with_connection_from_handle
def on_le_connection_rate_change_failure(self, connection: Connection, error: int):
logger.debug(
f'*** Connection Rate Change failed: [0x{connection.handle:04X}] '
f'{connection.peer_address} as {connection.role_name}, '
f'error={error}'
)
@@ -6256,7 +6619,25 @@ class Device(utils.CompositeEventEmitter):
subrate_factor,
continuation_number,
)
connection.emit(connection.EVENT_LE_SUBRATE_CHANGE)
connection.emit(connection.EVENT_CONNECTION_PARAMETERS_UPDATE)
@host_event_handler
@with_connection_from_handle
def on_le_subrate_change_failure(self, connection: Connection, status: int):
connection.emit(connection.EVENT_CONNECTION_PARAMETERS_UPDATE_FAILURE, status)
@host_event_handler
@with_connection_from_handle
def on_le_remote_features(
self, connection: Connection, le_features: hci.LeFeatureMask
):
connection.peer_le_features = le_features
connection.emit(connection.EVENT_LE_REMOTE_FEATURES_CHANGE)
@host_event_handler
@with_connection_from_handle
def on_le_remote_features_failure(self, connection: Connection, status: int):
connection.emit(connection.EVENT_LE_REMOTE_FEATURES_CHANGE_FAILURE, status)
@host_event_handler
@with_connection_from_handle
@@ -6303,7 +6684,7 @@ class Device(utils.CompositeEventEmitter):
rtt_capability=event.rtt_capability,
rtt_aa_only_n=event.rtt_aa_only_n,
rtt_sounding_n=event.rtt_sounding_n,
rtt_random_payload_n=event.rtt_random_payload_n,
rtt_random_sequence_n=event.rtt_random_sequence_n,
nadm_sounding_capability=event.nadm_sounding_capability,
nadm_random_capability=event.nadm_random_capability,
cs_sync_phys_supported=event.cs_sync_phys_supported,
+16 -8
View File
@@ -663,10 +663,13 @@ class Driver(common.Driver):
async def read_device_info(self) -> dict[ValueType, Any]:
self.host.ready = True
response1 = await self.host.send_sync_command(
hci.HCI_Reset_Command(), check_status=False
)
if response1.status not in (hci.HCI_UNKNOWN_HCI_COMMAND_ERROR, hci.HCI_SUCCESS):
response1 = await self.host.send_sync_command_raw(hci.HCI_Reset_Command())
if not isinstance(
response1.return_parameters, hci.HCI_StatusReturnParameters
) or response1.return_parameters.status not in (
hci.HCI_UNKNOWN_HCI_COMMAND_ERROR,
hci.HCI_SUCCESS,
):
# When the controller is in operational mode, the response is a
# successful response.
# When the controller is in bootloader mode,
@@ -676,13 +679,18 @@ class Driver(common.Driver):
raise DriverError("unexpected HCI response")
# Read the firmware version.
response2 = await self.host.send_sync_command(
HCI_Intel_Read_Version_Command(param0=0xFF), check_status=False
response2 = await self.host.send_sync_command_raw(
HCI_Intel_Read_Version_Command(param0=0xFF)
)
if response2.status != 0: # type: ignore
if (
not isinstance(
response2.return_parameters, HCI_Intel_Read_Version_ReturnParameters
)
or response2.return_parameters.status != 0
):
raise DriverError("HCI_Intel_Read_Version_Command error")
tlvs = _parse_tlv(response2.tlv) # type: ignore
tlvs = _parse_tlv(response2.return_parameters.tlv) # type: ignore
# Convert the list to a dict. That's Ok here because we only expect each type
# to appear just once.
+39 -18
View File
@@ -129,6 +129,7 @@ RTK_USB_PRODUCTS = {
(0x2357, 0x0604),
(0x2550, 0x8761),
(0x2B89, 0x8761),
(0x2C0A, 0x8761),
(0x7392, 0xC611),
# Realtek 8761CUV
(0x0B05, 0x1BF6),
@@ -533,11 +534,13 @@ class Driver(common.Driver):
@staticmethod
async def get_loaded_firmware_version(host: Host) -> int | None:
response1 = await host.send_sync_command(
HCI_RTK_Read_ROM_Version_Command(), check_status=False
)
if response1.status != hci.HCI_SUCCESS:
response1 = await host.send_sync_command_raw(HCI_RTK_Read_ROM_Version_Command())
if (
not isinstance(
response1.return_parameters, HCI_RTK_Read_ROM_Version_ReturnParameters
)
or response1.return_parameters.status != hci.HCI_SUCCESS
):
return None
response2 = await host.send_sync_command(
@@ -558,13 +561,20 @@ class Driver(common.Driver):
await host.send_sync_command(hci.HCI_Reset_Command())
host.ready = True
command = hci.HCI_Read_Local_Version_Information_Command()
response = await host.send_sync_command(command, check_status=False)
if response.status != hci.HCI_SUCCESS:
response = await host.send_sync_command_raw(
hci.HCI_Read_Local_Version_Information_Command()
)
if (
not isinstance(
response.return_parameters,
hci.HCI_Read_Local_Version_Information_ReturnParameters,
)
or response.return_parameters.status != hci.HCI_SUCCESS
):
logger.error("failed to probe local version information")
return None
local_version = response
local_version = response.return_parameters
logger.debug(
f"looking for a driver: 0x{local_version.lmp_subversion:04X} "
@@ -640,15 +650,21 @@ class Driver(common.Driver):
# TODO: load the firmware
async def download_for_rtl8723b(self):
async def download_for_rtl8723b(self) -> int | None:
if self.driver_info.has_rom_version:
response1 = await self.host.send_sync_command(
HCI_RTK_Read_ROM_Version_Command(), check_status=False
response1 = await self.host.send_sync_command_raw(
HCI_RTK_Read_ROM_Version_Command()
)
if response1.status != hci.HCI_SUCCESS:
if (
not isinstance(
response1.return_parameters,
HCI_RTK_Read_ROM_Version_ReturnParameters,
)
or response1.return_parameters.status != hci.HCI_SUCCESS
):
logger.warning("can't get ROM version")
return None
rom_version = response1.version
rom_version = response1.return_parameters.version
logger.debug(f"ROM version before download: {rom_version:04X}")
else:
rom_version = 0
@@ -690,13 +706,18 @@ class Driver(common.Driver):
logger.debug("download complete!")
# Read the version again
response2 = await self.host.send_sync_command(
HCI_RTK_Read_ROM_Version_Command(), check_status=False
response2 = await self.host.send_sync_command_raw(
HCI_RTK_Read_ROM_Version_Command()
)
if response2.status != hci.HCI_SUCCESS:
if (
not isinstance(
response2.return_parameters, HCI_RTK_Read_ROM_Version_ReturnParameters
)
or response2.return_parameters.status != hci.HCI_SUCCESS
):
logger.warning("can't get ROM version")
else:
rom_version = response2.version
rom_version = response2.return_parameters.version
logger.debug(f"ROM version after download: {rom_version:02X}")
return firmware.version
+2 -2
View File
@@ -29,7 +29,7 @@ import functools
import logging
import struct
from collections.abc import Iterable, Sequence
from typing import TypeVar
from typing import ClassVar, TypeVar
from bumble.att import Attribute, AttributeValue, AttributeValueV2
from bumble.colors import color
@@ -403,7 +403,7 @@ class TemplateService(Service):
to expose their UUID as a class property
'''
UUID: UUID
UUID: ClassVar[UUID]
def __init__(
self,
+5 -4
View File
@@ -34,11 +34,14 @@ from datetime import datetime
from typing import (
TYPE_CHECKING,
Any,
ClassVar,
Generic,
TypeVar,
overload,
)
from typing_extensions import Self
from bumble import att, core, l2cap, utils
from bumble.colors import color
from bumble.core import UUID, InvalidStateError
@@ -249,10 +252,10 @@ class ProfileServiceProxy:
Base class for profile-specific service proxies
'''
SERVICE_CLASS: type[TemplateService]
SERVICE_CLASS: ClassVar[type[TemplateService]]
@classmethod
def from_client(cls, client: Client) -> ProfileServiceProxy | None:
def from_client(cls, client: Client) -> Self | None:
return ServiceProxy.from_client(cls, client, cls.SERVICE_CLASS.UUID)
@@ -285,8 +288,6 @@ class Client:
self._bearer_id = (
f'[0x{bearer.connection.handle:04X}|CID=0x{bearer.source_cid:04X}]'
)
# Fill the mtu.
bearer.on_att_mtu_update(att.ATT_DEFAULT_MTU)
self.connection = bearer.connection
else:
bearer.on(bearer.EVENT_DISCONNECTION, self.on_disconnection)
+100 -1
View File
@@ -115,7 +115,6 @@ class Server(utils.EventEmitter):
channel.connection.handle,
channel.source_cid,
)
channel.att_mtu = att.ATT_DEFAULT_MTU
channel.sink = lambda pdu: self.on_gatt_pdu(
channel, att.ATT_PDU.from_bytes(pdu)
)
@@ -777,6 +776,18 @@ class Server(utils.EventEmitter):
error_code=att.ATT_ATTRIBUTE_NOT_FOUND_ERROR,
)
if (
request.starting_handle == 0x0000
or request.starting_handle > request.ending_handle
):
response = att.ATT_Error_Response(
request_opcode_in_error=request.op_code,
attribute_handle_in_error=request.starting_handle,
error_code=att.ATT_INVALID_HANDLE_ERROR,
)
self.send_response(bearer, response)
return
attributes: list[tuple[int, bytes]] = []
for attribute in (
attribute
@@ -977,6 +988,94 @@ class Server(utils.EventEmitter):
self.send_response(bearer, response)
@utils.AsyncRunner.run_in_task()
async def on_att_read_multiple_request(
self, bearer: att.Bearer, request: att.ATT_Read_Multiple_Request
):
'''
See Bluetooth spec Vol 3, Part F - 3.4.4.7 Read Multiple Request.
'''
response: att.ATT_PDU
pdu_space_available = bearer.att_mtu - 1
values: list[bytes] = []
for handle in request.set_of_handles:
if not (attribute := self.get_attribute(handle)):
response = att.ATT_Error_Response(
request_opcode_in_error=request.op_code,
attribute_handle_in_error=handle,
error_code=att.ATT_ATTRIBUTE_NOT_FOUND_ERROR,
)
self.send_response(bearer, response)
return
# No need to catch permission errors here, since these attributes
# must all be world-readable
attribute_value = await attribute.read_value(bearer)
# Check the attribute value size
max_attribute_size = min(bearer.att_mtu - 1, 251)
if len(attribute_value) > max_attribute_size:
# We need to truncate
attribute_value = attribute_value[:max_attribute_size]
# Check if there is enough space
entry_size = len(attribute_value)
if pdu_space_available < entry_size:
break
# Add the attribute to the list
values.append(attribute_value)
pdu_space_available -= entry_size
response = att.ATT_Read_Multiple_Response(set_of_values=b''.join(values))
self.send_response(bearer, response)
@utils.AsyncRunner.run_in_task()
async def on_att_read_multiple_variable_request(
self, bearer: att.Bearer, request: att.ATT_Read_Multiple_Variable_Request
):
'''
See Bluetooth spec Vol 3, Part F - 3.4.4.11 Read Multiple Variable Request.
'''
response: att.ATT_PDU
pdu_space_available = bearer.att_mtu - 1
length_value_tuple_list: list[tuple[int, bytes]] = []
for handle in request.set_of_handles:
if not (attribute := self.get_attribute(handle)):
response = att.ATT_Error_Response(
request_opcode_in_error=request.op_code,
attribute_handle_in_error=handle,
error_code=att.ATT_ATTRIBUTE_NOT_FOUND_ERROR,
)
self.send_response(bearer, response)
return
# No need to catch permission errors here, since these attributes
# must all be world-readable
attribute_value = await attribute.read_value(bearer)
length = len(attribute_value)
# Check the attribute value size
max_attribute_size = min(bearer.att_mtu - 3, 251)
if len(attribute_value) > max_attribute_size:
# We need to truncate
attribute_value = attribute_value[:max_attribute_size]
# Check if there is enough space
entry_size = 2 + len(attribute_value)
# Add the attribute to the list
length_value_tuple_list.append((length, attribute_value))
pdu_space_available -= entry_size
if pdu_space_available <= 0:
break
response = att.ATT_Read_Multiple_Variable_Response(
length_value_tuple_list=length_value_tuple_list
)
self.send_response(bearer, response)
@utils.AsyncRunner.run_in_task()
async def on_att_write_request(
self, bearer: att.Bearer, request: att.ATT_Write_Request
+244 -74
View File
@@ -398,8 +398,9 @@ HCI_LE_CS_SUBEVENT_RESULT_EVENT = 0x31
HCI_LE_CS_SUBEVENT_RESULT_CONTINUE_EVENT = 0x32
HCI_LE_CS_TEST_END_COMPLETE_EVENT = 0x33
HCI_LE_MONITORED_ADVERTISERS_REPORT_EVENT = 0x34
HCI_LE_FRAME_SPACE_UPDATE_EVENT = 0x35
HCI_LE_FRAME_SPACE_UPDATE_COMPLETE_EVENT = 0x35
HCI_LE_UTP_RECEIVE_EVENT = 0x36
HCI_LE_CONNECTION_RATE_CHANGE_EVENT = 0x37
# HCI Command
@@ -736,6 +737,12 @@ HCI_LE_CLEAR_MONITORED_ADVERTISERS_LIST_COMMAND = hci_c
HCI_LE_READ_MONITORED_ADVERTISERS_LIST_SIZE_COMMAND = hci_command_op_code(0x08, 0x009B)
HCI_LE_ENABLE_MONITORING_ADVERTISERS_COMMAND = hci_command_op_code(0x08, 0x009C)
HCI_LE_FRAME_SPACE_UPDATE_COMMAND = hci_command_op_code(0x08, 0x009D)
HCI_LE_SET_RESOLVABLE_PRIVATE_ADDRESS_TIMEOUT_V2_COMMAND = hci_command_op_code(0x08, 0x009E)
HCI_LE_ENABLE_UTP_OTA_MODE_COMMAND = hci_command_op_code(0x08, 0x009F)
HCI_LE_UTP_SEND_COMMAND = hci_command_op_code(0x08, 0x00A0)
HCI_LE_CONNECTION_RATE_REQUEST_COMMAND = hci_command_op_code(0x08, 0x00A1)
HCI_LE_SET_DEFAULT_RATE_PARAMETERS_COMMAND = hci_command_op_code(0x08, 0x00A2)
HCI_LE_READ_MINIMUM_SUPPORTED_CONNECTION_INTERVAL_COMMAND = hci_command_op_code(0x08, 0x00A3)
# HCI Error Codes
@@ -1398,6 +1405,12 @@ HCI_SUPPORTED_COMMANDS_MASKS = {
HCI_LE_CLEAR_MONITORED_ADVERTISERS_LIST_COMMAND : 1 << (47*8+7),
HCI_LE_READ_MONITORED_ADVERTISERS_LIST_SIZE_COMMAND : 1 << (48*8+0),
HCI_LE_FRAME_SPACE_UPDATE_COMMAND : 1 << (48*8+1),
HCI_LE_SET_RESOLVABLE_PRIVATE_ADDRESS_TIMEOUT_V2_COMMAND : 1 << (48*8+2),
HCI_LE_ENABLE_UTP_OTA_MODE_COMMAND : 1 << (48*8+3),
HCI_LE_UTP_SEND_COMMAND : 1 << (48*8+4),
HCI_LE_CONNECTION_RATE_REQUEST_COMMAND : 1 << (48*8+5),
HCI_LE_SET_DEFAULT_RATE_PARAMETERS_COMMAND : 1 << (48*8+6),
HCI_LE_READ_MINIMUM_SUPPORTED_CONNECTION_INTERVAL_COMMAND : 1 << (48*8+7)
}
# LE Supported Features
@@ -1455,6 +1468,14 @@ class LeFeature(SpecableEnum):
LL_EXTENDED_FEATURE_SET = 63
MONITORING_ADVERTISERS = 64
FRAME_SPACE_UPDATE = 65
UTP_OTA_MODE = 66
UTP_HCI_MODE = 67
LL_OTA_UTP_IND_MAXIMUM_LENGTH_0 = 68
LL_OTA_UTP_IND_MAXIMUM_LENGTH_1 = 69
SHORTER_CONNECTION_INTERVALS = 72
SHORTER_CONNECTION_INTERVALS_HOST_SUPPORT = 73
LE_FLUSHABLE_ACL_DATA = 74
class LeFeatureMask(utils.CompatibleIntFlag):
LE_ENCRYPTION = 1 << LeFeature.LE_ENCRYPTION
@@ -1509,6 +1530,13 @@ class LeFeatureMask(utils.CompatibleIntFlag):
LL_EXTENDED_FEATURE_SET = 1 << LeFeature.LL_EXTENDED_FEATURE_SET
MONITORING_ADVERTISERS = 1 << LeFeature.MONITORING_ADVERTISERS
FRAME_SPACE_UPDATE = 1 << LeFeature.FRAME_SPACE_UPDATE
UTP_OTA_MODE = 1 << LeFeature.UTP_OTA_MODE
UTP_HCI_MODE = 1 << LeFeature.UTP_HCI_MODE
LL_OTA_UTP_IND_MAXIMUM_LENGTH_0 = 1 << LeFeature.LL_OTA_UTP_IND_MAXIMUM_LENGTH_0
LL_OTA_UTP_IND_MAXIMUM_LENGTH_1 = 1 << LeFeature.LL_OTA_UTP_IND_MAXIMUM_LENGTH_1
SHORTER_CONNECTION_INTERVALS = 1 << LeFeature.SHORTER_CONNECTION_INTERVALS
SHORTER_CONNECTION_INTERVALS_HOST_SUPPORT = 1 << LeFeature.SHORTER_CONNECTION_INTERVALS_HOST_SUPPORT
LE_FLUSHABLE_ACL_DATA = 1 << LeFeature.LE_FLUSHABLE_ACL_DATA
class LmpFeature(SpecableEnum):
# Page 0 (Legacy LMP features)
@@ -2165,9 +2193,9 @@ class Address:
RANDOM_IDENTITY_ADDRESS = AddressType.RANDOM_IDENTITY
# Type declarations
NIL: Address
ANY: Address
ANY_RANDOM: Address
NIL: ClassVar[Address]
ANY: ClassVar[Address]
ANY_RANDOM: ClassVar[Address]
# pylint: disable-next=unnecessary-lambda
ADDRESS_TYPE_SPEC = {'size': 1, 'mapper': lambda x: Address.address_type_name(x)}
@@ -2279,38 +2307,38 @@ class Address:
self.address_type = address_type
def clone(self):
def clone(self) -> Address:
return Address(self.address_bytes, self.address_type)
@property
def is_public(self):
def is_public(self) -> bool:
return self.address_type in (
self.PUBLIC_DEVICE_ADDRESS,
self.PUBLIC_IDENTITY_ADDRESS,
)
@property
def is_random(self):
def is_random(self) -> bool:
return not self.is_public
@property
def is_resolved(self):
def is_resolved(self) -> bool:
return self.address_type in (
self.PUBLIC_IDENTITY_ADDRESS,
self.RANDOM_IDENTITY_ADDRESS,
)
@property
def is_resolvable(self):
def is_resolvable(self) -> bool:
return self.address_type == self.RANDOM_DEVICE_ADDRESS and (
self.address_bytes[5] >> 6 == 1
)
@property
def is_static(self):
def is_static(self) -> bool:
return self.is_random and (self.address_bytes[5] >> 6 == 3)
def to_string(self, with_type_qualifier=True):
def to_string(self, with_type_qualifier: bool = True) -> str:
'''
String representation of the address, MSB first, with an optional type
qualifier.
@@ -2320,23 +2348,23 @@ class Address:
return result
return result + '/P'
def __bytes__(self):
def __bytes__(self) -> bytes:
return self.address_bytes
def __hash__(self):
def __hash__(self) -> int:
return hash(self.address_bytes)
def __eq__(self, other):
def __eq__(self, other: Any) -> bool:
return (
isinstance(other, Address)
and self.address_bytes == other.address_bytes
and self.is_public == other.is_public
)
def __str__(self):
def __str__(self) -> str:
return self.to_string()
def __repr__(self):
def __repr__(self) -> str:
return f'Address({self.to_string(False)}/{self.address_type_name(self.address_type)})'
@@ -2375,30 +2403,34 @@ class HCI_Packet:
Abstract Base class for HCI packets
'''
hci_packet_type: ClassVar[int]
hci_packet_type: int
@staticmethod
def from_bytes(packet: bytes) -> HCI_Packet:
packet_type = packet[0]
@classmethod
def from_bytes(cls, packet: bytes) -> HCI_Packet:
try:
packet_type = packet[0]
if packet_type == HCI_COMMAND_PACKET:
return HCI_Command.from_bytes(packet)
if packet_type == HCI_COMMAND_PACKET:
return HCI_Command.from_bytes(packet)
if packet_type == HCI_ACL_DATA_PACKET:
return HCI_AclDataPacket.from_bytes(packet)
if packet_type == HCI_ACL_DATA_PACKET:
return HCI_AclDataPacket.from_bytes(packet)
if packet_type == HCI_SYNCHRONOUS_DATA_PACKET:
return HCI_SynchronousDataPacket.from_bytes(packet)
if packet_type == HCI_SYNCHRONOUS_DATA_PACKET:
return HCI_SynchronousDataPacket.from_bytes(packet)
if packet_type == HCI_EVENT_PACKET:
return HCI_Event.from_bytes(packet)
if packet_type == HCI_EVENT_PACKET:
return HCI_Event.from_bytes(packet)
if packet_type == HCI_ISO_DATA_PACKET:
return HCI_IsoDataPacket.from_bytes(packet)
if packet_type == HCI_ISO_DATA_PACKET:
return HCI_IsoDataPacket.from_bytes(packet)
return HCI_CustomPacket(packet)
return HCI_CustomPacket(packet)
except Exception as e:
logger.error(f'error parsing HCI packet [{packet.hex()}]: {e}')
raise
def __init__(self, name):
def __init__(self, name: str) -> None:
self.name = name
def __bytes__(self) -> bytes:
@@ -2410,7 +2442,7 @@ class HCI_Packet:
# -----------------------------------------------------------------------------
class HCI_CustomPacket(HCI_Packet):
def __init__(self, payload):
def __init__(self, payload: bytes) -> None:
super().__init__('HCI_CUSTOM_PACKET')
self.hci_packet_type = payload[0]
self.payload = payload
@@ -2569,6 +2601,21 @@ class HCI_GenericReturnParameters(HCI_ReturnParameters):
class HCI_StatusReturnParameters(HCI_ReturnParameters):
status: HCI_ErrorCode = field(metadata=HCI_ErrorCode.type_metadata(1))
@classmethod
def from_parameters(cls, parameters: bytes) -> Self | HCI_StatusReturnParameters:
status = HCI_ErrorCode(parameters[0])
if status != HCI_ErrorCode.SUCCESS:
# Don't parse further, just return the status.
return HCI_StatusReturnParameters(status=status)
return cls(**HCI_Object.dict_from_bytes(parameters, 0, cls.fields))
@dataclasses.dataclass
class HCI_GenericStatusReturnParameters(HCI_StatusReturnParameters):
data: bytes = field(metadata=metadata('*'))
@dataclasses.dataclass
class HCI_StatusAndAddressReturnParameters(HCI_StatusReturnParameters):
@@ -3746,7 +3793,7 @@ class HCI_Write_Extended_Inquiry_Response_Command(
'''
fec_required: int = field(metadata=metadata(1))
extended_inquiry_response: int = field(
extended_inquiry_response: bytes = field(
metadata=metadata({'size': 240, 'serializer': lambda x: padded_bytes(x, 240)})
)
@@ -5796,7 +5843,25 @@ class HCI_LE_Subrate_Request_Command(HCI_AsyncCommand):
# -----------------------------------------------------------------------------
@dataclasses.dataclass
class HHCI_LE_CS_Read_Local_Supported_Capabilities_ReturnParameters(
class HCI_LE_Read_All_Local_Supported_Features_ReturnParameters(
HCI_StatusReturnParameters
):
max_page: int = field(metadata=metadata(1))
le_features: bytes = field(metadata=metadata(248))
@HCI_SyncCommand.sync_command(HCI_LE_Read_All_Local_Supported_Features_ReturnParameters)
class HCI_LE_Read_All_Local_Supported_Features_Command(
HCI_SyncCommand[HCI_LE_Read_All_Local_Supported_Features_ReturnParameters]
):
'''
See Bluetooth spec @ 7.8.128 LE Read All Local Supported Features Command
'''
# -----------------------------------------------------------------------------
@dataclasses.dataclass
class HCI_LE_CS_Read_Local_Supported_Capabilities_ReturnParameters(
HCI_StatusReturnParameters
):
num_config_supported: int = field(metadata=metadata(1))
@@ -5808,7 +5873,7 @@ class HHCI_LE_CS_Read_Local_Supported_Capabilities_ReturnParameters(
rtt_capability: int = field(metadata=metadata(1))
rtt_aa_only_n: int = field(metadata=metadata(1))
rtt_sounding_n: int = field(metadata=metadata(1))
rtt_random_payload_n: int = field(metadata=metadata(1))
rtt_random_sequence_n: int = field(metadata=metadata(1))
nadm_sounding_capability: int = field(metadata=metadata(2))
nadm_random_capability: int = field(metadata=metadata(2))
cs_sync_phys_supported: int = field(metadata=metadata(CS_SYNC_PHY_SUPPORTED_SPEC))
@@ -5822,11 +5887,11 @@ class HHCI_LE_CS_Read_Local_Supported_Capabilities_ReturnParameters(
@HCI_SyncCommand.sync_command(
HHCI_LE_CS_Read_Local_Supported_Capabilities_ReturnParameters
HCI_LE_CS_Read_Local_Supported_Capabilities_ReturnParameters
)
@dataclasses.dataclass
class HCI_LE_CS_Read_Local_Supported_Capabilities_Command(
HCI_SyncCommand[HHCI_LE_CS_Read_Local_Supported_Capabilities_ReturnParameters]
HCI_SyncCommand[HCI_LE_CS_Read_Local_Supported_Capabilities_ReturnParameters]
):
'''
See Bluetooth spec @ 7.8.130 LE CS Read Local Supported Capabilities command
@@ -5864,7 +5929,7 @@ class HCI_LE_CS_Write_Cached_Remote_Supported_Capabilities_Command(
rtt_capability: int = field(metadata=metadata(1))
rtt_aa_only_n: int = field(metadata=metadata(1))
rtt_sounding_n: int = field(metadata=metadata(1))
rtt_random_payload_n: int = field(metadata=metadata(1))
rtt_random_sequence_n: int = field(metadata=metadata(1))
nadm_sounding_capability: int = field(metadata=metadata(2))
nadm_random_capability: int = field(metadata=metadata(2))
cs_sync_phys_supported: int = field(metadata=metadata(CS_SYNC_PHY_SUPPORTED_SPEC))
@@ -6073,6 +6138,92 @@ class HCI_LE_CS_Test_End_Command(HCI_AsyncCommand):
'''
# -----------------------------------------------------------------------------
@HCI_Command.command
@dataclasses.dataclass
class HCI_LE_Frame_Space_Update_Command(HCI_AsyncCommand):
'''
See Bluetooth spec @ 7.8.151 LE Frame Space Update command
'''
class SpacingType(SpecableFlag):
T_IFS_ACL_CP = 1 << 0
T_IFS_ACL_PC = 1 << 1
T_MCES = 1 << 2
T_IFS_CIS = 1 << 3
T_MSS_CIS = 1 << 4
connection_handle: int = field(metadata=metadata(2))
frame_space_min: int = field(metadata=metadata(2))
frame_space_max: int = field(metadata=metadata(2))
phys: int = field(metadata=PhyBit.type_metadata(1))
spacing_types: int = field(metadata=SpacingType.type_metadata(2))
# -----------------------------------------------------------------------------
@HCI_Command.command
@dataclasses.dataclass
class HCI_LE_Connection_Rate_Request_Command(HCI_AsyncCommand):
'''
See Bluetooth spec @ 7.8.154 LE Connection Rate Request command
'''
connection_handle: int = field(metadata=metadata(2))
connection_interval_min: int = field(metadata=metadata(2))
connection_interval_max: int = field(metadata=metadata(2))
subrate_min: int = field(metadata=metadata(2))
subrate_max: int = field(metadata=metadata(2))
max_latency: int = field(metadata=metadata(2))
continuation_number: int = field(metadata=metadata(2))
supervision_timeout: int = field(metadata=metadata(2))
min_ce_length: int = field(metadata=metadata(2))
max_ce_length: int = field(metadata=metadata(2))
# -----------------------------------------------------------------------------
@HCI_SyncCommand.sync_command(HCI_StatusReturnParameters)
@dataclasses.dataclass
class HCI_LE_Set_Default_Rate_Parameters_Command(
HCI_SyncCommand[HCI_StatusReturnParameters]
):
'''
See Bluetooth spec @ 7.8.155 LE Set Default Rate Parameters command
'''
connection_interval_min: int = field(metadata=metadata(2))
connection_interval_max: int = field(metadata=metadata(2))
subrate_min: int = field(metadata=metadata(2))
subrate_max: int = field(metadata=metadata(2))
max_latency: int = field(metadata=metadata(2))
continuation_number: int = field(metadata=metadata(2))
supervision_timeout: int = field(metadata=metadata(2))
min_ce_length: int = field(metadata=metadata(2))
max_ce_length: int = field(metadata=metadata(2))
# -----------------------------------------------------------------------------
@dataclasses.dataclass
class HCI_LE_Read_Minimum_Supported_Connection_Interval_ReturnParameters(
HCI_StatusReturnParameters
):
minimum_supported_connection_interval: int = field(metadata=metadata(1))
group_min: Sequence[int] = field(metadata=metadata(2, list_begin=True))
group_max: Sequence[int] = field(metadata=metadata(2))
group_stride: Sequence[int] = field(metadata=metadata(2, list_end=True))
@HCI_SyncCommand.sync_command(
HCI_LE_Read_Minimum_Supported_Connection_Interval_ReturnParameters
)
@dataclasses.dataclass
class HCI_LE_Read_Minimum_Supported_Connection_Interval_Command(
HCI_SyncCommand[HCI_LE_Read_Minimum_Supported_Connection_Interval_ReturnParameters]
):
'''
See Bluetooth spec @ 7.8.156 LE Read Minimum Supported Connection Interval command
'''
# -----------------------------------------------------------------------------
# HCI Events
# -----------------------------------------------------------------------------
@@ -6986,7 +7137,7 @@ class HCI_LE_CS_Read_Remote_Supported_Capabilities_Complete_Event(HCI_LE_Meta_Ev
rtt_capability: int = field(metadata=metadata(1))
rtt_aa_only_n: int = field(metadata=metadata(1))
rtt_sounding_n: int = field(metadata=metadata(1))
rtt_random_payload_n: int = field(metadata=metadata(1))
rtt_random_sequence_n: int = field(metadata=metadata(1))
nadm_sounding_capability: int = field(metadata=metadata(2))
nadm_random_capability: int = field(metadata=metadata(2))
cs_sync_phys_supported: int = field(metadata=metadata(CS_SYNC_PHY_SUPPORTED_SPEC))
@@ -7142,6 +7293,23 @@ class HCI_LE_CS_Test_End_Complete_Event(HCI_LE_Meta_Event):
status: int = field(metadata=metadata(STATUS_SPEC))
# -----------------------------------------------------------------------------
@HCI_LE_Meta_Event.event
@dataclasses.dataclass
class HCI_LE_Connection_Rate_Change_Event(HCI_LE_Meta_Event):
'''
See Bluetooth spec @ 7.7.65.50 LE Connection Rate Change event
'''
status: int = field(metadata=metadata(STATUS_SPEC))
connection_handle: int = field(metadata=metadata(2))
connection_interval: int = field(metadata=metadata(2))
subrate_factor: int = field(metadata=metadata(2))
peripheral_latency: int = field(metadata=metadata(2))
continuation_number: int = field(metadata=metadata(2))
supervision_timeout: int = field(metadata=metadata(2))
# -----------------------------------------------------------------------------
@HCI_Event.event
@dataclasses.dataclass
@@ -7345,6 +7513,7 @@ class HCI_Command_Complete_Event(HCI_Event, Generic[_RP]):
def from_parameters(cls, parameters: bytes) -> Self:
event = cls(**HCI_Object.dict_from_bytes(parameters, 0, cls.fields))
event.parameters = parameters
return_parameters_bytes = parameters[3:]
# Find the class for the matching command.
subclass = HCI_Command.command_classes.get(event.command_opcode)
@@ -7357,16 +7526,16 @@ class HCI_Command_Complete_Event(HCI_Event, Generic[_RP]):
'HCI Command Complete event with opcode for a class that is not'
' an HCI_SyncCommand subclass: '
f'opcode={event.command_opcode:#04x}, '
f'type={type(subclass).__name__}'
f'type={subclass.__name__}'
)
event.return_parameters = HCI_GenericReturnParameters(
data=event.return_parameters # type: ignore[arg-type]
data=return_parameters_bytes
) # type: ignore[assignment]
return event
# Parse the return parameters bytes into an object.
event.return_parameters = subclass.parse_return_parameters(
event.return_parameters # type: ignore[arg-type]
return_parameters_bytes
) # type: ignore[assignment]
return event
@@ -7805,6 +7974,7 @@ class HCI_Vendor_Event(HCI_Event):
# -----------------------------------------------------------------------------
@dataclasses.dataclass
class HCI_AclDataPacket(HCI_Packet):
'''
See Bluetooth spec @ 5.4.2 HCI ACL Data Packets
@@ -7812,8 +7982,14 @@ class HCI_AclDataPacket(HCI_Packet):
hci_packet_type = HCI_ACL_DATA_PACKET
@staticmethod
def from_bytes(packet: bytes) -> HCI_AclDataPacket:
connection_handle: int
pb_flag: int
bc_flag: int
data_total_length: int
data: bytes
@classmethod
def from_bytes(cls, packet: bytes) -> HCI_AclDataPacket:
# Read the header
h, data_total_length = struct.unpack_from('<HH', packet, 1)
connection_handle = h & 0xFFF
@@ -7822,25 +7998,22 @@ class HCI_AclDataPacket(HCI_Packet):
data = packet[5:]
if len(data) != data_total_length:
raise InvalidPacketError('invalid packet length')
return HCI_AclDataPacket(
connection_handle, pb_flag, bc_flag, data_total_length, data
return cls(
connection_handle=connection_handle,
pb_flag=pb_flag,
bc_flag=bc_flag,
data_total_length=data_total_length,
data=data,
)
def __bytes__(self):
def __bytes__(self) -> bytes:
h = (self.pb_flag << 12) | (self.bc_flag << 14) | self.connection_handle
return (
struct.pack('<BHH', HCI_ACL_DATA_PACKET, h, self.data_total_length)
+ self.data
)
def __init__(self, connection_handle, pb_flag, bc_flag, data_total_length, data):
self.connection_handle = connection_handle
self.pb_flag = pb_flag
self.bc_flag = bc_flag
self.data_total_length = data_total_length
self.data = data
def __str__(self):
def __str__(self) -> str:
return (
f'{color("ACL", "blue")}: '
f'handle=0x{self.connection_handle:04x}, '
@@ -7851,6 +8024,7 @@ class HCI_AclDataPacket(HCI_Packet):
# -----------------------------------------------------------------------------
@dataclasses.dataclass
class HCI_SynchronousDataPacket(HCI_Packet):
'''
See Bluetooth spec @ 5.4.3 HCI SCO Data Packets
@@ -7858,8 +8032,13 @@ class HCI_SynchronousDataPacket(HCI_Packet):
hci_packet_type = HCI_SYNCHRONOUS_DATA_PACKET
@staticmethod
def from_bytes(packet: bytes) -> HCI_SynchronousDataPacket:
connection_handle: int
packet_status: int
data_total_length: int
data: bytes
@classmethod
def from_bytes(cls, packet: bytes) -> HCI_SynchronousDataPacket:
# Read the header
h, data_total_length = struct.unpack_from('<HB', packet, 1)
connection_handle = h & 0xFFF
@@ -7869,8 +8048,11 @@ class HCI_SynchronousDataPacket(HCI_Packet):
raise InvalidPacketError(
f'invalid packet length {len(data)} != {data_total_length}'
)
return HCI_SynchronousDataPacket(
connection_handle, packet_status, data_total_length, data
return cls(
connection_handle=connection_handle,
packet_status=packet_status,
data_total_length=data_total_length,
data=data,
)
def __bytes__(self) -> bytes:
@@ -7880,18 +8062,6 @@ class HCI_SynchronousDataPacket(HCI_Packet):
+ self.data
)
def __init__(
self,
connection_handle: int,
packet_status: int,
data_total_length: int,
data: bytes,
) -> None:
self.connection_handle = connection_handle
self.packet_status = packet_status
self.data_total_length = data_total_length
self.data = data
def __str__(self) -> str:
return (
f'{color("SCO", "blue")}: '
@@ -7909,7 +8079,7 @@ class HCI_IsoDataPacket(HCI_Packet):
See Bluetooth spec @ 5.4.5 HCI ISO Data Packets
'''
hci_packet_type: ClassVar[int] = HCI_ISO_DATA_PACKET
hci_packet_type = HCI_ISO_DATA_PACKET
connection_handle: int
data_total_length: int
+2 -2
View File
@@ -312,11 +312,11 @@ class HID(ABC, utils.EventEmitter):
def send_pdu_on_ctrl(self, msg: bytes) -> None:
assert self.l2cap_ctrl_channel
self.l2cap_ctrl_channel.send_pdu(msg)
self.l2cap_ctrl_channel.write(msg)
def send_pdu_on_intr(self, msg: bytes) -> None:
assert self.l2cap_intr_channel
self.l2cap_intr_channel.send_pdu(msg)
self.l2cap_intr_channel.write(msg)
def send_data(self, data: bytes) -> None:
if self.role == HID.Role.HOST:
+146 -57
View File
@@ -21,7 +21,6 @@ import asyncio
import collections
import dataclasses
import logging
import struct
from collections.abc import Awaitable, Callable
from typing import TYPE_CHECKING, Any, TypeVar, cast, overload
@@ -271,14 +270,19 @@ class Host(utils.EventEmitter):
self.sco_links = {} # SCO links, by connection handle
self.bigs = {} # BIG Handle to BIS Handles
self.pending_command: hci.HCI_SyncCommand | hci.HCI_AsyncCommand | None = None
self.pending_response: asyncio.Future[Any] | None = None
self.pending_response: (
asyncio.Future[
hci.HCI_Command_Complete_Event | hci.HCI_Command_Status_Event
]
| None
) = None
self.number_of_supported_advertising_sets = 0
self.maximum_advertising_data_length = 31
self.local_version: (
hci.HCI_Read_Local_Version_Information_ReturnParameters | None
) = None
self.local_supported_commands = 0
self.local_le_features = 0
self.local_le_features = hci.LeFeatureMask(0) # LE features
self.local_lmp_features = hci.LmpFeatureMask(0) # Classic LMP features
self.suggested_max_tx_octets = 251 # Max allowed
self.suggested_max_tx_time = 2120 # Max allowed
@@ -348,17 +352,26 @@ class Host(utils.EventEmitter):
response1.supported_commands, 'little'
)
if self.supports_command(hci.HCI_LE_READ_LOCAL_SUPPORTED_FEATURES_COMMAND):
response2 = await self.send_sync_command(
hci.HCI_LE_Read_Local_Supported_Features_Command()
)
self.local_le_features = struct.unpack('<Q', response2.le_features)[0]
if self.supports_command(hci.HCI_READ_LOCAL_VERSION_INFORMATION_COMMAND):
self.local_version = await self.send_sync_command(
hci.HCI_Read_Local_Version_Information_Command()
)
if self.supports_command(hci.HCI_LE_READ_ALL_LOCAL_SUPPORTED_FEATURES_COMMAND):
response2 = await self.send_sync_command(
hci.HCI_LE_Read_All_Local_Supported_Features_Command()
)
self.local_le_features = hci.LeFeatureMask(
int.from_bytes(response2.le_features, 'little')
)
elif self.supports_command(hci.HCI_LE_READ_LOCAL_SUPPORTED_FEATURES_COMMAND):
response3 = await self.send_sync_command(
hci.HCI_LE_Read_Local_Supported_Features_Command()
)
self.local_le_features = hci.LeFeatureMask(
int.from_bytes(response3.le_features, 'little')
)
if self.supports_command(hci.HCI_READ_LOCAL_EXTENDED_FEATURES_COMMAND):
max_page_number = 0
page_number = 0
@@ -375,7 +388,6 @@ class Host(utils.EventEmitter):
max_page_number = response4.maximum_page_number
page_number += 1
self.local_lmp_features = hci.LmpFeatureMask(lmp_features)
elif self.supports_command(hci.HCI_READ_LOCAL_SUPPORTED_FEATURES_COMMAND):
response5 = await self.send_sync_command(
hci.HCI_Read_Local_Supported_Features_Command()
@@ -494,12 +506,17 @@ class Host(utils.EventEmitter):
hci.HCI_LE_TRANSMIT_POWER_REPORTING_EVENT,
hci.HCI_LE_BIGINFO_ADVERTISING_REPORT_EVENT,
hci.HCI_LE_SUBRATE_CHANGE_EVENT,
hci.HCI_LE_READ_ALL_REMOTE_FEATURES_COMPLETE_EVENT,
hci.HCI_LE_CS_READ_REMOTE_SUPPORTED_CAPABILITIES_COMPLETE_EVENT,
hci.HCI_LE_CS_PROCEDURE_ENABLE_COMPLETE_EVENT,
hci.HCI_LE_CS_SECURITY_ENABLE_COMPLETE_EVENT,
hci.HCI_LE_CS_CONFIG_COMPLETE_EVENT,
hci.HCI_LE_CS_SUBEVENT_RESULT_EVENT,
hci.HCI_LE_CS_SUBEVENT_RESULT_CONTINUE_EVENT,
hci.HCI_LE_MONITORED_ADVERTISERS_REPORT_EVENT,
hci.HCI_LE_FRAME_SPACE_UPDATE_COMPLETE_EVENT,
hci.HCI_LE_UTP_RECEIVE_EVENT,
hci.HCI_LE_CONNECTION_RATE_CHANGE_EVENT,
]
)
@@ -646,25 +663,35 @@ class Host(utils.EventEmitter):
response_timeout: float | None = None,
) -> hci.HCI_Command_Complete_Event | hci.HCI_Command_Status_Event:
# Wait until we can send (only one pending command at a time)
async with self.command_semaphore:
assert self.pending_command is None
assert self.pending_response is None
await self.command_semaphore.acquire()
# Create a future value to hold the eventual response
self.pending_response = asyncio.get_running_loop().create_future()
self.pending_command = command
# Create a future value to hold the eventual response
assert self.pending_command is None
assert self.pending_response is None
self.pending_response = asyncio.get_running_loop().create_future()
self.pending_command = command
try:
self.send_hci_packet(command)
return await asyncio.wait_for(
self.pending_response, timeout=response_timeout
)
except Exception:
logger.exception(color("!!! Exception while sending command:", "red"))
raise
finally:
self.pending_command = None
self.pending_response = None
response: (
hci.HCI_Command_Complete_Event | hci.HCI_Command_Status_Event | None
) = None
try:
self.send_hci_packet(command)
response = await asyncio.wait_for(
self.pending_response, timeout=response_timeout
)
return response
except Exception:
logger.exception(color("!!! Exception while sending command:", "red"))
raise
finally:
self.pending_command = None
self.pending_response = None
if (
response is not None
and response.num_hci_command_packets
and self.command_semaphore.locked()
):
self.command_semaphore.release()
@overload
async def send_command(
@@ -717,30 +744,42 @@ class Host(utils.EventEmitter):
return response
async def send_sync_command(
self, command: hci.HCI_SyncCommand[_RP], response_timeout: float | None = None
) -> _RP:
response = await self.send_sync_command_raw(command, response_timeout)
return_parameters = response.return_parameters
# Check the return parameters's status
if isinstance(return_parameters, hci.HCI_StatusReturnParameters):
status = return_parameters.status
elif isinstance(return_parameters, hci.HCI_GenericReturnParameters):
# if the payload has at least one byte, assume the first byte is the status
if not return_parameters.data:
raise RuntimeError('no status byte in return parameters')
status = hci.HCI_ErrorCode(return_parameters.data[0])
else:
raise RuntimeError(
f'unexpected return parameters type ({type(return_parameters)})'
)
if status != hci.HCI_ErrorCode.SUCCESS:
logger.warning(
f'{command.name} failed ' f'({hci.HCI_Constant.error_name(status)})'
)
raise hci.HCI_Error(status)
return return_parameters
async def send_sync_command_raw(
self,
command: hci.HCI_SyncCommand[_RP],
check_status: bool = True,
response_timeout: float | None = None,
) -> _RP:
) -> hci.HCI_Command_Complete_Event[_RP]:
response = await self._send_command(command, response_timeout)
# Check that the response is of the expected type
assert isinstance(response, hci.HCI_Command_Complete_Event)
return_parameters: _RP = response.return_parameters
assert isinstance(return_parameters, command.return_parameters_class)
# Check the return parameters if required
if check_status:
if isinstance(return_parameters, hci.HCI_StatusReturnParameters):
status = return_parameters.status
if status != hci.HCI_SUCCESS:
logger.warning(
f'{command.name} failed '
f'({hci.HCI_Constant.error_name(status)})'
)
raise hci.HCI_Error(status)
return return_parameters
return response
async def send_async_command(
self,
@@ -795,6 +834,16 @@ class Host(utils.EventEmitter):
)
packet_queue.enqueue(acl_packet, connection_handle)
def send_sco_sdu(self, connection_handle: int, sdu: bytes) -> None:
self.send_hci_packet(
hci.HCI_SynchronousDataPacket(
connection_handle=connection_handle,
packet_status=0,
data_total_length=len(sdu),
data=sdu,
)
)
def send_l2cap_pdu(self, connection_handle: int, cid: int, pdu: bytes) -> None:
self.send_acl_sdu(connection_handle, bytes(L2CAP_PDU(cid, pdu)))
@@ -879,16 +928,18 @@ class Host(utils.EventEmitter):
if self.local_supported_commands & mask
)
def supports_le_features(self, feature: hci.LeFeatureMask) -> bool:
return (self.local_le_features & feature) == feature
def supports_le_features(self, features: hci.LeFeatureMask) -> bool:
return (self.local_le_features & features) == features
def supports_lmp_features(self, feature: hci.LmpFeatureMask) -> bool:
return self.local_lmp_features & (feature) == feature
def supports_lmp_features(self, features: hci.LmpFeatureMask) -> bool:
return self.local_lmp_features & (features) == features
@property
def supported_le_features(self):
def supported_le_features(self) -> list[hci.LeFeature]:
return [
feature for feature in range(64) if self.local_le_features & (1 << feature)
feature
for feature in hci.LeFeature
if self.local_le_features & (1 << feature)
]
# Packet Sink protocol (packets coming from the controller via HCI)
@@ -977,6 +1028,8 @@ class Host(utils.EventEmitter):
self.pending_response.set_result(event)
else:
logger.warning('!!! no pending response future to set')
if event.num_hci_command_packets and self.command_semaphore.locked():
self.command_semaphore.release()
############################################################
# HCI handlers
@@ -988,7 +1041,13 @@ class Host(utils.EventEmitter):
if event.command_opcode == 0:
# This is used just for the Num_HCI_Command_Packets field, not related to
# an actual command
logger.debug('no-command event')
logger.debug('no-command event for flow control')
# Release the command semaphore if needed
if event.num_hci_command_packets and self.command_semaphore.locked():
logger.debug('command complete event releasing semaphore')
self.command_semaphore.release()
return
return self.on_command_processed(event)
@@ -1169,7 +1228,7 @@ class Host(utils.EventEmitter):
self, event: hci.HCI_LE_Connection_Update_Complete_Event
):
if (connection := self.connections.get(event.connection_handle)) is None:
logger.warning('!!! CONNECTION PARAMETERS UPDATE COMPLETE: unknown handle')
logger.warning('!!! CONNECTION UPDATE COMPLETE: unknown handle')
return
# Notify the client
@@ -1186,6 +1245,29 @@ class Host(utils.EventEmitter):
'connection_parameters_update_failure', connection.handle, event.status
)
def on_hci_le_connection_rate_change_event(
self, event: hci.HCI_LE_Connection_Rate_Change_Event
):
if (connection := self.connections.get(event.connection_handle)) is None:
logger.warning('!!! CONNECTION RATE CHANGE: unknown handle')
return
# Notify the client
if event.status == hci.HCI_SUCCESS:
self.emit(
'le_connection_rate_change',
connection.handle,
event.connection_interval,
event.subrate_factor,
event.peripheral_latency,
event.continuation_number,
event.supervision_timeout,
)
else:
self.emit(
'le_connection_rate_change_failure', connection.handle, event.status
)
def on_hci_le_phy_update_complete_event(
self, event: hci.HCI_LE_PHY_Update_Complete_Event
):
@@ -1745,12 +1827,13 @@ class Host(utils.EventEmitter):
self.emit(
'le_remote_features_failure', event.connection_handle, event.status
)
else:
self.emit(
'le_remote_features',
event.connection_handle,
int.from_bytes(event.le_features, 'little'),
)
return
self.emit(
'le_remote_features',
event.connection_handle,
hci.LeFeatureMask(int.from_bytes(event.le_features, 'little')),
)
def on_hci_le_cs_read_remote_supported_capabilities_complete_event(
self, event: hci.HCI_LE_CS_Read_Remote_Supported_Capabilities_Complete_Event
@@ -1783,6 +1866,12 @@ class Host(utils.EventEmitter):
self.emit('cs_subevent_result_continue', event)
def on_hci_le_subrate_change_event(self, event: hci.HCI_LE_Subrate_Change_Event):
if event.status != hci.HCI_SUCCESS:
self.emit(
'le_subrate_change_failure', event.connection_handle, event.status
)
return
self.emit(
'le_subrate_change',
event.connection_handle,
+13 -6
View File
@@ -1647,7 +1647,9 @@ class LeCreditBasedChannel(utils.EventEmitter):
self.connection_result = None
self.disconnection_result = None
self.drained = asyncio.Event()
self.att_mtu = 0 # Filled by GATT client or server later.
# Core Specification Vol 3, Part G, 5.3.1 ATT_MTU
# ATT_MTU shall be set to the minimum of the MTU field values of the two devices.
self.att_mtu = min(mtu, peer_mtu)
self.drained.set()
@@ -2340,8 +2342,8 @@ class ChannelManager:
cid,
L2CAP_Connection_Response(
identifier=request.identifier,
destination_cid=request.source_cid,
source_cid=0,
destination_cid=0,
source_cid=request.source_cid,
result=L2CAP_Connection_Response.Result.CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE,
status=0x0000,
),
@@ -2353,7 +2355,12 @@ class ChannelManager:
f'creating server channel with cid={source_cid} for psm {request.psm}'
)
channel = ClassicChannel(
self, connection, cid, request.psm, source_cid, server.spec
manager=self,
connection=connection,
signaling_cid=cid,
psm=request.psm,
source_cid=source_cid,
spec=server.spec,
)
connection_channels[source_cid] = channel
@@ -2370,8 +2377,8 @@ class ChannelManager:
cid,
L2CAP_Connection_Response(
identifier=request.identifier,
destination_cid=request.source_cid,
source_cid=0,
destination_cid=0,
source_cid=request.source_cid,
result=L2CAP_Connection_Response.Result.CONNECTION_REFUSED_PSM_NOT_SUPPORTED,
status=0x0000,
),
+1 -1
View File
@@ -278,7 +278,7 @@ class L2CAPService(L2CAPServicer):
if not l2cap_channel:
return SendResponse(error=COMMAND_NOT_UNDERSTOOD)
if isinstance(l2cap_channel, ClassicChannel):
l2cap_channel.send_pdu(request.data)
l2cap_channel.write(request.data)
else:
l2cap_channel.write(request.data)
return SendResponse(success=empty_pb2.Empty())
+1 -1
View File
@@ -800,7 +800,7 @@ class Multiplexer(utils.EventEmitter):
def send_frame(self, frame: RFCOMM_Frame) -> None:
logger.debug(f'>>> Multiplexer sending {frame}')
self.l2cap_channel.send_pdu(frame)
self.l2cap_channel.write(bytes(frame))
def on_pdu(self, pdu: bytes) -> None:
frame = RFCOMM_Frame.from_bytes(pdu)
+2 -2
View File
@@ -847,7 +847,7 @@ class Client:
self.pending_request = request
try:
self.channel.send_pdu(bytes(request))
self.channel.write(bytes(request))
return await self.pending_response
finally:
self.pending_request = None
@@ -1061,7 +1061,7 @@ class Server:
def send_response(self, response):
logger.debug(f'{color(">>> Sending SDP Response", "blue")}: {response}')
self.channel.send_pdu(response)
self.channel.write(response)
def match_services(self, search_pattern: DataElement) -> dict[int, Service]:
# Find the services for which the attributes in the pattern is a subset of the
+111 -1
View File
@@ -110,6 +110,53 @@ class BtSnooper(Snooper):
)
# -----------------------------------------------------------------------------
class PcapSnooper(Snooper):
"""
Snooper that saves or streames HCI packets using the PCAP format.
"""
PCAP_MAGIC = 0xA1B2C3D4
DLT_BLUETOOTH_HCI_H4_WITH_PHDR = 201
def __init__(self, output: BinaryIO):
self.output = output
# Write the header
self.output.write(
struct.pack(
"<IHHIIII",
self.PCAP_MAGIC,
2, # Major PCAP Version
4, # Minor PCAP Version
0, # Reserved 1
0, # Reserved 2
65535, # SnapLen
# FCS and f are set to 0 implicitly by the next line
self.DLT_BLUETOOTH_HCI_H4_WITH_PHDR, # The DLT in this PCAP
)
)
def snoop(self, hci_packet: bytes, direction: Snooper.Direction):
now = datetime.datetime.now(datetime.timezone.utc)
sec = int(now.timestamp())
usec = now.microsecond
# Emit the record
self.output.write(
struct.pack(
"<IIII",
sec, # Timestamp (Seconds)
usec, # Timestamp (Microseconds)
len(hci_packet) + 4,
len(hci_packet) + 4, # +4 because of the addtional direction info...
)
+ struct.pack(">I", int(direction)) # ...thats being added here
+ hci_packet
)
self.output.flush() # flush after every packet for live logging
# -----------------------------------------------------------------------------
_SNOOPER_INSTANCE_COUNT = 0
@@ -140,9 +187,38 @@ def create_snooper(spec: str) -> Generator[Snooper, None, None]:
pid: the current process ID.
instance: the instance ID in the current process.
pcapsnoop
The syntax for the type-specific arguments for this type is:
<io-type>:<io-type-specific-arguments>
Supported I/O types are:
file
The type-specific arguments for this I/O type is a string that is converted
to a file path using the python `str.format()` string formatting. The log
records will be written to that file if it can be opened/created.
The keyword args that may be referenced by the string pattern are:
now: the value of `datetime.now()`
utcnow: the value of `datetime.now(tz=datetime.timezone.utc)`
pid: the current process ID.
instance: the instance ID in the current process.
pipe
The type-specific arguments for this I/O type is a string that is converted
to a path using the python `str.format()` string formatting. The log
records will be written to the named pipe referenced by this path
if it can be opened. The keyword args that may be referenced by the
string pattern are:
now: the value of `datetime.now()`
utcnow: the value of `datetime.now(tz=datetime.timezone.utc)`
pid: the current process ID.
instance: the instance ID in the current process.
Examples:
btsnoop:file:my_btsnoop.log
btsnoop:file:/tmp/bumble_{now:%Y-%m-%d-%H:%M:%S}_{pid}.log
pcapsnoop:pipe:/tmp/bumble-extcap
"""
if ':' not in spec:
@@ -150,6 +226,8 @@ def create_snooper(spec: str) -> Generator[Snooper, None, None]:
snooper_type, snooper_args = spec.split(':', maxsplit=1)
global _SNOOPER_INSTANCE_COUNT
if snooper_type == 'btsnoop':
if ':' not in snooper_args:
raise core.InvalidArgumentError('I/O type for btsnoop snooper type missing')
@@ -157,7 +235,6 @@ def create_snooper(spec: str) -> Generator[Snooper, None, None]:
io_type, io_name = snooper_args.split(':', maxsplit=1)
if io_type == 'file':
# Process the file name string pattern.
global _SNOOPER_INSTANCE_COUNT
file_path = io_name.format(
now=datetime.datetime.now(),
utcnow=datetime.datetime.now(tz=datetime.timezone.utc),
@@ -173,6 +250,39 @@ def create_snooper(spec: str) -> Generator[Snooper, None, None]:
_SNOOPER_INSTANCE_COUNT -= 1
return
elif snooper_type == 'pcapsnoop':
if ':' not in snooper_args:
raise core.InvalidArgumentError(
'I/O type for pcapsnoop snooper type missing'
)
io_type, io_name = snooper_args.split(':', maxsplit=1)
if io_type in {'pipe', 'file'}:
# Process the file name string pattern.
file_path = io_name.format(
now=datetime.datetime.now(),
utcnow=datetime.datetime.now(tz=datetime.timezone.utc),
pid=os.getpid(),
instance=_SNOOPER_INSTANCE_COUNT,
)
# Open a file or pipe
logger.debug(f'PCAP file: {file_path}')
# Pipes we have to open with unbuffered binary I/O
# so we pass ``buffering`` for pipes but not for files
pcap_file: BinaryIO
if io_type == 'pipe':
pcap_file = open(file_path, 'wb', buffering=0)
else:
pcap_file = open(file_path, 'wb')
with pcap_file:
_SNOOPER_INSTANCE_COUNT += 1
yield PcapSnooper(pcap_file)
_SNOOPER_INSTANCE_COUNT -= 1
return
raise core.InvalidArgumentError(f'I/O type {io_type} not supported')
raise core.InvalidArgumentError(f'snooper type {snooper_type} not found')
+1 -1
View File
@@ -194,7 +194,7 @@ async def open_android_netsim_controller_transport(
# We only accept BLUETOOTH
if request.initial_info.chip.kind != ChipKind.BLUETOOTH:
logger.warning('Unsupported chip type')
logger.debug('Request for unsupported chip type')
error = PacketResponse(error='Unsupported chip type')
await self.context.write(error)
# return
+1 -2
View File
@@ -42,8 +42,7 @@ response = await host.send_sync_command(
handle_type=HCI_Write_Tx_Power_Level_Command.TX_POWER_HANDLE_TYPE_ADV,
connection_handle=0,
tx_power_level=-4,
),
check_status=False
)
)
if response.status == HCI_SUCCESS:
+1 -1
View File
@@ -65,7 +65,7 @@ async def main() -> None:
# Go!
await device.power_on()
await device.start_advertising(auto_restart=True)
await hci_transport.source.wait_for_termination()
await hci_transport.source.terminated
# -----------------------------------------------------------------------------
+1 -1
View File
@@ -161,7 +161,7 @@ async def main() -> None:
await device.set_discoverable(True)
await device.set_connectable(True)
await hci_transport.source.wait_for_termination()
await hci_transport.source.terminated
# -----------------------------------------------------------------------------
+1 -1
View File
@@ -181,7 +181,7 @@ async def main() -> None:
await device.set_discoverable(True)
await device.set_connectable(True)
await hci_transport.source.wait_for_termination()
await hci_transport.source.terminated
# -----------------------------------------------------------------------------
+1 -1
View File
@@ -70,7 +70,7 @@ async def main() -> None:
await device.power_on()
await device.start_advertising(advertising_type=advertising_type, target=target)
await hci_transport.source.wait_for_termination()
await hci_transport.source.terminated
# -----------------------------------------------------------------------------
+1 -1
View File
@@ -112,7 +112,7 @@ async def main() -> None:
await device.set_discoverable(True)
await device.set_connectable(True)
await hci_transport.source.wait_for_termination()
await hci_transport.source.terminated
# -----------------------------------------------------------------------------
+1 -1
View File
@@ -73,7 +73,7 @@ async def main() -> None:
await device.power_on()
await device.start_discovery()
await hci_transport.source.wait_for_termination()
await hci_transport.source.terminated
# -----------------------------------------------------------------------------
+1 -1
View File
@@ -57,7 +57,7 @@ async def main() -> None:
print(f'!!! Encryption failed: {error}')
return
await hci_transport.source.wait_for_termination()
await hci_transport.source.terminated
# -----------------------------------------------------------------------------
+201
View File
@@ -0,0 +1,201 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import sys
from collections.abc import Callable
import bumble.logging
from bumble.core import BaseError
from bumble.device import Connection, Device
from bumble.hci import Address, LeFeatureMask
from bumble.transport import open_transport
# -----------------------------------------------------------------------------
DEFAULT_CENTRAL_ADDRESS = Address("F0:F0:F0:F0:F0:F0")
DEFAULT_PERIPHERAL_ADDRESS = Address("F1:F1:F1:F1:F1:F1")
# -----------------------------------------------------------------------------
async def run_as_central(
device: Device,
scenario: Callable | None,
) -> None:
# Connect to the peripheral
print(f'=== Connecting to {DEFAULT_PERIPHERAL_ADDRESS}...')
connection = await device.connect(DEFAULT_PERIPHERAL_ADDRESS)
print("=== Connected")
if scenario is not None:
await asyncio.sleep(1)
await scenario(connection)
await asyncio.get_running_loop().create_future()
async def run_as_peripheral(device: Device, scenario: Callable | None) -> None:
# Wait for a connection from the central
print(f'=== Advertising as {DEFAULT_PERIPHERAL_ADDRESS}...')
await device.start_advertising(auto_restart=True)
async def on_connection(connection: Connection) -> None:
assert scenario is not None
await asyncio.sleep(1)
await scenario(connection)
if scenario is not None:
device.on(Device.EVENT_CONNECTION, on_connection)
await asyncio.get_running_loop().create_future()
async def change_parameters(
connection: Connection,
parameter_request_procedure_supported: bool,
subrating_supported: bool,
shorter_connection_intervals_supported: bool,
) -> None:
if parameter_request_procedure_supported:
try:
print(">>> update_parameters(7.5, 200, 0, 4000)")
await connection.update_parameters(7.5, 200, 0, 4000)
await asyncio.sleep(3)
except BaseError as error:
print(f"Error: {error}")
if subrating_supported:
try:
print(">>> update_subrate(1, 2, 2, 1, 4000)")
await connection.update_subrate(1, 2, 2, 1, 4000)
await asyncio.sleep(3)
except BaseError as error:
print(f"Error: {error}")
if shorter_connection_intervals_supported:
try:
print(
">>> update_parameters_with_subrate(7.5, 200, 1, 1, 0, 0, 4000, 5, 1000)"
)
await connection.update_parameters_with_subrate(
7.5, 200, 1, 1, 0, 0, 4000, 5, 1000
)
await asyncio.sleep(3)
except BaseError as error:
print(f"Error: {error}")
try:
print(
">>> update_parameters_with_subrate(0.750, 5, 1, 1, 0, 0, 4000, 0.125, 1000)"
)
await connection.update_parameters_with_subrate(
0.750, 5, 1, 1, 0, 0, 4000, 0.125, 1000
)
await asyncio.sleep(3)
except BaseError as error:
print(f"Error: {error}")
print(">>> done")
def on_connection(connection: Connection) -> None:
print(f"+++ Connection established: {connection}")
def on_le_remote_features_change() -> None:
print(f'... LE Remote Features change: {connection.peer_le_features.name}')
connection.on(
connection.EVENT_LE_REMOTE_FEATURES_CHANGE, on_le_remote_features_change
)
def on_connection_parameters_change() -> None:
print(f'... LE Connection Parameters change: {connection.parameters}')
connection.on(
connection.EVENT_CONNECTION_PARAMETERS_UPDATE, on_connection_parameters_change
)
async def main() -> None:
if len(sys.argv) < 3:
print(
'Usage: run_connection_updates.py <transport-spec> '
'central|peripheral initiator|responder'
)
return
print('<<< connecting to HCI...')
async with await open_transport(sys.argv[1]) as hci_transport:
print('<<< connected')
role = sys.argv[2]
direction = sys.argv[3]
device = Device.with_hci(
role,
(
DEFAULT_CENTRAL_ADDRESS
if role == "central"
else DEFAULT_PERIPHERAL_ADDRESS
),
hci_transport.source,
hci_transport.sink,
)
device.le_subrate_enabled = True
device.le_shorter_connection_intervals_enabled = True
await device.power_on()
parameter_request_procedure_supported = device.supports_le_features(
LeFeatureMask.CONNECTION_PARAMETERS_REQUEST_PROCEDURE
)
print(
"Parameters Request Procedure supported: "
f"{parameter_request_procedure_supported}"
)
subrating_supported = device.supports_le_features(
LeFeatureMask.CONNECTION_SUBRATING
)
print(f"Subrating supported: {subrating_supported}")
shorter_connection_intervals_supported = device.supports_le_features(
LeFeatureMask.SHORTER_CONNECTION_INTERVALS
)
print(
"Shorter Connection Intervals supported: "
f"{shorter_connection_intervals_supported}"
)
device.on(Device.EVENT_CONNECTION, on_connection)
async def run(connection: Connection) -> None:
await change_parameters(
connection,
parameter_request_procedure_supported,
subrating_supported,
shorter_connection_intervals_supported,
)
scenario = run if direction == "initiator" else None
if role == "central":
await run_as_central(device, scenario)
else:
await run_as_peripheral(device, scenario)
# -----------------------------------------------------------------------------
bumble.logging.setup_basic_logging('DEBUG')
asyncio.run(main())
+1 -1
View File
@@ -101,7 +101,7 @@ async def main() -> None:
await device.start_advertising()
await device.start_scanning()
await hci_transport.source.wait_for_termination()
await hci_transport.source.terminated
# -----------------------------------------------------------------------------
+1 -1
View File
@@ -48,7 +48,7 @@ async def main() -> None:
await device.power_on()
await device.start_scanning()
await hci_transport.source.wait_for_termination()
await hci_transport.source.terminated
# -----------------------------------------------------------------------------
+1 -1
View File
@@ -147,7 +147,7 @@ async def main() -> None:
else:
await device.start_advertising(auto_restart=True)
await hci_transport.source.wait_for_termination()
await hci_transport.source.terminated
# -----------------------------------------------------------------------------
@@ -99,7 +99,7 @@ async def main() -> None:
else:
await device.start_advertising(auto_restart=True)
await hci_transport.source.wait_for_termination()
await hci_transport.source.terminated
# -----------------------------------------------------------------------------
+1 -1
View File
@@ -422,7 +422,7 @@ async def main() -> None:
# Setup a server
await server(device)
await hci_transport.source.wait_for_termination()
await hci_transport.source.terminated
# -----------------------------------------------------------------------------
+3 -7
View File
@@ -100,13 +100,9 @@ def on_sco_packet(packet: hci.HCI_SynchronousDataPacket):
if source_file and (pcm_data := source_file.read(packet.data_total_length)):
assert ag_protocol
host = ag_protocol.dlc.multiplexer.l2cap_channel.connection.device.host
host.send_hci_packet(
hci.HCI_SynchronousDataPacket(
connection_handle=packet.connection_handle,
packet_status=0,
data_total_length=len(pcm_data),
data=pcm_data,
)
host.send_sco_sdu(
connection_handle=packet.connection_handle,
sdu=pcm_data,
)
+1 -1
View File
@@ -167,7 +167,7 @@ async def main() -> None:
await websockets.asyncio.server.serve(serve, 'localhost', 8989)
await hci_transport.source.wait_for_termination()
await hci_transport.source.terminated
# -----------------------------------------------------------------------------
+1 -1
View File
@@ -735,7 +735,7 @@ async def main() -> None:
print("Executing in Web mode")
await keyboard_device(hid_device)
await hci_transport.source.wait_for_termination()
await hci_transport.source.terminated
# -----------------------------------------------------------------------------
+1 -1
View File
@@ -556,7 +556,7 @@ async def main() -> None:
# Interrupt Channel
await hid_host.connect_interrupt_channel()
await hci_transport.source.wait_for_termination()
await hci_transport.source.terminated
# -----------------------------------------------------------------------------
+1 -1
View File
@@ -227,7 +227,7 @@ async def main() -> None:
tcp_port = int(sys.argv[5])
asyncio.create_task(tcp_server(tcp_port, session))
await hci_transport.source.wait_for_termination()
await hci_transport.source.terminated
# -----------------------------------------------------------------------------
+1 -1
View File
@@ -153,7 +153,7 @@ async def main() -> None:
await device.set_discoverable(True)
await device.set_connectable(True)
await hci_transport.source.wait_for_termination()
await hci_transport.source.terminated
# -----------------------------------------------------------------------------
+1 -1
View File
@@ -75,7 +75,7 @@ async def main() -> None:
await device.power_on()
await device.start_scanning(filter_duplicates=filter_duplicates)
await hci_transport.source.wait_for_termination()
await hci_transport.source.terminated
# -----------------------------------------------------------------------------
+7 -2
View File
@@ -15,15 +15,20 @@ dependencies = [
"aiohttp ~= 3.8; platform_system!='Emscripten'",
"appdirs >= 1.4; platform_system!='Emscripten'",
"click >= 8.1.3; platform_system!='Emscripten'",
"cryptography >= 44.0.3; platform_system!='Emscripten'",
"cryptography >= 44.0.3; platform_system!='Emscripten' and platform_system!='Android'",
# Pyodide bundles a version of cryptography that is built for wasm, which may not match the
# versions available on PyPI. Relax the version requirement since it's better than being
# completely unable to import the package in case of version mismatch.
"cryptography >= 44.0.3; platform_system=='Emscripten'",
# Android wheels for cryptography are not yet available on PyPI, so chaquopy uses
# the builds from https://chaquo.com/pypi-13.1/cryptography/. But these are not regually
# updated. Relax the version requirement since it's better than being completely unable
# to import the package in case of version mismatch.
"cryptography >= 42.0.8; platform_system=='Android'",
"grpcio >= 1.62.1; platform_system!='Emscripten'",
"humanize >= 4.6.0; platform_system!='Emscripten'",
"libusb1 >= 2.0.1; platform_system!='Emscripten'",
"libusb-package == 1.0.26.1; platform_system!='Emscripten'",
"libusb-package == 1.0.26.1; platform_system!='Emscripten' and platform_system!='Android'",
"platformdirs >= 3.10.0; platform_system!='Emscripten'",
"prompt_toolkit >= 3.0.16; platform_system!='Emscripten'",
"prettytable >= 3.6.0; platform_system!='Emscripten'",
+15 -1
View File
@@ -233,7 +233,21 @@ def test_event(event: avrcp.Event):
feature_bitmask=avrcp.MediaPlayerItem.Features.ADD_TO_NOW_PLAYING,
character_set_id=avrcp.CharacterSetId.UTF_8,
displayable_name="Woo",
)
),
avrcp.FolderItem(
folder_uid=1,
folder_type=avrcp.FolderItem.FolderType.ALBUMS,
is_playable=avrcp.FolderItem.Playable.PLAYABLE,
character_set_id=avrcp.CharacterSetId.UTF_8,
displayable_name="Album",
),
avrcp.MediaElementItem(
media_element_uid=1,
media_type=avrcp.MediaElementItem.MediaType.AUDIO,
character_set_id=avrcp.CharacterSetId.UTF_8,
displayable_name="Song",
attribute_value_entry_list=[],
),
],
),
avrcp.ChangePathResponse(
+3 -1
View File
@@ -619,7 +619,9 @@ async def test_le_request_subrate():
def on_le_subrate_change():
q.put_nowait(lambda: None)
devices.connections[0].on(Connection.EVENT_LE_SUBRATE_CHANGE, on_le_subrate_change)
devices.connections[0].on(
Connection.EVENT_CONNECTION_PARAMETERS_UPDATE, on_le_subrate_change
)
await devices[0].send_command(
hci.HCI_LE_Subrate_Request_Command(
+99 -1
View File
@@ -28,7 +28,7 @@ from unittest.mock import ANY, AsyncMock, Mock
import pytest
from typing_extensions import Self
from bumble import gatt_client, l2cap
from bumble import att, gatt_client, l2cap
from bumble.att import (
ATT_ATTRIBUTE_NOT_FOUND_ERROR,
ATT_PDU,
@@ -1638,6 +1638,104 @@ async def test_eatt_connection_failure():
await gatt_client.Client.connect_eatt(devices.connections[0])
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_read_multiple() -> None:
devices = await TwoDevices.create_with_connection()
characteristic1 = Characteristic(
'0001', Characteristic.Properties.READ, Characteristic.READABLE, b'1234'
)
characteristic2 = Characteristic(
'0002',
Characteristic.Properties.READ,
Characteristic.READABLE,
b'5678',
)
service = Service('0000', [characteristic1, characteristic2])
devices[1].add_service(service)
client = devices.connections[0].gatt_client
server = devices[1].gatt_server
await client.discover_services()
characteristics = await client.discover_characteristics(
[characteristic1.uuid, characteristic2.uuid], None
)
response = await client.send_request(
att.ATT_Read_Multiple_Request(
set_of_handles=[c.handle for c in characteristics]
)
)
assert isinstance(response, att.ATT_Read_Multiple_Response)
assert response.set_of_values == b'12345678'
response = await client.send_request(
att.ATT_Read_Multiple_Request(
set_of_handles=[
next(
handle
for handle in range(0x0001, 0xFFFF)
if not server.get_attribute(handle)
)
]
)
)
assert isinstance(response, att.ATT_Error_Response)
assert response.error_code == att.ATT_ATTRIBUTE_NOT_FOUND_ERROR
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_read_multiple_variable() -> None:
devices = await TwoDevices.create_with_connection()
characteristic1 = Characteristic(
'0001', Characteristic.Properties.READ, Characteristic.READABLE, b'1234'
)
characteristic2 = Characteristic(
'0002',
Characteristic.Properties.READ,
Characteristic.READABLE,
b'99',
)
service = Service('0000', [characteristic1, characteristic2])
devices[1].add_service(service)
client = devices.connections[0].gatt_client
server = devices[1].gatt_server
await client.discover_services()
characteristics = await client.discover_characteristics(
[characteristic1.uuid, characteristic2.uuid], None
)
response = await client.send_request(
att.ATT_Read_Multiple_Variable_Request(
set_of_handles=[c.handle for c in characteristics]
)
)
assert isinstance(response, att.ATT_Read_Multiple_Variable_Response)
assert response.length_value_tuple_list == [(4, b'1234'), (2, b'99')]
response = await client.send_request(
att.ATT_Read_Multiple_Variable_Request(
set_of_handles=[
next(
handle
for handle in range(0x0001, 0xFFFF)
if not server.get_attribute(handle)
)
]
)
)
assert isinstance(response, att.ATT_Error_Response)
assert response.error_code == att.ATT_ATTRIBUTE_NOT_FOUND_ERROR
# -----------------------------------------------------------------------------
if __name__ == '__main__':
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
+2 -2
View File
@@ -218,9 +218,9 @@ def test_return_parameters() -> None:
assert isinstance(params.status, utils.OpenIntEnum)
params = hci.HCI_Read_BD_ADDR_Command.parse_return_parameters(
bytes.fromhex('3C001122334455')
bytes.fromhex('00001122334455')
)
assert params.status == hci.HCI_ErrorCode.ADVERTISING_TIMEOUT_ERROR
assert params.status == hci.HCI_ErrorCode.SUCCESS
assert isinstance(params.status, utils.OpenIntEnum)
assert isinstance(params.bd_addr, hci.Address)
+17 -3
View File
@@ -26,9 +26,11 @@ from bumble.controller import Controller
from bumble.hci import (
HCI_AclDataPacket,
HCI_Command_Complete_Event,
HCI_Disconnect_Command,
HCI_Error,
HCI_ErrorCode,
HCI_Event,
HCI_GenericReturnParameters,
HCI_Reset_Command,
HCI_StatusReturnParameters,
)
@@ -195,6 +197,7 @@ async def test_send_sync_command() -> None:
)
host = Host(source, sink)
host.ready = True
# Sync command with success
response1 = await host.send_sync_command(HCI_Reset_Command())
@@ -212,6 +215,17 @@ async def test_send_sync_command() -> None:
assert excinfo.value.error_code == error_response.return_parameters.status
# Sync command with error status should not raise when `check_status` is False
response2 = await host.send_sync_command(HCI_Reset_Command(), check_status=False)
assert response2.status == HCI_ErrorCode.COMMAND_DISALLOWED_ERROR
# Sync command with raw result
response2 = await host.send_sync_command_raw(HCI_Reset_Command())
assert response2.return_parameters.status == HCI_ErrorCode.COMMAND_DISALLOWED_ERROR
# Sync command with a command that's not an HCI_SyncCommand
# (here, for convenience, we use an HCI_AsyncCommand instance)
command = HCI_Disconnect_Command(connection_handle=0x1234, reason=0x13)
sink.response = HCI_Command_Complete_Event(
1,
command.op_code,
HCI_GenericReturnParameters(data=bytes.fromhex("00112233")),
)
response3 = await host.send_sync_command_raw(command) # type: ignore
assert isinstance(response3.return_parameters, HCI_GenericReturnParameters)