Compare commits

...

26 Commits

Author SHA1 Message Date
Gilles Boccon-Gibod a0d24e95e7 fix spacing_type 2026-01-24 10:15:32 -08:00
Gilles Boccon-Gibod c577f17c99 add basic support for SCI 2026-01-20 15:32:55 -08:00
Gilles Boccon-Gibod 4986f55043 Merge pull request #869 from timrid/android-fix
Make bumble work on Android using briefcase/chaquopy
2026-01-19 09:50:08 -08:00
Gilles Boccon-Gibod 7e89c8a7f8 Merge pull request #868 from google/gbg/return-parameters
typing support for HCI commands return parameters
2026-01-19 09:49:15 -08:00
timrid 085905a7bf Make bumble work on Android using briefcase that is using chaquopy under the hood. 2026-01-18 23:32:37 +01:00
zxzxwu c619f1f21b Merge pull request #867 from zxzxwu/fix-import-error
Fix missing ClassVar import
2026-01-16 15:33:07 +08:00
Josh Wu d4b0da9265 Fix missing ClassVar import 2026-01-16 15:21:26 +08:00
zxzxwu f1058e4d4e Merge pull request #859 from istemon/att-read-by-type-request-fix
Return 'invalid handle' for malformed read by type request
2026-01-16 15:09:20 +08:00
zxzxwu 454d477d7e Merge pull request #864 from zxzxwu/hci-packets-typing
Add HCI Packets annotations and send_sco_sdu
2026-01-16 15:08:42 +08:00
zxzxwu 6966228d74 Merge pull request #863 from zxzxwu/eatt-mtu
Correct ATT_MTU in enhanced bearers
2026-01-16 15:08:12 +08:00
zxzxwu f4271a5646 Merge pull request #862 from zxzxwu/gatt-multiple
GATT: Support Multiple Requests
2026-01-16 15:08:02 +08:00
zxzxwu 534209f0af Merge pull request #861 from zxzxwu/l2cap
Replace send_pdu() with write()
2026-01-16 15:07:54 +08:00
zxzxwu 549b82999a Merge pull request #860 from zxzxwu/address
Improve Address type annotations
2026-01-16 14:04:56 +08:00
zxzxwu 551f577b2a Merge pull request #866 from zxzxwu/template-service
Fix GATT TemplateSerivce annotations
2026-01-16 09:41:48 +08:00
Josh Wu 84a6453dda Fix GATT TemplateSerivce annotations 2026-01-15 12:06:05 +08:00
Gilles Boccon-Gibod 591ed61686 Merge pull request #858 from klow68/feat/add-usb-probe-filtering 2026-01-13 08:54:55 -08:00
Josh Wu 3d3acbb374 Add HCI Packets annotations and send_sco_sdu 2026-01-13 17:58:37 +08:00
Stryxion 671f306a27 fix: black 2026-01-13 09:42:40 +01:00
Josh Wu f7364db992 Correct ATT_MTU in enhanced bearers 2026-01-12 21:03:14 +08:00
Josh Wu 0fb2b3bd66 GATT: Support Multiple Requests 2026-01-12 20:51:38 +08:00
Stryxion 9e270d4d62 fix: mypy 2026-01-12 09:36:35 +01:00
Josh Wu cf60b5ffbb Replace send_pdu() with write() 2026-01-12 13:16:49 +08:00
Josh Wu aa4c57d105 Improve Address type annotations
* Add missing annotations
* Declare address constants as ClassVar
2026-01-12 13:07:04 +08:00
Istemon 61a601e6e2 Return 'invalid handle' for malformed read by type request 2026-01-10 01:43:30 +00:00
Stryxion 05fd4fbfc6 fix: review 2026-01-09 08:46:31 +01:00
Stryxion 6aa9e0bdf7 feat: Add filtering options for usb probe 2026-01-08 14:54:58 +01:00
22 changed files with 1286 additions and 210 deletions
+40 -33
View File
@@ -27,9 +27,8 @@ from bumble.core import name_or_number
from bumble.hci import (
HCI_LE_READ_BUFFER_SIZE_COMMAND,
HCI_LE_READ_BUFFER_SIZE_V2_COMMAND,
HCI_LE_READ_MAXIMUM_ADVERTISING_DATA_LENGTH_COMMAND,
HCI_LE_READ_MAXIMUM_DATA_LENGTH_COMMAND,
HCI_LE_READ_NUMBER_OF_SUPPORTED_ADVERTISING_SETS_COMMAND,
HCI_LE_READ_MINIMUM_SUPPORTED_CONNECTION_INTERVAL_COMMAND,
HCI_LE_READ_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND,
HCI_READ_BD_ADDR_COMMAND,
HCI_READ_BUFFER_SIZE_COMMAND,
@@ -37,9 +36,8 @@ from bumble.hci import (
HCI_Command,
HCI_LE_Read_Buffer_Size_Command,
HCI_LE_Read_Buffer_Size_V2_Command,
HCI_LE_Read_Maximum_Advertising_Data_Length_Command,
HCI_LE_Read_Maximum_Data_Length_Command,
HCI_LE_Read_Number_Of_Supported_Advertising_Sets_Command,
HCI_LE_Read_Minimum_Supported_Connection_Interval_Command,
HCI_LE_Read_Suggested_Default_Data_Length_Command,
HCI_Read_BD_ADDR_Command,
HCI_Read_Buffer_Size_Command,
@@ -78,52 +76,61 @@ async def get_classic_info(host: Host) -> None:
async def get_le_info(host: Host) -> None:
print()
if host.supports_command(HCI_LE_READ_NUMBER_OF_SUPPORTED_ADVERTISING_SETS_COMMAND):
response1 = await host.send_sync_command(
HCI_LE_Read_Number_Of_Supported_Advertising_Sets_Command()
)
print(
color('LE Number Of Supported Advertising Sets:', 'yellow'),
response1.num_supported_advertising_sets,
'\n',
)
print(
color('LE Number Of Supported Advertising Sets:', 'yellow'),
host.number_of_supported_advertising_sets,
'\n',
)
if host.supports_command(HCI_LE_READ_MAXIMUM_ADVERTISING_DATA_LENGTH_COMMAND):
response2 = await host.send_sync_command(
HCI_LE_Read_Maximum_Advertising_Data_Length_Command()
)
print(
color('LE Maximum Advertising Data Length:', 'yellow'),
response2.max_advertising_data_length,
'\n',
)
print(
color('LE Maximum Advertising Data Length:', 'yellow'),
host.maximum_advertising_data_length,
'\n',
)
if host.supports_command(HCI_LE_READ_MAXIMUM_DATA_LENGTH_COMMAND):
response3 = await host.send_sync_command(
response1 = await host.send_sync_command(
HCI_LE_Read_Maximum_Data_Length_Command()
)
print(
color('Maximum Data Length:', 'yellow'),
color('LE Maximum Data Length:', 'yellow'),
(
f'tx:{response3.supported_max_tx_octets}/'
f'{response3.supported_max_tx_time}, '
f'rx:{response3.supported_max_rx_octets}/'
f'{response3.supported_max_rx_time}'
f'tx:{response1.supported_max_tx_octets}/'
f'{response1.supported_max_tx_time}, '
f'rx:{response1.supported_max_rx_octets}/'
f'{response1.supported_max_rx_time}'
),
'\n',
)
if host.supports_command(HCI_LE_READ_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND):
response4 = await host.send_sync_command(
response2 = await host.send_sync_command(
HCI_LE_Read_Suggested_Default_Data_Length_Command()
)
print(
color('Suggested Default Data Length:', 'yellow'),
f'{response4.suggested_max_tx_octets}/'
f'{response4.suggested_max_tx_time}',
color('LE Suggested Default Data Length:', 'yellow'),
f'{response2.suggested_max_tx_octets}/'
f'{response2.suggested_max_tx_time}',
'\n',
)
if host.supports_command(HCI_LE_READ_MINIMUM_SUPPORTED_CONNECTION_INTERVAL_COMMAND):
response3 = await host.send_sync_command(
HCI_LE_Read_Minimum_Supported_Connection_Interval_Command()
)
print(
color('LE Minimum Supported Connection Interval:', 'yellow'),
f'{response3.minimum_supported_connection_interval * 125} µs',
)
for group in range(len(response3.group_min)):
print(
f' Group {group}: '
f'{response3.group_min[group] * 125} µs to '
f'{response3.group_max[group] * 125} µs '
'by increments of '
f'{response3.group_stride[group] * 125} µs',
'\n',
)
print(color('LE Features:', 'yellow'))
for feature in host.supported_le_features:
print(f' {LeFeature(feature).name}')
+15 -2
View File
@@ -26,6 +26,8 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from typing import Any
import click
import usb1
@@ -166,13 +168,16 @@ def is_bluetooth_hci(device):
# -----------------------------------------------------------------------------
@click.command()
@click.option('--verbose', is_flag=True, default=False, help='Print more details')
def main(verbose):
@click.option('--hci-only', is_flag=True, default=False, help='only show HCI device')
@click.option('--manufacturer', help='filter by manufacturer')
@click.option('--product', help='filter by product')
def main(verbose: bool, manufacturer: str, product: str, hci_only: bool):
bumble.logging.setup_basic_logging('WARNING')
load_libusb()
with usb1.USBContext() as context:
bluetooth_device_count = 0
devices = {}
devices: dict[tuple[Any, Any], list[str | None]] = {}
for device in context.getDeviceIterator(skip_on_error=True):
device_class = device.getDeviceClass()
@@ -234,6 +239,14 @@ def main(verbose):
f'{basic_transport_name}/{device_serial_number}'
)
# Filter
if product and device_product != product:
continue
if manufacturer and device_manufacturer != manufacturer:
continue
if not is_bluetooth_hci(device) and hci_only:
continue
# Print the results
print(
color(
+97 -32
View File
@@ -29,7 +29,7 @@ import enum
import functools
import inspect
import struct
from collections.abc import Awaitable, Callable
from collections.abc import Awaitable, Callable, Sequence
from typing import (
TYPE_CHECKING,
ClassVar,
@@ -72,34 +72,36 @@ ATT_PSM = 0x001F
EATT_PSM = 0x0027
class Opcode(hci.SpecableEnum):
ATT_ERROR_RESPONSE = 0x01
ATT_EXCHANGE_MTU_REQUEST = 0x02
ATT_EXCHANGE_MTU_RESPONSE = 0x03
ATT_FIND_INFORMATION_REQUEST = 0x04
ATT_FIND_INFORMATION_RESPONSE = 0x05
ATT_FIND_BY_TYPE_VALUE_REQUEST = 0x06
ATT_FIND_BY_TYPE_VALUE_RESPONSE = 0x07
ATT_READ_BY_TYPE_REQUEST = 0x08
ATT_READ_BY_TYPE_RESPONSE = 0x09
ATT_READ_REQUEST = 0x0A
ATT_READ_RESPONSE = 0x0B
ATT_READ_BLOB_REQUEST = 0x0C
ATT_READ_BLOB_RESPONSE = 0x0D
ATT_READ_MULTIPLE_REQUEST = 0x0E
ATT_READ_MULTIPLE_RESPONSE = 0x0F
ATT_READ_BY_GROUP_TYPE_REQUEST = 0x10
ATT_READ_BY_GROUP_TYPE_RESPONSE = 0x11
ATT_WRITE_REQUEST = 0x12
ATT_WRITE_RESPONSE = 0x13
ATT_WRITE_COMMAND = 0x52
ATT_SIGNED_WRITE_COMMAND = 0xD2
ATT_PREPARE_WRITE_REQUEST = 0x16
ATT_PREPARE_WRITE_RESPONSE = 0x17
ATT_EXECUTE_WRITE_REQUEST = 0x18
ATT_EXECUTE_WRITE_RESPONSE = 0x19
ATT_HANDLE_VALUE_NOTIFICATION = 0x1B
ATT_HANDLE_VALUE_INDICATION = 0x1D
ATT_HANDLE_VALUE_CONFIRMATION = 0x1E
ATT_ERROR_RESPONSE = 0x01
ATT_EXCHANGE_MTU_REQUEST = 0x02
ATT_EXCHANGE_MTU_RESPONSE = 0x03
ATT_FIND_INFORMATION_REQUEST = 0x04
ATT_FIND_INFORMATION_RESPONSE = 0x05
ATT_FIND_BY_TYPE_VALUE_REQUEST = 0x06
ATT_FIND_BY_TYPE_VALUE_RESPONSE = 0x07
ATT_READ_BY_TYPE_REQUEST = 0x08
ATT_READ_BY_TYPE_RESPONSE = 0x09
ATT_READ_REQUEST = 0x0A
ATT_READ_RESPONSE = 0x0B
ATT_READ_BLOB_REQUEST = 0x0C
ATT_READ_BLOB_RESPONSE = 0x0D
ATT_READ_MULTIPLE_REQUEST = 0x0E
ATT_READ_MULTIPLE_RESPONSE = 0x0F
ATT_READ_BY_GROUP_TYPE_REQUEST = 0x10
ATT_READ_BY_GROUP_TYPE_RESPONSE = 0x11
ATT_READ_MULTIPLE_VARIABLE_REQUEST = 0x20
ATT_READ_MULTIPLE_VARIABLE_RESPONSE = 0x21
ATT_WRITE_REQUEST = 0x12
ATT_WRITE_RESPONSE = 0x13
ATT_WRITE_COMMAND = 0x52
ATT_SIGNED_WRITE_COMMAND = 0xD2
ATT_PREPARE_WRITE_REQUEST = 0x16
ATT_PREPARE_WRITE_RESPONSE = 0x17
ATT_EXECUTE_WRITE_REQUEST = 0x18
ATT_EXECUTE_WRITE_RESPONSE = 0x19
ATT_HANDLE_VALUE_NOTIFICATION = 0x1B
ATT_HANDLE_VALUE_INDICATION = 0x1D
ATT_HANDLE_VALUE_CONFIRMATION = 0x1E
ATT_REQUESTS = [
Opcode.ATT_EXCHANGE_MTU_REQUEST,
@@ -110,9 +112,10 @@ ATT_REQUESTS = [
Opcode.ATT_READ_BLOB_REQUEST,
Opcode.ATT_READ_MULTIPLE_REQUEST,
Opcode.ATT_READ_BY_GROUP_TYPE_REQUEST,
Opcode.ATT_READ_MULTIPLE_VARIABLE_REQUEST,
Opcode.ATT_WRITE_REQUEST,
Opcode.ATT_PREPARE_WRITE_REQUEST,
Opcode.ATT_EXECUTE_WRITE_REQUEST
Opcode.ATT_EXECUTE_WRITE_REQUEST,
]
ATT_RESPONSES = [
@@ -125,9 +128,10 @@ ATT_RESPONSES = [
Opcode.ATT_READ_BLOB_RESPONSE,
Opcode.ATT_READ_MULTIPLE_RESPONSE,
Opcode.ATT_READ_BY_GROUP_TYPE_RESPONSE,
Opcode.ATT_READ_MULTIPLE_VARIABLE_RESPONSE,
Opcode.ATT_WRITE_RESPONSE,
Opcode.ATT_PREPARE_WRITE_RESPONSE,
Opcode.ATT_EXECUTE_WRITE_RESPONSE
Opcode.ATT_EXECUTE_WRITE_RESPONSE,
]
class ErrorCode(hci.SpecableEnum):
@@ -185,6 +189,18 @@ ATT_INSUFFICIENT_RESOURCES_ERROR = ErrorCode.INSUFFICIENT_RESOURCES
ATT_DEFAULT_MTU = 23
HANDLE_FIELD_SPEC = {'size': 2, 'mapper': lambda x: f'0x{x:04X}'}
_SET_OF_HANDLES_METADATA = hci.metadata({
'parser': lambda data, offset: (
len(data),
[
struct.unpack_from('<H', data, i)[0]
for i in range(offset, len(data), 2)
],
),
'serializer': lambda handles: b''.join(
[struct.pack('<H', handle) for handle in handles]
),
})
# fmt: on
# pylint: enable=line-too-long
@@ -554,7 +570,7 @@ class ATT_Read_Multiple_Request(ATT_PDU):
See Bluetooth spec @ Vol 3, Part F - 3.4.4.7 Read Multiple Request
'''
set_of_handles: bytes = dataclasses.field(metadata=hci.metadata("*"))
set_of_handles: Sequence[int] = dataclasses.field(metadata=_SET_OF_HANDLES_METADATA)
# -----------------------------------------------------------------------------
@@ -635,6 +651,55 @@ class ATT_Read_By_Group_Type_Response(ATT_PDU):
return result
# -----------------------------------------------------------------------------
@ATT_PDU.subclass
@dataclasses.dataclass
class ATT_Read_Multiple_Variable_Request(ATT_PDU):
'''
See Bluetooth spec @ Vol 3, Part F - 3.4.4.11 Read Multiple Variable Request
'''
set_of_handles: Sequence[int] = dataclasses.field(metadata=_SET_OF_HANDLES_METADATA)
# -----------------------------------------------------------------------------
@ATT_PDU.subclass
@dataclasses.dataclass
class ATT_Read_Multiple_Variable_Response(ATT_PDU):
'''
See Bluetooth spec @ Vol 3, Part F - 3.4.4.12 Read Multiple Variable Response
'''
@classmethod
def _parse_length_value_tuples(
cls, data: bytes, offset: int
) -> tuple[int, list[tuple[int, bytes]]]:
length_value_tuple_list: list[tuple[int, bytes]] = []
while offset < len(data):
length = struct.unpack_from('<H', data, offset)[0]
length_value_tuple_list.append(
(length, data[offset + 2 : offset + 2 + length])
)
offset += 2 + length
return (len(data), length_value_tuple_list)
length_value_tuple_list: Sequence[tuple[int, bytes]] = dataclasses.field(
metadata=hci.metadata(
{
'parser': lambda data, offset: ATT_Read_Multiple_Variable_Response._parse_length_value_tuples(
data, offset
),
'serializer': lambda length_value_tuple_list: b''.join(
[
struct.pack('<H', length) + value
for length, value in length_value_tuple_list
]
),
}
)
)
# -----------------------------------------------------------------------------
@ATT_PDU.subclass
@dataclasses.dataclass
+1 -1
View File
@@ -235,7 +235,7 @@ class Protocol:
)
+ payload
)
self.l2cap_channel.send_pdu(pdu)
self.l2cap_channel.write(pdu)
def send_command(self, transaction_label: int, pid: int, payload: bytes) -> None:
logger.debug(
+3 -3
View File
@@ -268,7 +268,7 @@ class MediaPacketPump:
await self.clock.sleep(delay)
# Emit
rtp_channel.send_pdu(bytes(packet))
rtp_channel.write(bytes(packet))
logger.debug(
f'{color(">>> sending RTP packet:", "green")} {packet}'
)
@@ -1519,7 +1519,7 @@ class Protocol(utils.EventEmitter):
header = bytes([first_header_byte])
# Send one packet
self.l2cap_channel.send_pdu(header + payload[:max_fragment_size])
self.l2cap_channel.write(header + payload[:max_fragment_size])
# Prepare for the next packet
payload = payload[max_fragment_size:]
@@ -1829,7 +1829,7 @@ class Stream:
def send_media_packet(self, packet: MediaPacket) -> None:
assert self.rtp_channel
self.rtp_channel.send_pdu(bytes(packet))
self.rtp_channel.write(bytes(packet))
async def configure(self) -> None:
if self.state != State.IDLE:
+13
View File
@@ -1898,6 +1898,19 @@ class Controller:
'''
return bytes([hci.HCI_SUCCESS]) + self.le_features.value.to_bytes(8, 'little')
def on_hci_le_read_all_local_supported_features_command(
self, _command: hci.HCI_LE_Read_All_Local_Supported_Features_Command
) -> bytes | None:
'''
See Bluetooth spec Vol 4, Part E - 7.8.128 LE Read All Local Supported Features
Command
'''
return (
bytes([hci.HCI_SUCCESS])
+ bytes([0])
+ self.le_features.value.to_bytes(248, 'little')
)
def on_hci_le_set_random_address_command(
self, command: hci.HCI_LE_Set_Random_Address_Command
) -> bytes | None:
+406 -36
View File
@@ -1367,10 +1367,7 @@ class Peer:
def create_service_proxy(
self, proxy_class: type[_PROXY_CLASS]
) -> _PROXY_CLASS | None:
if proxy := proxy_class.from_client(self.gatt_client):
return cast(_PROXY_CLASS, proxy)
return None
return proxy_class.from_client(self.gatt_client)
async def discover_service_and_create_proxy(
self, proxy_class: type[_PROXY_CLASS]
@@ -1412,8 +1409,8 @@ class ConnectionParametersPreferences:
connection_interval_max: float = DEVICE_DEFAULT_CONNECTION_INTERVAL_MAX
max_latency: int = DEVICE_DEFAULT_CONNECTION_MAX_LATENCY
supervision_timeout: int = DEVICE_DEFAULT_CONNECTION_SUPERVISION_TIMEOUT
min_ce_length: int = DEVICE_DEFAULT_CONNECTION_MIN_CE_LENGTH
max_ce_length: int = DEVICE_DEFAULT_CONNECTION_MAX_CE_LENGTH
min_ce_length: float = DEVICE_DEFAULT_CONNECTION_MIN_CE_LENGTH
max_ce_length: float = DEVICE_DEFAULT_CONNECTION_MAX_CE_LENGTH
ConnectionParametersPreferences.default = ConnectionParametersPreferences()
@@ -1523,7 +1520,7 @@ class _IsoLink:
self.device.host.send_iso_sdu(connection_handle=self.handle, sdu=sdu)
async def get_tx_time_stamp(self) -> tuple[int, int, int]:
response = await self.device.host.send_sync_command(
response = await self.device.send_sync_command(
hci.HCI_LE_Read_ISO_TX_Sync_Command(connection_handle=self.handle)
)
return (
@@ -1700,7 +1697,7 @@ class Connection(utils.CompositeEventEmitter):
peer_address: hci.Address
peer_name: str | None
peer_resolvable_address: hci.Address | None
peer_le_features: hci.LeFeatureMask | None
peer_le_features: hci.LeFeatureMask
role: hci.Role
parameters: Parameters
encryption: int
@@ -1753,8 +1750,8 @@ class Connection(utils.CompositeEventEmitter):
EVENT_CIS_REQUEST = "cis_request"
EVENT_CIS_ESTABLISHMENT = "cis_establishment"
EVENT_CIS_ESTABLISHMENT_FAILURE = "cis_establishment_failure"
EVENT_LE_SUBRATE_CHANGE = "le_subrate_change"
EVENT_LE_SUBRATE_CHANGE_FAILURE = "le_subrate_change_failure"
EVENT_LE_REMOTE_FEATURES_CHANGE = "le_remote_features_change"
EVENT_LE_REMOTE_FEATURES_CHANGE_FAILURE = "le_remote_features_change_failure"
@utils.composite_listener
class Listener:
@@ -1832,14 +1829,14 @@ class Connection(utils.CompositeEventEmitter):
self.authenticated = False
self.sc = False
self.att_mtu = att.ATT_DEFAULT_MTU
self.data_length = DEVICE_DEFAULT_DATA_LENGTH
self.data_length: tuple[int, int, int, int] = DEVICE_DEFAULT_DATA_LENGTH
self.gatt_client = gatt_client.Client(self) # Per-connection client
self.gatt_server = (
device.gatt_server
) # By default, use the device's shared server
self.pairing_peer_io_capability = None
self.pairing_peer_authentication_requirements = None
self.peer_le_features = None
self.peer_le_features = hci.LeFeatureMask(0)
self.cs_configs = {}
self.cs_procedures = {}
@@ -1921,16 +1918,21 @@ class Connection(utils.CompositeEventEmitter):
connection_interval_max: float,
max_latency: int,
supervision_timeout: float,
min_ce_length: float = 0.0,
max_ce_length: float = 0.0,
use_l2cap=False,
) -> None:
"""
Request an update of the connection parameters.
Request a change of the connection parameters.
For short connection intervals (below 7.5ms, introduced in Bluetooth 6.2),
use the `update_parameters_with_subrate` method instead.
Args:
connection_interval_min: Minimum interval, in milliseconds.
connection_interval_max: Maximum interval, in milliseconds.
max_latency: Latency, in number of intervals.
supervision_timeout: Timeout, in milliseconds.
max_latency: Max latency, in number of intervals.
supervision_timeout: Supervision Timeout, in milliseconds.
use_l2cap: Request the update via L2CAP.
"""
return await self.device.update_connection_parameters(
@@ -1940,6 +1942,77 @@ class Connection(utils.CompositeEventEmitter):
max_latency,
supervision_timeout,
use_l2cap=use_l2cap,
min_ce_length=min_ce_length,
max_ce_length=max_ce_length,
)
async def update_parameters_with_subrate(
self,
connection_interval_min: float,
connection_interval_max: float,
subrate_min: int,
subrate_max: int,
max_latency: int,
continuation_number: int,
supervision_timeout: float,
min_ce_length: float,
max_ce_length: float,
) -> None:
"""
Request a change of the connection parameters.
This is similar to `update_parameters` but also allows specifying
the subrate parameters and supports shorter connection intervals (below
7.5ms, as introduced in Bluetooth 6.2).
Args:
connection_interval_min: Minimum interval, in milliseconds.
connection_interval_max: Maximum interval, in milliseconds.
subrate_min: Minimum subrate factor.
subrate_max: Maximum subrate factor.
max_latency: Max latency, in number of intervals.
continuation_number: Continuation number.
supervision_timeout: Supervision Timeout, in milliseconds.
min_ce_length: Minimum connection event length, in milliseconds.
max_ce_length: Maximumsub connection event length, in milliseconds.
"""
return await self.device.update_connection_parameters_with_subrate(
self,
connection_interval_min,
connection_interval_max,
subrate_min,
subrate_max,
max_latency,
continuation_number,
supervision_timeout,
min_ce_length,
max_ce_length,
)
async def update_subrate(
self,
subrate_min: int,
subrate_max: int,
max_latency: int,
continuation_number: int,
supervision_timeout: float,
) -> None:
"""
Request request a change to the subrating factor and/or other parameters.
Args:
subrate_min: Minimum subrate factor.
subrate_max: Maximum subrate factor.
max_latency: Max latency, in number of intervals.
continuation_number: Continuation number.
supervision_timeout: Supervision Timeout, in milliseconds.
"""
return await self.device.update_connection_subrate(
self,
subrate_min,
subrate_max,
max_latency,
continuation_number,
supervision_timeout,
)
async def set_phy(
@@ -2046,6 +2119,7 @@ class DeviceConfiguration:
le_privacy_enabled: bool = False
le_rpa_timeout: int = DEVICE_DEFAULT_LE_RPA_TIMEOUT
le_subrate_enabled: bool = False
le_shorter_connection_intervals_enabled: bool = False
classic_enabled: bool = False
classic_sc_enabled: bool = True
classic_ssp_enabled: bool = True
@@ -2400,6 +2474,9 @@ class Device(utils.CompositeEventEmitter):
self.le_rpa_timeout = config.le_rpa_timeout
self.le_rpa_periodic_update_task: asyncio.Task | None = None
self.le_subrate_enabled = config.le_subrate_enabled
self.le_shorter_connection_intervals_enabled = (
config.le_shorter_connection_intervals_enabled
)
self.classic_enabled = config.classic_enabled
self.cis_enabled = config.cis_enabled
self.classic_sc_enabled = config.classic_sc_enabled
@@ -2803,7 +2880,9 @@ class Device(utils.CompositeEventEmitter):
)
)
if self.cis_enabled:
if self.cis_enabled and self.host.supports_command(
hci.HCI_LE_SET_HOST_FEATURE_COMMAND
):
await self.send_sync_command(
hci.HCI_LE_Set_Host_Feature_Command(
bit_number=hci.LeFeature.CONNECTED_ISOCHRONOUS_STREAM,
@@ -2811,7 +2890,13 @@ class Device(utils.CompositeEventEmitter):
)
)
if self.le_subrate_enabled:
if (
self.le_subrate_enabled
and self.host.supports_command(hci.HCI_LE_SET_HOST_FEATURE_COMMAND)
and self.host.supports_le_features(
hci.LeFeatureMask.CONNECTION_SUBRATING
)
):
await self.send_sync_command(
hci.HCI_LE_Set_Host_Feature_Command(
bit_number=hci.LeFeature.CONNECTION_SUBRATING_HOST_SUPPORT,
@@ -2819,7 +2904,9 @@ class Device(utils.CompositeEventEmitter):
)
)
if self.config.channel_sounding_enabled:
if self.config.channel_sounding_enabled and self.host.supports_command(
hci.HCI_LE_SET_HOST_FEATURE_COMMAND
):
await self.send_sync_command(
hci.HCI_LE_Set_Host_Feature_Command(
bit_number=hci.LeFeature.CHANNEL_SOUNDING_HOST_SUPPORT,
@@ -2852,6 +2939,20 @@ class Device(utils.CompositeEventEmitter):
tx_snr_capability=result.tx_snr_capability,
)
if (
self.le_shorter_connection_intervals_enabled
and self.host.supports_command(hci.HCI_LE_SET_HOST_FEATURE_COMMAND)
and self.host.supports_le_features(
hci.LeFeatureMask.SHORTER_CONNECTION_INTERVALS
)
):
await self.send_sync_command(
hci.HCI_LE_Set_Host_Feature_Command(
bit_number=hci.LeFeature.SHORTER_CONNECTION_INTERVALS_HOST_SUPPORT,
bit_value=1,
)
)
if self.classic_enabled:
await self.send_sync_command(
hci.HCI_Write_Local_Name_Command(local_name=self.name.encode('utf8')),
@@ -4183,6 +4284,9 @@ class Device(utils.CompositeEventEmitter):
'''
Request an update of the connection parameters.
For short connection intervals (below 7.5 ms, introduced in Bluetooth 6.2),
use `update_connection_parameters_with_subrate` instead.
Args:
connection: The connection to update
connection_interval_min: Minimum interval, in milliseconds.
@@ -4223,17 +4327,148 @@ class Device(utils.CompositeEventEmitter):
return
await self.send_async_command(
hci.HCI_LE_Connection_Update_Command(
connection_handle=connection.handle,
connection_interval_min=connection_interval_min,
connection_interval_max=connection_interval_max,
max_latency=max_latency,
supervision_timeout=supervision_timeout,
min_ce_length=min_ce_length,
max_ce_length=max_ce_length,
pending_result = asyncio.get_running_loop().create_future()
with closing(utils.EventWatcher()) as watcher:
@watcher.on(connection, connection.EVENT_CONNECTION_PARAMETERS_UPDATE)
def _():
pending_result.set_result(None)
@watcher.on(
connection, connection.EVENT_CONNECTION_PARAMETERS_UPDATE_FAILURE
)
)
def _(error_code: int):
pending_result.set_exception(hci.HCI_Error(error_code))
await self.send_async_command(
hci.HCI_LE_Connection_Update_Command(
connection_handle=connection.handle,
connection_interval_min=connection_interval_min,
connection_interval_max=connection_interval_max,
max_latency=max_latency,
supervision_timeout=supervision_timeout,
min_ce_length=min_ce_length,
max_ce_length=max_ce_length,
)
)
await connection.cancel_on_disconnection(pending_result)
async def update_connection_parameters_with_subrate(
self,
connection: Connection,
connection_interval_min: float,
connection_interval_max: float,
subrate_min: int,
subrate_max: int,
max_latency: int,
continuation_number: int,
supervision_timeout: float,
min_ce_length: float = 0.0,
max_ce_length: float = 0.0,
) -> None:
'''
Request a change of the connection parameters.
This is similar to `update_connection_parameters` but also allows specifying
the subrate parameters and supports shorter connection intervals (below
7.5ms, as introduced in Bluetooth 6.2).
Args:
connection: The connection to update
connection_interval_min: Minimum interval, in milliseconds.
connection_interval_max: Maximum interval, in milliseconds.
subrate_min: Minimum subrate factor.
subrate_max: Maximum subrate factor.
max_latency: Max latency, in number of intervals.
continuation_number: Continuation number.
supervision_timeout: Supervision Timeout, in milliseconds.
min_ce_length: Minimum connection event length, in milliseconds.
max_ce_length: Maximum connection event length, in milliseconds.
'''
# Convert the input parameters
connection_interval_min = int(connection_interval_min / 0.125)
connection_interval_max = int(connection_interval_max / 0.125)
supervision_timeout = int(supervision_timeout / 10)
min_ce_length = int(min_ce_length / 0.125)
max_ce_length = int(max_ce_length / 0.125)
pending_result = asyncio.get_running_loop().create_future()
with closing(utils.EventWatcher()) as watcher:
@watcher.on(connection, connection.EVENT_CONNECTION_PARAMETERS_UPDATE)
def _():
pending_result.set_result(None)
@watcher.on(
connection, connection.EVENT_CONNECTION_PARAMETERS_UPDATE_FAILURE
)
def _(error_code: int):
pending_result.set_exception(hci.HCI_Error(error_code))
await self.send_async_command(
hci.HCI_LE_Connection_Rate_Request_Command(
connection_handle=connection.handle,
connection_interval_min=connection_interval_min,
connection_interval_max=connection_interval_max,
subrate_min=subrate_min,
subrate_max=subrate_max,
max_latency=max_latency,
continuation_number=continuation_number,
supervision_timeout=supervision_timeout,
min_ce_length=min_ce_length,
max_ce_length=max_ce_length,
)
)
await connection.cancel_on_disconnection(pending_result)
async def update_connection_subrate(
self,
connection: Connection,
subrate_min: int,
subrate_max: int,
max_latency: int,
continuation_number: int,
supervision_timeout: float,
) -> None:
'''
Request a change to the subrating factor and/or other parameters.
Args:
connection: The connection to update
subrate_min: Minimum subrate factor.
subrate_max: Maximum subrate factor.
max_latency: Max latency, in number of intervals.
continuation_number: Continuation number.
supervision_timeout: Supervision Timeout, in milliseconds.
'''
pending_result = asyncio.get_running_loop().create_future()
with closing(utils.EventWatcher()) as watcher:
@watcher.on(connection, connection.EVENT_CONNECTION_PARAMETERS_UPDATE)
def _():
pending_result.set_result(None)
@watcher.on(
connection, connection.EVENT_CONNECTION_PARAMETERS_UPDATE_FAILURE
)
def _(error_code: int):
pending_result.set_exception(hci.HCI_Error(error_code))
await self.send_async_command(
hci.HCI_LE_Subrate_Request_Command(
connection_handle=connection.handle,
subrate_min=subrate_min,
subrate_max=subrate_max,
max_latency=max_latency,
continuation_number=continuation_number,
supervision_timeout=int(supervision_timeout / 10),
)
)
await connection.cancel_on_disconnection(pending_result)
async def get_connection_rssi(self, connection):
result = await self.send_sync_command(
@@ -4292,6 +4527,87 @@ class Device(utils.CompositeEventEmitter):
)
)
async def set_default_connection_subrate(
self,
subrate_min: int,
subrate_max: int,
max_latency: int,
continuation_number: int,
supervision_timeout: float,
) -> None:
'''
Set the default subrate parameters for new connections.
Args:
subrate_min: Minimum subrate factor.
subrate_max: Maximum subrate factor.
max_latency: Max latency, in number of intervals.
continuation_number: Continuation number.
supervision_timeout: Supervision Timeout, in milliseconds.
'''
# Convert the input parameters
supervision_timeout = int(supervision_timeout / 10)
await self.send_command(
hci.HCI_LE_Set_Default_Subrate_Command(
subrate_min=subrate_min,
subrate_max=subrate_max,
max_latency=max_latency,
continuation_number=continuation_number,
supervision_timeout=supervision_timeout,
),
check_result=True,
)
async def set_default_connection_parameters(
self,
connection_interval_min: float,
connection_interval_max: float,
subrate_min: int,
subrate_max: int,
max_latency: int,
continuation_number: int,
supervision_timeout: float,
min_ce_length: float = 0.0,
max_ce_length: float = 0.0,
) -> None:
'''
Set the default connection parameters for new connections.
Args:
connection_interval_min: Minimum interval, in milliseconds.
connection_interval_max: Maximum interval, in milliseconds.
subrate_min: Minimum subrate factor.
subrate_max: Maximum subrate factor.
max_latency: Max latency, in number of intervals.
continuation_number: Continuation number.
supervision_timeout: Supervision Timeout, in milliseconds.
min_ce_length: Minimum connection event length, in milliseconds.
max_ce_length: Maximum connection event length, in milliseconds.
'''
# Convert the input parameters
connection_interval_min = int(connection_interval_min / 0.125)
connection_interval_max = int(connection_interval_max / 0.125)
supervision_timeout = int(supervision_timeout / 10)
min_ce_length = int(min_ce_length / 0.125)
max_ce_length = int(max_ce_length / 0.125)
await self.send_sync_command(
hci.HCI_LE_Set_Default_Rate_Parameters_Command(
connection_interval_min=connection_interval_min,
connection_interval_max=connection_interval_max,
subrate_min=subrate_min,
subrate_max=subrate_max,
max_latency=max_latency,
continuation_number=continuation_number,
supervision_timeout=supervision_timeout,
min_ce_length=min_ce_length,
max_ce_length=max_ce_length,
)
)
async def transfer_periodic_sync(
self, connection: Connection, sync_handle: int, service_data: int = 0
) -> None:
@@ -4314,7 +4630,9 @@ class Device(utils.CompositeEventEmitter):
)
)
async def find_peer_by_name(self, name: str, transport=PhysicalTransport.LE):
async def find_peer_by_name(
self, name: str, transport=PhysicalTransport.LE
) -> hci.Address:
"""
Scan for a peer with a given name and return its address.
"""
@@ -4332,6 +4650,7 @@ class Device(utils.CompositeEventEmitter):
listener: Callable[..., None] | None = None
was_scanning = self.scanning
was_discovering = self.discovering
event_name: str | None = None
try:
if transport == PhysicalTransport.LE:
event_name = 'advertisement'
@@ -4357,11 +4676,11 @@ class Device(utils.CompositeEventEmitter):
if not self.discovering:
await self.start_discovery()
else:
return None
raise ValueError('invalid transport')
return await utils.cancel_on_event(self, Device.EVENT_FLUSH, peer_address)
finally:
if listener is not None:
if listener is not None and event_name is not None:
self.remove_listener(event_name, listener)
if transport == PhysicalTransport.LE and not was_scanning:
@@ -4387,7 +4706,7 @@ class Device(utils.CompositeEventEmitter):
peer_address.set_result(address)
return
if address.is_resolvable:
if address.is_resolvable and self.address_resolver is not None:
resolved_address = self.address_resolver.resolve(address)
if resolved_address == identity_address:
if not peer_address.done():
@@ -4602,7 +4921,6 @@ class Device(utils.CompositeEventEmitter):
# [Classic only]
async def request_remote_name(self, remote: hci.Address | Connection) -> str:
# Set up event handlers
pending_name: asyncio.Future[str] = asyncio.get_running_loop().create_future()
peer_address = (
@@ -6189,7 +6507,7 @@ class Device(utils.CompositeEventEmitter):
):
logger.debug(
f'*** Connection Parameters Update: [0x{connection.handle:04X}] '
f'{connection.peer_address} as {connection.role_name}, '
f'{connection.peer_address} as {connection.role_name}'
)
if connection.parameters.connection_interval != connection_interval * 1.25:
connection.parameters = Connection.Parameters(
@@ -6213,7 +6531,41 @@ class Device(utils.CompositeEventEmitter):
self, connection: Connection, error: int
):
logger.debug(
f'*** Connection Parameters Update Failed: [0x{connection.handle:04X}] '
f'*** Connection Parameters Update failed: [0x{connection.handle:04X}] '
f'{connection.peer_address} as {connection.role_name}, '
f'error={error}'
)
connection.emit(connection.EVENT_CONNECTION_PARAMETERS_UPDATE_FAILURE, error)
@host_event_handler
@with_connection_from_handle
def on_le_connection_rate_change(
self,
connection: Connection,
connection_interval: int,
subrate_factor: int,
peripheral_latency: int,
continuation_number: int,
supervision_timeout: int,
):
logger.debug(
f'*** Connection Rate Change: [0x{connection.handle:04X}] '
f'{connection.peer_address} as {connection.role_name}'
)
connection.parameters = Connection.Parameters(
connection_interval=connection_interval * 0.125,
subrate_factor=subrate_factor,
peripheral_latency=peripheral_latency,
continuation_number=continuation_number,
supervision_timeout=supervision_timeout * 10.0,
)
connection.emit(connection.EVENT_CONNECTION_PARAMETERS_UPDATE)
@host_event_handler
@with_connection_from_handle
def on_le_connection_rate_change_failure(self, connection: Connection, error: int):
logger.debug(
f'*** Connection Rate Change failed: [0x{connection.handle:04X}] '
f'{connection.peer_address} as {connection.role_name}, '
f'error={error}'
)
@@ -6256,7 +6608,25 @@ class Device(utils.CompositeEventEmitter):
subrate_factor,
continuation_number,
)
connection.emit(connection.EVENT_LE_SUBRATE_CHANGE)
connection.emit(connection.EVENT_CONNECTION_PARAMETERS_UPDATE)
@host_event_handler
@with_connection_from_handle
def on_le_subrate_change_failure(self, connection: Connection, status: int):
connection.emit(connection.EVENT_CONNECTION_PARAMETERS_UPDATE_FAILURE, status)
@host_event_handler
@with_connection_from_handle
def on_le_remote_features(
self, connection: Connection, le_features: hci.LeFeatureMask
):
connection.peer_le_features = le_features
connection.emit(connection.EVENT_LE_REMOTE_FEATURES_CHANGE)
@host_event_handler
@with_connection_from_handle
def on_le_remote_features_failure(self, connection: Connection, status: int):
connection.emit(connection.EVENT_LE_REMOTE_FEATURES_CHANGE_FAILURE, status)
@host_event_handler
@with_connection_from_handle
+2 -2
View File
@@ -29,7 +29,7 @@ import functools
import logging
import struct
from collections.abc import Iterable, Sequence
from typing import TypeVar
from typing import ClassVar, TypeVar
from bumble.att import Attribute, AttributeValue, AttributeValueV2
from bumble.colors import color
@@ -403,7 +403,7 @@ class TemplateService(Service):
to expose their UUID as a class property
'''
UUID: UUID
UUID: ClassVar[UUID]
def __init__(
self,
+5 -4
View File
@@ -34,11 +34,14 @@ from datetime import datetime
from typing import (
TYPE_CHECKING,
Any,
ClassVar,
Generic,
TypeVar,
overload,
)
from typing_extensions import Self
from bumble import att, core, l2cap, utils
from bumble.colors import color
from bumble.core import UUID, InvalidStateError
@@ -249,10 +252,10 @@ class ProfileServiceProxy:
Base class for profile-specific service proxies
'''
SERVICE_CLASS: type[TemplateService]
SERVICE_CLASS: ClassVar[type[TemplateService]]
@classmethod
def from_client(cls, client: Client) -> ProfileServiceProxy | None:
def from_client(cls, client: Client) -> Self | None:
return ServiceProxy.from_client(cls, client, cls.SERVICE_CLASS.UUID)
@@ -285,8 +288,6 @@ class Client:
self._bearer_id = (
f'[0x{bearer.connection.handle:04X}|CID=0x{bearer.source_cid:04X}]'
)
# Fill the mtu.
bearer.on_att_mtu_update(att.ATT_DEFAULT_MTU)
self.connection = bearer.connection
else:
bearer.on(bearer.EVENT_DISCONNECTION, self.on_disconnection)
+100 -1
View File
@@ -115,7 +115,6 @@ class Server(utils.EventEmitter):
channel.connection.handle,
channel.source_cid,
)
channel.att_mtu = att.ATT_DEFAULT_MTU
channel.sink = lambda pdu: self.on_gatt_pdu(
channel, att.ATT_PDU.from_bytes(pdu)
)
@@ -777,6 +776,18 @@ class Server(utils.EventEmitter):
error_code=att.ATT_ATTRIBUTE_NOT_FOUND_ERROR,
)
if (
request.starting_handle == 0x0000
or request.starting_handle > request.ending_handle
):
response = att.ATT_Error_Response(
request_opcode_in_error=request.op_code,
attribute_handle_in_error=request.starting_handle,
error_code=att.ATT_INVALID_HANDLE_ERROR,
)
self.send_response(bearer, response)
return
attributes: list[tuple[int, bytes]] = []
for attribute in (
attribute
@@ -977,6 +988,94 @@ class Server(utils.EventEmitter):
self.send_response(bearer, response)
@utils.AsyncRunner.run_in_task()
async def on_att_read_multiple_request(
self, bearer: att.Bearer, request: att.ATT_Read_Multiple_Request
):
'''
See Bluetooth spec Vol 3, Part F - 3.4.4.7 Read Multiple Request.
'''
response: att.ATT_PDU
pdu_space_available = bearer.att_mtu - 1
values: list[bytes] = []
for handle in request.set_of_handles:
if not (attribute := self.get_attribute(handle)):
response = att.ATT_Error_Response(
request_opcode_in_error=request.op_code,
attribute_handle_in_error=handle,
error_code=att.ATT_ATTRIBUTE_NOT_FOUND_ERROR,
)
self.send_response(bearer, response)
return
# No need to catch permission errors here, since these attributes
# must all be world-readable
attribute_value = await attribute.read_value(bearer)
# Check the attribute value size
max_attribute_size = min(bearer.att_mtu - 1, 251)
if len(attribute_value) > max_attribute_size:
# We need to truncate
attribute_value = attribute_value[:max_attribute_size]
# Check if there is enough space
entry_size = len(attribute_value)
if pdu_space_available < entry_size:
break
# Add the attribute to the list
values.append(attribute_value)
pdu_space_available -= entry_size
response = att.ATT_Read_Multiple_Response(set_of_values=b''.join(values))
self.send_response(bearer, response)
@utils.AsyncRunner.run_in_task()
async def on_att_read_multiple_variable_request(
self, bearer: att.Bearer, request: att.ATT_Read_Multiple_Variable_Request
):
'''
See Bluetooth spec Vol 3, Part F - 3.4.4.11 Read Multiple Variable Request.
'''
response: att.ATT_PDU
pdu_space_available = bearer.att_mtu - 1
length_value_tuple_list: list[tuple[int, bytes]] = []
for handle in request.set_of_handles:
if not (attribute := self.get_attribute(handle)):
response = att.ATT_Error_Response(
request_opcode_in_error=request.op_code,
attribute_handle_in_error=handle,
error_code=att.ATT_ATTRIBUTE_NOT_FOUND_ERROR,
)
self.send_response(bearer, response)
return
# No need to catch permission errors here, since these attributes
# must all be world-readable
attribute_value = await attribute.read_value(bearer)
length = len(attribute_value)
# Check the attribute value size
max_attribute_size = min(bearer.att_mtu - 3, 251)
if len(attribute_value) > max_attribute_size:
# We need to truncate
attribute_value = attribute_value[:max_attribute_size]
# Check if there is enough space
entry_size = 2 + len(attribute_value)
# Add the attribute to the list
length_value_tuple_list.append((length, attribute_value))
pdu_space_available -= entry_size
if pdu_space_available <= 0:
break
response = att.ATT_Read_Multiple_Variable_Response(
length_value_tuple_list=length_value_tuple_list
)
self.send_response(bearer, response)
@utils.AsyncRunner.run_in_task()
async def on_att_write_request(
self, bearer: att.Bearer, request: att.ATT_Write_Request
+206 -56
View File
@@ -398,8 +398,9 @@ HCI_LE_CS_SUBEVENT_RESULT_EVENT = 0x31
HCI_LE_CS_SUBEVENT_RESULT_CONTINUE_EVENT = 0x32
HCI_LE_CS_TEST_END_COMPLETE_EVENT = 0x33
HCI_LE_MONITORED_ADVERTISERS_REPORT_EVENT = 0x34
HCI_LE_FRAME_SPACE_UPDATE_EVENT = 0x35
HCI_LE_FRAME_SPACE_UPDATE_COMPLETE_EVENT = 0x35
HCI_LE_UTP_RECEIVE_EVENT = 0x36
HCI_LE_CONNECTION_RATE_CHANGE_EVENT = 0x37
# HCI Command
@@ -736,6 +737,12 @@ HCI_LE_CLEAR_MONITORED_ADVERTISERS_LIST_COMMAND = hci_c
HCI_LE_READ_MONITORED_ADVERTISERS_LIST_SIZE_COMMAND = hci_command_op_code(0x08, 0x009B)
HCI_LE_ENABLE_MONITORING_ADVERTISERS_COMMAND = hci_command_op_code(0x08, 0x009C)
HCI_LE_FRAME_SPACE_UPDATE_COMMAND = hci_command_op_code(0x08, 0x009D)
HCI_LE_SET_RESOLVABLE_PRIVATE_ADDRESS_TIMEOUT_V2_COMMAND = hci_command_op_code(0x08, 0x009E)
HCI_LE_ENABLE_UTP_OTA_MODE_COMMAND = hci_command_op_code(0x08, 0x009F)
HCI_LE_UTP_SEND_COMMAND = hci_command_op_code(0x08, 0x00A0)
HCI_LE_CONNECTION_RATE_REQUEST_COMMAND = hci_command_op_code(0x08, 0x00A1)
HCI_LE_SET_DEFAULT_RATE_PARAMETERS_COMMAND = hci_command_op_code(0x08, 0x00A2)
HCI_LE_READ_MINIMUM_SUPPORTED_CONNECTION_INTERVAL_COMMAND = hci_command_op_code(0x08, 0x00A3)
# HCI Error Codes
@@ -1398,6 +1405,12 @@ HCI_SUPPORTED_COMMANDS_MASKS = {
HCI_LE_CLEAR_MONITORED_ADVERTISERS_LIST_COMMAND : 1 << (47*8+7),
HCI_LE_READ_MONITORED_ADVERTISERS_LIST_SIZE_COMMAND : 1 << (48*8+0),
HCI_LE_FRAME_SPACE_UPDATE_COMMAND : 1 << (48*8+1),
HCI_LE_SET_RESOLVABLE_PRIVATE_ADDRESS_TIMEOUT_V2_COMMAND : 1 << (48*8+2),
HCI_LE_ENABLE_UTP_OTA_MODE_COMMAND : 1 << (48*8+3),
HCI_LE_UTP_SEND_COMMAND : 1 << (48*8+4),
HCI_LE_CONNECTION_RATE_REQUEST_COMMAND : 1 << (48*8+5),
HCI_LE_SET_DEFAULT_RATE_PARAMETERS_COMMAND : 1 << (48*8+6),
HCI_LE_READ_MINIMUM_SUPPORTED_CONNECTION_INTERVAL_COMMAND : 1 << (48*8+7)
}
# LE Supported Features
@@ -1455,6 +1468,14 @@ class LeFeature(SpecableEnum):
LL_EXTENDED_FEATURE_SET = 63
MONITORING_ADVERTISERS = 64
FRAME_SPACE_UPDATE = 65
UTP_OTA_MODE = 66
UTP_HCI_MODE = 67
LL_OTA_UTP_IND_MAXIMUM_LENGTH_0 = 68
LL_OTA_UTP_IND_MAXIMUM_LENGTH_1 = 69
SHORTER_CONNECTION_INTERVALS = 72
SHORTER_CONNECTION_INTERVALS_HOST_SUPPORT = 73
LE_FLUSHABLE_ACL_DATA = 74
class LeFeatureMask(utils.CompatibleIntFlag):
LE_ENCRYPTION = 1 << LeFeature.LE_ENCRYPTION
@@ -1509,6 +1530,13 @@ class LeFeatureMask(utils.CompatibleIntFlag):
LL_EXTENDED_FEATURE_SET = 1 << LeFeature.LL_EXTENDED_FEATURE_SET
MONITORING_ADVERTISERS = 1 << LeFeature.MONITORING_ADVERTISERS
FRAME_SPACE_UPDATE = 1 << LeFeature.FRAME_SPACE_UPDATE
UTP_OTA_MODE = 1 << LeFeature.UTP_OTA_MODE
UTP_HCI_MODE = 1 << LeFeature.UTP_HCI_MODE
LL_OTA_UTP_IND_MAXIMUM_LENGTH_0 = 1 << LeFeature.LL_OTA_UTP_IND_MAXIMUM_LENGTH_0
LL_OTA_UTP_IND_MAXIMUM_LENGTH_1 = 1 << LeFeature.LL_OTA_UTP_IND_MAXIMUM_LENGTH_1
SHORTER_CONNECTION_INTERVALS = 1 << LeFeature.SHORTER_CONNECTION_INTERVALS
SHORTER_CONNECTION_INTERVALS_HOST_SUPPORT = 1 << LeFeature.SHORTER_CONNECTION_INTERVALS_HOST_SUPPORT
LE_FLUSHABLE_ACL_DATA = 1 << LeFeature.LE_FLUSHABLE_ACL_DATA
class LmpFeature(SpecableEnum):
# Page 0 (Legacy LMP features)
@@ -2165,9 +2193,9 @@ class Address:
RANDOM_IDENTITY_ADDRESS = AddressType.RANDOM_IDENTITY
# Type declarations
NIL: Address
ANY: Address
ANY_RANDOM: Address
NIL: ClassVar[Address]
ANY: ClassVar[Address]
ANY_RANDOM: ClassVar[Address]
# pylint: disable-next=unnecessary-lambda
ADDRESS_TYPE_SPEC = {'size': 1, 'mapper': lambda x: Address.address_type_name(x)}
@@ -2279,38 +2307,38 @@ class Address:
self.address_type = address_type
def clone(self):
def clone(self) -> Address:
return Address(self.address_bytes, self.address_type)
@property
def is_public(self):
def is_public(self) -> bool:
return self.address_type in (
self.PUBLIC_DEVICE_ADDRESS,
self.PUBLIC_IDENTITY_ADDRESS,
)
@property
def is_random(self):
def is_random(self) -> bool:
return not self.is_public
@property
def is_resolved(self):
def is_resolved(self) -> bool:
return self.address_type in (
self.PUBLIC_IDENTITY_ADDRESS,
self.RANDOM_IDENTITY_ADDRESS,
)
@property
def is_resolvable(self):
def is_resolvable(self) -> bool:
return self.address_type == self.RANDOM_DEVICE_ADDRESS and (
self.address_bytes[5] >> 6 == 1
)
@property
def is_static(self):
def is_static(self) -> bool:
return self.is_random and (self.address_bytes[5] >> 6 == 3)
def to_string(self, with_type_qualifier=True):
def to_string(self, with_type_qualifier: bool = True) -> str:
'''
String representation of the address, MSB first, with an optional type
qualifier.
@@ -2320,23 +2348,23 @@ class Address:
return result
return result + '/P'
def __bytes__(self):
def __bytes__(self) -> bytes:
return self.address_bytes
def __hash__(self):
def __hash__(self) -> int:
return hash(self.address_bytes)
def __eq__(self, other):
def __eq__(self, other: Any) -> bool:
return (
isinstance(other, Address)
and self.address_bytes == other.address_bytes
and self.is_public == other.is_public
)
def __str__(self):
def __str__(self) -> str:
return self.to_string()
def __repr__(self):
def __repr__(self) -> str:
return f'Address({self.to_string(False)}/{self.address_type_name(self.address_type)})'
@@ -2375,10 +2403,10 @@ class HCI_Packet:
Abstract Base class for HCI packets
'''
hci_packet_type: ClassVar[int]
hci_packet_type: int
@staticmethod
def from_bytes(packet: bytes) -> HCI_Packet:
@classmethod
def from_bytes(cls, packet: bytes) -> HCI_Packet:
packet_type = packet[0]
if packet_type == HCI_COMMAND_PACKET:
@@ -2398,7 +2426,7 @@ class HCI_Packet:
return HCI_CustomPacket(packet)
def __init__(self, name):
def __init__(self, name: str) -> None:
self.name = name
def __bytes__(self) -> bytes:
@@ -2410,7 +2438,7 @@ class HCI_Packet:
# -----------------------------------------------------------------------------
class HCI_CustomPacket(HCI_Packet):
def __init__(self, payload):
def __init__(self, payload: bytes) -> None:
super().__init__('HCI_CUSTOM_PACKET')
self.hci_packet_type = payload[0]
self.payload = payload
@@ -3746,7 +3774,7 @@ class HCI_Write_Extended_Inquiry_Response_Command(
'''
fec_required: int = field(metadata=metadata(1))
extended_inquiry_response: int = field(
extended_inquiry_response: bytes = field(
metadata=metadata({'size': 240, 'serializer': lambda x: padded_bytes(x, 240)})
)
@@ -5796,7 +5824,25 @@ class HCI_LE_Subrate_Request_Command(HCI_AsyncCommand):
# -----------------------------------------------------------------------------
@dataclasses.dataclass
class HHCI_LE_CS_Read_Local_Supported_Capabilities_ReturnParameters(
class HCI_LE_Read_All_Local_Supported_Features_ReturnParameters(
HCI_StatusReturnParameters
):
max_page: int = field(metadata=metadata(1))
le_features: bytes = field(metadata=metadata(248))
@HCI_SyncCommand.sync_command(HCI_LE_Read_All_Local_Supported_Features_ReturnParameters)
class HCI_LE_Read_All_Local_Supported_Features_Command(
HCI_SyncCommand[HCI_LE_Read_All_Local_Supported_Features_ReturnParameters]
):
'''
See Bluetooth spec @ 7.8.128 LE Read All Local Supported Features Command
'''
# -----------------------------------------------------------------------------
@dataclasses.dataclass
class HCI_LE_CS_Read_Local_Supported_Capabilities_ReturnParameters(
HCI_StatusReturnParameters
):
num_config_supported: int = field(metadata=metadata(1))
@@ -5822,11 +5868,11 @@ class HHCI_LE_CS_Read_Local_Supported_Capabilities_ReturnParameters(
@HCI_SyncCommand.sync_command(
HHCI_LE_CS_Read_Local_Supported_Capabilities_ReturnParameters
HCI_LE_CS_Read_Local_Supported_Capabilities_ReturnParameters
)
@dataclasses.dataclass
class HCI_LE_CS_Read_Local_Supported_Capabilities_Command(
HCI_SyncCommand[HHCI_LE_CS_Read_Local_Supported_Capabilities_ReturnParameters]
HCI_SyncCommand[HCI_LE_CS_Read_Local_Supported_Capabilities_ReturnParameters]
):
'''
See Bluetooth spec @ 7.8.130 LE CS Read Local Supported Capabilities command
@@ -6073,6 +6119,92 @@ class HCI_LE_CS_Test_End_Command(HCI_AsyncCommand):
'''
# -----------------------------------------------------------------------------
@HCI_Command.command
@dataclasses.dataclass
class HCI_LE_Frame_Space_Update_Command(HCI_AsyncCommand):
'''
See Bluetooth spec @ 7.8.151 LE Frame Space Update command
'''
class SpacingType(SpecableFlag):
T_IFS_ACL_CP = 1 << 0
T_IFS_ACL_PC = 1 << 1
T_MCES = 1 << 2
T_IFS_CIS = 1 << 3
T_MSS_CIS = 1 << 4
connection_handle: int = field(metadata=metadata(2))
frame_space_min: int = field(metadata=metadata(2))
frame_space_max: int = field(metadata=metadata(2))
phys: int = field(metadata=PhyBit.type_metadata(1))
spacing_types: int = field(metadata=SpacingType.type_metadata(2))
# -----------------------------------------------------------------------------
@HCI_Command.command
@dataclasses.dataclass
class HCI_LE_Connection_Rate_Request_Command(HCI_AsyncCommand):
'''
See Bluetooth spec @ 7.8.154 LE Connection Rate Request command
'''
connection_handle: int = field(metadata=metadata(2))
connection_interval_min: int = field(metadata=metadata(2))
connection_interval_max: int = field(metadata=metadata(2))
subrate_min: int = field(metadata=metadata(2))
subrate_max: int = field(metadata=metadata(2))
max_latency: int = field(metadata=metadata(2))
continuation_number: int = field(metadata=metadata(2))
supervision_timeout: int = field(metadata=metadata(2))
min_ce_length: int = field(metadata=metadata(2))
max_ce_length: int = field(metadata=metadata(2))
# -----------------------------------------------------------------------------
@HCI_SyncCommand.sync_command(HCI_StatusReturnParameters)
@dataclasses.dataclass
class HCI_LE_Set_Default_Rate_Parameters_Command(
HCI_SyncCommand[HCI_StatusReturnParameters]
):
'''
See Bluetooth spec @ 7.8.155 LE Set Default Rate Parameters command
'''
connection_interval_min: int = field(metadata=metadata(2))
connection_interval_max: int = field(metadata=metadata(2))
subrate_min: int = field(metadata=metadata(2))
subrate_max: int = field(metadata=metadata(2))
max_latency: int = field(metadata=metadata(2))
continuation_number: int = field(metadata=metadata(2))
supervision_timeout: int = field(metadata=metadata(2))
min_ce_length: int = field(metadata=metadata(2))
max_ce_length: int = field(metadata=metadata(2))
# -----------------------------------------------------------------------------
@dataclasses.dataclass
class HCI_LE_Read_Minimum_Supported_Connection_Interval_ReturnParameters(
HCI_StatusReturnParameters
):
minimum_supported_connection_interval: int = field(metadata=metadata(1))
group_min: Sequence[int] = field(metadata=metadata(2, list_begin=True))
group_max: Sequence[int] = field(metadata=metadata(2))
group_stride: Sequence[int] = field(metadata=metadata(2, list_end=True))
@HCI_SyncCommand.sync_command(
HCI_LE_Read_Minimum_Supported_Connection_Interval_ReturnParameters
)
@dataclasses.dataclass
class HCI_LE_Read_Minimum_Supported_Connection_Interval_Command(
HCI_SyncCommand[HCI_LE_Read_Minimum_Supported_Connection_Interval_ReturnParameters]
):
'''
See Bluetooth spec @ 7.8.156 LE Read Minimum Supported Connection Interval command
'''
# -----------------------------------------------------------------------------
# HCI Events
# -----------------------------------------------------------------------------
@@ -7142,6 +7274,23 @@ class HCI_LE_CS_Test_End_Complete_Event(HCI_LE_Meta_Event):
status: int = field(metadata=metadata(STATUS_SPEC))
# -----------------------------------------------------------------------------
@HCI_LE_Meta_Event.event
@dataclasses.dataclass
class HCI_LE_Connection_Rate_Change_Event(HCI_LE_Meta_Event):
'''
See Bluetooth spec @ 7.7.65.50 LE Connection Rate Change event
'''
status: int = field(metadata=metadata(STATUS_SPEC))
connection_handle: int = field(metadata=metadata(2))
connection_interval: int = field(metadata=metadata(2))
subrate_factor: int = field(metadata=metadata(2))
peripheral_latency: int = field(metadata=metadata(2))
continuation_number: int = field(metadata=metadata(2))
supervision_timeout: int = field(metadata=metadata(2))
# -----------------------------------------------------------------------------
@HCI_Event.event
@dataclasses.dataclass
@@ -7805,6 +7954,7 @@ class HCI_Vendor_Event(HCI_Event):
# -----------------------------------------------------------------------------
@dataclasses.dataclass
class HCI_AclDataPacket(HCI_Packet):
'''
See Bluetooth spec @ 5.4.2 HCI ACL Data Packets
@@ -7812,8 +7962,14 @@ class HCI_AclDataPacket(HCI_Packet):
hci_packet_type = HCI_ACL_DATA_PACKET
@staticmethod
def from_bytes(packet: bytes) -> HCI_AclDataPacket:
connection_handle: int
pb_flag: int
bc_flag: int
data_total_length: int
data: bytes
@classmethod
def from_bytes(cls, packet: bytes) -> HCI_AclDataPacket:
# Read the header
h, data_total_length = struct.unpack_from('<HH', packet, 1)
connection_handle = h & 0xFFF
@@ -7822,25 +7978,22 @@ class HCI_AclDataPacket(HCI_Packet):
data = packet[5:]
if len(data) != data_total_length:
raise InvalidPacketError('invalid packet length')
return HCI_AclDataPacket(
connection_handle, pb_flag, bc_flag, data_total_length, data
return cls(
connection_handle=connection_handle,
pb_flag=pb_flag,
bc_flag=bc_flag,
data_total_length=data_total_length,
data=data,
)
def __bytes__(self):
def __bytes__(self) -> bytes:
h = (self.pb_flag << 12) | (self.bc_flag << 14) | self.connection_handle
return (
struct.pack('<BHH', HCI_ACL_DATA_PACKET, h, self.data_total_length)
+ self.data
)
def __init__(self, connection_handle, pb_flag, bc_flag, data_total_length, data):
self.connection_handle = connection_handle
self.pb_flag = pb_flag
self.bc_flag = bc_flag
self.data_total_length = data_total_length
self.data = data
def __str__(self):
def __str__(self) -> str:
return (
f'{color("ACL", "blue")}: '
f'handle=0x{self.connection_handle:04x}, '
@@ -7851,6 +8004,7 @@ class HCI_AclDataPacket(HCI_Packet):
# -----------------------------------------------------------------------------
@dataclasses.dataclass
class HCI_SynchronousDataPacket(HCI_Packet):
'''
See Bluetooth spec @ 5.4.3 HCI SCO Data Packets
@@ -7858,8 +8012,13 @@ class HCI_SynchronousDataPacket(HCI_Packet):
hci_packet_type = HCI_SYNCHRONOUS_DATA_PACKET
@staticmethod
def from_bytes(packet: bytes) -> HCI_SynchronousDataPacket:
connection_handle: int
packet_status: int
data_total_length: int
data: bytes
@classmethod
def from_bytes(cls, packet: bytes) -> HCI_SynchronousDataPacket:
# Read the header
h, data_total_length = struct.unpack_from('<HB', packet, 1)
connection_handle = h & 0xFFF
@@ -7869,8 +8028,11 @@ class HCI_SynchronousDataPacket(HCI_Packet):
raise InvalidPacketError(
f'invalid packet length {len(data)} != {data_total_length}'
)
return HCI_SynchronousDataPacket(
connection_handle, packet_status, data_total_length, data
return cls(
connection_handle=connection_handle,
packet_status=packet_status,
data_total_length=data_total_length,
data=data,
)
def __bytes__(self) -> bytes:
@@ -7880,18 +8042,6 @@ class HCI_SynchronousDataPacket(HCI_Packet):
+ self.data
)
def __init__(
self,
connection_handle: int,
packet_status: int,
data_total_length: int,
data: bytes,
) -> None:
self.connection_handle = connection_handle
self.packet_status = packet_status
self.data_total_length = data_total_length
self.data = data
def __str__(self) -> str:
return (
f'{color("SCO", "blue")}: '
@@ -7909,7 +8059,7 @@ class HCI_IsoDataPacket(HCI_Packet):
See Bluetooth spec @ 5.4.5 HCI ISO Data Packets
'''
hci_packet_type: ClassVar[int] = HCI_ISO_DATA_PACKET
hci_packet_type = HCI_ISO_DATA_PACKET
connection_handle: int
data_total_length: int
+2 -2
View File
@@ -312,11 +312,11 @@ class HID(ABC, utils.EventEmitter):
def send_pdu_on_ctrl(self, msg: bytes) -> None:
assert self.l2cap_ctrl_channel
self.l2cap_ctrl_channel.send_pdu(msg)
self.l2cap_ctrl_channel.write(msg)
def send_pdu_on_intr(self, msg: bytes) -> None:
assert self.l2cap_intr_channel
self.l2cap_intr_channel.send_pdu(msg)
self.l2cap_intr_channel.write(msg)
def send_data(self, data: bytes) -> None:
if self.role == HID.Role.HOST:
+76 -22
View File
@@ -21,7 +21,6 @@ import asyncio
import collections
import dataclasses
import logging
import struct
from collections.abc import Awaitable, Callable
from typing import TYPE_CHECKING, Any, TypeVar, cast, overload
@@ -278,7 +277,7 @@ class Host(utils.EventEmitter):
hci.HCI_Read_Local_Version_Information_ReturnParameters | None
) = None
self.local_supported_commands = 0
self.local_le_features = 0
self.local_le_features = hci.LeFeatureMask(0) # LE features
self.local_lmp_features = hci.LmpFeatureMask(0) # Classic LMP features
self.suggested_max_tx_octets = 251 # Max allowed
self.suggested_max_tx_time = 2120 # Max allowed
@@ -348,17 +347,26 @@ class Host(utils.EventEmitter):
response1.supported_commands, 'little'
)
if self.supports_command(hci.HCI_LE_READ_LOCAL_SUPPORTED_FEATURES_COMMAND):
response2 = await self.send_sync_command(
hci.HCI_LE_Read_Local_Supported_Features_Command()
)
self.local_le_features = struct.unpack('<Q', response2.le_features)[0]
if self.supports_command(hci.HCI_READ_LOCAL_VERSION_INFORMATION_COMMAND):
self.local_version = await self.send_sync_command(
hci.HCI_Read_Local_Version_Information_Command()
)
if self.supports_command(hci.HCI_LE_READ_ALL_LOCAL_SUPPORTED_FEATURES_COMMAND):
response2 = await self.send_sync_command(
hci.HCI_LE_Read_All_Local_Supported_Features_Command()
)
self.local_le_features = hci.LeFeatureMask(
int.from_bytes(response2.le_features, 'little')
)
elif self.supports_command(hci.HCI_LE_READ_LOCAL_SUPPORTED_FEATURES_COMMAND):
response3 = await self.send_sync_command(
hci.HCI_LE_Read_Local_Supported_Features_Command()
)
self.local_le_features = hci.LeFeatureMask(
int.from_bytes(response3.le_features, 'little')
)
if self.supports_command(hci.HCI_READ_LOCAL_EXTENDED_FEATURES_COMMAND):
max_page_number = 0
page_number = 0
@@ -375,7 +383,6 @@ class Host(utils.EventEmitter):
max_page_number = response4.maximum_page_number
page_number += 1
self.local_lmp_features = hci.LmpFeatureMask(lmp_features)
elif self.supports_command(hci.HCI_READ_LOCAL_SUPPORTED_FEATURES_COMMAND):
response5 = await self.send_sync_command(
hci.HCI_Read_Local_Supported_Features_Command()
@@ -494,12 +501,17 @@ class Host(utils.EventEmitter):
hci.HCI_LE_TRANSMIT_POWER_REPORTING_EVENT,
hci.HCI_LE_BIGINFO_ADVERTISING_REPORT_EVENT,
hci.HCI_LE_SUBRATE_CHANGE_EVENT,
hci.HCI_LE_READ_ALL_REMOTE_FEATURES_COMPLETE_EVENT,
hci.HCI_LE_CS_READ_REMOTE_SUPPORTED_CAPABILITIES_COMPLETE_EVENT,
hci.HCI_LE_CS_PROCEDURE_ENABLE_COMPLETE_EVENT,
hci.HCI_LE_CS_SECURITY_ENABLE_COMPLETE_EVENT,
hci.HCI_LE_CS_CONFIG_COMPLETE_EVENT,
hci.HCI_LE_CS_SUBEVENT_RESULT_EVENT,
hci.HCI_LE_CS_SUBEVENT_RESULT_CONTINUE_EVENT,
hci.HCI_LE_MONITORED_ADVERTISERS_REPORT_EVENT,
hci.HCI_LE_FRAME_SPACE_UPDATE_COMPLETE_EVENT,
hci.HCI_LE_UTP_RECEIVE_EVENT,
hci.HCI_LE_CONNECTION_RATE_CHANGE_EVENT,
]
)
@@ -795,6 +807,16 @@ class Host(utils.EventEmitter):
)
packet_queue.enqueue(acl_packet, connection_handle)
def send_sco_sdu(self, connection_handle: int, sdu: bytes) -> None:
self.send_hci_packet(
hci.HCI_SynchronousDataPacket(
connection_handle=connection_handle,
packet_status=0,
data_total_length=len(sdu),
data=sdu,
)
)
def send_l2cap_pdu(self, connection_handle: int, cid: int, pdu: bytes) -> None:
self.send_acl_sdu(connection_handle, bytes(L2CAP_PDU(cid, pdu)))
@@ -879,16 +901,18 @@ class Host(utils.EventEmitter):
if self.local_supported_commands & mask
)
def supports_le_features(self, feature: hci.LeFeatureMask) -> bool:
return (self.local_le_features & feature) == feature
def supports_le_features(self, features: hci.LeFeatureMask) -> bool:
return (self.local_le_features & features) == features
def supports_lmp_features(self, feature: hci.LmpFeatureMask) -> bool:
return self.local_lmp_features & (feature) == feature
def supports_lmp_features(self, features: hci.LmpFeatureMask) -> bool:
return self.local_lmp_features & (features) == features
@property
def supported_le_features(self):
def supported_le_features(self) -> list[hci.LeFeature]:
return [
feature for feature in range(64) if self.local_le_features & (1 << feature)
feature
for feature in hci.LeFeature
if self.local_le_features & (1 << feature)
]
# Packet Sink protocol (packets coming from the controller via HCI)
@@ -1169,7 +1193,7 @@ class Host(utils.EventEmitter):
self, event: hci.HCI_LE_Connection_Update_Complete_Event
):
if (connection := self.connections.get(event.connection_handle)) is None:
logger.warning('!!! CONNECTION PARAMETERS UPDATE COMPLETE: unknown handle')
logger.warning('!!! CONNECTION UPDATE COMPLETE: unknown handle')
return
# Notify the client
@@ -1186,6 +1210,29 @@ class Host(utils.EventEmitter):
'connection_parameters_update_failure', connection.handle, event.status
)
def on_hci_le_connection_rate_change_event(
self, event: hci.HCI_LE_Connection_Rate_Change_Event
):
if (connection := self.connections.get(event.connection_handle)) is None:
logger.warning('!!! CONNECTION RATE CHANGE: unknown handle')
return
# Notify the client
if event.status == hci.HCI_SUCCESS:
self.emit(
'le_connection_rate_change',
connection.handle,
event.connection_interval,
event.subrate_factor,
event.peripheral_latency,
event.continuation_number,
event.supervision_timeout,
)
else:
self.emit(
'le_connection_rate_change_failure', connection.handle, event.status
)
def on_hci_le_phy_update_complete_event(
self, event: hci.HCI_LE_PHY_Update_Complete_Event
):
@@ -1745,12 +1792,13 @@ class Host(utils.EventEmitter):
self.emit(
'le_remote_features_failure', event.connection_handle, event.status
)
else:
self.emit(
'le_remote_features',
event.connection_handle,
int.from_bytes(event.le_features, 'little'),
)
return
self.emit(
'le_remote_features',
event.connection_handle,
hci.LeFeatureMask(int.from_bytes(event.le_features, 'little')),
)
def on_hci_le_cs_read_remote_supported_capabilities_complete_event(
self, event: hci.HCI_LE_CS_Read_Remote_Supported_Capabilities_Complete_Event
@@ -1783,6 +1831,12 @@ class Host(utils.EventEmitter):
self.emit('cs_subevent_result_continue', event)
def on_hci_le_subrate_change_event(self, event: hci.HCI_LE_Subrate_Change_Event):
if event.status != hci.HCI_SUCCESS:
self.emit(
'le_subrate_change_failure', event.connection_handle, event.status
)
return
self.emit(
'le_subrate_change',
event.connection_handle,
+3 -1
View File
@@ -1647,7 +1647,9 @@ class LeCreditBasedChannel(utils.EventEmitter):
self.connection_result = None
self.disconnection_result = None
self.drained = asyncio.Event()
self.att_mtu = 0 # Filled by GATT client or server later.
# Core Specification Vol 3, Part G, 5.3.1 ATT_MTU
# ATT_MTU shall be set to the minimum of the MTU field values of the two devices.
self.att_mtu = min(mtu, peer_mtu)
self.drained.set()
+1 -1
View File
@@ -278,7 +278,7 @@ class L2CAPService(L2CAPServicer):
if not l2cap_channel:
return SendResponse(error=COMMAND_NOT_UNDERSTOOD)
if isinstance(l2cap_channel, ClassicChannel):
l2cap_channel.send_pdu(request.data)
l2cap_channel.write(request.data)
else:
l2cap_channel.write(request.data)
return SendResponse(success=empty_pb2.Empty())
+1 -1
View File
@@ -800,7 +800,7 @@ class Multiplexer(utils.EventEmitter):
def send_frame(self, frame: RFCOMM_Frame) -> None:
logger.debug(f'>>> Multiplexer sending {frame}')
self.l2cap_channel.send_pdu(frame)
self.l2cap_channel.write(bytes(frame))
def on_pdu(self, pdu: bytes) -> None:
frame = RFCOMM_Frame.from_bytes(pdu)
+2 -2
View File
@@ -847,7 +847,7 @@ class Client:
self.pending_request = request
try:
self.channel.send_pdu(bytes(request))
self.channel.write(bytes(request))
return await self.pending_response
finally:
self.pending_request = None
@@ -1061,7 +1061,7 @@ class Server:
def send_response(self, response):
logger.debug(f'{color(">>> Sending SDP Response", "blue")}: {response}')
self.channel.send_pdu(response)
self.channel.write(response)
def match_services(self, search_pattern: DataElement) -> dict[int, Service]:
# Find the services for which the attributes in the pattern is a subset of the
+201
View File
@@ -0,0 +1,201 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import sys
from collections.abc import Callable
import bumble.logging
from bumble.core import BaseError
from bumble.device import Connection, Device
from bumble.hci import Address, LeFeatureMask
from bumble.transport import open_transport
# -----------------------------------------------------------------------------
DEFAULT_CENTRAL_ADDRESS = Address("F0:F0:F0:F0:F0:F0")
DEFAULT_PERIPHERAL_ADDRESS = Address("F1:F1:F1:F1:F1:F1")
# -----------------------------------------------------------------------------
async def run_as_central(
device: Device,
scenario: Callable | None,
) -> None:
# Connect to the peripheral
print(f'=== Connecting to {DEFAULT_PERIPHERAL_ADDRESS}...')
connection = await device.connect(DEFAULT_PERIPHERAL_ADDRESS)
print("=== Connected")
if scenario is not None:
await asyncio.sleep(1)
await scenario(connection)
await asyncio.get_running_loop().create_future()
async def run_as_peripheral(device: Device, scenario: Callable | None) -> None:
# Wait for a connection from the central
print(f'=== Advertising as {DEFAULT_PERIPHERAL_ADDRESS}...')
await device.start_advertising(auto_restart=True)
async def on_connection(connection: Connection) -> None:
assert scenario is not None
await asyncio.sleep(1)
await scenario(connection)
if scenario is not None:
device.on(Device.EVENT_CONNECTION, on_connection)
await asyncio.get_running_loop().create_future()
async def change_parameters(
connection: Connection,
parameter_request_procedure_supported: bool,
subrating_supported: bool,
shorter_connection_intervals_supported: bool,
) -> None:
if parameter_request_procedure_supported:
try:
print(">>> update_parameters(7.5, 200, 0, 4000)")
await connection.update_parameters(7.5, 200, 0, 4000)
await asyncio.sleep(3)
except BaseError as error:
print(f"Error: {error}")
if subrating_supported:
try:
print(">>> update_subrate(1, 2, 2, 1, 4000)")
await connection.update_subrate(1, 2, 2, 1, 4000)
await asyncio.sleep(3)
except BaseError as error:
print(f"Error: {error}")
if shorter_connection_intervals_supported:
try:
print(
">>> update_parameters_with_subrate(7.5, 200, 1, 1, 0, 0, 4000, 5, 1000)"
)
await connection.update_parameters_with_subrate(
7.5, 200, 1, 1, 0, 0, 4000, 5, 1000
)
await asyncio.sleep(3)
except BaseError as error:
print(f"Error: {error}")
try:
print(
">>> update_parameters_with_subrate(0.750, 5, 1, 1, 0, 0, 4000, 0.125, 1000)"
)
await connection.update_parameters_with_subrate(
0.750, 5, 1, 1, 0, 0, 4000, 0.125, 1000
)
await asyncio.sleep(3)
except BaseError as error:
print(f"Error: {error}")
print(">>> done")
def on_connection(connection: Connection) -> None:
print(f"+++ Connection established: {connection}")
def on_le_remote_features_change() -> None:
print(f'... LE Remote Features change: {connection.peer_le_features.name}')
connection.on(
connection.EVENT_LE_REMOTE_FEATURES_CHANGE, on_le_remote_features_change
)
def on_connection_parameters_change() -> None:
print(f'... LE Connection Parameters change: {connection.parameters}')
connection.on(
connection.EVENT_CONNECTION_PARAMETERS_UPDATE, on_connection_parameters_change
)
async def main() -> None:
if len(sys.argv) < 3:
print(
'Usage: run_connection_updates.py <transport-spec> '
'central|peripheral initiator|responder'
)
return
print('<<< connecting to HCI...')
async with await open_transport(sys.argv[1]) as hci_transport:
print('<<< connected')
role = sys.argv[2]
direction = sys.argv[3]
device = Device.with_hci(
role,
(
DEFAULT_CENTRAL_ADDRESS
if role == "central"
else DEFAULT_PERIPHERAL_ADDRESS
),
hci_transport.source,
hci_transport.sink,
)
device.le_subrate_enabled = True
device.le_shorter_connection_intervals_enabled = True
await device.power_on()
parameter_request_procedure_supported = device.supports_le_features(
LeFeatureMask.CONNECTION_PARAMETERS_REQUEST_PROCEDURE
)
print(
"Parameters Request Procedure supported: "
f"{parameter_request_procedure_supported}"
)
subrating_supported = device.supports_le_features(
LeFeatureMask.CONNECTION_SUBRATING
)
print(f"Subrating supported: {subrating_supported}")
shorter_connection_intervals_supported = device.supports_le_features(
LeFeatureMask.SHORTER_CONNECTION_INTERVALS
)
print(
"Shorter Connection Intervals supported: "
f"{shorter_connection_intervals_supported}"
)
device.on(Device.EVENT_CONNECTION, on_connection)
async def run(connection: Connection) -> None:
await change_parameters(
connection,
parameter_request_procedure_supported,
subrating_supported,
shorter_connection_intervals_supported,
)
scenario = run if direction == "initiator" else None
if role == "central":
await run_as_central(device, scenario)
else:
await run_as_peripheral(device, scenario)
# -----------------------------------------------------------------------------
bumble.logging.setup_basic_logging('DEBUG')
asyncio.run(main())
+3 -7
View File
@@ -100,13 +100,9 @@ def on_sco_packet(packet: hci.HCI_SynchronousDataPacket):
if source_file and (pcm_data := source_file.read(packet.data_total_length)):
assert ag_protocol
host = ag_protocol.dlc.multiplexer.l2cap_channel.connection.device.host
host.send_hci_packet(
hci.HCI_SynchronousDataPacket(
connection_handle=packet.connection_handle,
packet_status=0,
data_total_length=len(pcm_data),
data=pcm_data,
)
host.send_sco_sdu(
connection_handle=packet.connection_handle,
sdu=pcm_data,
)
+7 -2
View File
@@ -15,15 +15,20 @@ dependencies = [
"aiohttp ~= 3.8; platform_system!='Emscripten'",
"appdirs >= 1.4; platform_system!='Emscripten'",
"click >= 8.1.3; platform_system!='Emscripten'",
"cryptography >= 44.0.3; platform_system!='Emscripten'",
"cryptography >= 44.0.3; platform_system!='Emscripten' and platform_system!='Android'",
# Pyodide bundles a version of cryptography that is built for wasm, which may not match the
# versions available on PyPI. Relax the version requirement since it's better than being
# completely unable to import the package in case of version mismatch.
"cryptography >= 44.0.3; platform_system=='Emscripten'",
# Android wheels for cryptography are not yet available on PyPI, so chaquopy uses
# the builds from https://chaquo.com/pypi-13.1/cryptography/. But these are not regually
# updated. Relax the version requirement since it's better than being completely unable
# to import the package in case of version mismatch.
"cryptography >= 42.0.8; platform_system=='Android'",
"grpcio >= 1.62.1; platform_system!='Emscripten'",
"humanize >= 4.6.0; platform_system!='Emscripten'",
"libusb1 >= 2.0.1; platform_system!='Emscripten'",
"libusb-package == 1.0.26.1; platform_system!='Emscripten'",
"libusb-package == 1.0.26.1; platform_system!='Emscripten' and platform_system!='Android'",
"platformdirs >= 3.10.0; platform_system!='Emscripten'",
"prompt_toolkit >= 3.0.16; platform_system!='Emscripten'",
"prettytable >= 3.6.0; platform_system!='Emscripten'",
+3 -1
View File
@@ -619,7 +619,9 @@ async def test_le_request_subrate():
def on_le_subrate_change():
q.put_nowait(lambda: None)
devices.connections[0].on(Connection.EVENT_LE_SUBRATE_CHANGE, on_le_subrate_change)
devices.connections[0].on(
Connection.EVENT_CONNECTION_PARAMETERS_UPDATE, on_le_subrate_change
)
await devices[0].send_command(
hci.HCI_LE_Subrate_Request_Command(
+99 -1
View File
@@ -28,7 +28,7 @@ from unittest.mock import ANY, AsyncMock, Mock
import pytest
from typing_extensions import Self
from bumble import gatt_client, l2cap
from bumble import att, gatt_client, l2cap
from bumble.att import (
ATT_ATTRIBUTE_NOT_FOUND_ERROR,
ATT_PDU,
@@ -1638,6 +1638,104 @@ async def test_eatt_connection_failure():
await gatt_client.Client.connect_eatt(devices.connections[0])
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_read_multiple() -> None:
devices = await TwoDevices.create_with_connection()
characteristic1 = Characteristic(
'0001', Characteristic.Properties.READ, Characteristic.READABLE, b'1234'
)
characteristic2 = Characteristic(
'0002',
Characteristic.Properties.READ,
Characteristic.READABLE,
b'5678',
)
service = Service('0000', [characteristic1, characteristic2])
devices[1].add_service(service)
client = devices.connections[0].gatt_client
server = devices[1].gatt_server
await client.discover_services()
characteristics = await client.discover_characteristics(
[characteristic1.uuid, characteristic2.uuid], None
)
response = await client.send_request(
att.ATT_Read_Multiple_Request(
set_of_handles=[c.handle for c in characteristics]
)
)
assert isinstance(response, att.ATT_Read_Multiple_Response)
assert response.set_of_values == b'12345678'
response = await client.send_request(
att.ATT_Read_Multiple_Request(
set_of_handles=[
next(
handle
for handle in range(0x0001, 0xFFFF)
if not server.get_attribute(handle)
)
]
)
)
assert isinstance(response, att.ATT_Error_Response)
assert response.error_code == att.ATT_ATTRIBUTE_NOT_FOUND_ERROR
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_read_multiple_variable() -> None:
devices = await TwoDevices.create_with_connection()
characteristic1 = Characteristic(
'0001', Characteristic.Properties.READ, Characteristic.READABLE, b'1234'
)
characteristic2 = Characteristic(
'0002',
Characteristic.Properties.READ,
Characteristic.READABLE,
b'99',
)
service = Service('0000', [characteristic1, characteristic2])
devices[1].add_service(service)
client = devices.connections[0].gatt_client
server = devices[1].gatt_server
await client.discover_services()
characteristics = await client.discover_characteristics(
[characteristic1.uuid, characteristic2.uuid], None
)
response = await client.send_request(
att.ATT_Read_Multiple_Variable_Request(
set_of_handles=[c.handle for c in characteristics]
)
)
assert isinstance(response, att.ATT_Read_Multiple_Variable_Response)
assert response.length_value_tuple_list == [(4, b'1234'), (2, b'99')]
response = await client.send_request(
att.ATT_Read_Multiple_Variable_Request(
set_of_handles=[
next(
handle
for handle in range(0x0001, 0xFFFF)
if not server.get_attribute(handle)
)
]
)
)
assert isinstance(response, att.ATT_Error_Response)
assert response.error_code == att.ATT_ATTRIBUTE_NOT_FOUND_ERROR
# -----------------------------------------------------------------------------
if __name__ == '__main__':
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())