Compare commits

...

22 Commits

Author SHA1 Message Date
Gilles Boccon-Gibod 7a58f36020 add auracast doc 2025-02-24 09:10:03 -08:00
Gilles Boccon-Gibod ed0eb912c5 Merge pull request #650 from google/gbg/gatt-adapter-typing
new GATT adapter classes with proper typing support
2025-02-23 18:06:16 -08:00
Gilles Boccon-Gibod 752ce6c830 Merge pull request #657 from google/gbg/auracast-iso-data-path-refactor
use bis link API
2025-02-23 07:42:13 -08:00
Gilles Boccon-Gibod 8e509c18c9 remove unused import 2025-02-22 13:34:57 -08:00
Gilles Boccon-Gibod cc21ed27c7 use bis link API 2025-02-22 13:32:58 -08:00
Gilles Boccon-Gibod b932bafe6d Merge pull request #655 from markusjellitsch/fix/acl_packet_queue
FIX - acl_packet_queue.flush
2025-02-22 11:33:37 -08:00
markus 4e35aba033 fix acl_packet_queue flush when controller does not support HCI_READ_BUFFER_SIZE_COMMAND 2025-02-22 08:37:12 +01:00
zxzxwu 0060ee8ee2 Merge pull request #646 from zxzxwu/ad
Improve AdvertisingData type annotations
2025-02-22 02:52:09 +08:00
zxzxwu 3263d71f54 Merge pull request #654 from zxzxwu/init
Fix mutable default values
2025-02-22 02:51:11 +08:00
Josh Wu f321143837 Improve AdvertisingData type annotations
* Add overloads to provide better return type hints
* Make advertising data type enum so they can be considered constants
2025-02-21 17:12:14 +08:00
Josh Wu bac6f5baaf Fix mutable default values 2025-02-21 16:00:18 +08:00
Gilles Boccon-Gibod e027bcb57a Merge pull request #651 from google/gbg/update-lc3-and-rootcanal
update rootcanal and lc3 dependencies
2025-02-18 14:08:42 -08:00
Gilles Boccon-Gibod eeb9de31ed update dependencies 2025-02-18 12:52:57 -08:00
Gilles Boccon-Gibod 2c3af5b2bb Merge pull request #649 from google/gbg/async-read-phy
make connection phy async
2025-02-18 07:25:06 -08:00
Gilles Boccon-Gibod dfb92e8ed1 fix pandora connection waiting 2025-02-17 19:50:57 -08:00
Gilles Boccon-Gibod 73d2b54e30 make connection phy async 2025-02-17 19:24:18 -08:00
Gilles Boccon-Gibod 8315a60f24 Merge pull request #648 from pstrueb/fix/auracast_transmit
Add setup_data_path for iso queues
2025-02-17 17:22:47 -08:00
pstruebi 185d5fd577 reformatting 2025-02-17 20:11:17 +01:00
pstrueb ae5f9cf690 Remove little accident 2025-02-17 18:21:16 +01:00
pstrueb 4b66a38fe6 Refractoring again 2025-02-17 09:49:09 +01:00
pstrueb f526f549ee refractoring 2025-02-17 08:51:28 +01:00
pstrueb 8761129677 Add setup_data_path for iso queues 2025-02-16 14:38:05 +01:00
11 changed files with 610 additions and 283 deletions
+21 -32
View File
@@ -27,7 +27,6 @@ import logging
import os
import struct
from typing import (
cast,
Any,
AsyncGenerator,
Coroutine,
@@ -127,11 +126,9 @@ class BroadcastScanner(pyee.EventEmitter):
def update(self, advertisement: bumble.device.Advertisement) -> None:
self.rssi = advertisement.rssi
for service_data in advertisement.data.get_all(
core.AdvertisingData.SERVICE_DATA
core.AdvertisingData.Type.SERVICE_DATA_16_BIT_UUID
):
assert isinstance(service_data, tuple)
service_uuid, data = service_data
assert isinstance(data, bytes)
if service_uuid == gatt.GATT_PUBLIC_BROADCAST_ANNOUNCEMENT_SERVICE:
self.public_broadcast_announcement = (
@@ -145,16 +142,14 @@ class BroadcastScanner(pyee.EventEmitter):
)
continue
self.appearance = advertisement.data.get( # type: ignore[assignment]
core.AdvertisingData.APPEARANCE
self.appearance = advertisement.data.get(
core.AdvertisingData.Type.APPEARANCE
)
if manufacturer_data := advertisement.data.get(
core.AdvertisingData.MANUFACTURER_SPECIFIC_DATA
core.AdvertisingData.Type.MANUFACTURER_SPECIFIC_DATA
):
assert isinstance(manufacturer_data, tuple)
company_id = cast(int, manufacturer_data[0])
data = cast(bytes, manufacturer_data[1])
company_id, data = manufacturer_data
self.manufacturer_data = (
company_ids.COMPANY_IDENTIFIERS.get(
company_id, f'0x{company_id:04X}'
@@ -271,11 +266,9 @@ class BroadcastScanner(pyee.EventEmitter):
return
for service_data in advertisement.data.get_all(
core.AdvertisingData.SERVICE_DATA
core.AdvertisingData.Type.SERVICE_DATA_16_BIT_UUID
):
assert isinstance(service_data, tuple)
service_uuid, data = service_data
assert isinstance(data, bytes)
if service_uuid == gatt.GATT_BASIC_AUDIO_ANNOUNCEMENT_SERVICE:
self.basic_audio_announcement = (
@@ -316,24 +309,23 @@ class BroadcastScanner(pyee.EventEmitter):
def on_advertisement(self, advertisement: bumble.device.Advertisement) -> None:
if not (
ads := advertisement.data.get_all(
core.AdvertisingData.SERVICE_DATA_16_BIT_UUID
core.AdvertisingData.Type.SERVICE_DATA_16_BIT_UUID
)
) or not (
broadcast_audio_announcement := next(
(
ad
for ad in ads
if isinstance(ad, tuple)
and ad[0] == gatt.GATT_BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE
if ad[0] == gatt.GATT_BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE
),
None,
)
):
return
broadcast_name = advertisement.data.get(core.AdvertisingData.BROADCAST_NAME)
assert isinstance(broadcast_name, str) or broadcast_name is None
assert isinstance(broadcast_audio_announcement[1], bytes)
broadcast_name = advertisement.data.get_all(
core.AdvertisingData.Type.BROADCAST_NAME
)
if broadcast := self.broadcasts.get(advertisement.address):
broadcast.update(advertisement)
@@ -341,7 +333,7 @@ class BroadcastScanner(pyee.EventEmitter):
bumble.utils.AsyncRunner.spawn(
self.on_new_broadcast(
broadcast_name,
broadcast_name[0] if broadcast_name else None,
advertisement,
bap.BroadcastAudioAnnouncement.from_bytes(
broadcast_audio_announcement[1]
@@ -795,16 +787,8 @@ async def run_receive(
for i, bis_link in enumerate(big_sync.bis_links):
print(f'Setup ISO for BIS {bis_link.handle}')
bis_link.sink = functools.partial(sink, lc3_queues[i])
await device.send_command(
hci.HCI_LE_Setup_ISO_Data_Path_Command(
connection_handle=bis_link.handle,
data_path_direction=hci.HCI_LE_Setup_ISO_Data_Path_Command.Direction.CONTROLLER_TO_HOST,
data_path_id=0,
codec_id=hci.CodingFormat(codec_id=hci.CodecID.TRANSPARENT),
controller_delay=0,
codec_configuration=b'',
),
check_result=True,
await bis_link.setup_data_path(
direction=bis_link.Direction.CONTROLLER_TO_HOST
)
terminated = asyncio.Event()
@@ -964,10 +948,15 @@ async def run_transmit(
),
),
)
for bis_link in big.bis_links:
print(f'Setup ISO for BIS {bis_link.handle}')
await bis_link.setup_data_path(
direction=bis_link.Direction.HOST_TO_CONTROLLER
)
iso_queues = [
bumble.device.IsoPacketStream(big.bis_links[0], 64),
bumble.device.IsoPacketStream(big.bis_links[1], 64),
bumble.device.IsoPacketStream(bis_link, 64)
for bis_link in big.bis_links
]
def on_flow():
+13 -10
View File
@@ -104,15 +104,16 @@ def le_phy_name(phy_id):
)
def print_connection_phy(phy):
logging.info(
color('@@@ PHY: ', 'yellow') + f'TX:{le_phy_name(phy.tx_phy)}/'
f'RX:{le_phy_name(phy.rx_phy)}'
)
def print_connection(connection):
params = []
if connection.transport == BT_LE_TRANSPORT:
params.append(
'PHY='
f'TX:{le_phy_name(connection.phy.tx_phy)}/'
f'RX:{le_phy_name(connection.phy.rx_phy)}'
)
params.append(
'DL=('
f'TX:{connection.data_length[0]}/{connection.data_length[1]},'
@@ -1288,6 +1289,8 @@ class Central(Connection.Listener):
logging.info(color('### Connected', 'cyan'))
self.connection.listener = self
print_connection(self.connection)
phy = await self.connection.get_phy()
print_connection_phy(phy)
# Switch roles if needed.
if self.role_switch:
@@ -1345,8 +1348,8 @@ class Central(Connection.Listener):
def on_connection_parameters_update(self):
print_connection(self.connection)
def on_connection_phy_update(self):
print_connection(self.connection)
def on_connection_phy_update(self, phy):
print_connection_phy(phy)
def on_connection_att_mtu_update(self):
print_connection(self.connection)
@@ -1472,8 +1475,8 @@ class Peripheral(Device.Listener, Connection.Listener):
def on_connection_parameters_update(self):
print_connection(self.connection)
def on_connection_phy_update(self):
print_connection(self.connection)
def on_connection_phy_update(self, phy):
print_connection_phy(phy)
def on_connection_att_mtu_update(self):
print_connection(self.connection)
+19 -12
View File
@@ -22,7 +22,6 @@
import asyncio
import logging
import os
import random
import re
import humanize
from typing import Optional, Union
@@ -57,7 +56,13 @@ from bumble import __version__
import bumble.core
from bumble import colors
from bumble.core import UUID, AdvertisingData, BT_LE_TRANSPORT
from bumble.device import ConnectionParametersPreferences, Device, Connection, Peer
from bumble.device import (
ConnectionParametersPreferences,
ConnectionPHY,
Device,
Connection,
Peer,
)
from bumble.utils import AsyncRunner
from bumble.transport import open_transport_or_link
from bumble.gatt import Characteristic, Service, CharacteristicDeclaration, Descriptor
@@ -125,6 +130,7 @@ def parse_phys(phys):
# -----------------------------------------------------------------------------
class ConsoleApp:
connected_peer: Optional[Peer]
connection_phy: Optional[ConnectionPHY]
def __init__(self):
self.known_addresses = set()
@@ -132,6 +138,7 @@ class ConsoleApp:
self.known_local_attributes = []
self.device = None
self.connected_peer = None
self.connection_phy = None
self.top_tab = 'device'
self.monitor_rssi = False
self.connection_rssi = None
@@ -332,10 +339,10 @@ class ConsoleApp:
f'{connection.parameters.peripheral_latency}/'
f'{connection.parameters.supervision_timeout}'
)
if connection.transport == BT_LE_TRANSPORT:
if self.connection_phy is not None:
phy_state = (
f' RX={le_phy_name(connection.phy.rx_phy)}/'
f'TX={le_phy_name(connection.phy.tx_phy)}'
f' RX={le_phy_name(self.connection_phy.rx_phy)}/'
f'TX={le_phy_name(self.connection_phy.tx_phy)}'
)
else:
phy_state = ''
@@ -654,11 +661,12 @@ class ConsoleApp:
self.append_to_output('connecting...')
try:
await self.device.connect(
connection = await self.device.connect(
params[0],
connection_parameters_preferences=connection_parameters_preferences,
timeout=DEFAULT_CONNECTION_TIMEOUT,
)
self.connection_phy = await connection.get_phy()
self.top_tab = 'services'
except bumble.core.TimeoutError:
self.show_error('connection timed out')
@@ -838,8 +846,8 @@ class ConsoleApp:
phy = await self.connected_peer.connection.get_phy()
self.append_to_output(
f'PHY: RX={HCI_Constant.le_phy_name(phy[0])}, '
f'TX={HCI_Constant.le_phy_name(phy[1])}'
f'PHY: RX={HCI_Constant.le_phy_name(phy.rx_phy)}, '
f'TX={HCI_Constant.le_phy_name(phy.tx_phy)}'
)
async def do_request_mtu(self, params):
@@ -1076,10 +1084,9 @@ class DeviceListener(Device.Listener, Connection.Listener):
f'{self.app.connected_peer.connection.parameters}'
)
def on_connection_phy_update(self):
self.app.append_to_output(
f'connection phy update: {self.app.connected_peer.connection.phy}'
)
def on_connection_phy_update(self, phy):
self.app.connection_phy = phy
self.app.append_to_output(f'connection phy update: {phy}')
def on_connection_att_mtu_update(self):
self.app.append_to_output(
+300 -154
View File
@@ -1,4 +1,4 @@
# Copyright 2021-2022 Google LLC
# Copyright 2021-2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -16,10 +16,10 @@
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import dataclasses
import enum
import struct
from typing import List, Optional, Tuple, Union, cast, Dict
from typing import cast, overload, Literal, Union, Optional
from typing_extensions import Self
from bumble.company_ids import COMPANY_IDENTIFIERS
@@ -57,7 +57,7 @@ def bit_flags_to_strings(bits, bit_flag_names):
return names
def name_or_number(dictionary: Dict[int, str], number: int, width: int = 2) -> str:
def name_or_number(dictionary: dict[int, str], number: int, width: int = 2) -> str:
name = dictionary.get(number)
if name is not None:
return name
@@ -200,7 +200,7 @@ class UUID:
'''
BASE_UUID = bytes.fromhex('00001000800000805F9B34FB')[::-1] # little-endian
UUIDS: List[UUID] = [] # Registry of all instances created
UUIDS: list[UUID] = [] # Registry of all instances created
uuid_bytes: bytes
name: Optional[str]
@@ -259,11 +259,11 @@ class UUID:
return cls.from_bytes(struct.pack('<I', uuid_32), name)
@classmethod
def parse_uuid(cls, uuid_as_bytes: bytes, offset: int) -> Tuple[int, UUID]:
def parse_uuid(cls, uuid_as_bytes: bytes, offset: int) -> tuple[int, UUID]:
return len(uuid_as_bytes), cls.from_bytes(uuid_as_bytes[offset:])
@classmethod
def parse_uuid_2(cls, uuid_as_bytes: bytes, offset: int) -> Tuple[int, UUID]:
def parse_uuid_2(cls, uuid_as_bytes: bytes, offset: int) -> tuple[int, UUID]:
return offset + 2, cls.from_bytes(uuid_as_bytes[offset : offset + 2])
def to_bytes(self, force_128: bool = False) -> bytes:
@@ -1280,13 +1280,13 @@ class Appearance:
# Advertising Data
# -----------------------------------------------------------------------------
AdvertisingDataObject = Union[
List[UUID],
Tuple[UUID, bytes],
list[UUID],
tuple[UUID, bytes],
bytes,
str,
int,
Tuple[int, int],
Tuple[int, bytes],
tuple[int, int],
tuple[int, bytes],
Appearance,
]
@@ -1295,116 +1295,116 @@ class AdvertisingData:
# fmt: off
# pylint: disable=line-too-long
FLAGS = 0x01
INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS = 0x02
COMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS = 0x03
INCOMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS = 0x04
COMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS = 0x05
INCOMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS = 0x06
COMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS = 0x07
SHORTENED_LOCAL_NAME = 0x08
COMPLETE_LOCAL_NAME = 0x09
TX_POWER_LEVEL = 0x0A
CLASS_OF_DEVICE = 0x0D
SIMPLE_PAIRING_HASH_C = 0x0E
SIMPLE_PAIRING_HASH_C_192 = 0x0E
SIMPLE_PAIRING_RANDOMIZER_R = 0x0F
SIMPLE_PAIRING_RANDOMIZER_R_192 = 0x0F
DEVICE_ID = 0x10
SECURITY_MANAGER_TK_VALUE = 0x10
SECURITY_MANAGER_OUT_OF_BAND_FLAGS = 0x11
PERIPHERAL_CONNECTION_INTERVAL_RANGE = 0x12
LIST_OF_16_BIT_SERVICE_SOLICITATION_UUIDS = 0x14
LIST_OF_128_BIT_SERVICE_SOLICITATION_UUIDS = 0x15
SERVICE_DATA = 0x16
SERVICE_DATA_16_BIT_UUID = 0x16
PUBLIC_TARGET_ADDRESS = 0x17
RANDOM_TARGET_ADDRESS = 0x18
APPEARANCE = 0x19
ADVERTISING_INTERVAL = 0x1A
LE_BLUETOOTH_DEVICE_ADDRESS = 0x1B
LE_ROLE = 0x1C
SIMPLE_PAIRING_HASH_C_256 = 0x1D
SIMPLE_PAIRING_RANDOMIZER_R_256 = 0x1E
LIST_OF_32_BIT_SERVICE_SOLICITATION_UUIDS = 0x1F
SERVICE_DATA_32_BIT_UUID = 0x20
SERVICE_DATA_128_BIT_UUID = 0x21
LE_SECURE_CONNECTIONS_CONFIRMATION_VALUE = 0x22
LE_SECURE_CONNECTIONS_RANDOM_VALUE = 0x23
URI = 0x24
INDOOR_POSITIONING = 0x25
TRANSPORT_DISCOVERY_DATA = 0x26
LE_SUPPORTED_FEATURES = 0x27
CHANNEL_MAP_UPDATE_INDICATION = 0x28
PB_ADV = 0x29
MESH_MESSAGE = 0x2A
MESH_BEACON = 0x2B
BIGINFO = 0x2C
BROADCAST_CODE = 0x2D
RESOLVABLE_SET_IDENTIFIER = 0x2E
ADVERTISING_INTERVAL_LONG = 0x2F
BROADCAST_NAME = 0x30
ENCRYPTED_ADVERTISING_DATA = 0X31
PERIODIC_ADVERTISING_RESPONSE_TIMING_INFORMATION = 0X32
ELECTRONIC_SHELF_LABEL = 0X34
THREE_D_INFORMATION_DATA = 0x3D
MANUFACTURER_SPECIFIC_DATA = 0xFF
class Type(OpenIntEnum):
FLAGS = 0x01
INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS = 0x02
COMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS = 0x03
INCOMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS = 0x04
COMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS = 0x05
INCOMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS = 0x06
COMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS = 0x07
SHORTENED_LOCAL_NAME = 0x08
COMPLETE_LOCAL_NAME = 0x09
TX_POWER_LEVEL = 0x0A
CLASS_OF_DEVICE = 0x0D
SIMPLE_PAIRING_HASH_C = 0x0E
SIMPLE_PAIRING_HASH_C_192 = 0x0E
SIMPLE_PAIRING_RANDOMIZER_R = 0x0F
SIMPLE_PAIRING_RANDOMIZER_R_192 = 0x0F
DEVICE_ID = 0x10
SECURITY_MANAGER_TK_VALUE = 0x10
SECURITY_MANAGER_OUT_OF_BAND_FLAGS = 0x11
PERIPHERAL_CONNECTION_INTERVAL_RANGE = 0x12
LIST_OF_16_BIT_SERVICE_SOLICITATION_UUIDS = 0x14
LIST_OF_128_BIT_SERVICE_SOLICITATION_UUIDS = 0x15
SERVICE_DATA_16_BIT_UUID = 0x16
PUBLIC_TARGET_ADDRESS = 0x17
RANDOM_TARGET_ADDRESS = 0x18
APPEARANCE = 0x19
ADVERTISING_INTERVAL = 0x1A
LE_BLUETOOTH_DEVICE_ADDRESS = 0x1B
LE_ROLE = 0x1C
SIMPLE_PAIRING_HASH_C_256 = 0x1D
SIMPLE_PAIRING_RANDOMIZER_R_256 = 0x1E
LIST_OF_32_BIT_SERVICE_SOLICITATION_UUIDS = 0x1F
SERVICE_DATA_32_BIT_UUID = 0x20
SERVICE_DATA_128_BIT_UUID = 0x21
LE_SECURE_CONNECTIONS_CONFIRMATION_VALUE = 0x22
LE_SECURE_CONNECTIONS_RANDOM_VALUE = 0x23
URI = 0x24
INDOOR_POSITIONING = 0x25
TRANSPORT_DISCOVERY_DATA = 0x26
LE_SUPPORTED_FEATURES = 0x27
CHANNEL_MAP_UPDATE_INDICATION = 0x28
PB_ADV = 0x29
MESH_MESSAGE = 0x2A
MESH_BEACON = 0x2B
BIGINFO = 0x2C
BROADCAST_CODE = 0x2D
RESOLVABLE_SET_IDENTIFIER = 0x2E
ADVERTISING_INTERVAL_LONG = 0x2F
BROADCAST_NAME = 0x30
ENCRYPTED_ADVERTISING_DATA = 0x31
PERIODIC_ADVERTISING_RESPONSE_TIMING_INFORMATION = 0x32
ELECTRONIC_SHELF_LABEL = 0x34
THREE_D_INFORMATION_DATA = 0x3D
MANUFACTURER_SPECIFIC_DATA = 0xFF
AD_TYPE_NAMES = {
FLAGS: 'FLAGS',
INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS: 'INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS',
COMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS: 'COMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS',
INCOMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS: 'INCOMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS',
COMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS: 'COMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS',
INCOMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS: 'INCOMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS',
COMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS: 'COMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS',
SHORTENED_LOCAL_NAME: 'SHORTENED_LOCAL_NAME',
COMPLETE_LOCAL_NAME: 'COMPLETE_LOCAL_NAME',
TX_POWER_LEVEL: 'TX_POWER_LEVEL',
CLASS_OF_DEVICE: 'CLASS_OF_DEVICE',
SIMPLE_PAIRING_HASH_C: 'SIMPLE_PAIRING_HASH_C',
SIMPLE_PAIRING_HASH_C_192: 'SIMPLE_PAIRING_HASH_C_192',
SIMPLE_PAIRING_RANDOMIZER_R: 'SIMPLE_PAIRING_RANDOMIZER_R',
SIMPLE_PAIRING_RANDOMIZER_R_192: 'SIMPLE_PAIRING_RANDOMIZER_R_192',
DEVICE_ID: 'DEVICE_ID',
SECURITY_MANAGER_TK_VALUE: 'SECURITY_MANAGER_TK_VALUE',
SECURITY_MANAGER_OUT_OF_BAND_FLAGS: 'SECURITY_MANAGER_OUT_OF_BAND_FLAGS',
PERIPHERAL_CONNECTION_INTERVAL_RANGE: 'PERIPHERAL_CONNECTION_INTERVAL_RANGE',
LIST_OF_16_BIT_SERVICE_SOLICITATION_UUIDS: 'LIST_OF_16_BIT_SERVICE_SOLICITATION_UUIDS',
LIST_OF_128_BIT_SERVICE_SOLICITATION_UUIDS: 'LIST_OF_128_BIT_SERVICE_SOLICITATION_UUIDS',
SERVICE_DATA_16_BIT_UUID: 'SERVICE_DATA_16_BIT_UUID',
PUBLIC_TARGET_ADDRESS: 'PUBLIC_TARGET_ADDRESS',
RANDOM_TARGET_ADDRESS: 'RANDOM_TARGET_ADDRESS',
APPEARANCE: 'APPEARANCE',
ADVERTISING_INTERVAL: 'ADVERTISING_INTERVAL',
LE_BLUETOOTH_DEVICE_ADDRESS: 'LE_BLUETOOTH_DEVICE_ADDRESS',
LE_ROLE: 'LE_ROLE',
SIMPLE_PAIRING_HASH_C_256: 'SIMPLE_PAIRING_HASH_C_256',
SIMPLE_PAIRING_RANDOMIZER_R_256: 'SIMPLE_PAIRING_RANDOMIZER_R_256',
LIST_OF_32_BIT_SERVICE_SOLICITATION_UUIDS: 'LIST_OF_32_BIT_SERVICE_SOLICITATION_UUIDS',
SERVICE_DATA_32_BIT_UUID: 'SERVICE_DATA_32_BIT_UUID',
SERVICE_DATA_128_BIT_UUID: 'SERVICE_DATA_128_BIT_UUID',
LE_SECURE_CONNECTIONS_CONFIRMATION_VALUE: 'LE_SECURE_CONNECTIONS_CONFIRMATION_VALUE',
LE_SECURE_CONNECTIONS_RANDOM_VALUE: 'LE_SECURE_CONNECTIONS_RANDOM_VALUE',
URI: 'URI',
INDOOR_POSITIONING: 'INDOOR_POSITIONING',
TRANSPORT_DISCOVERY_DATA: 'TRANSPORT_DISCOVERY_DATA',
LE_SUPPORTED_FEATURES: 'LE_SUPPORTED_FEATURES',
CHANNEL_MAP_UPDATE_INDICATION: 'CHANNEL_MAP_UPDATE_INDICATION',
PB_ADV: 'PB_ADV',
MESH_MESSAGE: 'MESH_MESSAGE',
MESH_BEACON: 'MESH_BEACON',
BIGINFO: 'BIGINFO',
BROADCAST_CODE: 'BROADCAST_CODE',
RESOLVABLE_SET_IDENTIFIER: 'RESOLVABLE_SET_IDENTIFIER',
ADVERTISING_INTERVAL_LONG: 'ADVERTISING_INTERVAL_LONG',
BROADCAST_NAME: 'BROADCAST_NAME',
ENCRYPTED_ADVERTISING_DATA: 'ENCRYPTED_ADVERTISING_DATA',
PERIODIC_ADVERTISING_RESPONSE_TIMING_INFORMATION: 'PERIODIC_ADVERTISING_RESPONSE_TIMING_INFORMATION',
ELECTRONIC_SHELF_LABEL: 'ELECTRONIC_SHELF_LABEL',
THREE_D_INFORMATION_DATA: 'THREE_D_INFORMATION_DATA',
MANUFACTURER_SPECIFIC_DATA: 'MANUFACTURER_SPECIFIC_DATA'
}
# For backward-compatibility
FLAGS = Type.FLAGS
INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS = Type.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS
COMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS = Type.COMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS
INCOMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS = Type.INCOMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS
COMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS = Type.COMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS
INCOMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS = Type.INCOMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS
COMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS = Type.COMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS
SHORTENED_LOCAL_NAME = Type.SHORTENED_LOCAL_NAME
COMPLETE_LOCAL_NAME = Type.COMPLETE_LOCAL_NAME
TX_POWER_LEVEL = Type.TX_POWER_LEVEL
CLASS_OF_DEVICE = Type.CLASS_OF_DEVICE
SIMPLE_PAIRING_HASH_C = Type.SIMPLE_PAIRING_HASH_C
SIMPLE_PAIRING_HASH_C_192 = Type.SIMPLE_PAIRING_HASH_C_192
SIMPLE_PAIRING_RANDOMIZER_R = Type.SIMPLE_PAIRING_RANDOMIZER_R
SIMPLE_PAIRING_RANDOMIZER_R_192 = Type.SIMPLE_PAIRING_RANDOMIZER_R_192
DEVICE_ID = Type.DEVICE_ID
SECURITY_MANAGER_TK_VALUE = Type.SECURITY_MANAGER_TK_VALUE
SECURITY_MANAGER_OUT_OF_BAND_FLAGS = Type.SECURITY_MANAGER_OUT_OF_BAND_FLAGS
PERIPHERAL_CONNECTION_INTERVAL_RANGE = Type.PERIPHERAL_CONNECTION_INTERVAL_RANGE
LIST_OF_16_BIT_SERVICE_SOLICITATION_UUIDS = Type.LIST_OF_16_BIT_SERVICE_SOLICITATION_UUIDS
LIST_OF_128_BIT_SERVICE_SOLICITATION_UUIDS = Type.LIST_OF_128_BIT_SERVICE_SOLICITATION_UUIDS
SERVICE_DATA = Type.SERVICE_DATA_16_BIT_UUID
SERVICE_DATA_16_BIT_UUID = Type.SERVICE_DATA_16_BIT_UUID
PUBLIC_TARGET_ADDRESS = Type.PUBLIC_TARGET_ADDRESS
RANDOM_TARGET_ADDRESS = Type.RANDOM_TARGET_ADDRESS
APPEARANCE = Type.APPEARANCE
ADVERTISING_INTERVAL = Type.ADVERTISING_INTERVAL
LE_BLUETOOTH_DEVICE_ADDRESS = Type.LE_BLUETOOTH_DEVICE_ADDRESS
LE_ROLE = Type.LE_ROLE
SIMPLE_PAIRING_HASH_C_256 = Type.SIMPLE_PAIRING_HASH_C_256
SIMPLE_PAIRING_RANDOMIZER_R_256 = Type.SIMPLE_PAIRING_RANDOMIZER_R_256
LIST_OF_32_BIT_SERVICE_SOLICITATION_UUIDS = Type.LIST_OF_32_BIT_SERVICE_SOLICITATION_UUIDS
SERVICE_DATA_32_BIT_UUID = Type.SERVICE_DATA_32_BIT_UUID
SERVICE_DATA_128_BIT_UUID = Type.SERVICE_DATA_128_BIT_UUID
LE_SECURE_CONNECTIONS_CONFIRMATION_VALUE = Type.LE_SECURE_CONNECTIONS_CONFIRMATION_VALUE
LE_SECURE_CONNECTIONS_RANDOM_VALUE = Type.LE_SECURE_CONNECTIONS_RANDOM_VALUE
URI = Type.URI
INDOOR_POSITIONING = Type.INDOOR_POSITIONING
TRANSPORT_DISCOVERY_DATA = Type.TRANSPORT_DISCOVERY_DATA
LE_SUPPORTED_FEATURES = Type.LE_SUPPORTED_FEATURES
CHANNEL_MAP_UPDATE_INDICATION = Type.CHANNEL_MAP_UPDATE_INDICATION
PB_ADV = Type.PB_ADV
MESH_MESSAGE = Type.MESH_MESSAGE
MESH_BEACON = Type.MESH_BEACON
BIGINFO = Type.BIGINFO
BROADCAST_CODE = Type.BROADCAST_CODE
RESOLVABLE_SET_IDENTIFIER = Type.RESOLVABLE_SET_IDENTIFIER
ADVERTISING_INTERVAL_LONG = Type.ADVERTISING_INTERVAL_LONG
BROADCAST_NAME = Type.BROADCAST_NAME
ENCRYPTED_ADVERTISING_DATA = Type.ENCRYPTED_ADVERTISING_DATA
PERIODIC_ADVERTISING_RESPONSE_TIMING_INFORMATION = Type.PERIODIC_ADVERTISING_RESPONSE_TIMING_INFORMATION
ELECTRONIC_SHELF_LABEL = Type.ELECTRONIC_SHELF_LABEL
THREE_D_INFORMATION_DATA = Type.THREE_D_INFORMATION_DATA
MANUFACTURER_SPECIFIC_DATA = Type.MANUFACTURER_SPECIFIC_DATA
LE_LIMITED_DISCOVERABLE_MODE_FLAG = 0x01
LE_GENERAL_DISCOVERABLE_MODE_FLAG = 0x02
@@ -1412,12 +1412,12 @@ class AdvertisingData:
BR_EDR_CONTROLLER_FLAG = 0x08
BR_EDR_HOST_FLAG = 0x10
ad_structures: List[Tuple[int, bytes]]
ad_structures: list[tuple[int, bytes]]
# fmt: on
# pylint: enable=line-too-long
def __init__(self, ad_structures: Optional[List[Tuple[int, bytes]]] = None) -> None:
def __init__(self, ad_structures: Optional[list[tuple[int, bytes]]] = None) -> None:
if ad_structures is None:
ad_structures = []
self.ad_structures = ad_structures[:]
@@ -1444,7 +1444,7 @@ class AdvertisingData:
return ','.join(bit_flags_to_strings(flags, flag_names))
@staticmethod
def uuid_list_to_objects(ad_data: bytes, uuid_size: int) -> List[UUID]:
def uuid_list_to_objects(ad_data: bytes, uuid_size: int) -> list[UUID]:
uuids = []
offset = 0
while (offset + uuid_size) <= len(ad_data):
@@ -1461,8 +1461,8 @@ class AdvertisingData:
]
)
@staticmethod
def ad_data_to_string(ad_type, ad_data):
@classmethod
def ad_data_to_string(cls, ad_type: int, ad_data: bytes) -> str:
if ad_type == AdvertisingData.FLAGS:
ad_type_str = 'Flags'
ad_data_str = AdvertisingData.flags_to_string(ad_data[0], short=True)
@@ -1521,72 +1521,72 @@ class AdvertisingData:
ad_type_str = 'Broadcast Name'
ad_data_str = ad_data.decode('utf-8')
else:
ad_type_str = AdvertisingData.AD_TYPE_NAMES.get(ad_type, f'0x{ad_type:02X}')
ad_type_str = AdvertisingData.Type(ad_type).name
ad_data_str = ad_data.hex()
return f'[{ad_type_str}]: {ad_data_str}'
# pylint: disable=too-many-return-statements
@staticmethod
def ad_data_to_object(ad_type: int, ad_data: bytes) -> AdvertisingDataObject:
@classmethod
def ad_data_to_object(cls, ad_type: int, ad_data: bytes) -> AdvertisingDataObject:
if ad_type in (
AdvertisingData.COMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.LIST_OF_16_BIT_SERVICE_SOLICITATION_UUIDS,
AdvertisingData.Type.COMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.Type.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.Type.LIST_OF_16_BIT_SERVICE_SOLICITATION_UUIDS,
):
return AdvertisingData.uuid_list_to_objects(ad_data, 2)
if ad_type in (
AdvertisingData.COMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.INCOMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.LIST_OF_32_BIT_SERVICE_SOLICITATION_UUIDS,
AdvertisingData.Type.COMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.Type.INCOMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.Type.LIST_OF_32_BIT_SERVICE_SOLICITATION_UUIDS,
):
return AdvertisingData.uuid_list_to_objects(ad_data, 4)
if ad_type in (
AdvertisingData.COMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.INCOMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.LIST_OF_128_BIT_SERVICE_SOLICITATION_UUIDS,
AdvertisingData.Type.COMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.Type.INCOMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.Type.LIST_OF_128_BIT_SERVICE_SOLICITATION_UUIDS,
):
return AdvertisingData.uuid_list_to_objects(ad_data, 16)
if ad_type == AdvertisingData.SERVICE_DATA_16_BIT_UUID:
if ad_type == AdvertisingData.Type.SERVICE_DATA_16_BIT_UUID:
return (UUID.from_bytes(ad_data[:2]), ad_data[2:])
if ad_type == AdvertisingData.SERVICE_DATA_32_BIT_UUID:
if ad_type == AdvertisingData.Type.SERVICE_DATA_32_BIT_UUID:
return (UUID.from_bytes(ad_data[:4]), ad_data[4:])
if ad_type == AdvertisingData.SERVICE_DATA_128_BIT_UUID:
if ad_type == AdvertisingData.Type.SERVICE_DATA_128_BIT_UUID:
return (UUID.from_bytes(ad_data[:16]), ad_data[16:])
if ad_type in (
AdvertisingData.SHORTENED_LOCAL_NAME,
AdvertisingData.COMPLETE_LOCAL_NAME,
AdvertisingData.URI,
AdvertisingData.BROADCAST_NAME,
AdvertisingData.Type.SHORTENED_LOCAL_NAME,
AdvertisingData.Type.COMPLETE_LOCAL_NAME,
AdvertisingData.Type.URI,
AdvertisingData.Type.BROADCAST_NAME,
):
return ad_data.decode("utf-8")
if ad_type in (AdvertisingData.TX_POWER_LEVEL, AdvertisingData.FLAGS):
if ad_type in (AdvertisingData.Type.TX_POWER_LEVEL, AdvertisingData.Type.FLAGS):
return cast(int, struct.unpack('B', ad_data)[0])
if ad_type in (AdvertisingData.ADVERTISING_INTERVAL,):
if ad_type in (AdvertisingData.Type.ADVERTISING_INTERVAL,):
return cast(int, struct.unpack('<H', ad_data)[0])
if ad_type == AdvertisingData.CLASS_OF_DEVICE:
if ad_type == AdvertisingData.Type.CLASS_OF_DEVICE:
return cast(int, struct.unpack('<I', bytes([*ad_data, 0]))[0])
if ad_type == AdvertisingData.PERIPHERAL_CONNECTION_INTERVAL_RANGE:
return cast(Tuple[int, int], struct.unpack('<HH', ad_data))
if ad_type == AdvertisingData.Type.PERIPHERAL_CONNECTION_INTERVAL_RANGE:
return cast(tuple[int, int], struct.unpack('<HH', ad_data))
if ad_type == AdvertisingData.MANUFACTURER_SPECIFIC_DATA:
return (cast(int, struct.unpack_from('<H', ad_data, 0)[0]), ad_data[2:])
if ad_type == AdvertisingData.APPEARANCE:
if ad_type == AdvertisingData.Type.APPEARANCE:
return Appearance.from_int(
cast(int, struct.unpack_from('<H', ad_data, 0)[0])
)
if ad_type == AdvertisingData.Type.MANUFACTURER_SPECIFIC_DATA:
return (cast(int, struct.unpack_from('<H', ad_data, 0)[0]), ad_data[2:])
return ad_data
def append(self, data: bytes) -> None:
@@ -1600,7 +1600,80 @@ class AdvertisingData:
self.ad_structures.append((ad_type, ad_data))
offset += length
def get_all(self, type_id: int, raw: bool = False) -> List[AdvertisingDataObject]:
@overload
def get_all(
self,
type_id: Literal[
AdvertisingData.Type.COMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.Type.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.Type.LIST_OF_16_BIT_SERVICE_SOLICITATION_UUIDS,
AdvertisingData.Type.COMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.Type.INCOMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.Type.LIST_OF_32_BIT_SERVICE_SOLICITATION_UUIDS,
AdvertisingData.Type.COMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.Type.INCOMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.Type.LIST_OF_128_BIT_SERVICE_SOLICITATION_UUIDS,
],
raw: Literal[False] = False,
) -> list[list[UUID]]: ...
@overload
def get_all(
self,
type_id: Literal[
AdvertisingData.Type.SERVICE_DATA_16_BIT_UUID,
AdvertisingData.Type.SERVICE_DATA_32_BIT_UUID,
AdvertisingData.Type.SERVICE_DATA_128_BIT_UUID,
],
raw: Literal[False] = False,
) -> list[tuple[UUID, bytes]]: ...
@overload
def get_all(
self,
type_id: Literal[
AdvertisingData.Type.SHORTENED_LOCAL_NAME,
AdvertisingData.Type.COMPLETE_LOCAL_NAME,
AdvertisingData.Type.URI,
AdvertisingData.Type.BROADCAST_NAME,
],
raw: Literal[False] = False,
) -> list[str]: ...
@overload
def get_all(
self,
type_id: Literal[
AdvertisingData.Type.TX_POWER_LEVEL,
AdvertisingData.Type.FLAGS,
AdvertisingData.Type.ADVERTISING_INTERVAL,
AdvertisingData.Type.CLASS_OF_DEVICE,
],
raw: Literal[False] = False,
) -> list[int]: ...
@overload
def get_all(
self,
type_id: Literal[AdvertisingData.Type.PERIPHERAL_CONNECTION_INTERVAL_RANGE,],
raw: Literal[False] = False,
) -> list[tuple[int, int]]: ...
@overload
def get_all(
self,
type_id: Literal[AdvertisingData.Type.MANUFACTURER_SPECIFIC_DATA,],
raw: Literal[False] = False,
) -> list[tuple[int, bytes]]: ...
@overload
def get_all(
self,
type_id: Literal[AdvertisingData.Type.APPEARANCE,],
raw: Literal[False] = False,
) -> list[Appearance]: ...
@overload
def get_all(self, type_id: int, raw: Literal[True]) -> list[bytes]: ...
@overload
def get_all(
self, type_id: int, raw: bool = False
) -> list[AdvertisingDataObject]: ...
def get_all(self, type_id: int, raw: bool = False) -> list[AdvertisingDataObject]: # type: ignore[misc]
'''
Get Advertising Data Structure(s) with a given type
@@ -1612,6 +1685,79 @@ class AdvertisingData:
return [process_ad_data(ad[1]) for ad in self.ad_structures if ad[0] == type_id]
@overload
def get(
self,
type_id: Literal[
AdvertisingData.Type.COMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.Type.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.Type.LIST_OF_16_BIT_SERVICE_SOLICITATION_UUIDS,
AdvertisingData.Type.COMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.Type.INCOMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.Type.LIST_OF_32_BIT_SERVICE_SOLICITATION_UUIDS,
AdvertisingData.Type.COMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.Type.INCOMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.Type.LIST_OF_128_BIT_SERVICE_SOLICITATION_UUIDS,
],
raw: Literal[False] = False,
) -> Optional[list[UUID]]: ...
@overload
def get(
self,
type_id: Literal[
AdvertisingData.Type.SERVICE_DATA_16_BIT_UUID,
AdvertisingData.Type.SERVICE_DATA_32_BIT_UUID,
AdvertisingData.Type.SERVICE_DATA_128_BIT_UUID,
],
raw: Literal[False] = False,
) -> Optional[tuple[UUID, bytes]]: ...
@overload
def get(
self,
type_id: Literal[
AdvertisingData.Type.SHORTENED_LOCAL_NAME,
AdvertisingData.Type.COMPLETE_LOCAL_NAME,
AdvertisingData.Type.URI,
AdvertisingData.Type.BROADCAST_NAME,
],
raw: Literal[False] = False,
) -> Optional[Optional[str]]: ...
@overload
def get(
self,
type_id: Literal[
AdvertisingData.Type.TX_POWER_LEVEL,
AdvertisingData.Type.FLAGS,
AdvertisingData.Type.ADVERTISING_INTERVAL,
AdvertisingData.Type.CLASS_OF_DEVICE,
],
raw: Literal[False] = False,
) -> Optional[int]: ...
@overload
def get(
self,
type_id: Literal[AdvertisingData.Type.PERIPHERAL_CONNECTION_INTERVAL_RANGE,],
raw: Literal[False] = False,
) -> Optional[tuple[int, int]]: ...
@overload
def get(
self,
type_id: Literal[AdvertisingData.Type.MANUFACTURER_SPECIFIC_DATA,],
raw: Literal[False] = False,
) -> Optional[tuple[int, bytes]]: ...
@overload
def get(
self,
type_id: Literal[AdvertisingData.Type.APPEARANCE,],
raw: Literal[False] = False,
) -> Optional[Appearance]: ...
@overload
def get(self, type_id: int, raw: Literal[True]) -> Optional[bytes]: ...
@overload
def get(
self, type_id: int, raw: bool = False
) -> Optional[AdvertisingDataObject]: ...
def get(self, type_id: int, raw: bool = False) -> Optional[AdvertisingDataObject]:
'''
Get Advertising Data Structure(s) with a given type
+28 -50
View File
@@ -1569,8 +1569,8 @@ class Connection(CompositeEventEmitter):
gatt_client: gatt_client.Client
pairing_peer_io_capability: Optional[int]
pairing_peer_authentication_requirements: Optional[int]
cs_configs: dict[int, ChannelSoundingConfig] = {} # Config ID to Configuration
cs_procedures: dict[int, ChannelSoundingProcedure] = {} # Config ID to Procedures
cs_configs: dict[int, ChannelSoundingConfig] # Config ID to Configuration
cs_procedures: dict[int, ChannelSoundingProcedure] # Config ID to Procedures
@composite_listener
class Listener:
@@ -1586,7 +1586,7 @@ class Connection(CompositeEventEmitter):
def on_connection_data_length_change(self):
pass
def on_connection_phy_update(self):
def on_connection_phy_update(self, phy):
pass
def on_connection_phy_update_failure(self, error):
@@ -1612,7 +1612,6 @@ class Connection(CompositeEventEmitter):
peer_resolvable_address,
role,
parameters,
phy,
):
super().__init__()
self.device = device
@@ -1629,7 +1628,6 @@ class Connection(CompositeEventEmitter):
self.authenticated = False
self.sc = False
self.link_key_type = None
self.phy = phy
self.att_mtu = ATT_DEFAULT_MTU
self.data_length = DEVICE_DEFAULT_DATA_LENGTH
self.gatt_client = None # Per-connection client
@@ -1639,6 +1637,8 @@ class Connection(CompositeEventEmitter):
self.pairing_peer_io_capability = None
self.pairing_peer_authentication_requirements = None
self.peer_le_features = None
self.cs_configs = {}
self.cs_procedures = {}
# [Classic only]
@classmethod
@@ -1658,7 +1658,6 @@ class Connection(CompositeEventEmitter):
None,
role,
None,
None,
)
# [Classic only]
@@ -1774,12 +1773,12 @@ class Connection(CompositeEventEmitter):
async def set_phy(self, tx_phys=None, rx_phys=None, phy_options=None):
return await self.device.set_connection_phy(self, tx_phys, rx_phys, phy_options)
async def get_phy(self) -> ConnectionPHY:
return await self.device.get_connection_phy(self)
async def get_rssi(self):
return await self.device.get_connection_rssi(self)
async def get_phy(self):
return await self.device.get_connection_phy(self)
async def transfer_periodic_sync(
self, sync_handle: int, service_data: int = 0
) -> None:
@@ -2049,9 +2048,9 @@ class Device(CompositeEventEmitter):
legacy_advertiser: Optional[LegacyAdvertiser]
sco_links: Dict[int, ScoLink]
cis_links: Dict[int, CisLink]
bigs = dict[int, Big]()
bis_links = dict[int, BisLink]()
big_syncs = dict[int, BigSync]()
bigs: dict[int, Big]
bis_links: dict[int, BisLink]
big_syncs: dict[int, BigSync]
_pending_cis: Dict[int, tuple[int, int]]
gatt_service: gatt_service.GenericAttributeProfileService | None = None
@@ -2144,6 +2143,9 @@ class Device(CompositeEventEmitter):
self.sco_links = {} # ScoLinks, by connection handle (BR/EDR only)
self.cis_links = {} # CisLinks, by connection handle (LE only)
self._pending_cis = {} # (CIS_ID, CIG_ID), by CIS_handle
self.bigs = {}
self.bis_links = {}
self.big_syncs = {}
self.classic_enabled = False
self.inquiry_response = None
self.address_resolver = None
@@ -3937,12 +3939,14 @@ class Device(CompositeEventEmitter):
)
return result.return_parameters.rssi
async def get_connection_phy(self, connection):
async def get_connection_phy(self, connection: Connection) -> ConnectionPHY:
result = await self.send_command(
hci.HCI_LE_Read_PHY_Command(connection_handle=connection.handle),
check_result=True,
)
return (result.return_parameters.tx_phy, result.return_parameters.rx_phy)
return ConnectionPHY(
result.return_parameters.tx_phy, result.return_parameters.rx_phy
)
async def set_connection_phy(
self, connection, tx_phys=None, rx_phys=None, phy_options=None
@@ -4006,13 +4010,12 @@ class Device(CompositeEventEmitter):
# Create a future to wait for an address to be found
peer_address = asyncio.get_running_loop().create_future()
def on_peer_found(address, ad_data):
local_name = ad_data.get(AdvertisingData.COMPLETE_LOCAL_NAME, raw=True)
if local_name is None:
local_name = ad_data.get(AdvertisingData.SHORTENED_LOCAL_NAME, raw=True)
if local_name is not None:
if local_name.decode('utf-8') == name:
peer_address.set_result(address)
def on_peer_found(address: hci.Address, ad_data: AdvertisingData) -> None:
local_name = ad_data.get(
AdvertisingData.Type.COMPLETE_LOCAL_NAME
) or ad_data.get(AdvertisingData.Type.SHORTENED_LOCAL_NAME)
if local_name == name:
peer_address.set_result(address)
listener = None
was_scanning = self.scanning
@@ -5169,29 +5172,6 @@ class Device(CompositeEventEmitter):
lambda _: self.abort_on('flush', advertising_set.start()),
)
self._emit_le_connection(connection)
def _emit_le_connection(self, connection: Connection) -> None:
# If supported, read which PHY we're connected with before
# notifying listeners of the new connection.
if self.host.supports_command(hci.HCI_LE_READ_PHY_COMMAND):
async def read_phy():
result = await self.send_command(
hci.HCI_LE_Read_PHY_Command(connection_handle=connection.handle),
check_result=True,
)
connection.phy = ConnectionPHY(
result.return_parameters.tx_phy, result.return_parameters.rx_phy
)
# Emit an event to notify listeners of the new connection
self.emit('connection', connection)
# Do so asynchronously to not block the current event handler
connection.abort_on('disconnection', read_phy())
return
self.emit('connection', connection)
@host_event_handler
@@ -5290,7 +5270,6 @@ class Device(CompositeEventEmitter):
peer_resolvable_address,
role,
connection_parameters,
ConnectionPHY(hci.HCI_LE_1M_PHY, hci.HCI_LE_1M_PHY),
)
self.connections[connection_handle] = connection
@@ -5306,7 +5285,7 @@ class Device(CompositeEventEmitter):
if role == hci.HCI_CENTRAL_ROLE or not self.supports_le_extended_advertising:
# We can emit now, we have all the info we need
self._emit_le_connection(connection)
self.emit('connection', connection)
return
if role == hci.HCI_PERIPHERAL_ROLE and self.supports_le_extended_advertising:
@@ -5860,14 +5839,13 @@ class Device(CompositeEventEmitter):
@host_event_handler
@with_connection_from_handle
def on_connection_phy_update(self, connection, connection_phy):
def on_connection_phy_update(self, connection, phy):
logger.debug(
f'*** Connection PHY Update: [0x{connection.handle:04X}] '
f'{connection.peer_address} as {connection.role_name}, '
f'{connection_phy}'
f'{phy}'
)
connection.phy = connection_phy
connection.emit('connection_phy_update')
connection.emit('connection_phy_update', phy)
@host_event_handler
@with_connection_from_handle
+11 -5
View File
@@ -235,7 +235,7 @@ class Host(AbortableEventEmitter):
cis_links: Dict[int, IsoLink]
bis_links: Dict[int, IsoLink]
sco_links: Dict[int, ScoLink]
bigs: dict[int, set[int]] = {} # BIG Handle to BIS Handles
bigs: dict[int, set[int]]
acl_packet_queue: Optional[DataPacketQueue] = None
le_acl_packet_queue: Optional[DataPacketQueue] = None
iso_packet_queue: Optional[DataPacketQueue] = None
@@ -259,6 +259,7 @@ class Host(AbortableEventEmitter):
self.cis_links = {} # CIS links, by connection handle
self.bis_links = {} # BIS links, by connection handle
self.sco_links = {} # SCO links, by connection handle
self.bigs = {} # BIG Handle to BIS Handles
self.pending_command = None
self.pending_response: Optional[asyncio.Future[Any]] = None
self.number_of_supported_advertising_sets = 0
@@ -1061,8 +1062,10 @@ class Host(AbortableEventEmitter):
)
# Flush the data queues
self.acl_packet_queue.flush(handle)
self.le_acl_packet_queue.flush(handle)
if self.acl_packet_queue:
self.acl_packet_queue.flush(handle)
if self.le_acl_packet_queue:
self.le_acl_packet_queue.flush(handle)
if self.iso_packet_queue:
self.iso_packet_queue.flush(handle)
else:
@@ -1098,8 +1101,11 @@ class Host(AbortableEventEmitter):
# Notify the client
if event.status == hci.HCI_SUCCESS:
connection_phy = ConnectionPHY(event.tx_phy, event.rx_phy)
self.emit('connection_phy_update', connection.handle, connection_phy)
self.emit(
'connection_phy_update',
connection.handle,
ConnectionPHY(event.tx_phy, event.rx_phy),
)
else:
self.emit('connection_phy_update_failure', connection.handle, event.status)
+5 -5
View File
@@ -1,4 +1,4 @@
# Copyright 2021-2023 Google LLC
# Copyright 2021-2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -76,18 +76,18 @@ class OobData:
return instance
def to_ad(self) -> AdvertisingData:
ad_structures = []
ad_structures: list[tuple[int, bytes]] = []
if self.address is not None:
ad_structures.append(
(AdvertisingData.LE_BLUETOOTH_DEVICE_ADDRESS, bytes(self.address))
(AdvertisingData.Type.LE_BLUETOOTH_DEVICE_ADDRESS, bytes(self.address))
)
if self.role is not None:
ad_structures.append((AdvertisingData.LE_ROLE, bytes([self.role])))
ad_structures.append((AdvertisingData.Type.LE_ROLE, bytes([self.role])))
if self.shared_data is not None:
ad_structures.extend(self.shared_data.to_ad().ad_structures)
if self.legacy_context is not None:
ad_structures.append(
(AdvertisingData.SECURITY_MANAGER_TK_VALUE, self.legacy_context.tk)
(AdvertisingData.Type.SECURITY_MANAGER_TK_VALUE, self.legacy_context.tk)
)
return AdvertisingData(ad_structures)
+7 -12
View File
@@ -371,9 +371,7 @@ class HostService(HostServicer):
scan_response_data=scan_response_data,
)
pending_connection: asyncio.Future[bumble.device.Connection] = (
asyncio.get_running_loop().create_future()
)
connections: asyncio.Queue[bumble.device.Connection] = asyncio.Queue()
if request.connectable:
@@ -382,7 +380,7 @@ class HostService(HostServicer):
connection.transport == BT_LE_TRANSPORT
and connection.role == BT_PERIPHERAL_ROLE
):
pending_connection.set_result(connection)
connections.put_nowait(connection)
self.device.on('connection', on_connection)
@@ -397,8 +395,7 @@ class HostService(HostServicer):
await asyncio.sleep(1)
continue
connection = await pending_connection
pending_connection = asyncio.get_running_loop().create_future()
connection = await connections.get()
cookie = any_pb2.Any(value=connection.handle.to_bytes(4, 'big'))
yield AdvertiseResponse(connection=Connection(cookie=cookie))
@@ -492,6 +489,8 @@ class HostService(HostServicer):
target = Address(target_bytes, Address.RANDOM_DEVICE_ADDRESS)
advertising_type = AdvertisingType.DIRECTED_CONNECTABLE_LOW_DUTY
connections: asyncio.Queue[bumble.device.Connection] = asyncio.Queue()
if request.connectable:
def on_connection(connection: bumble.device.Connection) -> None:
@@ -499,7 +498,7 @@ class HostService(HostServicer):
connection.transport == BT_LE_TRANSPORT
and connection.role == BT_PERIPHERAL_ROLE
):
pending_connection.set_result(connection)
connections.put_nowait(connection)
self.device.on('connection', on_connection)
@@ -517,12 +516,8 @@ class HostService(HostServicer):
await asyncio.sleep(1)
continue
pending_connection: asyncio.Future[bumble.device.Connection] = (
asyncio.get_running_loop().create_future()
)
self.log.debug('Wait for LE connection...')
connection = await pending_connection
connection = await connections.get()
self.log.debug(
f"Advertise: Connected to {connection.peer_address} (handle={connection.handle})"
+2 -1
View File
@@ -301,7 +301,7 @@ class AseStateMachine(gatt.Characteristic):
presentation_delay = 0
# Additional parameters in ENABLING, STREAMING, DISABLING State
metadata = le_audio.Metadata()
metadata: le_audio.Metadata
def __init__(
self,
@@ -313,6 +313,7 @@ class AseStateMachine(gatt.Characteristic):
self.ase_id = ase_id
self._state = AseStateMachine.State.IDLE
self.role = role
self.metadata = le_audio.Metadata()
uuid = (
gatt.GATT_SINK_ASE_CHARACTERISTIC
+202
View File
@@ -0,0 +1,202 @@
AURACAST TOOL
=============
The "auracast" tool implements commands that implement broadcasting, receiving
and controlling LE Audio broadcasts.
=== "Running as an installed package"
```
$ bumble-auracast
```
=== "Running from source"
```
$ python3 apps/auracast.py <args>
```
# Python Dependencies
Try installing the optional `[auracast]` dependencies:
=== "From source"
```bash
$ python3 -m pip install ".[auracast]"
```
=== "From PyPI"
```bash
$ python3 -m pip install "bumble[auracast]"
```
## LC3
The `auracast` app depends on the `lc3` python module, which is available
either as PyPI module (currently only available for Linux x86_64).
When installing Bumble with the optional `auracast` dependency, the `lc3`
module will be installed from the `lc3py` PyPI package if available.
If not, you will need to install it separately. This can be done with:
```bash
$ python3 -m pip install "git+https://github.com/google/liblc3.git"
```
## SoundDevice
The `sounddevice` module is required for audio output to the host's sound
output device(s) and/or input from the host's input device(s).
If not installed, the `auracast` app is still functional, but will be limited
to non-device inputs and output (files, external processes, ...)
On macOS and Windows, the `sounddevice` module gets installed with the
native PortAudio libraries included.
For Linux, however, PortAudio must be installed separately.
This is typically done with a command like:
```bash
$ sudo apt install libportaudio2
```
Visit the [sounddevice documentation](https://python-sounddevice.readthedocs.io/)
for details.
# General Usage
```
Usage: bumble-auracast [OPTIONS] COMMAND [ARGS]...
Options:
--help Show this message and exit.
Commands:
assist Scan for broadcasts on behalf of an audio server
pair Pair with an audio server
receive Receive a broadcast source
scan Scan for public broadcasts
transmit Transmit a broadcast source
```
Use `bumble-auracast <command> --help` to get more detailed usage information
for a specific `<command>`.
## `assist`
Act as a broadcast assistant.
Use `bumble-auracast assist --help` for details on the commands and options.
The assistant commands are:
### `monitor-state`
Subscribe to the state characteristic and monitor changes.
### `add-source`
Add a broadcast source. This will instruct the device to start
receiving a broadcast.
### `modify-source`
Modify a broadcast source.
### `remove-source`
Remote a broadcast source.
## `pair`
Pair with a device.
## `receive`
Receive a broadcast source.
The `--output` option specifies where to send the decoded audio samples.
The following outputs are supported:
### Sound Device
The `--output` argument is either `device`, to send the audio to the hosts's default sound device, or `device:<DEVICE_ID>` where `<DEVICE_ID>`
is the integer ID of one of the available sound devices.
When invoked with `--output "device:?"`, a list of available devices and
their IDs is printed out.
### Standard Output
With `--output stdout`, the decoded audio samples are written to the
standard output (currently always as float32 PCM samples)
### FFPlay
With `--output ffplay`, the decoded audio samples are piped to `ffplay`
in a child process. This option is only available if `ffplay` is a command that is available on the host.
### File
With `--output <filename>` or `--output file:<filename>`, the decoded audio
samples are written to a file (currently always as float32 PCM)
## `transmit`
Broadcast an audio source as a transmitter.
The `--input` and `--input-format` options specify what audio input
source to transmit.
The following inputs are supported:
### Sound Device
The `--input` argument is either `device`, to use the host's default sound
device (typically a builtin microphone), or `device:<DEVICE_ID>` where
`<DEVICE_ID>` is the integer ID of one of the available sound devices.
When invoked with `--input "device:?"`, a list of available devices and their
IDs is printed out.
### Standard Input
With `--input stdout`, the audio samples are read from the standard input.
(currently always as int16 PCM).
### File
With `--input <filename>` or `--input file:<filename>`, the audio samples
are read from a .wav or raw PCM file.
Use the `--input-format <FORMAT>` option to specify the format of the audio
samples in raw PCM files. `<FORMAT>` is expressed as:
`<sample-type>,<sample-rate>,<channels>`
(the only supported <sample-type> currently is 'int16le' for 16 bit signed integers with little-endian byte order)
## `scan`
Scan for public broadcasts.
A live display of the available broadcasts is displayed continuously.
# Compatibility With Some Products
The `auracast` app has been tested for compatibility with a few products.
The list is still very limited. Please let us know if there are products
that are not working well, or if there are specific instructions that should
be shared to allow better compatibiity with certain products.
## Transmitters
The `receive` command has been tested to successfully receive broadcasts from
the following transmitters:
* JBL GO 4
* Flairmesh FlooGoo FMA120
* Eppfun AK3040Pro Max
* HIGHGAZE BA-25T
* Nexum Audio VOCE and USB dongle
## Receivers
### Pixel Buds Pro 2
The Pixel Buds Pro 2 can be used as a broadcast receiver, controlled by the
`auracast assist` command, instructing the buds to receive a broadcast.
Use the `assist --command add-source` command to tell the buds to receive a
broadcast.
Use the `assist --command monitor-state` command to monitor the current sync/receive
state of the buds.
### JBL
The JBL GO 4 and other JBL products that support the Auracast feature can be used
as transmitters or receivers.
When running in receiver mode (pressing the Auracast button while not already playing),
the JBL speaker will scan for broadcast advertisements with a specific manufacturer data.
Use the `--manufacturer-data` option of the `transmit` command in order to include data
that will let the speaker recognize the broadcast as a compatible source.
The manufacturer ID for JBL is 87.
Using an option like `--manufacturer-data 87:00000000000000000000000000000000dffd` should work (tested on the
JBL GO 4. The `dffd` value at the end of the payload may be different on other models?).
### Others
* Nexum Audio VOCE and USB dongle
+2 -2
View File
@@ -57,7 +57,7 @@ development = [
]
avatar = [
"pandora-avatar == 0.0.10",
"rootcanal == 1.10.0 ; python_version>='3.10'",
"rootcanal == 1.11.1 ; python_version>='3.10'",
]
pandora = ["bt-test-interfaces >= 0.0.6"]
documentation = [
@@ -66,7 +66,7 @@ documentation = [
"mkdocstrings[python] >= 0.27.0",
]
auracast = [
"lc3py ; python_version>='3.10' and platform_system=='Linux' and platform_machine=='x86_64'",
"lc3py >= 1.1.3; python_version>='3.10' and ((platform_system=='Linux' and platform_machine=='x86_64') or (platform_system=='Darwin' and platform_machine=='arm64'))",
"sounddevice >= 0.5.1",
]