Compare commits

...

29 Commits

Author SHA1 Message Date
Gilles Boccon-Gibod 3495eb52ba reset parser before raising exception 2025-03-19 11:32:51 -04:00
Gilles Boccon-Gibod 5e55c0e358 add broadcast code encoding 2025-03-17 19:56:02 -04:00
zxzxwu b2d9541f8f Merge pull request #332 from zxzxwu/role
Enumify: PhysicalTransport, Role, AddressType
2025-03-10 00:04:18 +08:00
Josh Wu 637224d5bc Enum: PhysicalTransport, Role, AddressType 2025-03-09 23:34:01 +08:00
Gilles Boccon-Gibod 92ab171013 Merge pull request #659 from pcondoleon/le_scan_interval_fix
Fixed le_scan_interval incorrectly being set with scan_window
2025-02-28 15:04:03 -05:00
Peter Condoleon 592475e2ed Fixed le_scan_interval incorrectly being set with scan_window 2025-02-27 13:54:20 +10:00
Gilles Boccon-Gibod 12bcdb7770 Merge pull request #658 from google/gbg/auracast-doc
add auracast doc
2025-02-25 07:18:38 -08:00
Gilles Boccon-Gibod 7a58f36020 add auracast doc 2025-02-24 09:10:03 -08:00
Gilles Boccon-Gibod ed0eb912c5 Merge pull request #650 from google/gbg/gatt-adapter-typing
new GATT adapter classes with proper typing support
2025-02-23 18:06:16 -08:00
Gilles Boccon-Gibod 752ce6c830 Merge pull request #657 from google/gbg/auracast-iso-data-path-refactor
use bis link API
2025-02-23 07:42:13 -08:00
Gilles Boccon-Gibod 8e509c18c9 remove unused import 2025-02-22 13:34:57 -08:00
Gilles Boccon-Gibod cc21ed27c7 use bis link API 2025-02-22 13:32:58 -08:00
Gilles Boccon-Gibod b932bafe6d Merge pull request #655 from markusjellitsch/fix/acl_packet_queue
FIX - acl_packet_queue.flush
2025-02-22 11:33:37 -08:00
markus 4e35aba033 fix acl_packet_queue flush when controller does not support HCI_READ_BUFFER_SIZE_COMMAND 2025-02-22 08:37:12 +01:00
zxzxwu 0060ee8ee2 Merge pull request #646 from zxzxwu/ad
Improve AdvertisingData type annotations
2025-02-22 02:52:09 +08:00
zxzxwu 3263d71f54 Merge pull request #654 from zxzxwu/init
Fix mutable default values
2025-02-22 02:51:11 +08:00
Josh Wu f321143837 Improve AdvertisingData type annotations
* Add overloads to provide better return type hints
* Make advertising data type enum so they can be considered constants
2025-02-21 17:12:14 +08:00
Josh Wu bac6f5baaf Fix mutable default values 2025-02-21 16:00:18 +08:00
Gilles Boccon-Gibod e027bcb57a Merge pull request #651 from google/gbg/update-lc3-and-rootcanal
update rootcanal and lc3 dependencies
2025-02-18 14:08:42 -08:00
Gilles Boccon-Gibod eeb9de31ed update dependencies 2025-02-18 12:52:57 -08:00
Gilles Boccon-Gibod 2c3af5b2bb Merge pull request #649 from google/gbg/async-read-phy
make connection phy async
2025-02-18 07:25:06 -08:00
Gilles Boccon-Gibod dfb92e8ed1 fix pandora connection waiting 2025-02-17 19:50:57 -08:00
Gilles Boccon-Gibod 73d2b54e30 make connection phy async 2025-02-17 19:24:18 -08:00
Gilles Boccon-Gibod 8315a60f24 Merge pull request #648 from pstrueb/fix/auracast_transmit
Add setup_data_path for iso queues
2025-02-17 17:22:47 -08:00
pstruebi 185d5fd577 reformatting 2025-02-17 20:11:17 +01:00
pstrueb ae5f9cf690 Remove little accident 2025-02-17 18:21:16 +01:00
pstrueb 4b66a38fe6 Refractoring again 2025-02-17 09:49:09 +01:00
pstrueb f526f549ee refractoring 2025-02-17 08:51:28 +01:00
pstrueb 8761129677 Add setup_data_path for iso queues 2025-02-16 14:38:05 +01:00
23 changed files with 802 additions and 453 deletions
+55 -56
View File
@@ -27,7 +27,6 @@ import logging
import os
import struct
from typing import (
cast,
Any,
AsyncGenerator,
Coroutine,
@@ -100,6 +99,25 @@ def codec_config_string(
return '\n'.join(indent + line for line in lines)
def broadcast_code_bytes(broadcast_code: str) -> bytes:
"""
Convert a broadcast code string to a 16-byte value.
If `broadcast_code` is `0x` followed by 32 hex characters, it is interpreted as a
raw 16-byte raw broadcast code in big-endian byte order.
Otherwise, `broadcast_code` is converted to a 16-byte value as specified in
BLUETOOTH CORE SPECIFICATION Version 6.0 | Vol 3, Part C , section 3.2.6.3
"""
if broadcast_code.startswith("0x") and len(broadcast_code) == 34:
return bytes.fromhex(broadcast_code[2:])[::-1]
broadcast_code_utf8 = broadcast_code.encode("utf-8")
if len(broadcast_code_utf8) > 16:
raise ValueError("broadcast code must be <= 16 bytes in utf-8 encoding")
padding = bytes(16 - len(broadcast_code_utf8))
return broadcast_code_utf8 + padding
# -----------------------------------------------------------------------------
# Scan For Broadcasts
# -----------------------------------------------------------------------------
@@ -127,11 +145,9 @@ class BroadcastScanner(pyee.EventEmitter):
def update(self, advertisement: bumble.device.Advertisement) -> None:
self.rssi = advertisement.rssi
for service_data in advertisement.data.get_all(
core.AdvertisingData.SERVICE_DATA
core.AdvertisingData.Type.SERVICE_DATA_16_BIT_UUID
):
assert isinstance(service_data, tuple)
service_uuid, data = service_data
assert isinstance(data, bytes)
if service_uuid == gatt.GATT_PUBLIC_BROADCAST_ANNOUNCEMENT_SERVICE:
self.public_broadcast_announcement = (
@@ -145,16 +161,14 @@ class BroadcastScanner(pyee.EventEmitter):
)
continue
self.appearance = advertisement.data.get( # type: ignore[assignment]
core.AdvertisingData.APPEARANCE
self.appearance = advertisement.data.get(
core.AdvertisingData.Type.APPEARANCE
)
if manufacturer_data := advertisement.data.get(
core.AdvertisingData.MANUFACTURER_SPECIFIC_DATA
core.AdvertisingData.Type.MANUFACTURER_SPECIFIC_DATA
):
assert isinstance(manufacturer_data, tuple)
company_id = cast(int, manufacturer_data[0])
data = cast(bytes, manufacturer_data[1])
company_id, data = manufacturer_data
self.manufacturer_data = (
company_ids.COMPANY_IDENTIFIERS.get(
company_id, f'0x{company_id:04X}'
@@ -239,22 +253,14 @@ class BroadcastScanner(pyee.EventEmitter):
if self.biginfo:
print(color(' BIG:', 'cyan'))
print(
color(' Number of BIS:', 'magenta'),
self.biginfo.num_bis,
)
print(
color(' PHY: ', 'magenta'),
self.biginfo.phy.name,
)
print(
color(' Framed: ', 'magenta'),
self.biginfo.framed,
)
print(
color(' Encrypted: ', 'magenta'),
self.biginfo.encrypted,
)
print(color(' Number of BIS:', 'magenta'), self.biginfo.num_bis)
print(color(' ISO Interval: ', 'magenta'), self.biginfo.iso_interval)
print(color(' Max PDU: ', 'magenta'), self.biginfo.max_pdu)
print(color(' SDU Interval: ', 'magenta'), self.biginfo.sdu_interval)
print(color(' Max SDU: ', 'magenta'), self.biginfo.max_sdu)
print(color(' PHY: ', 'magenta'), self.biginfo.phy.name)
print(color(' Framed: ', 'magenta'), self.biginfo.framed)
print(color(' Encrypted: ', 'magenta'), self.biginfo.encrypted)
def on_sync_establishment(self) -> None:
self.emit('sync_establishment')
@@ -271,11 +277,9 @@ class BroadcastScanner(pyee.EventEmitter):
return
for service_data in advertisement.data.get_all(
core.AdvertisingData.SERVICE_DATA
core.AdvertisingData.Type.SERVICE_DATA_16_BIT_UUID
):
assert isinstance(service_data, tuple)
service_uuid, data = service_data
assert isinstance(data, bytes)
if service_uuid == gatt.GATT_BASIC_AUDIO_ANNOUNCEMENT_SERVICE:
self.basic_audio_announcement = (
@@ -316,24 +320,23 @@ class BroadcastScanner(pyee.EventEmitter):
def on_advertisement(self, advertisement: bumble.device.Advertisement) -> None:
if not (
ads := advertisement.data.get_all(
core.AdvertisingData.SERVICE_DATA_16_BIT_UUID
core.AdvertisingData.Type.SERVICE_DATA_16_BIT_UUID
)
) or not (
broadcast_audio_announcement := next(
(
ad
for ad in ads
if isinstance(ad, tuple)
and ad[0] == gatt.GATT_BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE
if ad[0] == gatt.GATT_BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE
),
None,
)
):
return
broadcast_name = advertisement.data.get(core.AdvertisingData.BROADCAST_NAME)
assert isinstance(broadcast_name, str) or broadcast_name is None
assert isinstance(broadcast_audio_announcement[1], bytes)
broadcast_name = advertisement.data.get_all(
core.AdvertisingData.Type.BROADCAST_NAME
)
if broadcast := self.broadcasts.get(advertisement.address):
broadcast.update(advertisement)
@@ -341,7 +344,7 @@ class BroadcastScanner(pyee.EventEmitter):
bumble.utils.AsyncRunner.spawn(
self.on_new_broadcast(
broadcast_name,
broadcast_name[0] if broadcast_name else None,
advertisement,
bap.BroadcastAudioAnnouncement.from_bytes(
broadcast_audio_announcement[1]
@@ -710,14 +713,13 @@ async def run_receive(
def on_change() -> None:
if (
broadcast.basic_audio_announcement
and not basic_audio_announcement_scanned.is_set()
):
broadcast.basic_audio_announcement and broadcast.biginfo
) and not basic_audio_announcement_scanned.is_set():
basic_audio_announcement_scanned.set()
broadcast.on('change', on_change)
if not broadcast.basic_audio_announcement:
print('Wait for Basic Audio Announcement...')
if not broadcast.basic_audio_announcement or not broadcast.biginfo:
print('Wait for Basic Audio Announcement and BIG Info...')
await basic_audio_announcement_scanned.wait()
print('Basic Audio Announcement found')
broadcast.print()
@@ -738,7 +740,7 @@ async def run_receive(
big_sync_timeout=0x4000,
bis=[bis.index for bis in subgroup.bis],
broadcast_code=(
bytes.fromhex(broadcast_code) if broadcast_code else None
broadcast_code_bytes(broadcast_code) if broadcast_code else None
),
),
)
@@ -795,16 +797,8 @@ async def run_receive(
for i, bis_link in enumerate(big_sync.bis_links):
print(f'Setup ISO for BIS {bis_link.handle}')
bis_link.sink = functools.partial(sink, lc3_queues[i])
await device.send_command(
hci.HCI_LE_Setup_ISO_Data_Path_Command(
connection_handle=bis_link.handle,
data_path_direction=hci.HCI_LE_Setup_ISO_Data_Path_Command.Direction.CONTROLLER_TO_HOST,
data_path_id=0,
codec_id=hci.CodingFormat(codec_id=hci.CodecID.TRANSPARENT),
controller_delay=0,
codec_configuration=b'',
),
check_result=True,
await bis_link.setup_data_path(
direction=bis_link.Direction.CONTROLLER_TO_HOST
)
terminated = asyncio.Event()
@@ -960,14 +954,19 @@ async def run_transmit(
max_transport_latency=65,
rtn=4,
broadcast_code=(
bytes.fromhex(broadcast_code) if broadcast_code else None
broadcast_code_bytes(broadcast_code) if broadcast_code else None
),
),
)
for bis_link in big.bis_links:
print(f'Setup ISO for BIS {bis_link.handle}')
await bis_link.setup_data_path(
direction=bis_link.Direction.HOST_TO_CONTROLLER
)
iso_queues = [
bumble.device.IsoPacketStream(big.bis_links[0], 64),
bumble.device.IsoPacketStream(big.bis_links[1], 64),
bumble.device.IsoPacketStream(bis_link, 64)
for bis_link in big.bis_links
]
def on_flow():
@@ -1099,7 +1098,7 @@ def pair(ctx, transport, address):
'--broadcast-code',
metavar='BROADCAST_CODE',
type=str,
help='Broadcast encryption code in hex format',
help='Broadcast encryption code (string or raw hex format prefixed with 0x)',
)
@click.option(
'--sync-timeout',
+13 -10
View File
@@ -104,15 +104,16 @@ def le_phy_name(phy_id):
)
def print_connection_phy(phy):
logging.info(
color('@@@ PHY: ', 'yellow') + f'TX:{le_phy_name(phy.tx_phy)}/'
f'RX:{le_phy_name(phy.rx_phy)}'
)
def print_connection(connection):
params = []
if connection.transport == BT_LE_TRANSPORT:
params.append(
'PHY='
f'TX:{le_phy_name(connection.phy.tx_phy)}/'
f'RX:{le_phy_name(connection.phy.rx_phy)}'
)
params.append(
'DL=('
f'TX:{connection.data_length[0]}/{connection.data_length[1]},'
@@ -1288,6 +1289,8 @@ class Central(Connection.Listener):
logging.info(color('### Connected', 'cyan'))
self.connection.listener = self
print_connection(self.connection)
phy = await self.connection.get_phy()
print_connection_phy(phy)
# Switch roles if needed.
if self.role_switch:
@@ -1345,8 +1348,8 @@ class Central(Connection.Listener):
def on_connection_parameters_update(self):
print_connection(self.connection)
def on_connection_phy_update(self):
print_connection(self.connection)
def on_connection_phy_update(self, phy):
print_connection_phy(phy)
def on_connection_att_mtu_update(self):
print_connection(self.connection)
@@ -1472,8 +1475,8 @@ class Peripheral(Device.Listener, Connection.Listener):
def on_connection_parameters_update(self):
print_connection(self.connection)
def on_connection_phy_update(self):
print_connection(self.connection)
def on_connection_phy_update(self, phy):
print_connection_phy(phy)
def on_connection_att_mtu_update(self):
print_connection(self.connection)
+19 -12
View File
@@ -22,7 +22,6 @@
import asyncio
import logging
import os
import random
import re
import humanize
from typing import Optional, Union
@@ -57,7 +56,13 @@ from bumble import __version__
import bumble.core
from bumble import colors
from bumble.core import UUID, AdvertisingData, BT_LE_TRANSPORT
from bumble.device import ConnectionParametersPreferences, Device, Connection, Peer
from bumble.device import (
ConnectionParametersPreferences,
ConnectionPHY,
Device,
Connection,
Peer,
)
from bumble.utils import AsyncRunner
from bumble.transport import open_transport_or_link
from bumble.gatt import Characteristic, Service, CharacteristicDeclaration, Descriptor
@@ -125,6 +130,7 @@ def parse_phys(phys):
# -----------------------------------------------------------------------------
class ConsoleApp:
connected_peer: Optional[Peer]
connection_phy: Optional[ConnectionPHY]
def __init__(self):
self.known_addresses = set()
@@ -132,6 +138,7 @@ class ConsoleApp:
self.known_local_attributes = []
self.device = None
self.connected_peer = None
self.connection_phy = None
self.top_tab = 'device'
self.monitor_rssi = False
self.connection_rssi = None
@@ -332,10 +339,10 @@ class ConsoleApp:
f'{connection.parameters.peripheral_latency}/'
f'{connection.parameters.supervision_timeout}'
)
if connection.transport == BT_LE_TRANSPORT:
if self.connection_phy is not None:
phy_state = (
f' RX={le_phy_name(connection.phy.rx_phy)}/'
f'TX={le_phy_name(connection.phy.tx_phy)}'
f' RX={le_phy_name(self.connection_phy.rx_phy)}/'
f'TX={le_phy_name(self.connection_phy.tx_phy)}'
)
else:
phy_state = ''
@@ -654,11 +661,12 @@ class ConsoleApp:
self.append_to_output('connecting...')
try:
await self.device.connect(
connection = await self.device.connect(
params[0],
connection_parameters_preferences=connection_parameters_preferences,
timeout=DEFAULT_CONNECTION_TIMEOUT,
)
self.connection_phy = await connection.get_phy()
self.top_tab = 'services'
except bumble.core.TimeoutError:
self.show_error('connection timed out')
@@ -838,8 +846,8 @@ class ConsoleApp:
phy = await self.connected_peer.connection.get_phy()
self.append_to_output(
f'PHY: RX={HCI_Constant.le_phy_name(phy[0])}, '
f'TX={HCI_Constant.le_phy_name(phy[1])}'
f'PHY: RX={HCI_Constant.le_phy_name(phy.rx_phy)}, '
f'TX={HCI_Constant.le_phy_name(phy.tx_phy)}'
)
async def do_request_mtu(self, params):
@@ -1076,10 +1084,9 @@ class DeviceListener(Device.Listener, Connection.Listener):
f'{self.app.connected_peer.connection.parameters}'
)
def on_connection_phy_update(self):
self.app.append_to_output(
f'connection phy update: {self.app.connected_peer.connection.phy}'
)
def on_connection_phy_update(self, phy):
self.app.connection_phy = phy
self.app.append_to_output(f'connection phy update: {phy}')
def on_connection_att_mtu_update(self):
self.app.append_to_output(
+7 -8
View File
@@ -25,8 +25,6 @@ import random
import struct
from bumble.colors import color
from bumble.core import (
BT_CENTRAL_ROLE,
BT_PERIPHERAL_ROLE,
BT_LE_TRANSPORT,
BT_BR_EDR_TRANSPORT,
)
@@ -47,6 +45,7 @@ from bumble.hci import (
HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR,
HCI_VERSION_BLUETOOTH_CORE_5_0,
Address,
Role,
HCI_AclDataPacket,
HCI_AclDataPacketAssembler,
HCI_Command_Complete_Event,
@@ -98,7 +97,7 @@ class CisLink:
class Connection:
controller: Controller
handle: int
role: int
role: Role
peer_address: Address
link: Any
transport: int
@@ -390,7 +389,7 @@ class Controller:
connection = Connection(
controller=self,
handle=connection_handle,
role=BT_PERIPHERAL_ROLE,
role=Role.PERIPHERAL,
peer_address=peer_address,
link=self.link,
transport=BT_LE_TRANSPORT,
@@ -450,7 +449,7 @@ class Controller:
connection = Connection(
controller=self,
handle=connection_handle,
role=BT_CENTRAL_ROLE,
role=Role.CENTRAL,
peer_address=peer_address,
link=self.link,
transport=BT_LE_TRANSPORT,
@@ -469,7 +468,7 @@ class Controller:
HCI_LE_Connection_Complete_Event(
status=status,
connection_handle=connection.handle if connection else 0,
role=BT_CENTRAL_ROLE,
role=Role.CENTRAL,
peer_address_type=le_create_connection_command.peer_address_type,
peer_address=le_create_connection_command.peer_address,
connection_interval=le_create_connection_command.connection_interval_min,
@@ -693,7 +692,7 @@ class Controller:
controller=self,
handle=connection_handle,
# Role doesn't matter in Classic because they are managed by HCI_Role_Change and HCI_Role_Discovery
role=BT_CENTRAL_ROLE,
role=Role.CENTRAL,
peer_address=peer_address,
link=self.link,
transport=BT_BR_EDR_TRANSPORT,
@@ -761,7 +760,7 @@ class Controller:
controller=self,
handle=connection_handle,
# Role doesn't matter in SCO.
role=BT_CENTRAL_ROLE,
role=Role.CENTRAL,
peer_address=peer_address,
link=self.link,
transport=BT_BR_EDR_TRANSPORT,
+305 -158
View File
@@ -1,4 +1,4 @@
# Copyright 2021-2022 Google LLC
# Copyright 2021-2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -16,10 +16,10 @@
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import dataclasses
import enum
import struct
from typing import List, Optional, Tuple, Union, cast, Dict
from typing import cast, overload, Literal, Union, Optional
from typing_extensions import Self
from bumble.company_ids import COMPANY_IDENTIFIERS
@@ -31,11 +31,12 @@ from bumble.utils import OpenIntEnum
# -----------------------------------------------------------------------------
# fmt: off
BT_CENTRAL_ROLE = 0
BT_PERIPHERAL_ROLE = 1
class PhysicalTransport(enum.IntEnum):
BR_EDR = 0
LE = 1
BT_BR_EDR_TRANSPORT = 0
BT_LE_TRANSPORT = 1
BT_BR_EDR_TRANSPORT = PhysicalTransport.BR_EDR
BT_LE_TRANSPORT = PhysicalTransport.LE
# fmt: on
@@ -57,7 +58,7 @@ def bit_flags_to_strings(bits, bit_flag_names):
return names
def name_or_number(dictionary: Dict[int, str], number: int, width: int = 2) -> str:
def name_or_number(dictionary: dict[int, str], number: int, width: int = 2) -> str:
name = dictionary.get(number)
if name is not None:
return name
@@ -200,7 +201,7 @@ class UUID:
'''
BASE_UUID = bytes.fromhex('00001000800000805F9B34FB')[::-1] # little-endian
UUIDS: List[UUID] = [] # Registry of all instances created
UUIDS: list[UUID] = [] # Registry of all instances created
uuid_bytes: bytes
name: Optional[str]
@@ -259,11 +260,11 @@ class UUID:
return cls.from_bytes(struct.pack('<I', uuid_32), name)
@classmethod
def parse_uuid(cls, uuid_as_bytes: bytes, offset: int) -> Tuple[int, UUID]:
def parse_uuid(cls, uuid_as_bytes: bytes, offset: int) -> tuple[int, UUID]:
return len(uuid_as_bytes), cls.from_bytes(uuid_as_bytes[offset:])
@classmethod
def parse_uuid_2(cls, uuid_as_bytes: bytes, offset: int) -> Tuple[int, UUID]:
def parse_uuid_2(cls, uuid_as_bytes: bytes, offset: int) -> tuple[int, UUID]:
return offset + 2, cls.from_bytes(uuid_as_bytes[offset : offset + 2])
def to_bytes(self, force_128: bool = False) -> bytes:
@@ -1280,13 +1281,13 @@ class Appearance:
# Advertising Data
# -----------------------------------------------------------------------------
AdvertisingDataObject = Union[
List[UUID],
Tuple[UUID, bytes],
list[UUID],
tuple[UUID, bytes],
bytes,
str,
int,
Tuple[int, int],
Tuple[int, bytes],
tuple[int, int],
tuple[int, bytes],
Appearance,
]
@@ -1295,116 +1296,116 @@ class AdvertisingData:
# fmt: off
# pylint: disable=line-too-long
FLAGS = 0x01
INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS = 0x02
COMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS = 0x03
INCOMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS = 0x04
COMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS = 0x05
INCOMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS = 0x06
COMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS = 0x07
SHORTENED_LOCAL_NAME = 0x08
COMPLETE_LOCAL_NAME = 0x09
TX_POWER_LEVEL = 0x0A
CLASS_OF_DEVICE = 0x0D
SIMPLE_PAIRING_HASH_C = 0x0E
SIMPLE_PAIRING_HASH_C_192 = 0x0E
SIMPLE_PAIRING_RANDOMIZER_R = 0x0F
SIMPLE_PAIRING_RANDOMIZER_R_192 = 0x0F
DEVICE_ID = 0x10
SECURITY_MANAGER_TK_VALUE = 0x10
SECURITY_MANAGER_OUT_OF_BAND_FLAGS = 0x11
PERIPHERAL_CONNECTION_INTERVAL_RANGE = 0x12
LIST_OF_16_BIT_SERVICE_SOLICITATION_UUIDS = 0x14
LIST_OF_128_BIT_SERVICE_SOLICITATION_UUIDS = 0x15
SERVICE_DATA = 0x16
SERVICE_DATA_16_BIT_UUID = 0x16
PUBLIC_TARGET_ADDRESS = 0x17
RANDOM_TARGET_ADDRESS = 0x18
APPEARANCE = 0x19
ADVERTISING_INTERVAL = 0x1A
LE_BLUETOOTH_DEVICE_ADDRESS = 0x1B
LE_ROLE = 0x1C
SIMPLE_PAIRING_HASH_C_256 = 0x1D
SIMPLE_PAIRING_RANDOMIZER_R_256 = 0x1E
LIST_OF_32_BIT_SERVICE_SOLICITATION_UUIDS = 0x1F
SERVICE_DATA_32_BIT_UUID = 0x20
SERVICE_DATA_128_BIT_UUID = 0x21
LE_SECURE_CONNECTIONS_CONFIRMATION_VALUE = 0x22
LE_SECURE_CONNECTIONS_RANDOM_VALUE = 0x23
URI = 0x24
INDOOR_POSITIONING = 0x25
TRANSPORT_DISCOVERY_DATA = 0x26
LE_SUPPORTED_FEATURES = 0x27
CHANNEL_MAP_UPDATE_INDICATION = 0x28
PB_ADV = 0x29
MESH_MESSAGE = 0x2A
MESH_BEACON = 0x2B
BIGINFO = 0x2C
BROADCAST_CODE = 0x2D
RESOLVABLE_SET_IDENTIFIER = 0x2E
ADVERTISING_INTERVAL_LONG = 0x2F
BROADCAST_NAME = 0x30
ENCRYPTED_ADVERTISING_DATA = 0X31
PERIODIC_ADVERTISING_RESPONSE_TIMING_INFORMATION = 0X32
ELECTRONIC_SHELF_LABEL = 0X34
THREE_D_INFORMATION_DATA = 0x3D
MANUFACTURER_SPECIFIC_DATA = 0xFF
class Type(OpenIntEnum):
FLAGS = 0x01
INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS = 0x02
COMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS = 0x03
INCOMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS = 0x04
COMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS = 0x05
INCOMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS = 0x06
COMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS = 0x07
SHORTENED_LOCAL_NAME = 0x08
COMPLETE_LOCAL_NAME = 0x09
TX_POWER_LEVEL = 0x0A
CLASS_OF_DEVICE = 0x0D
SIMPLE_PAIRING_HASH_C = 0x0E
SIMPLE_PAIRING_HASH_C_192 = 0x0E
SIMPLE_PAIRING_RANDOMIZER_R = 0x0F
SIMPLE_PAIRING_RANDOMIZER_R_192 = 0x0F
DEVICE_ID = 0x10
SECURITY_MANAGER_TK_VALUE = 0x10
SECURITY_MANAGER_OUT_OF_BAND_FLAGS = 0x11
PERIPHERAL_CONNECTION_INTERVAL_RANGE = 0x12
LIST_OF_16_BIT_SERVICE_SOLICITATION_UUIDS = 0x14
LIST_OF_128_BIT_SERVICE_SOLICITATION_UUIDS = 0x15
SERVICE_DATA_16_BIT_UUID = 0x16
PUBLIC_TARGET_ADDRESS = 0x17
RANDOM_TARGET_ADDRESS = 0x18
APPEARANCE = 0x19
ADVERTISING_INTERVAL = 0x1A
LE_BLUETOOTH_DEVICE_ADDRESS = 0x1B
LE_ROLE = 0x1C
SIMPLE_PAIRING_HASH_C_256 = 0x1D
SIMPLE_PAIRING_RANDOMIZER_R_256 = 0x1E
LIST_OF_32_BIT_SERVICE_SOLICITATION_UUIDS = 0x1F
SERVICE_DATA_32_BIT_UUID = 0x20
SERVICE_DATA_128_BIT_UUID = 0x21
LE_SECURE_CONNECTIONS_CONFIRMATION_VALUE = 0x22
LE_SECURE_CONNECTIONS_RANDOM_VALUE = 0x23
URI = 0x24
INDOOR_POSITIONING = 0x25
TRANSPORT_DISCOVERY_DATA = 0x26
LE_SUPPORTED_FEATURES = 0x27
CHANNEL_MAP_UPDATE_INDICATION = 0x28
PB_ADV = 0x29
MESH_MESSAGE = 0x2A
MESH_BEACON = 0x2B
BIGINFO = 0x2C
BROADCAST_CODE = 0x2D
RESOLVABLE_SET_IDENTIFIER = 0x2E
ADVERTISING_INTERVAL_LONG = 0x2F
BROADCAST_NAME = 0x30
ENCRYPTED_ADVERTISING_DATA = 0x31
PERIODIC_ADVERTISING_RESPONSE_TIMING_INFORMATION = 0x32
ELECTRONIC_SHELF_LABEL = 0x34
THREE_D_INFORMATION_DATA = 0x3D
MANUFACTURER_SPECIFIC_DATA = 0xFF
AD_TYPE_NAMES = {
FLAGS: 'FLAGS',
INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS: 'INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS',
COMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS: 'COMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS',
INCOMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS: 'INCOMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS',
COMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS: 'COMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS',
INCOMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS: 'INCOMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS',
COMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS: 'COMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS',
SHORTENED_LOCAL_NAME: 'SHORTENED_LOCAL_NAME',
COMPLETE_LOCAL_NAME: 'COMPLETE_LOCAL_NAME',
TX_POWER_LEVEL: 'TX_POWER_LEVEL',
CLASS_OF_DEVICE: 'CLASS_OF_DEVICE',
SIMPLE_PAIRING_HASH_C: 'SIMPLE_PAIRING_HASH_C',
SIMPLE_PAIRING_HASH_C_192: 'SIMPLE_PAIRING_HASH_C_192',
SIMPLE_PAIRING_RANDOMIZER_R: 'SIMPLE_PAIRING_RANDOMIZER_R',
SIMPLE_PAIRING_RANDOMIZER_R_192: 'SIMPLE_PAIRING_RANDOMIZER_R_192',
DEVICE_ID: 'DEVICE_ID',
SECURITY_MANAGER_TK_VALUE: 'SECURITY_MANAGER_TK_VALUE',
SECURITY_MANAGER_OUT_OF_BAND_FLAGS: 'SECURITY_MANAGER_OUT_OF_BAND_FLAGS',
PERIPHERAL_CONNECTION_INTERVAL_RANGE: 'PERIPHERAL_CONNECTION_INTERVAL_RANGE',
LIST_OF_16_BIT_SERVICE_SOLICITATION_UUIDS: 'LIST_OF_16_BIT_SERVICE_SOLICITATION_UUIDS',
LIST_OF_128_BIT_SERVICE_SOLICITATION_UUIDS: 'LIST_OF_128_BIT_SERVICE_SOLICITATION_UUIDS',
SERVICE_DATA_16_BIT_UUID: 'SERVICE_DATA_16_BIT_UUID',
PUBLIC_TARGET_ADDRESS: 'PUBLIC_TARGET_ADDRESS',
RANDOM_TARGET_ADDRESS: 'RANDOM_TARGET_ADDRESS',
APPEARANCE: 'APPEARANCE',
ADVERTISING_INTERVAL: 'ADVERTISING_INTERVAL',
LE_BLUETOOTH_DEVICE_ADDRESS: 'LE_BLUETOOTH_DEVICE_ADDRESS',
LE_ROLE: 'LE_ROLE',
SIMPLE_PAIRING_HASH_C_256: 'SIMPLE_PAIRING_HASH_C_256',
SIMPLE_PAIRING_RANDOMIZER_R_256: 'SIMPLE_PAIRING_RANDOMIZER_R_256',
LIST_OF_32_BIT_SERVICE_SOLICITATION_UUIDS: 'LIST_OF_32_BIT_SERVICE_SOLICITATION_UUIDS',
SERVICE_DATA_32_BIT_UUID: 'SERVICE_DATA_32_BIT_UUID',
SERVICE_DATA_128_BIT_UUID: 'SERVICE_DATA_128_BIT_UUID',
LE_SECURE_CONNECTIONS_CONFIRMATION_VALUE: 'LE_SECURE_CONNECTIONS_CONFIRMATION_VALUE',
LE_SECURE_CONNECTIONS_RANDOM_VALUE: 'LE_SECURE_CONNECTIONS_RANDOM_VALUE',
URI: 'URI',
INDOOR_POSITIONING: 'INDOOR_POSITIONING',
TRANSPORT_DISCOVERY_DATA: 'TRANSPORT_DISCOVERY_DATA',
LE_SUPPORTED_FEATURES: 'LE_SUPPORTED_FEATURES',
CHANNEL_MAP_UPDATE_INDICATION: 'CHANNEL_MAP_UPDATE_INDICATION',
PB_ADV: 'PB_ADV',
MESH_MESSAGE: 'MESH_MESSAGE',
MESH_BEACON: 'MESH_BEACON',
BIGINFO: 'BIGINFO',
BROADCAST_CODE: 'BROADCAST_CODE',
RESOLVABLE_SET_IDENTIFIER: 'RESOLVABLE_SET_IDENTIFIER',
ADVERTISING_INTERVAL_LONG: 'ADVERTISING_INTERVAL_LONG',
BROADCAST_NAME: 'BROADCAST_NAME',
ENCRYPTED_ADVERTISING_DATA: 'ENCRYPTED_ADVERTISING_DATA',
PERIODIC_ADVERTISING_RESPONSE_TIMING_INFORMATION: 'PERIODIC_ADVERTISING_RESPONSE_TIMING_INFORMATION',
ELECTRONIC_SHELF_LABEL: 'ELECTRONIC_SHELF_LABEL',
THREE_D_INFORMATION_DATA: 'THREE_D_INFORMATION_DATA',
MANUFACTURER_SPECIFIC_DATA: 'MANUFACTURER_SPECIFIC_DATA'
}
# For backward-compatibility
FLAGS = Type.FLAGS
INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS = Type.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS
COMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS = Type.COMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS
INCOMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS = Type.INCOMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS
COMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS = Type.COMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS
INCOMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS = Type.INCOMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS
COMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS = Type.COMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS
SHORTENED_LOCAL_NAME = Type.SHORTENED_LOCAL_NAME
COMPLETE_LOCAL_NAME = Type.COMPLETE_LOCAL_NAME
TX_POWER_LEVEL = Type.TX_POWER_LEVEL
CLASS_OF_DEVICE = Type.CLASS_OF_DEVICE
SIMPLE_PAIRING_HASH_C = Type.SIMPLE_PAIRING_HASH_C
SIMPLE_PAIRING_HASH_C_192 = Type.SIMPLE_PAIRING_HASH_C_192
SIMPLE_PAIRING_RANDOMIZER_R = Type.SIMPLE_PAIRING_RANDOMIZER_R
SIMPLE_PAIRING_RANDOMIZER_R_192 = Type.SIMPLE_PAIRING_RANDOMIZER_R_192
DEVICE_ID = Type.DEVICE_ID
SECURITY_MANAGER_TK_VALUE = Type.SECURITY_MANAGER_TK_VALUE
SECURITY_MANAGER_OUT_OF_BAND_FLAGS = Type.SECURITY_MANAGER_OUT_OF_BAND_FLAGS
PERIPHERAL_CONNECTION_INTERVAL_RANGE = Type.PERIPHERAL_CONNECTION_INTERVAL_RANGE
LIST_OF_16_BIT_SERVICE_SOLICITATION_UUIDS = Type.LIST_OF_16_BIT_SERVICE_SOLICITATION_UUIDS
LIST_OF_128_BIT_SERVICE_SOLICITATION_UUIDS = Type.LIST_OF_128_BIT_SERVICE_SOLICITATION_UUIDS
SERVICE_DATA = Type.SERVICE_DATA_16_BIT_UUID
SERVICE_DATA_16_BIT_UUID = Type.SERVICE_DATA_16_BIT_UUID
PUBLIC_TARGET_ADDRESS = Type.PUBLIC_TARGET_ADDRESS
RANDOM_TARGET_ADDRESS = Type.RANDOM_TARGET_ADDRESS
APPEARANCE = Type.APPEARANCE
ADVERTISING_INTERVAL = Type.ADVERTISING_INTERVAL
LE_BLUETOOTH_DEVICE_ADDRESS = Type.LE_BLUETOOTH_DEVICE_ADDRESS
LE_ROLE = Type.LE_ROLE
SIMPLE_PAIRING_HASH_C_256 = Type.SIMPLE_PAIRING_HASH_C_256
SIMPLE_PAIRING_RANDOMIZER_R_256 = Type.SIMPLE_PAIRING_RANDOMIZER_R_256
LIST_OF_32_BIT_SERVICE_SOLICITATION_UUIDS = Type.LIST_OF_32_BIT_SERVICE_SOLICITATION_UUIDS
SERVICE_DATA_32_BIT_UUID = Type.SERVICE_DATA_32_BIT_UUID
SERVICE_DATA_128_BIT_UUID = Type.SERVICE_DATA_128_BIT_UUID
LE_SECURE_CONNECTIONS_CONFIRMATION_VALUE = Type.LE_SECURE_CONNECTIONS_CONFIRMATION_VALUE
LE_SECURE_CONNECTIONS_RANDOM_VALUE = Type.LE_SECURE_CONNECTIONS_RANDOM_VALUE
URI = Type.URI
INDOOR_POSITIONING = Type.INDOOR_POSITIONING
TRANSPORT_DISCOVERY_DATA = Type.TRANSPORT_DISCOVERY_DATA
LE_SUPPORTED_FEATURES = Type.LE_SUPPORTED_FEATURES
CHANNEL_MAP_UPDATE_INDICATION = Type.CHANNEL_MAP_UPDATE_INDICATION
PB_ADV = Type.PB_ADV
MESH_MESSAGE = Type.MESH_MESSAGE
MESH_BEACON = Type.MESH_BEACON
BIGINFO = Type.BIGINFO
BROADCAST_CODE = Type.BROADCAST_CODE
RESOLVABLE_SET_IDENTIFIER = Type.RESOLVABLE_SET_IDENTIFIER
ADVERTISING_INTERVAL_LONG = Type.ADVERTISING_INTERVAL_LONG
BROADCAST_NAME = Type.BROADCAST_NAME
ENCRYPTED_ADVERTISING_DATA = Type.ENCRYPTED_ADVERTISING_DATA
PERIODIC_ADVERTISING_RESPONSE_TIMING_INFORMATION = Type.PERIODIC_ADVERTISING_RESPONSE_TIMING_INFORMATION
ELECTRONIC_SHELF_LABEL = Type.ELECTRONIC_SHELF_LABEL
THREE_D_INFORMATION_DATA = Type.THREE_D_INFORMATION_DATA
MANUFACTURER_SPECIFIC_DATA = Type.MANUFACTURER_SPECIFIC_DATA
LE_LIMITED_DISCOVERABLE_MODE_FLAG = 0x01
LE_GENERAL_DISCOVERABLE_MODE_FLAG = 0x02
@@ -1412,12 +1413,12 @@ class AdvertisingData:
BR_EDR_CONTROLLER_FLAG = 0x08
BR_EDR_HOST_FLAG = 0x10
ad_structures: List[Tuple[int, bytes]]
ad_structures: list[tuple[int, bytes]]
# fmt: on
# pylint: enable=line-too-long
def __init__(self, ad_structures: Optional[List[Tuple[int, bytes]]] = None) -> None:
def __init__(self, ad_structures: Optional[list[tuple[int, bytes]]] = None) -> None:
if ad_structures is None:
ad_structures = []
self.ad_structures = ad_structures[:]
@@ -1444,7 +1445,7 @@ class AdvertisingData:
return ','.join(bit_flags_to_strings(flags, flag_names))
@staticmethod
def uuid_list_to_objects(ad_data: bytes, uuid_size: int) -> List[UUID]:
def uuid_list_to_objects(ad_data: bytes, uuid_size: int) -> list[UUID]:
uuids = []
offset = 0
while (offset + uuid_size) <= len(ad_data):
@@ -1461,8 +1462,8 @@ class AdvertisingData:
]
)
@staticmethod
def ad_data_to_string(ad_type, ad_data):
@classmethod
def ad_data_to_string(cls, ad_type: int, ad_data: bytes) -> str:
if ad_type == AdvertisingData.FLAGS:
ad_type_str = 'Flags'
ad_data_str = AdvertisingData.flags_to_string(ad_data[0], short=True)
@@ -1521,72 +1522,72 @@ class AdvertisingData:
ad_type_str = 'Broadcast Name'
ad_data_str = ad_data.decode('utf-8')
else:
ad_type_str = AdvertisingData.AD_TYPE_NAMES.get(ad_type, f'0x{ad_type:02X}')
ad_type_str = AdvertisingData.Type(ad_type).name
ad_data_str = ad_data.hex()
return f'[{ad_type_str}]: {ad_data_str}'
# pylint: disable=too-many-return-statements
@staticmethod
def ad_data_to_object(ad_type: int, ad_data: bytes) -> AdvertisingDataObject:
@classmethod
def ad_data_to_object(cls, ad_type: int, ad_data: bytes) -> AdvertisingDataObject:
if ad_type in (
AdvertisingData.COMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.LIST_OF_16_BIT_SERVICE_SOLICITATION_UUIDS,
AdvertisingData.Type.COMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.Type.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.Type.LIST_OF_16_BIT_SERVICE_SOLICITATION_UUIDS,
):
return AdvertisingData.uuid_list_to_objects(ad_data, 2)
if ad_type in (
AdvertisingData.COMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.INCOMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.LIST_OF_32_BIT_SERVICE_SOLICITATION_UUIDS,
AdvertisingData.Type.COMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.Type.INCOMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.Type.LIST_OF_32_BIT_SERVICE_SOLICITATION_UUIDS,
):
return AdvertisingData.uuid_list_to_objects(ad_data, 4)
if ad_type in (
AdvertisingData.COMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.INCOMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.LIST_OF_128_BIT_SERVICE_SOLICITATION_UUIDS,
AdvertisingData.Type.COMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.Type.INCOMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.Type.LIST_OF_128_BIT_SERVICE_SOLICITATION_UUIDS,
):
return AdvertisingData.uuid_list_to_objects(ad_data, 16)
if ad_type == AdvertisingData.SERVICE_DATA_16_BIT_UUID:
if ad_type == AdvertisingData.Type.SERVICE_DATA_16_BIT_UUID:
return (UUID.from_bytes(ad_data[:2]), ad_data[2:])
if ad_type == AdvertisingData.SERVICE_DATA_32_BIT_UUID:
if ad_type == AdvertisingData.Type.SERVICE_DATA_32_BIT_UUID:
return (UUID.from_bytes(ad_data[:4]), ad_data[4:])
if ad_type == AdvertisingData.SERVICE_DATA_128_BIT_UUID:
if ad_type == AdvertisingData.Type.SERVICE_DATA_128_BIT_UUID:
return (UUID.from_bytes(ad_data[:16]), ad_data[16:])
if ad_type in (
AdvertisingData.SHORTENED_LOCAL_NAME,
AdvertisingData.COMPLETE_LOCAL_NAME,
AdvertisingData.URI,
AdvertisingData.BROADCAST_NAME,
AdvertisingData.Type.SHORTENED_LOCAL_NAME,
AdvertisingData.Type.COMPLETE_LOCAL_NAME,
AdvertisingData.Type.URI,
AdvertisingData.Type.BROADCAST_NAME,
):
return ad_data.decode("utf-8")
if ad_type in (AdvertisingData.TX_POWER_LEVEL, AdvertisingData.FLAGS):
if ad_type in (AdvertisingData.Type.TX_POWER_LEVEL, AdvertisingData.Type.FLAGS):
return cast(int, struct.unpack('B', ad_data)[0])
if ad_type in (AdvertisingData.ADVERTISING_INTERVAL,):
if ad_type in (AdvertisingData.Type.ADVERTISING_INTERVAL,):
return cast(int, struct.unpack('<H', ad_data)[0])
if ad_type == AdvertisingData.CLASS_OF_DEVICE:
if ad_type == AdvertisingData.Type.CLASS_OF_DEVICE:
return cast(int, struct.unpack('<I', bytes([*ad_data, 0]))[0])
if ad_type == AdvertisingData.PERIPHERAL_CONNECTION_INTERVAL_RANGE:
return cast(Tuple[int, int], struct.unpack('<HH', ad_data))
if ad_type == AdvertisingData.Type.PERIPHERAL_CONNECTION_INTERVAL_RANGE:
return cast(tuple[int, int], struct.unpack('<HH', ad_data))
if ad_type == AdvertisingData.MANUFACTURER_SPECIFIC_DATA:
return (cast(int, struct.unpack_from('<H', ad_data, 0)[0]), ad_data[2:])
if ad_type == AdvertisingData.APPEARANCE:
if ad_type == AdvertisingData.Type.APPEARANCE:
return Appearance.from_int(
cast(int, struct.unpack_from('<H', ad_data, 0)[0])
)
if ad_type == AdvertisingData.Type.MANUFACTURER_SPECIFIC_DATA:
return (cast(int, struct.unpack_from('<H', ad_data, 0)[0]), ad_data[2:])
return ad_data
def append(self, data: bytes) -> None:
@@ -1600,7 +1601,80 @@ class AdvertisingData:
self.ad_structures.append((ad_type, ad_data))
offset += length
def get_all(self, type_id: int, raw: bool = False) -> List[AdvertisingDataObject]:
@overload
def get_all(
self,
type_id: Literal[
AdvertisingData.Type.COMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.Type.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.Type.LIST_OF_16_BIT_SERVICE_SOLICITATION_UUIDS,
AdvertisingData.Type.COMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.Type.INCOMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.Type.LIST_OF_32_BIT_SERVICE_SOLICITATION_UUIDS,
AdvertisingData.Type.COMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.Type.INCOMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.Type.LIST_OF_128_BIT_SERVICE_SOLICITATION_UUIDS,
],
raw: Literal[False] = False,
) -> list[list[UUID]]: ...
@overload
def get_all(
self,
type_id: Literal[
AdvertisingData.Type.SERVICE_DATA_16_BIT_UUID,
AdvertisingData.Type.SERVICE_DATA_32_BIT_UUID,
AdvertisingData.Type.SERVICE_DATA_128_BIT_UUID,
],
raw: Literal[False] = False,
) -> list[tuple[UUID, bytes]]: ...
@overload
def get_all(
self,
type_id: Literal[
AdvertisingData.Type.SHORTENED_LOCAL_NAME,
AdvertisingData.Type.COMPLETE_LOCAL_NAME,
AdvertisingData.Type.URI,
AdvertisingData.Type.BROADCAST_NAME,
],
raw: Literal[False] = False,
) -> list[str]: ...
@overload
def get_all(
self,
type_id: Literal[
AdvertisingData.Type.TX_POWER_LEVEL,
AdvertisingData.Type.FLAGS,
AdvertisingData.Type.ADVERTISING_INTERVAL,
AdvertisingData.Type.CLASS_OF_DEVICE,
],
raw: Literal[False] = False,
) -> list[int]: ...
@overload
def get_all(
self,
type_id: Literal[AdvertisingData.Type.PERIPHERAL_CONNECTION_INTERVAL_RANGE,],
raw: Literal[False] = False,
) -> list[tuple[int, int]]: ...
@overload
def get_all(
self,
type_id: Literal[AdvertisingData.Type.MANUFACTURER_SPECIFIC_DATA,],
raw: Literal[False] = False,
) -> list[tuple[int, bytes]]: ...
@overload
def get_all(
self,
type_id: Literal[AdvertisingData.Type.APPEARANCE,],
raw: Literal[False] = False,
) -> list[Appearance]: ...
@overload
def get_all(self, type_id: int, raw: Literal[True]) -> list[bytes]: ...
@overload
def get_all(
self, type_id: int, raw: bool = False
) -> list[AdvertisingDataObject]: ...
def get_all(self, type_id: int, raw: bool = False) -> list[AdvertisingDataObject]: # type: ignore[misc]
'''
Get Advertising Data Structure(s) with a given type
@@ -1612,6 +1686,79 @@ class AdvertisingData:
return [process_ad_data(ad[1]) for ad in self.ad_structures if ad[0] == type_id]
@overload
def get(
self,
type_id: Literal[
AdvertisingData.Type.COMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.Type.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.Type.LIST_OF_16_BIT_SERVICE_SOLICITATION_UUIDS,
AdvertisingData.Type.COMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.Type.INCOMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.Type.LIST_OF_32_BIT_SERVICE_SOLICITATION_UUIDS,
AdvertisingData.Type.COMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.Type.INCOMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.Type.LIST_OF_128_BIT_SERVICE_SOLICITATION_UUIDS,
],
raw: Literal[False] = False,
) -> Optional[list[UUID]]: ...
@overload
def get(
self,
type_id: Literal[
AdvertisingData.Type.SERVICE_DATA_16_BIT_UUID,
AdvertisingData.Type.SERVICE_DATA_32_BIT_UUID,
AdvertisingData.Type.SERVICE_DATA_128_BIT_UUID,
],
raw: Literal[False] = False,
) -> Optional[tuple[UUID, bytes]]: ...
@overload
def get(
self,
type_id: Literal[
AdvertisingData.Type.SHORTENED_LOCAL_NAME,
AdvertisingData.Type.COMPLETE_LOCAL_NAME,
AdvertisingData.Type.URI,
AdvertisingData.Type.BROADCAST_NAME,
],
raw: Literal[False] = False,
) -> Optional[Optional[str]]: ...
@overload
def get(
self,
type_id: Literal[
AdvertisingData.Type.TX_POWER_LEVEL,
AdvertisingData.Type.FLAGS,
AdvertisingData.Type.ADVERTISING_INTERVAL,
AdvertisingData.Type.CLASS_OF_DEVICE,
],
raw: Literal[False] = False,
) -> Optional[int]: ...
@overload
def get(
self,
type_id: Literal[AdvertisingData.Type.PERIPHERAL_CONNECTION_INTERVAL_RANGE,],
raw: Literal[False] = False,
) -> Optional[tuple[int, int]]: ...
@overload
def get(
self,
type_id: Literal[AdvertisingData.Type.MANUFACTURER_SPECIFIC_DATA,],
raw: Literal[False] = False,
) -> Optional[tuple[int, bytes]]: ...
@overload
def get(
self,
type_id: Literal[AdvertisingData.Type.APPEARANCE,],
raw: Literal[False] = False,
) -> Optional[Appearance]: ...
@overload
def get(self, type_id: int, raw: Literal[True]) -> Optional[bytes]: ...
@overload
def get(
self, type_id: int, raw: bool = False
) -> Optional[AdvertisingDataObject]: ...
def get(self, type_id: int, raw: bool = False) -> Optional[AdvertisingDataObject]:
'''
Get Advertising Data Structure(s) with a given type
+53 -76
View File
@@ -58,9 +58,7 @@ from .host import DataPacketQueue, Host
from .profiles.gap import GenericAccessService
from .core import (
BT_BR_EDR_TRANSPORT,
BT_CENTRAL_ROLE,
BT_LE_TRANSPORT,
BT_PERIPHERAL_ROLE,
AdvertisingData,
BaseBumbleError,
ConnectionParameterUpdateError,
@@ -1555,13 +1553,13 @@ class IsoPacketStream:
class Connection(CompositeEventEmitter):
device: Device
handle: int
transport: int
transport: core.PhysicalTransport
self_address: hci.Address
self_resolvable_address: Optional[hci.Address]
peer_address: hci.Address
peer_resolvable_address: Optional[hci.Address]
peer_le_features: Optional[hci.LeFeatureMask]
role: int
role: hci.Role
encryption: int
authenticated: bool
sc: bool
@@ -1569,8 +1567,8 @@ class Connection(CompositeEventEmitter):
gatt_client: gatt_client.Client
pairing_peer_io_capability: Optional[int]
pairing_peer_authentication_requirements: Optional[int]
cs_configs: dict[int, ChannelSoundingConfig] = {} # Config ID to Configuration
cs_procedures: dict[int, ChannelSoundingProcedure] = {} # Config ID to Procedures
cs_configs: dict[int, ChannelSoundingConfig] # Config ID to Configuration
cs_procedures: dict[int, ChannelSoundingProcedure] # Config ID to Procedures
@composite_listener
class Listener:
@@ -1586,7 +1584,7 @@ class Connection(CompositeEventEmitter):
def on_connection_data_length_change(self):
pass
def on_connection_phy_update(self):
def on_connection_phy_update(self, phy):
pass
def on_connection_phy_update_failure(self, error):
@@ -1612,7 +1610,6 @@ class Connection(CompositeEventEmitter):
peer_resolvable_address,
role,
parameters,
phy,
):
super().__init__()
self.device = device
@@ -1629,7 +1626,6 @@ class Connection(CompositeEventEmitter):
self.authenticated = False
self.sc = False
self.link_key_type = None
self.phy = phy
self.att_mtu = ATT_DEFAULT_MTU
self.data_length = DEVICE_DEFAULT_DATA_LENGTH
self.gatt_client = None # Per-connection client
@@ -1639,6 +1635,8 @@ class Connection(CompositeEventEmitter):
self.pairing_peer_io_capability = None
self.pairing_peer_authentication_requirements = None
self.peer_le_features = None
self.cs_configs = {}
self.cs_procedures = {}
# [Classic only]
@classmethod
@@ -1658,7 +1656,6 @@ class Connection(CompositeEventEmitter):
None,
role,
None,
None,
)
# [Classic only]
@@ -1675,9 +1672,9 @@ class Connection(CompositeEventEmitter):
def role_name(self):
if self.role is None:
return 'NOT-SET'
if self.role == BT_CENTRAL_ROLE:
if self.role == hci.Role.CENTRAL:
return 'CENTRAL'
if self.role == BT_PERIPHERAL_ROLE:
if self.role == hci.Role.PERIPHERAL:
return 'PERIPHERAL'
return f'UNKNOWN[{self.role}]'
@@ -1735,7 +1732,7 @@ class Connection(CompositeEventEmitter):
async def encrypt(self, enable: bool = True) -> None:
return await self.device.encrypt(self, enable)
async def switch_role(self, role: int) -> None:
async def switch_role(self, role: hci.Role) -> None:
return await self.device.switch_role(self, role)
async def sustain(self, timeout: Optional[float] = None) -> None:
@@ -1774,12 +1771,12 @@ class Connection(CompositeEventEmitter):
async def set_phy(self, tx_phys=None, rx_phys=None, phy_options=None):
return await self.device.set_connection_phy(self, tx_phys, rx_phys, phy_options)
async def get_phy(self) -> ConnectionPHY:
return await self.device.get_connection_phy(self)
async def get_rssi(self):
return await self.device.get_connection_rssi(self)
async def get_phy(self):
return await self.device.get_connection_phy(self)
async def transfer_periodic_sync(
self, sync_handle: int, service_data: int = 0
) -> None:
@@ -2049,9 +2046,9 @@ class Device(CompositeEventEmitter):
legacy_advertiser: Optional[LegacyAdvertiser]
sco_links: Dict[int, ScoLink]
cis_links: Dict[int, CisLink]
bigs = dict[int, Big]()
bis_links = dict[int, BisLink]()
big_syncs = dict[int, BigSync]()
bigs: dict[int, Big]
bis_links: dict[int, BisLink]
big_syncs: dict[int, BigSync]
_pending_cis: Dict[int, tuple[int, int]]
gatt_service: gatt_service.GenericAttributeProfileService | None = None
@@ -2144,6 +2141,9 @@ class Device(CompositeEventEmitter):
self.sco_links = {} # ScoLinks, by connection handle (BR/EDR only)
self.cis_links = {} # CisLinks, by connection handle (LE only)
self._pending_cis = {} # (CIS_ID, CIG_ID), by CIS_handle
self.bigs = {}
self.bis_links = {}
self.big_syncs = {}
self.classic_enabled = False
self.inquiry_response = None
self.address_resolver = None
@@ -2711,7 +2711,7 @@ class Device(CompositeEventEmitter):
if phy == hci.HCI_LE_1M_PHY:
return True
feature_map = {
feature_map: dict[int, hci.LeFeatureMask] = {
hci.HCI_LE_2M_PHY: hci.LeFeatureMask.LE_2M_PHY,
hci.HCI_LE_CODED_PHY: hci.LeFeatureMask.LE_CODED_PHY,
}
@@ -2732,7 +2732,7 @@ class Device(CompositeEventEmitter):
self,
advertising_type: AdvertisingType = AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE,
target: Optional[hci.Address] = None,
own_address_type: int = hci.OwnAddressType.RANDOM,
own_address_type: hci.OwnAddressType = hci.OwnAddressType.RANDOM,
auto_restart: bool = False,
advertising_data: Optional[bytes] = None,
scan_response_data: Optional[bytes] = None,
@@ -3013,7 +3013,7 @@ class Device(CompositeEventEmitter):
active: bool = True,
scan_interval: float = DEVICE_DEFAULT_SCAN_INTERVAL, # Scan interval in ms
scan_window: float = DEVICE_DEFAULT_SCAN_WINDOW, # Scan window in ms
own_address_type: int = hci.OwnAddressType.RANDOM,
own_address_type: hci.OwnAddressType = hci.OwnAddressType.RANDOM,
filter_duplicates: bool = False,
scanning_phys: Sequence[int] = (hci.HCI_LE_1M_PHY, hci.HCI_LE_CODED_PHY),
) -> None:
@@ -3089,7 +3089,7 @@ class Device(CompositeEventEmitter):
# pylint: disable=line-too-long
hci.HCI_LE_Set_Scan_Parameters_Command(
le_scan_type=scan_type,
le_scan_interval=int(scan_window / 0.625),
le_scan_interval=int(scan_interval / 0.625),
le_scan_window=int(scan_window / 0.625),
own_address_type=own_address_type,
scanning_filter_policy=hci.HCI_LE_Set_Scan_Parameters_Command.BASIC_UNFILTERED_POLICY,
@@ -3379,11 +3379,11 @@ class Device(CompositeEventEmitter):
async def connect(
self,
peer_address: Union[hci.Address, str],
transport: int = BT_LE_TRANSPORT,
transport: core.PhysicalTransport = BT_LE_TRANSPORT,
connection_parameters_preferences: Optional[
Dict[int, ConnectionParametersPreferences]
dict[hci.Phy, ConnectionParametersPreferences]
] = None,
own_address_type: int = hci.OwnAddressType.RANDOM,
own_address_type: hci.OwnAddressType = hci.OwnAddressType.RANDOM,
timeout: Optional[float] = DEVICE_DEFAULT_CONNECT_TIMEOUT,
always_resolve: bool = False,
) -> Connection:
@@ -3431,6 +3431,7 @@ class Device(CompositeEventEmitter):
# Check parameters
if transport not in (BT_LE_TRANSPORT, BT_BR_EDR_TRANSPORT):
raise InvalidArgumentError('invalid transport')
transport = core.PhysicalTransport(transport)
# Adjust the transport automatically if we need to
if transport == BT_LE_TRANSPORT and not self.le_enabled:
@@ -3626,7 +3627,7 @@ class Device(CompositeEventEmitter):
else:
# Save pending connection
self.pending_connections[peer_address] = Connection.incomplete(
self, peer_address, BT_CENTRAL_ROLE
self, peer_address, hci.Role.CENTRAL
)
# TODO: allow passing other settings
@@ -3681,7 +3682,7 @@ class Device(CompositeEventEmitter):
async def accept(
self,
peer_address: Union[hci.Address, str] = hci.Address.ANY,
role: int = BT_PERIPHERAL_ROLE,
role: hci.Role = hci.Role.PERIPHERAL,
timeout: Optional[float] = DEVICE_DEFAULT_CONNECT_TIMEOUT,
) -> Connection:
'''
@@ -3767,12 +3768,12 @@ class Device(CompositeEventEmitter):
self.on('connection', on_connection)
self.on('connection_failure', on_connection_failure)
# Save pending connection, with the Peripheral role.
# Save pending connection, with the Peripheral hci.role.
# Even if we requested a role switch in the hci.HCI_Accept_Connection_Request
# command, this connection is still considered Peripheral until an eventual
# role change event.
self.pending_connections[peer_address] = Connection.incomplete(
self, peer_address, BT_PERIPHERAL_ROLE
self, peer_address, hci.Role.PERIPHERAL
)
try:
@@ -3901,7 +3902,7 @@ class Device(CompositeEventEmitter):
'''
if use_l2cap:
if connection.role != BT_PERIPHERAL_ROLE:
if connection.role != hci.Role.PERIPHERAL:
raise InvalidStateError(
'only peripheral can update connection parameters with l2cap'
)
@@ -3937,12 +3938,14 @@ class Device(CompositeEventEmitter):
)
return result.return_parameters.rssi
async def get_connection_phy(self, connection):
async def get_connection_phy(self, connection: Connection) -> ConnectionPHY:
result = await self.send_command(
hci.HCI_LE_Read_PHY_Command(connection_handle=connection.handle),
check_result=True,
)
return (result.return_parameters.tx_phy, result.return_parameters.rx_phy)
return ConnectionPHY(
result.return_parameters.tx_phy, result.return_parameters.rx_phy
)
async def set_connection_phy(
self, connection, tx_phys=None, rx_phys=None, phy_options=None
@@ -4006,13 +4009,12 @@ class Device(CompositeEventEmitter):
# Create a future to wait for an address to be found
peer_address = asyncio.get_running_loop().create_future()
def on_peer_found(address, ad_data):
local_name = ad_data.get(AdvertisingData.COMPLETE_LOCAL_NAME, raw=True)
if local_name is None:
local_name = ad_data.get(AdvertisingData.SHORTENED_LOCAL_NAME, raw=True)
if local_name is not None:
if local_name.decode('utf-8') == name:
peer_address.set_result(address)
def on_peer_found(address: hci.Address, ad_data: AdvertisingData) -> None:
local_name = ad_data.get(
AdvertisingData.Type.COMPLETE_LOCAL_NAME
) or ad_data.get(AdvertisingData.Type.SHORTENED_LOCAL_NAME)
if local_name == name:
peer_address.set_result(address)
listener = None
was_scanning = self.scanning
@@ -4145,10 +4147,10 @@ class Device(CompositeEventEmitter):
if keys.ltk:
return keys.ltk.value
if connection.role == BT_CENTRAL_ROLE and keys.ltk_central:
if connection.role == hci.Role.CENTRAL and keys.ltk_central:
return keys.ltk_central.value
if connection.role == BT_PERIPHERAL_ROLE and keys.ltk_peripheral:
if connection.role == hci.Role.PERIPHERAL and keys.ltk_peripheral:
return keys.ltk_peripheral.value
return None
@@ -4300,7 +4302,7 @@ class Device(CompositeEventEmitter):
self.emit('key_store_update')
# [Classic only]
async def switch_role(self, connection: Connection, role: int):
async def switch_role(self, connection: Connection, role: hci.Role):
pending_role_change = asyncio.get_running_loop().create_future()
def on_role_change(new_role):
@@ -5169,40 +5171,17 @@ class Device(CompositeEventEmitter):
lambda _: self.abort_on('flush', advertising_set.start()),
)
self._emit_le_connection(connection)
def _emit_le_connection(self, connection: Connection) -> None:
# If supported, read which PHY we're connected with before
# notifying listeners of the new connection.
if self.host.supports_command(hci.HCI_LE_READ_PHY_COMMAND):
async def read_phy():
result = await self.send_command(
hci.HCI_LE_Read_PHY_Command(connection_handle=connection.handle),
check_result=True,
)
connection.phy = ConnectionPHY(
result.return_parameters.tx_phy, result.return_parameters.rx_phy
)
# Emit an event to notify listeners of the new connection
self.emit('connection', connection)
# Do so asynchronously to not block the current event handler
connection.abort_on('disconnection', read_phy())
return
self.emit('connection', connection)
@host_event_handler
def on_connection(
self,
connection_handle: int,
transport: int,
transport: core.PhysicalTransport,
peer_address: hci.Address,
self_resolvable_address: Optional[hci.Address],
peer_resolvable_address: Optional[hci.Address],
role: int,
role: hci.Role,
connection_parameters: ConnectionParameters,
) -> None:
# Convert all-zeros addresses into None.
@@ -5245,7 +5224,7 @@ class Device(CompositeEventEmitter):
peer_address = resolved_address
self_address = None
own_address_type: Optional[int] = None
own_address_type: Optional[hci.OwnAddressType] = None
if role == hci.HCI_CENTRAL_ROLE:
own_address_type = self.connect_own_address_type
assert own_address_type is not None
@@ -5290,7 +5269,6 @@ class Device(CompositeEventEmitter):
peer_resolvable_address,
role,
connection_parameters,
ConnectionPHY(hci.HCI_LE_1M_PHY, hci.HCI_LE_1M_PHY),
)
self.connections[connection_handle] = connection
@@ -5306,7 +5284,7 @@ class Device(CompositeEventEmitter):
if role == hci.HCI_CENTRAL_ROLE or not self.supports_le_extended_advertising:
# We can emit now, we have all the info we need
self._emit_le_connection(connection)
self.emit('connection', connection)
return
if role == hci.HCI_PERIPHERAL_ROLE and self.supports_le_extended_advertising:
@@ -5374,7 +5352,7 @@ class Device(CompositeEventEmitter):
elif self.classic_accept_any:
# Save pending connection
self.pending_connections[bd_addr] = Connection.incomplete(
self, bd_addr, BT_PERIPHERAL_ROLE
self, bd_addr, hci.Role.PERIPHERAL
)
self.host.send_command_sync(
@@ -5860,14 +5838,13 @@ class Device(CompositeEventEmitter):
@host_event_handler
@with_connection_from_handle
def on_connection_phy_update(self, connection, connection_phy):
def on_connection_phy_update(self, connection, phy):
logger.debug(
f'*** Connection PHY Update: [0x{connection.handle:04X}] '
f'{connection.peer_address} as {connection.role_name}, '
f'{connection_phy}'
f'{phy}'
)
connection.phy = connection_phy
connection.emit('connection_phy_update')
connection.emit('connection_phy_update', phy)
@host_event_handler
@with_connection_from_handle
+76 -71
View File
@@ -24,6 +24,7 @@ import logging
import secrets
import struct
from typing import Any, Callable, Dict, Iterable, List, Optional, Type, Union, ClassVar
from typing_extensions import Self
from bumble import crypto
from bumble.colors import color
@@ -34,6 +35,7 @@ from bumble.core import (
InvalidArgumentError,
InvalidPacketError,
ProtocolError,
PhysicalTransport,
bit_flags_to_strings,
name_or_number,
padded_bytes,
@@ -94,7 +96,7 @@ def map_class_of_device(class_of_device):
)
def phy_list_to_bits(phys: Optional[Iterable[int]]) -> int:
def phy_list_to_bits(phys: Optional[Iterable[Phy]]) -> int:
if phys is None:
return 0
@@ -700,30 +702,22 @@ HCI_ERROR_NAMES[HCI_SUCCESS] = 'HCI_SUCCESS'
HCI_COMMAND_STATUS_PENDING = 0
class Phy(enum.IntEnum):
LE_1M = 1
LE_2M = 2
LE_CODED = 3
# ACL
HCI_ACL_PB_FIRST_NON_FLUSHABLE = 0
HCI_ACL_PB_CONTINUATION = 1
HCI_ACL_PB_FIRST_FLUSHABLE = 2
HCI_ACK_PB_COMPLETE_L2CAP = 3
# Roles
HCI_CENTRAL_ROLE = 0
HCI_PERIPHERAL_ROLE = 1
HCI_ROLE_NAMES = {
HCI_CENTRAL_ROLE: 'CENTRAL',
HCI_PERIPHERAL_ROLE: 'PERIPHERAL'
}
# LE PHY Types
HCI_LE_1M_PHY = 1
HCI_LE_2M_PHY = 2
HCI_LE_CODED_PHY = 3
HCI_LE_PHY_NAMES = {
HCI_LE_1M_PHY: 'LE 1M',
HCI_LE_2M_PHY: 'LE 2M',
HCI_LE_CODED_PHY: 'LE Coded'
HCI_LE_PHY_NAMES: dict[int,str] = {
Phy.LE_1M: 'LE 1M',
Phy.LE_2M: 'LE 2M',
Phy.LE_CODED: 'LE Coded'
}
HCI_LE_1M_PHY_BIT = 0
@@ -732,19 +726,13 @@ HCI_LE_CODED_PHY_BIT = 2
HCI_LE_PHY_BIT_NAMES = ['LE_1M_PHY', 'LE_2M_PHY', 'LE_CODED_PHY']
HCI_LE_PHY_TYPE_TO_BIT = {
HCI_LE_1M_PHY: HCI_LE_1M_PHY_BIT,
HCI_LE_2M_PHY: HCI_LE_2M_PHY_BIT,
HCI_LE_CODED_PHY: HCI_LE_CODED_PHY_BIT
HCI_LE_PHY_TYPE_TO_BIT: dict[Phy, int] = {
Phy.LE_1M: HCI_LE_1M_PHY_BIT,
Phy.LE_2M: HCI_LE_2M_PHY_BIT,
Phy.LE_CODED: HCI_LE_CODED_PHY_BIT,
}
class Phy(enum.IntEnum):
LE_1M = HCI_LE_1M_PHY
LE_2M = HCI_LE_2M_PHY
LE_CODED = HCI_LE_CODED_PHY
class PhyBit(enum.IntFlag):
LE_1M = 1 << HCI_LE_1M_PHY_BIT
LE_2M = 1 << HCI_LE_2M_PHY_BIT
@@ -811,6 +799,19 @@ class CsSubeventAbortReason(OpenIntEnum):
SCHEDULING_CONFLICT_OR_LIMITED_RESOURCES = 0x03
UNSPECIFIED = 0x0F
class Role(enum.IntEnum):
CENTRAL = 0
PERIPHERAL = 1
# For Backward Compatibility.
HCI_CENTRAL_ROLE = Role.CENTRAL
HCI_PERIPHERAL_ROLE = Role.PERIPHERAL
HCI_LE_1M_PHY = Phy.LE_1M
HCI_LE_2M_PHY = Phy.LE_2M
HCI_LE_CODED_PHY = Phy.LE_CODED
# Connection Parameters
HCI_CONNECTION_INTERVAL_MS_PER_UNIT = 1.25
@@ -889,10 +890,15 @@ HCI_LINK_TYPE_NAMES = {
}
# Address types
HCI_PUBLIC_DEVICE_ADDRESS_TYPE = 0x00
HCI_RANDOM_DEVICE_ADDRESS_TYPE = 0x01
HCI_PUBLIC_IDENTITY_ADDRESS_TYPE = 0x02
HCI_RANDOM_IDENTITY_ADDRESS_TYPE = 0x03
class AddressType(OpenIntEnum):
PUBLIC_DEVICE = 0x00
RANDOM_DEVICE = 0x01
PUBLIC_IDENTITY = 0x02
RANDOM_IDENTITY = 0x03
# (Directed Only) Address is RPA, but controller cannot resolve.
UNABLE_TO_RESOLVE = 0xFE
# (Extended Only) No address.
ANONYMOUS = 0xFF
# Supported Commands Masks
# See Bluetooth spec @ 6.27 SUPPORTED COMMANDS
@@ -1582,8 +1588,8 @@ class HCI_Constant:
return HCI_ERROR_NAMES.get(status, f'0x{status:02X}')
@staticmethod
def role_name(role):
return HCI_ROLE_NAMES.get(role, str(role))
def role_name(role: int) -> str:
return Role(role).name
@staticmethod
def le_phy_name(phy):
@@ -1949,17 +1955,10 @@ class Address:
address[0] is the LSB of the address, address[5] is the MSB.
'''
PUBLIC_DEVICE_ADDRESS = 0x00
RANDOM_DEVICE_ADDRESS = 0x01
PUBLIC_IDENTITY_ADDRESS = 0x02
RANDOM_IDENTITY_ADDRESS = 0x03
ADDRESS_TYPE_NAMES = {
PUBLIC_DEVICE_ADDRESS: 'PUBLIC_DEVICE_ADDRESS',
RANDOM_DEVICE_ADDRESS: 'RANDOM_DEVICE_ADDRESS',
PUBLIC_IDENTITY_ADDRESS: 'PUBLIC_IDENTITY_ADDRESS',
RANDOM_IDENTITY_ADDRESS: 'RANDOM_IDENTITY_ADDRESS',
}
PUBLIC_DEVICE_ADDRESS = AddressType.PUBLIC_DEVICE
RANDOM_DEVICE_ADDRESS = AddressType.RANDOM_DEVICE
PUBLIC_IDENTITY_ADDRESS = AddressType.PUBLIC_IDENTITY
RANDOM_IDENTITY_ADDRESS = AddressType.RANDOM_IDENTITY
# Type declarations
NIL: Address
@@ -1969,40 +1968,44 @@ class Address:
# pylint: disable-next=unnecessary-lambda
ADDRESS_TYPE_SPEC = {'size': 1, 'mapper': lambda x: Address.address_type_name(x)}
@staticmethod
def address_type_name(address_type):
return name_or_number(Address.ADDRESS_TYPE_NAMES, address_type)
@classmethod
def address_type_name(cls: type[Self], address_type: int) -> str:
return AddressType(address_type).name
@staticmethod
def from_string_for_transport(string, transport):
@classmethod
def from_string_for_transport(
cls: type[Self], string: str, transport: PhysicalTransport
) -> Self:
if transport == BT_BR_EDR_TRANSPORT:
address_type = Address.PUBLIC_DEVICE_ADDRESS
else:
address_type = Address.RANDOM_DEVICE_ADDRESS
return Address(string, address_type)
return cls(string, address_type)
@staticmethod
def parse_address(data, offset):
@classmethod
def parse_address(cls: type[Self], data: bytes, offset: int) -> tuple[int, Self]:
# Fix the type to a default value. This is used for parsing type-less Classic
# addresses
return Address.parse_address_with_type(
data, offset, Address.PUBLIC_DEVICE_ADDRESS
)
return cls.parse_address_with_type(data, offset, Address.PUBLIC_DEVICE_ADDRESS)
@staticmethod
def parse_random_address(data, offset):
return Address.parse_address_with_type(
data, offset, Address.RANDOM_DEVICE_ADDRESS
)
@classmethod
def parse_random_address(
cls: type[Self], data: bytes, offset: int
) -> tuple[int, Self]:
return cls.parse_address_with_type(data, offset, Address.RANDOM_DEVICE_ADDRESS)
@staticmethod
def parse_address_with_type(data, offset, address_type):
return offset + 6, Address(data[offset : offset + 6], address_type)
@classmethod
def parse_address_with_type(
cls: type[Self], data: bytes, offset: int, address_type: AddressType
) -> tuple[int, Self]:
return offset + 6, cls(data[offset : offset + 6], address_type)
@staticmethod
def parse_address_preceded_by_type(data, offset):
address_type = data[offset - 1]
return Address.parse_address_with_type(data, offset, address_type)
@classmethod
def parse_address_preceded_by_type(
cls: type[Self], data: bytes, offset: int
) -> tuple[int, Self]:
address_type = AddressType(data[offset - 1])
return cls.parse_address_with_type(data, offset, address_type)
@classmethod
def generate_static_address(cls) -> Address:
@@ -2042,8 +2045,10 @@ class Address:
)
def __init__(
self, address: Union[bytes, str], address_type: int = RANDOM_DEVICE_ADDRESS
):
self,
address: Union[bytes, str],
address_type: AddressType = RANDOM_DEVICE_ADDRESS,
) -> None:
'''
Initialize an instance. `address` may be a byte array in little-endian
format, or a hex string in big-endian format (with optional ':'
+19 -8
View File
@@ -44,6 +44,7 @@ from bumble import hci
from bumble.core import (
BT_BR_EDR_TRANSPORT,
BT_LE_TRANSPORT,
PhysicalTransport,
ConnectionPHY,
ConnectionParameters,
)
@@ -186,7 +187,11 @@ class DataPacketQueue(pyee.EventEmitter):
# -----------------------------------------------------------------------------
class Connection:
def __init__(
self, host: Host, handle: int, peer_address: hci.Address, transport: int
self,
host: Host,
handle: int,
peer_address: hci.Address,
transport: PhysicalTransport,
):
self.host = host
self.handle = handle
@@ -235,7 +240,7 @@ class Host(AbortableEventEmitter):
cis_links: Dict[int, IsoLink]
bis_links: Dict[int, IsoLink]
sco_links: Dict[int, ScoLink]
bigs: dict[int, set[int]] = {} # BIG Handle to BIS Handles
bigs: dict[int, set[int]]
acl_packet_queue: Optional[DataPacketQueue] = None
le_acl_packet_queue: Optional[DataPacketQueue] = None
iso_packet_queue: Optional[DataPacketQueue] = None
@@ -259,6 +264,7 @@ class Host(AbortableEventEmitter):
self.cis_links = {} # CIS links, by connection handle
self.bis_links = {} # BIS links, by connection handle
self.sco_links = {} # SCO links, by connection handle
self.bigs = {} # BIG Handle to BIS Handles
self.pending_command = None
self.pending_response: Optional[asyncio.Future[Any]] = None
self.number_of_supported_advertising_sets = 0
@@ -978,7 +984,7 @@ class Host(AbortableEventEmitter):
event.peer_address,
getattr(event, 'local_resolvable_private_address', None),
getattr(event, 'peer_resolvable_private_address', None),
event.role,
hci.Role(event.role),
connection_parameters,
)
else:
@@ -1061,8 +1067,10 @@ class Host(AbortableEventEmitter):
)
# Flush the data queues
self.acl_packet_queue.flush(handle)
self.le_acl_packet_queue.flush(handle)
if self.acl_packet_queue:
self.acl_packet_queue.flush(handle)
if self.le_acl_packet_queue:
self.le_acl_packet_queue.flush(handle)
if self.iso_packet_queue:
self.iso_packet_queue.flush(handle)
else:
@@ -1098,8 +1106,11 @@ class Host(AbortableEventEmitter):
# Notify the client
if event.status == hci.HCI_SUCCESS:
connection_phy = ConnectionPHY(event.tx_phy, event.rx_phy)
self.emit('connection_phy_update', connection.handle, connection_phy)
self.emit(
'connection_phy_update',
connection.handle,
ConnectionPHY(event.tx_phy, event.rx_phy),
)
else:
self.emit('connection_phy_update_failure', connection.handle, event.status)
@@ -1331,7 +1342,7 @@ class Host(AbortableEventEmitter):
f'role change for {event.bd_addr}: '
f'{hci.HCI_Constant.role_name(event.new_role)}'
)
self.emit('role_change', event.bd_addr, event.new_role)
self.emit('role_change', event.bd_addr, hci.Role(event.new_role))
else:
logger.debug(
f'role change for {event.bd_addr} failed: '
+2 -2
View File
@@ -42,7 +42,6 @@ from typing import (
from .utils import deprecated
from .colors import color
from .core import (
BT_CENTRAL_ROLE,
InvalidStateError,
InvalidArgumentError,
InvalidPacketError,
@@ -52,6 +51,7 @@ from .core import (
from .hci import (
HCI_LE_Connection_Update_Command,
HCI_Object,
Role,
key_with_value,
name_or_number,
)
@@ -1908,7 +1908,7 @@ class ChannelManager:
def on_l2cap_connection_parameter_update_request(
self, connection: Connection, cid: int, request
):
if connection.role == BT_CENTRAL_ROLE:
if connection.role == Role.CENTRAL:
self.send_control_frame(
connection,
cid,
+2 -2
View File
@@ -20,7 +20,6 @@ import asyncio
from functools import partial
from bumble.core import (
BT_PERIPHERAL_ROLE,
BT_BR_EDR_TRANSPORT,
BT_LE_TRANSPORT,
InvalidStateError,
@@ -28,6 +27,7 @@ from bumble.core import (
from bumble.colors import color
from bumble.hci import (
Address,
Role,
HCI_SUCCESS,
HCI_CONNECTION_ACCEPT_TIMEOUT_ERROR,
HCI_CONNECTION_TIMEOUT_ERROR,
@@ -292,7 +292,7 @@ class LocalLink:
return
async def task():
if responder_role != BT_PERIPHERAL_ROLE:
if responder_role != Role.PERIPHERAL:
initiator_controller.on_classic_role_change(
responder_controller.public_address, int(not (responder_role))
)
+5 -5
View File
@@ -1,4 +1,4 @@
# Copyright 2021-2023 Google LLC
# Copyright 2021-2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -76,18 +76,18 @@ class OobData:
return instance
def to_ad(self) -> AdvertisingData:
ad_structures = []
ad_structures: list[tuple[int, bytes]] = []
if self.address is not None:
ad_structures.append(
(AdvertisingData.LE_BLUETOOTH_DEVICE_ADDRESS, bytes(self.address))
(AdvertisingData.Type.LE_BLUETOOTH_DEVICE_ADDRESS, bytes(self.address))
)
if self.role is not None:
ad_structures.append((AdvertisingData.LE_ROLE, bytes([self.role])))
ad_structures.append((AdvertisingData.Type.LE_ROLE, bytes([self.role])))
if self.shared_data is not None:
ad_structures.extend(self.shared_data.to_ad().ad_structures)
if self.legacy_context is not None:
ad_structures.append(
(AdvertisingData.SECURITY_MANAGER_TK_VALUE, self.legacy_context.tk)
(AdvertisingData.Type.SECURITY_MANAGER_TK_VALUE, self.legacy_context.tk)
)
return AdvertisingData(ad_structures)
+19 -23
View File
@@ -25,7 +25,6 @@ from .config import Config
from bumble.core import (
BT_BR_EDR_TRANSPORT,
BT_LE_TRANSPORT,
BT_PERIPHERAL_ROLE,
UUID,
AdvertisingData,
Appearance,
@@ -47,6 +46,8 @@ from bumble.hci import (
HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR,
Address,
Phy,
Role,
OwnAddressType,
)
from google.protobuf import any_pb2 # pytype: disable=pyi-error
from google.protobuf import empty_pb2 # pytype: disable=pyi-error
@@ -114,11 +115,11 @@ SECONDARY_PHY_TO_BUMBLE_PHY_MAP: Dict[SecondaryPhy, Phy] = {
SECONDARY_CODED: Phy.LE_CODED,
}
OWN_ADDRESS_MAP: Dict[host_pb2.OwnAddressType, bumble.hci.OwnAddressType] = {
host_pb2.PUBLIC: bumble.hci.OwnAddressType.PUBLIC,
host_pb2.RANDOM: bumble.hci.OwnAddressType.RANDOM,
host_pb2.RESOLVABLE_OR_PUBLIC: bumble.hci.OwnAddressType.RESOLVABLE_OR_PUBLIC,
host_pb2.RESOLVABLE_OR_RANDOM: bumble.hci.OwnAddressType.RESOLVABLE_OR_RANDOM,
OWN_ADDRESS_MAP: Dict[host_pb2.OwnAddressType, OwnAddressType] = {
host_pb2.PUBLIC: OwnAddressType.PUBLIC,
host_pb2.RANDOM: OwnAddressType.RANDOM,
host_pb2.RESOLVABLE_OR_PUBLIC: OwnAddressType.RESOLVABLE_OR_PUBLIC,
host_pb2.RESOLVABLE_OR_RANDOM: OwnAddressType.RESOLVABLE_OR_RANDOM,
}
@@ -250,7 +251,7 @@ class HostService(HostServicer):
connection = await self.device.connect(
address,
transport=BT_LE_TRANSPORT,
own_address_type=request.own_address_type,
own_address_type=OwnAddressType(request.own_address_type),
)
except ConnectionError as e:
if e.error_code == HCI_PAGE_TIMEOUT_ERROR:
@@ -371,18 +372,16 @@ class HostService(HostServicer):
scan_response_data=scan_response_data,
)
pending_connection: asyncio.Future[bumble.device.Connection] = (
asyncio.get_running_loop().create_future()
)
connections: asyncio.Queue[bumble.device.Connection] = asyncio.Queue()
if request.connectable:
def on_connection(connection: bumble.device.Connection) -> None:
if (
connection.transport == BT_LE_TRANSPORT
and connection.role == BT_PERIPHERAL_ROLE
and connection.role == Role.PERIPHERAL
):
pending_connection.set_result(connection)
connections.put_nowait(connection)
self.device.on('connection', on_connection)
@@ -397,8 +396,7 @@ class HostService(HostServicer):
await asyncio.sleep(1)
continue
connection = await pending_connection
pending_connection = asyncio.get_running_loop().create_future()
connection = await connections.get()
cookie = any_pb2.Any(value=connection.handle.to_bytes(4, 'big'))
yield AdvertiseResponse(connection=Connection(cookie=cookie))
@@ -492,14 +490,16 @@ class HostService(HostServicer):
target = Address(target_bytes, Address.RANDOM_DEVICE_ADDRESS)
advertising_type = AdvertisingType.DIRECTED_CONNECTABLE_LOW_DUTY
connections: asyncio.Queue[bumble.device.Connection] = asyncio.Queue()
if request.connectable:
def on_connection(connection: bumble.device.Connection) -> None:
if (
connection.transport == BT_LE_TRANSPORT
and connection.role == BT_PERIPHERAL_ROLE
and connection.role == Role.PERIPHERAL
):
pending_connection.set_result(connection)
connections.put_nowait(connection)
self.device.on('connection', on_connection)
@@ -510,19 +510,15 @@ class HostService(HostServicer):
await self.device.start_advertising(
target=target,
advertising_type=advertising_type,
own_address_type=request.own_address_type,
own_address_type=OwnAddressType(request.own_address_type),
)
if not request.connectable:
await asyncio.sleep(1)
continue
pending_connection: asyncio.Future[bumble.device.Connection] = (
asyncio.get_running_loop().create_future()
)
self.log.debug('Wait for LE connection...')
connection = await pending_connection
connection = await connections.get()
self.log.debug(
f"Advertise: Connected to {connection.peer_address} (handle={connection.handle})"
@@ -563,7 +559,7 @@ class HostService(HostServicer):
await self.device.start_scanning(
legacy=request.legacy,
active=not request.passive,
own_address_type=request.own_address_type,
own_address_type=OwnAddressType(request.own_address_type),
scan_interval=(
int(request.interval)
if request.interval
+2 -3
View File
@@ -24,11 +24,10 @@ from bumble import hci
from bumble.core import (
BT_BR_EDR_TRANSPORT,
BT_LE_TRANSPORT,
BT_PERIPHERAL_ROLE,
ProtocolError,
)
from bumble.device import Connection as BumbleConnection, Device
from bumble.hci import HCI_Error
from bumble.hci import HCI_Error, Role
from bumble.utils import EventWatcher
from bumble.pairing import PairingConfig, PairingDelegate as BasePairingDelegate
from google.protobuf import any_pb2 # pytype: disable=pyi-error
@@ -318,7 +317,7 @@ class SecurityService(SecurityServicer):
if (
connection.transport == BT_LE_TRANSPORT
and connection.role == BT_PERIPHERAL_ROLE
and connection.role == Role.PERIPHERAL
):
connection.request_pairing()
else:
+2 -2
View File
@@ -20,11 +20,11 @@ import inspect
import logging
from bumble.device import Device
from bumble.hci import Address
from bumble.hci import Address, AddressType
from google.protobuf.message import Message # pytype: disable=pyi-error
from typing import Any, Dict, Generator, MutableMapping, Optional, Tuple
ADDRESS_TYPES: Dict[str, int] = {
ADDRESS_TYPES: Dict[str, AddressType] = {
"public": Address.PUBLIC_DEVICE_ADDRESS,
"random": Address.RANDOM_DEVICE_ADDRESS,
"public_identity": Address.PUBLIC_IDENTITY_ADDRESS,
+2 -1
View File
@@ -301,7 +301,7 @@ class AseStateMachine(gatt.Characteristic):
presentation_delay = 0
# Additional parameters in ENABLING, STREAMING, DISABLING State
metadata = le_audio.Metadata()
metadata: le_audio.Metadata
def __init__(
self,
@@ -313,6 +313,7 @@ class AseStateMachine(gatt.Characteristic):
self.ase_id = ase_id
self._state = AseStateMachine.State.IDLE
self.role = role
self.metadata = le_audio.Metadata()
uuid = (
gatt.GATT_SINK_ASE_CHARACTERISTIC
+1 -1
View File
@@ -40,7 +40,7 @@ class PublicBroadcastAnnouncement:
def from_bytes(cls, data: bytes) -> Self:
features = cls.Features(data[0])
metadata_length = data[1]
metadata_ltv = data[1 : 1 + metadata_length]
metadata_ltv = data[2 : 2 + metadata_length]
return cls(
features=features, metadata=le_audio.Metadata.from_bytes(metadata_ltv)
)
+3 -3
View File
@@ -46,13 +46,13 @@ from pyee import EventEmitter
from .colors import color
from .hci import (
Address,
Role,
HCI_LE_Enable_Encryption_Command,
HCI_Object,
key_with_value,
)
from .core import (
BT_BR_EDR_TRANSPORT,
BT_CENTRAL_ROLE,
BT_LE_TRANSPORT,
AdvertisingData,
InvalidArgumentError,
@@ -1975,7 +1975,7 @@ class Manager(EventEmitter):
# Look for a session with this connection, and create one if none exists
if not (session := self.sessions.get(connection.handle)):
if connection.role == BT_CENTRAL_ROLE:
if connection.role == Role.CENTRAL:
logger.warning('Remote starts pairing as Peripheral!')
pairing_config = self.pairing_config_factory(connection)
session = self.session_proxy(
@@ -1995,7 +1995,7 @@ class Manager(EventEmitter):
async def pair(self, connection: Connection) -> None:
# TODO: check if there's already a session for this connection
if connection.role != BT_CENTRAL_ROLE:
if connection.role != Role.CENTRAL:
logger.warning('Start pairing as Peripheral!')
pairing_config = self.pairing_config_factory(connection)
session = self.session_proxy(
+5 -1
View File
@@ -139,6 +139,7 @@ class PacketParser:
packet_type
) or self.extended_packet_info.get(packet_type)
if self.packet_info is None:
self.reset()
raise core.InvalidPacketError(
f'invalid packet type {packet_type}'
)
@@ -302,7 +303,10 @@ class ParserSource(BaseSource):
# -----------------------------------------------------------------------------
class StreamPacketSource(asyncio.Protocol, ParserSource):
def data_received(self, data: bytes) -> None:
self.parser.feed_data(data)
try:
self.parser.feed_data(data)
except core.InvalidPacketError:
logger.warning("invalid packet, ignoring data")
# -----------------------------------------------------------------------------
+1 -3
View File
@@ -115,9 +115,7 @@ async def open_usb_transport(spec: str) -> Transport:
self.acl_out = acl_out
self.acl_out_transfer = device.getTransfer()
self.acl_out_transfer_ready = asyncio.Semaphore(1)
self.packets: asyncio.Queue[bytes] = (
asyncio.Queue()
) # Queue of packets waiting to be sent
self.packets = asyncio.Queue[bytes]() # Queue of packets waiting to be sent
self.loop = asyncio.get_running_loop()
self.queue_task = None
self.cancel_done = self.loop.create_future()
+202
View File
@@ -0,0 +1,202 @@
AURACAST TOOL
=============
The "auracast" tool implements commands that implement broadcasting, receiving
and controlling LE Audio broadcasts.
=== "Running as an installed package"
```
$ bumble-auracast
```
=== "Running from source"
```
$ python3 apps/auracast.py <args>
```
# Python Dependencies
Try installing the optional `[auracast]` dependencies:
=== "From source"
```bash
$ python3 -m pip install ".[auracast]"
```
=== "From PyPI"
```bash
$ python3 -m pip install "bumble[auracast]"
```
## LC3
The `auracast` app depends on the `lc3` python module, which is available
either as PyPI module (currently only available for Linux x86_64).
When installing Bumble with the optional `auracast` dependency, the `lc3`
module will be installed from the `lc3py` PyPI package if available.
If not, you will need to install it separately. This can be done with:
```bash
$ python3 -m pip install "git+https://github.com/google/liblc3.git"
```
## SoundDevice
The `sounddevice` module is required for audio output to the host's sound
output device(s) and/or input from the host's input device(s).
If not installed, the `auracast` app is still functional, but will be limited
to non-device inputs and output (files, external processes, ...)
On macOS and Windows, the `sounddevice` module gets installed with the
native PortAudio libraries included.
For Linux, however, PortAudio must be installed separately.
This is typically done with a command like:
```bash
$ sudo apt install libportaudio2
```
Visit the [sounddevice documentation](https://python-sounddevice.readthedocs.io/)
for details.
# General Usage
```
Usage: bumble-auracast [OPTIONS] COMMAND [ARGS]...
Options:
--help Show this message and exit.
Commands:
assist Scan for broadcasts on behalf of an audio server
pair Pair with an audio server
receive Receive a broadcast source
scan Scan for public broadcasts
transmit Transmit a broadcast source
```
Use `bumble-auracast <command> --help` to get more detailed usage information
for a specific `<command>`.
## `assist`
Act as a broadcast assistant.
Use `bumble-auracast assist --help` for details on the commands and options.
The assistant commands are:
### `monitor-state`
Subscribe to the state characteristic and monitor changes.
### `add-source`
Add a broadcast source. This will instruct the device to start
receiving a broadcast.
### `modify-source`
Modify a broadcast source.
### `remove-source`
Remote a broadcast source.
## `pair`
Pair with a device.
## `receive`
Receive a broadcast source.
The `--output` option specifies where to send the decoded audio samples.
The following outputs are supported:
### Sound Device
The `--output` argument is either `device`, to send the audio to the hosts's default sound device, or `device:<DEVICE_ID>` where `<DEVICE_ID>`
is the integer ID of one of the available sound devices.
When invoked with `--output "device:?"`, a list of available devices and
their IDs is printed out.
### Standard Output
With `--output stdout`, the decoded audio samples are written to the
standard output (currently always as float32 PCM samples)
### FFPlay
With `--output ffplay`, the decoded audio samples are piped to `ffplay`
in a child process. This option is only available if `ffplay` is a command that is available on the host.
### File
With `--output <filename>` or `--output file:<filename>`, the decoded audio
samples are written to a file (currently always as float32 PCM)
## `transmit`
Broadcast an audio source as a transmitter.
The `--input` and `--input-format` options specify what audio input
source to transmit.
The following inputs are supported:
### Sound Device
The `--input` argument is either `device`, to use the host's default sound
device (typically a builtin microphone), or `device:<DEVICE_ID>` where
`<DEVICE_ID>` is the integer ID of one of the available sound devices.
When invoked with `--input "device:?"`, a list of available devices and their
IDs is printed out.
### Standard Input
With `--input stdout`, the audio samples are read from the standard input.
(currently always as int16 PCM).
### File
With `--input <filename>` or `--input file:<filename>`, the audio samples
are read from a .wav or raw PCM file.
Use the `--input-format <FORMAT>` option to specify the format of the audio
samples in raw PCM files. `<FORMAT>` is expressed as:
`<sample-type>,<sample-rate>,<channels>`
(the only supported <sample-type> currently is 'int16le' for 16 bit signed integers with little-endian byte order)
## `scan`
Scan for public broadcasts.
A live display of the available broadcasts is displayed continuously.
# Compatibility With Some Products
The `auracast` app has been tested for compatibility with a few products.
The list is still very limited. Please let us know if there are products
that are not working well, or if there are specific instructions that should
be shared to allow better compatibiity with certain products.
## Transmitters
The `receive` command has been tested to successfully receive broadcasts from
the following transmitters:
* JBL GO 4
* Flairmesh FlooGoo FMA120
* Eppfun AK3040Pro Max
* HIGHGAZE BA-25T
* Nexum Audio VOCE and USB dongle
## Receivers
### Pixel Buds Pro 2
The Pixel Buds Pro 2 can be used as a broadcast receiver, controlled by the
`auracast assist` command, instructing the buds to receive a broadcast.
Use the `assist --command add-source` command to tell the buds to receive a
broadcast.
Use the `assist --command monitor-state` command to monitor the current sync/receive
state of the buds.
### JBL
The JBL GO 4 and other JBL products that support the Auracast feature can be used
as transmitters or receivers.
When running in receiver mode (pressing the Auracast button while not already playing),
the JBL speaker will scan for broadcast advertisements with a specific manufacturer data.
Use the `--manufacturer-data` option of the `transmit` command in order to include data
that will let the speaker recognize the broadcast as a compatible source.
The manufacturer ID for JBL is 87.
Using an option like `--manufacturer-data 87:00000000000000000000000000000000dffd` should work (tested on the
JBL GO 4. The `dffd` value at the end of the payload may be different on other models?).
### Others
* Nexum Audio VOCE and USB dongle
+2 -2
View File
@@ -57,7 +57,7 @@ development = [
]
avatar = [
"pandora-avatar == 0.0.10",
"rootcanal == 1.10.0 ; python_version>='3.10'",
"rootcanal == 1.11.1 ; python_version>='3.10'",
]
pandora = ["bt-test-interfaces >= 0.0.6"]
documentation = [
@@ -66,7 +66,7 @@ documentation = [
"mkdocstrings[python] >= 0.27.0",
]
auracast = [
"lc3py ; python_version>='3.10' and platform_system=='Linux' and platform_machine=='x86_64'",
"lc3py >= 1.1.3; python_version>='3.10' and ((platform_system=='Linux' and platform_machine=='x86_64') or (platform_system=='Darwin' and platform_machine=='arm64'))",
"sounddevice >= 0.5.1",
]
+4 -4
View File
@@ -24,7 +24,6 @@ import pytest
from bumble.core import (
BT_BR_EDR_TRANSPORT,
BT_LE_TRANSPORT,
BT_PERIPHERAL_ROLE,
ConnectionParameters,
)
from bumble.device import (
@@ -43,6 +42,7 @@ from bumble.hci import (
HCI_CONNECTION_FAILED_TO_BE_ESTABLISHED_ERROR,
Address,
OwnAddressType,
Role,
HCI_Command_Complete_Event,
HCI_Command_Status_Event,
HCI_Connection_Complete_Event,
@@ -295,7 +295,7 @@ async def test_legacy_advertising_disconnection(auto_restart):
peer_address,
None,
None,
BT_PERIPHERAL_ROLE,
Role.PERIPHERAL,
ConnectionParameters(0, 0, 0),
)
@@ -353,7 +353,7 @@ async def test_extended_advertising_connection(own_address_type):
peer_address,
None,
None,
BT_PERIPHERAL_ROLE,
Role.PERIPHERAL,
ConnectionParameters(0, 0, 0),
)
device.on_advertising_set_termination(
@@ -397,7 +397,7 @@ async def test_extended_advertising_connection_out_of_order(own_address_type):
Address('F0:F1:F2:F3:F4:F5'),
None,
None,
BT_PERIPHERAL_ROLE,
Role.PERIPHERAL,
ConnectionParameters(0, 0, 0),
)
+3 -2
View File
@@ -24,7 +24,7 @@ import pytest
from unittest.mock import AsyncMock, MagicMock, patch
from bumble.controller import Controller
from bumble.core import BT_BR_EDR_TRANSPORT, BT_PERIPHERAL_ROLE, BT_CENTRAL_ROLE
from bumble.core import BT_BR_EDR_TRANSPORT, BT_LE_TRANSPORT
from bumble.link import LocalLink
from bumble.device import Device, Peer
from bumble.host import Host
@@ -39,6 +39,7 @@ from bumble.smp import (
)
from bumble.core import ProtocolError
from bumble.keys import PairingKeys
from bumble.hci import Role
# -----------------------------------------------------------------------------
@@ -111,7 +112,7 @@ async def test_self_connection():
@pytest.mark.asyncio
@pytest.mark.parametrize(
'responder_role,',
(BT_CENTRAL_ROLE, BT_PERIPHERAL_ROLE),
(Role.CENTRAL, Role.PERIPHERAL),
)
async def test_self_classic_connection(responder_role):
# Create two devices, each with a controller, attached to the same link