Compare commits

...

72 Commits

Author SHA1 Message Date
Gilles Boccon-Gibod
859bb0609f fix support for float32 2025-02-08 18:12:45 -05:00
Gilles Boccon-Gibod
5f2d24570e larger queue size 2025-02-06 21:24:58 -05:00
Gilles Boccon-Gibod
dbf94c8f3e print encoding params 2025-02-06 18:23:31 -05:00
Gilles Boccon-Gibod
b6adc29365 python 3.9 compat 2025-02-06 18:17:13 -05:00
Gilles Boccon-Gibod
5caa7bfa90 fix type checker and linter errors 2025-02-06 17:05:56 -05:00
Gilles Boccon-Gibod
f39d706fa0 remove obsolete code 2025-02-06 16:45:37 -05:00
Gilles Boccon-Gibod
33435c2980 better docs and GATT fixes 2025-02-06 15:48:39 -05:00
Gilles Boccon-Gibod
26e87f09fe better error message 2025-02-05 22:28:05 -05:00
Gilles Boccon-Gibod
7f5e0d190e fix import checking 2025-02-05 22:19:39 -05:00
Gilles Boccon-Gibod
efae307b3d wip 2025-02-05 16:23:47 -05:00
Gilles Boccon-Gibod
9756572c93 add audio module 2025-02-04 17:58:54 -05:00
Gilles Boccon-Gibod
70141c0439 improvements 2025-02-03 17:58:09 -05:00
Gilles Boccon-Gibod
55d3fd90f5 wip 2025-01-25 21:04:59 -05:00
Gilles Boccon-Gibod
afee659ca6 Merge pull request #630 from google/gbg/iso-packet-queue
add support for ACL and ISO HCI packet queues
2025-01-24 15:59:19 -05:00
Gilles Boccon-Gibod
6fe7931d7d rename drain event to flow 2025-01-24 11:05:02 -05:00
Gilles Boccon-Gibod
cbd46adbcf add support for ACL and ISO HCI packet queues 2025-01-22 13:42:29 -05:00
Gilles Boccon-Gibod
af466c2970 Merge pull request #629 from google/gbg/sdp-enforce-mtu
SDP: enforce MTU limits
2025-01-21 12:29:18 -05:00
Gilles Boccon-Gibod
931e2de854 address PR comments 2025-01-21 12:18:06 -05:00
Gilles Boccon-Gibod
55eb7eb237 enforce MTU limits 2025-01-21 10:31:10 -05:00
zxzxwu
bade4502f9 Merge pull request #628 from zxzxwu/cs-hci
Channel Sounding HCI packet definitions
2025-01-19 16:14:08 +08:00
Josh Wu
9f952f202f Channel Sounding HCI packet definitions 2025-01-16 14:33:34 +08:00
Gilles Boccon-Gibod
5a477eb391 Merge pull request #626 from markusjellitsch/fix/set-ext-scan-param-cmd
Update device.py - Fix scan_interval param in hci.HCI_LE_Set_Extended_Scan_Parameters_Command
2025-01-14 11:04:15 -05:00
Markus Jellitsch
86cda8771d Update device.py 2025-01-14 10:43:49 +01:00
zxzxwu
c1ea0ddd35 Merge pull request #622 from markusjellitsch/main
Fix: _IsoLink.write() struct.exception
2025-01-13 16:21:41 +08:00
Markus Jellitsch
f567711a6c avoid struct.error exception when packet_sequence_number > 0xFFFF 2025-01-10 01:33:43 +01:00
Gilles Boccon-Gibod
509df4c676 Merge pull request #618 from google/gbg/hci-event-multi-vendor
support multiple event factories
2025-01-07 15:00:20 -05:00
Gilles Boccon-Gibod
b375ed07b4 add test 2025-01-07 14:54:59 -05:00
Gilles Boccon-Gibod
69d62d3dd1 support multiple event factories 2025-01-06 08:42:09 -05:00
zxzxwu
fe3fa3d505 Merge pull request #617 from zxzxwu/iso
Unify ISO methods
2025-01-06 14:31:47 +08:00
Josh Wu
27fcd43224 Unify ISO methods 2025-01-02 14:19:36 +08:00
zxzxwu
c3b2bb19d5 Merge pull request #589 from zxzxwu/auracast
Auracast support
2025-01-02 01:02:13 +08:00
Gilles Boccon-Gibod
34287177b9 Merge pull request #615 from google/gbg/bluetooth-6-constants
add bluetooth 6.0 constants
2024-12-23 08:46:13 -05:00
Josh Wu
d238dd4059 Use dynamic sample rate 2024-12-23 17:01:11 +08:00
Gilles Boccon-Gibod
865f3a249f add bluetooth 6.0 constants 2024-12-22 12:47:37 -05:00
Josh Wu
7324d322fe BIG 2024-12-20 13:45:12 +08:00
Gilles Boccon-Gibod
af148b476d Merge pull request #613 from google/gbg/update-cryptography-dependency
update cryptography dependency
2024-12-19 08:42:51 -05:00
zxzxwu
80d60aaf15 Merge pull request #612 from zxzxwu/lc3
Replace liblc3 wasm library
2024-12-19 15:06:22 +08:00
Gilles Boccon-Gibod
c80f89d20f update cryptography dependency 2024-12-18 22:01:42 -05:00
Josh Wu
a27f55a588 Replace liblc3 wasm library 2024-12-19 02:21:38 +08:00
Gilles Boccon-Gibod
62e4670a39 Merge pull request #606 from wpiet/gmap-wip
Add `Gaming Audio Profile`
2024-12-18 11:56:57 -05:00
zxzxwu
99695bb264 Merge pull request #610 from zxzxwu/cfg
Remove setup.py and setup.cfg
2024-12-19 00:53:12 +08:00
Josh Wu
eb54898106 Remove setup.py and setup.cfg 2024-12-19 00:45:13 +08:00
Gilles Boccon-Gibod
4f5ee204d2 Update code-check.yml
Hot fix because 3.13.1 somehow breaks the current version of pylint. Will revert to 3.13 without pining to 3.13.0 when pylint is fixed
2024-12-18 11:36:08 -05:00
Wojciech Pietraszewski
2552e21db1 Add characteristics initial values
Sets default values for characteristics if not specified explicitly
2024-12-04 17:00:29 +01:00
Wojciech Pietraszewski
6168f87e2f Add characteristics conditionally
Only adds a characteristic if the corresponding role has been set
2024-12-04 12:57:34 +01:00
Gilles Boccon-Gibod
ca7d2ca4df Merge pull request #607 from google/gbg/pandora-deps
move pandora deps to development
2024-12-03 09:42:44 -08:00
Gilles Boccon-Gibod
60723323e9 move pandora deps to development 2024-12-03 09:08:30 -08:00
Gilles Boccon-Gibod
3ce7b9255b Merge pull request #598 from google/gbg/gatt-class-adapter
Add a class-based GATT adapter
2024-12-03 08:46:30 -08:00
Gilles Boccon-Gibod
97fcfc2fa0 Merge pull request #604 from jmdietrich-gcx/add_encryption_key_size_to_pairing_config
Add maximum encryption key size to PairingDelegate
2024-12-03 08:30:53 -08:00
Wojciech Pietraszewski
19674e3758 Add Gaming Audio Profile
Adds initial support for `Gaming Audio Service`.
2024-12-02 11:15:10 +01:00
Jan-Marcel Dietrich
1130e1db8f Fix code formatting 2024-12-02 09:01:18 +01:00
Gilles Boccon-Gibod
37c7f3a58a Merge pull request #603 from google/gbg/fix-pair-oob
fix oob support in pair.py
2024-12-01 08:43:04 -08:00
Gilles Boccon-Gibod
0a12b2bf2e Merge pull request #585 from wpiet/vocs
Add `Volume Offset Control Service`
2024-11-29 10:41:30 -08:00
Gilles Boccon-Gibod
d014acbe63 Merge pull request #597 from google/gbg/intel-hci
intel hci
2024-11-29 10:41:10 -08:00
Jan-Marcel Dietrich
07f9997a49 Add maximum encryption key size to PairingDelegate
So far the maxmium encryption key size has been hardcoded to 16 bytes in
'send_pairing_request_command()' and 'send_pairing_response_comman()'. By
making this configurable via the PairingDelegate, one can test how devices
respond to smaller encryption key sizes. Default remains 16 bytes.
2024-11-28 14:15:51 +01:00
Gilles Boccon-Gibod
b9f91f695a fix oob support in pair.py 2024-11-27 12:58:03 -08:00
Gilles Boccon-Gibod
082d55af10 Merge pull request #599 from google/gbg/hfp-19
add super wide band constants
2024-11-25 07:47:40 -08:00
Gilles Boccon-Gibod
4c3fd5688d Merge pull request #600 from google/gbg/unify-to-bytes
only use `__bytes__` when not argument is needed.
2024-11-25 07:44:17 -08:00
Gilles Boccon-Gibod
9d3d5495ce only use __bytes__ when not argument is needed. 2024-11-23 15:56:14 -08:00
Gilles Boccon-Gibod
b3869f267c add super wide band constants 2024-11-23 09:27:03 -08:00
Gilles Boccon-Gibod
b57096abe2 Merge pull request #595 from wpiet/aics-opcode-fix
Amend Opcode value in `Audio Input Control Service`
2024-11-23 08:56:23 -08:00
Gilles Boccon-Gibod
48685c8587 improve vendor event support 2024-11-23 08:55:50 -08:00
Wojciech Pietraszewski
100bea6b41 Fix typos
Amends the typo in the `INACTIVE` field in `Audio Input Status` characteristic.
Amends the typo in the log message of `_set_gain_settings` method.
2024-11-21 18:29:44 +01:00
Wojciech Pietraszewski
63819bf9dd Amend Opcode value in Audio Input Control Service
Corrects the Audio Input Control Point
Opcode value for `Set Gain Setting` field.
2024-11-21 16:40:49 +01:00
Wojciech Pietraszewski
6e55390930 Add Volume Offset Control Service
Adds initial support for VOCS.
2024-11-21 11:56:14 +01:00
zxzxwu
e3fdab4175 Merge pull request #593 from zxzxwu/periodic
Support Periodic Advertising
2024-11-19 17:22:37 +08:00
Josh Wu
bbcd14dbf0 Support Periodic Advertising 2024-11-19 16:27:13 +08:00
zxzxwu
01dc0d574b Merge pull request #590 from SergeantSerk/parse-scan-response-data
Correctly parse scan response from device config
2024-11-17 15:39:11 +08:00
zxzxwu
5e959d638e Merge pull request #591 from zxzxwu/auracast_scan
Improve Broadcast Scanning
2024-11-16 04:10:27 +08:00
Gilles Boccon-Gibod
8d908288c8 Merge pull request #583 from google/gbg/more-gatt-tests
regression test for GATT unsubscription
2024-11-15 10:19:20 -08:00
Josh Wu
c88b32a406 Improve Broadcast Scanning 2024-11-16 02:02:28 +08:00
Serkan
d0990ee04d Correctly parse scan response from device config
Parses scan response data correctly just like advertising data
2024-11-07 21:49:33 +03:00
70 changed files with 6197 additions and 1162 deletions

View File

@@ -16,7 +16,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.9", "3.10", "3.11", "3.12", "3.13"]
python-version: ["3.9", "3.10", "3.11", "3.12", "3.13.0"]
fail-fast: false
steps:
@@ -33,7 +33,7 @@ jobs:
- name: Install dependencies
run: |
python -m pip install --upgrade pip
python -m pip install ".[build,test,development,pandora]"
python -m pip install ".[build,test,development]"
- name: Check
run: |
invoke project.pre-commit

View File

@@ -32,7 +32,7 @@ jobs:
- name: Install
run: |
python -m pip install --upgrade pip
python -m pip install .[avatar,pandora]
python -m pip install .[avatar]
- name: Rootcanal
run: nohup python -m rootcanal > rootcanal.log &
- name: Test

View File

@@ -14,9 +14,12 @@
"ASHA",
"asyncio",
"ATRAC",
"auracast",
"avctp",
"avdtp",
"avrcp",
"biginfo",
"bigs",
"bitpool",
"bitstruct",
"BSCP",
@@ -36,6 +39,7 @@
"deregistration",
"dhkey",
"diversifier",
"ediv",
"endianness",
"ESCO",
"Fitbit",
@@ -47,6 +51,7 @@
"libc",
"liblc",
"libusb",
"maxs",
"MITM",
"MSBC",
"NDIS",
@@ -54,8 +59,10 @@
"NONBLOCK",
"NONCONN",
"OXIMETER",
"PDUS",
"popleft",
"PRAND",
"prefs",
"protobuf",
"psms",
"pyee",

View File

@@ -1,4 +1,4 @@
# Copyright 2024 Google LLC
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -16,29 +16,50 @@
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import asyncio
import asyncio.subprocess
import collections
import contextlib
import dataclasses
import functools
import logging
import os
from typing import cast, Any, AsyncGenerator, Coroutine, Dict, Optional, Tuple
import struct
from typing import (
cast,
Any,
AsyncGenerator,
Coroutine,
Deque,
Optional,
Tuple,
)
import click
import pyee
try:
import lc3 # type: ignore # pylint: disable=E0401
except ImportError as e:
raise ImportError(
"Try `python -m pip install \"git+https://github.com/google/liblc3.git\"`."
) from e
from bumble.audio import io as audio_io
from bumble.colors import color
import bumble.company_ids
import bumble.core
from bumble import company_ids
from bumble import core
from bumble import gatt
from bumble import hci
from bumble.profiles import bap
from bumble.profiles import le_audio
from bumble.profiles import pbp
from bumble.profiles import bass
import bumble.device
import bumble.gatt
import bumble.hci
import bumble.profiles.bap
import bumble.profiles.bass
import bumble.profiles.pbp
import bumble.transport
import bumble.utils
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
@@ -49,9 +70,34 @@ logger = logging.getLogger(__name__)
# Constants
# -----------------------------------------------------------------------------
AURACAST_DEFAULT_DEVICE_NAME = 'Bumble Auracast'
AURACAST_DEFAULT_DEVICE_ADDRESS = bumble.hci.Address('F0:F1:F2:F3:F4:F5')
AURACAST_DEFAULT_DEVICE_ADDRESS = hci.Address('F0:F1:F2:F3:F4:F5')
AURACAST_DEFAULT_SYNC_TIMEOUT = 5.0
AURACAST_DEFAULT_ATT_MTU = 256
AURACAST_DEFAULT_FRAME_DURATION = 10000
AURACAST_DEFAULT_SAMPLE_RATE = 48000
AURACAST_DEFAULT_TRANSMIT_BITRATE = 80000
# -----------------------------------------------------------------------------
# Utils
# -----------------------------------------------------------------------------
def codec_config_string(
codec_config: bap.CodecSpecificConfiguration, indent: str
) -> str:
lines = []
if codec_config.sampling_frequency is not None:
lines.append(f'Sampling Frequency: {codec_config.sampling_frequency.hz} hz')
if codec_config.frame_duration is not None:
lines.append(f'Frame Duration: {codec_config.frame_duration.us} µs')
if codec_config.octets_per_codec_frame is not None:
lines.append(f'Frame Size: {codec_config.octets_per_codec_frame} bytes')
if codec_config.codec_frames_per_sdu is not None:
lines.append(f'Frames Per SDU: {codec_config.codec_frames_per_sdu}')
if codec_config.audio_channel_allocation is not None:
lines.append(
f'Audio Location: {codec_config.audio_channel_allocation.name}'
)
return '\n'.join(indent + line for line in lines)
# -----------------------------------------------------------------------------
@@ -60,19 +106,14 @@ AURACAST_DEFAULT_ATT_MTU = 256
class BroadcastScanner(pyee.EventEmitter):
@dataclasses.dataclass
class Broadcast(pyee.EventEmitter):
name: str
name: str | None
sync: bumble.device.PeriodicAdvertisingSync
broadcast_id: int
rssi: int = 0
public_broadcast_announcement: Optional[
bumble.profiles.pbp.PublicBroadcastAnnouncement
] = None
broadcast_audio_announcement: Optional[
bumble.profiles.bap.BroadcastAudioAnnouncement
] = None
basic_audio_announcement: Optional[
bumble.profiles.bap.BasicAudioAnnouncement
] = None
appearance: Optional[bumble.core.Appearance] = None
public_broadcast_announcement: Optional[pbp.PublicBroadcastAnnouncement] = None
broadcast_audio_announcement: Optional[bap.BroadcastAudioAnnouncement] = None
basic_audio_announcement: Optional[bap.BasicAudioAnnouncement] = None
appearance: Optional[core.Appearance] = None
biginfo: Optional[bumble.device.BIGInfoAdvertisement] = None
manufacturer_data: Optional[Tuple[str, bytes]] = None
@@ -86,42 +127,36 @@ class BroadcastScanner(pyee.EventEmitter):
def update(self, advertisement: bumble.device.Advertisement) -> None:
self.rssi = advertisement.rssi
for service_data in advertisement.data.get_all(
bumble.core.AdvertisingData.SERVICE_DATA
core.AdvertisingData.SERVICE_DATA
):
assert isinstance(service_data, tuple)
service_uuid, data = service_data
assert isinstance(data, bytes)
if (
service_uuid
== bumble.gatt.GATT_PUBLIC_BROADCAST_ANNOUNCEMENT_SERVICE
):
if service_uuid == gatt.GATT_PUBLIC_BROADCAST_ANNOUNCEMENT_SERVICE:
self.public_broadcast_announcement = (
bumble.profiles.pbp.PublicBroadcastAnnouncement.from_bytes(data)
pbp.PublicBroadcastAnnouncement.from_bytes(data)
)
continue
if (
service_uuid
== bumble.gatt.GATT_BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE
):
if service_uuid == gatt.GATT_BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE:
self.broadcast_audio_announcement = (
bumble.profiles.bap.BroadcastAudioAnnouncement.from_bytes(data)
bap.BroadcastAudioAnnouncement.from_bytes(data)
)
continue
self.appearance = advertisement.data.get( # type: ignore[assignment]
bumble.core.AdvertisingData.APPEARANCE
core.AdvertisingData.APPEARANCE
)
if manufacturer_data := advertisement.data.get(
bumble.core.AdvertisingData.MANUFACTURER_SPECIFIC_DATA
core.AdvertisingData.MANUFACTURER_SPECIFIC_DATA
):
assert isinstance(manufacturer_data, tuple)
company_id = cast(int, manufacturer_data[0])
data = cast(bytes, manufacturer_data[1])
self.manufacturer_data = (
bumble.company_ids.COMPANY_IDENTIFIERS.get(
company_ids.COMPANY_IDENTIFIERS.get(
company_id, f'0x{company_id:04X}'
),
data,
@@ -135,7 +170,8 @@ class BroadcastScanner(pyee.EventEmitter):
self.sync.advertiser_address,
color(self.sync.state.name, 'green'),
)
print(f' {color("Name", "cyan")}: {self.name}')
if self.name is not None:
print(f' {color("Name", "cyan")}: {self.name}')
if self.appearance:
print(f' {color("Appearance", "cyan")}: {str(self.appearance)}')
print(f' {color("RSSI", "cyan")}: {self.rssi}')
@@ -156,25 +192,24 @@ class BroadcastScanner(pyee.EventEmitter):
if self.public_broadcast_announcement:
print(
f' {color("Features", "cyan")}: '
f'{self.public_broadcast_announcement.features}'
)
print(
f' {color("Metadata", "cyan")}: '
f'{self.public_broadcast_announcement.metadata}'
f'{self.public_broadcast_announcement.features.name}'
)
print(f' {color("Metadata", "cyan")}:')
print(self.public_broadcast_announcement.metadata.pretty_print(' '))
if self.basic_audio_announcement:
print(color(' Audio:', 'cyan'))
print(
color(' Presentation Delay:', 'magenta'),
self.basic_audio_announcement.presentation_delay,
"µs",
)
for subgroup in self.basic_audio_announcement.subgroups:
print(color(' Subgroup:', 'magenta'))
print(color(' Codec ID:', 'yellow'))
print(
color(' Coding Format: ', 'green'),
subgroup.codec_id.coding_format.name,
subgroup.codec_id.codec_id.name,
)
print(
color(' Company ID: ', 'green'),
@@ -184,17 +219,22 @@ class BroadcastScanner(pyee.EventEmitter):
color(' Vendor Specific Codec ID:', 'green'),
subgroup.codec_id.vendor_specific_codec_id,
)
print(color(' Codec Config:', 'yellow'))
print(
color(' Codec Config:', 'yellow'),
subgroup.codec_specific_configuration,
codec_config_string(
subgroup.codec_specific_configuration, ' '
),
)
print(color(' Metadata: ', 'yellow'), subgroup.metadata)
print(color(' Metadata: ', 'yellow'))
print(subgroup.metadata.pretty_print(' '))
for bis in subgroup.bis:
print(color(f' BIS [{bis.index}]:', 'yellow'))
print(color(' Codec Config:', 'green'))
print(
color(' Codec Config:', 'green'),
bis.codec_specific_configuration,
codec_config_string(
bis.codec_specific_configuration, ' '
),
)
if self.biginfo:
@@ -231,15 +271,15 @@ class BroadcastScanner(pyee.EventEmitter):
return
for service_data in advertisement.data.get_all(
bumble.core.AdvertisingData.SERVICE_DATA
core.AdvertisingData.SERVICE_DATA
):
assert isinstance(service_data, tuple)
service_uuid, data = service_data
assert isinstance(data, bytes)
if service_uuid == bumble.gatt.GATT_BASIC_AUDIO_ANNOUNCEMENT_SERVICE:
if service_uuid == gatt.GATT_BASIC_AUDIO_ANNOUNCEMENT_SERVICE:
self.basic_audio_announcement = (
bumble.profiles.bap.BasicAudioAnnouncement.from_bytes(data)
bap.BasicAudioAnnouncement.from_bytes(data)
)
break
@@ -261,7 +301,7 @@ class BroadcastScanner(pyee.EventEmitter):
self.device = device
self.filter_duplicates = filter_duplicates
self.sync_timeout = sync_timeout
self.broadcasts: Dict[bumble.hci.Address, BroadcastScanner.Broadcast] = {}
self.broadcasts = dict[hci.Address, BroadcastScanner.Broadcast]()
device.on('advertisement', self.on_advertisement)
async def start(self) -> None:
@@ -274,24 +314,46 @@ class BroadcastScanner(pyee.EventEmitter):
await self.device.stop_scanning()
def on_advertisement(self, advertisement: bumble.device.Advertisement) -> None:
if (
broadcast_name := advertisement.data.get(
bumble.core.AdvertisingData.BROADCAST_NAME
if not (
ads := advertisement.data.get_all(
core.AdvertisingData.SERVICE_DATA_16_BIT_UUID
)
) is None:
) or not (
broadcast_audio_announcement := next(
(
ad
for ad in ads
if isinstance(ad, tuple)
and ad[0] == gatt.GATT_BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE
),
None,
)
):
return
assert isinstance(broadcast_name, str)
broadcast_name = advertisement.data.get(core.AdvertisingData.BROADCAST_NAME)
assert isinstance(broadcast_name, str) or broadcast_name is None
assert isinstance(broadcast_audio_announcement[1], bytes)
if broadcast := self.broadcasts.get(advertisement.address):
broadcast.update(advertisement)
return
bumble.utils.AsyncRunner.spawn(
self.on_new_broadcast(broadcast_name, advertisement)
self.on_new_broadcast(
broadcast_name,
advertisement,
bap.BroadcastAudioAnnouncement.from_bytes(
broadcast_audio_announcement[1]
).broadcast_id,
)
)
async def on_new_broadcast(
self, name: str, advertisement: bumble.device.Advertisement
self,
name: str | None,
advertisement: bumble.device.Advertisement,
broadcast_id: int,
) -> None:
periodic_advertising_sync = await self.device.create_periodic_advertising_sync(
advertiser_address=advertisement.address,
@@ -299,10 +361,7 @@ class BroadcastScanner(pyee.EventEmitter):
sync_timeout=self.sync_timeout,
filter_duplicates=self.filter_duplicates,
)
broadcast = self.Broadcast(
name,
periodic_advertising_sync,
)
broadcast = self.Broadcast(name, periodic_advertising_sync, broadcast_id)
broadcast.update(advertisement)
self.broadcasts[advertisement.address] = broadcast
periodic_advertising_sync.on('loss', lambda: self.on_broadcast_loss(broadcast))
@@ -314,10 +373,11 @@ class BroadcastScanner(pyee.EventEmitter):
self.emit('broadcast_loss', broadcast)
class PrintingBroadcastScanner:
class PrintingBroadcastScanner(pyee.EventEmitter):
def __init__(
self, device: bumble.device.Device, filter_duplicates: bool, sync_timeout: float
) -> None:
super().__init__()
self.scanner = BroadcastScanner(device, filter_duplicates, sync_timeout)
self.scanner.on('new_broadcast', self.on_new_broadcast)
self.scanner.on('broadcast_loss', self.on_broadcast_loss)
@@ -452,27 +512,29 @@ async def run_assist(
await peer.request_mtu(mtu)
# Get the BASS service
bass = await peer.discover_service_and_create_proxy(
bumble.profiles.bass.BroadcastAudioScanServiceProxy
bass_client = await peer.discover_service_and_create_proxy(
bass.BroadcastAudioScanServiceProxy
)
# Check that the service was found
if not bass:
if not bass_client:
print(color('!!! Broadcast Audio Scan Service not found', 'red'))
return
# Subscribe to and read the broadcast receive state characteristics
for i, broadcast_receive_state in enumerate(bass.broadcast_receive_states):
for i, broadcast_receive_state in enumerate(
bass_client.broadcast_receive_states
):
try:
await broadcast_receive_state.subscribe(
lambda value, i=i: print(
f"{color(f'Broadcast Receive State Update [{i}]:', 'green')} {value}"
)
)
except bumble.core.ProtocolError as error:
except core.ProtocolError as error:
print(
color(
f'!!! Failed to subscribe to Broadcast Receive State characteristic:',
'!!! Failed to subscribe to Broadcast Receive State characteristic',
'red',
),
error,
@@ -488,7 +550,7 @@ async def run_assist(
if command == 'add-source':
# Find the requested broadcast
await bass.remote_scan_started()
await bass_client.remote_scan_started()
if broadcast_name:
print(color('Scanning for broadcast:', 'cyan'), broadcast_name)
else:
@@ -508,15 +570,15 @@ async def run_assist(
# Add the source
print(color('Adding source:', 'blue'), broadcast.sync.advertiser_address)
await bass.add_source(
await bass_client.add_source(
broadcast.sync.advertiser_address,
broadcast.sync.sid,
broadcast.broadcast_audio_announcement.broadcast_id,
bumble.profiles.bass.PeriodicAdvertisingSyncParams.SYNCHRONIZE_TO_PA_PAST_AVAILABLE,
bass.PeriodicAdvertisingSyncParams.SYNCHRONIZE_TO_PA_PAST_AVAILABLE,
0xFFFF,
[
bumble.profiles.bass.SubgroupInfo(
bumble.profiles.bass.SubgroupInfo.ANY_BIS,
bass.SubgroupInfo(
bass.SubgroupInfo.ANY_BIS,
bytes(broadcast.basic_audio_announcement.subgroups[0].metadata),
)
],
@@ -526,7 +588,7 @@ async def run_assist(
await broadcast.sync.transfer(peer.connection)
# Notify the sink that we're done scanning.
await bass.remote_scan_stopped()
await bass_client.remote_scan_stopped()
await peer.sustain()
return
@@ -537,7 +599,7 @@ async def run_assist(
return
# Find the requested broadcast
await bass.remote_scan_started()
await bass_client.remote_scan_started()
if broadcast_name:
print(color('Scanning for broadcast:', 'cyan'), broadcast_name)
else:
@@ -560,13 +622,13 @@ async def run_assist(
color('Modifying source:', 'blue'),
source_id,
)
await bass.modify_source(
await bass_client.modify_source(
source_id,
bumble.profiles.bass.PeriodicAdvertisingSyncParams.SYNCHRONIZE_TO_PA_PAST_NOT_AVAILABLE,
bass.PeriodicAdvertisingSyncParams.SYNCHRONIZE_TO_PA_PAST_NOT_AVAILABLE,
0xFFFF,
[
bumble.profiles.bass.SubgroupInfo(
bumble.profiles.bass.SubgroupInfo.ANY_BIS,
bass.SubgroupInfo(
bass.SubgroupInfo.ANY_BIS,
bytes(broadcast.basic_audio_announcement.subgroups[0].metadata),
)
],
@@ -581,7 +643,7 @@ async def run_assist(
# Remove the source
print(color('Removing source:', 'blue'), source_id)
await bass.remove_source(source_id)
await bass_client.remove_source(source_id)
await peer.sustain()
return
@@ -601,14 +663,342 @@ async def run_pair(transport: str, address: str) -> None:
print("+++ Paired")
async def run_receive(
transport: str,
broadcast_id: Optional[int],
output: str,
broadcast_code: str | None,
sync_timeout: float,
subgroup_index: int,
) -> None:
# Run a pre-flight check for the output.
try:
if not audio_io.check_audio_output(output):
return
except ValueError as error:
print(error)
return
async with create_device(transport) as device:
if not device.supports_le_periodic_advertising:
print(color('Periodic advertising not supported', 'red'))
return
scanner = BroadcastScanner(device, False, sync_timeout)
scan_result: asyncio.Future[BroadcastScanner.Broadcast] = (
asyncio.get_running_loop().create_future()
)
def on_new_broadcast(broadcast: BroadcastScanner.Broadcast) -> None:
if scan_result.done():
return
if broadcast_id is None or broadcast.broadcast_id == broadcast_id:
scan_result.set_result(broadcast)
scanner.on('new_broadcast', on_new_broadcast)
await scanner.start()
print('Start scanning...')
broadcast = await scan_result
print('Advertisement found:')
broadcast.print()
basic_audio_announcement_scanned = asyncio.Event()
def on_change() -> None:
if (
broadcast.basic_audio_announcement
and not basic_audio_announcement_scanned.is_set()
):
basic_audio_announcement_scanned.set()
broadcast.on('change', on_change)
if not broadcast.basic_audio_announcement:
print('Wait for Basic Audio Announcement...')
await basic_audio_announcement_scanned.wait()
print('Basic Audio Announcement found')
broadcast.print()
print('Stop scanning')
await scanner.stop()
print('Start sync to BIG')
assert broadcast.basic_audio_announcement
subgroup = broadcast.basic_audio_announcement.subgroups[subgroup_index]
configuration = subgroup.codec_specific_configuration
assert configuration
assert (sampling_frequency := configuration.sampling_frequency)
assert (frame_duration := configuration.frame_duration)
big_sync = await device.create_big_sync(
broadcast.sync,
bumble.device.BigSyncParameters(
big_sync_timeout=0x4000,
bis=[bis.index for bis in subgroup.bis],
broadcast_code=(
bytes.fromhex(broadcast_code) if broadcast_code else None
),
),
)
num_bis = len(big_sync.bis_links)
decoder = lc3.Decoder(
frame_duration_us=frame_duration.us,
sample_rate_hz=sampling_frequency.hz,
num_channels=num_bis,
)
lc3_queues: list[Deque[bytes]] = [collections.deque() for i in range(num_bis)]
packet_stats = [0, 0]
audio_output = await audio_io.create_audio_output(output)
# This try should be replaced with contextlib.aclosing() when python 3.9 is no
# longer needed.
try:
await audio_output.open(
audio_io.PcmFormat(
audio_io.PcmFormat.Endianness.LITTLE,
audio_io.PcmFormat.SampleType.FLOAT32,
sampling_frequency.hz,
num_bis,
)
)
def sink(queue: Deque[bytes], packet: hci.HCI_IsoDataPacket):
# TODO: re-assemble fragments and detect errors
queue.append(packet.iso_sdu_fragment)
while all(lc3_queues):
# This assumes SDUs contain one LC3 frame each, which may not
# be correct for all cases. TODO: revisit this assumption.
frame = b''.join([lc3_queue.popleft() for lc3_queue in lc3_queues])
if not frame:
print(color('!!! received empty frame', 'red'))
continue
packet_stats[0] += len(frame)
packet_stats[1] += 1
print(
f'\rRECEIVED: {packet_stats[0]} bytes in '
f'{packet_stats[1]} packets',
end='',
)
try:
pcm = decoder.decode(frame).tobytes()
except lc3.BaseError as error:
print(color(f'!!! LC3 decoding error: {error}'))
continue
audio_output.write(pcm)
for i, bis_link in enumerate(big_sync.bis_links):
print(f'Setup ISO for BIS {bis_link.handle}')
bis_link.sink = functools.partial(sink, lc3_queues[i])
await device.send_command(
hci.HCI_LE_Setup_ISO_Data_Path_Command(
connection_handle=bis_link.handle,
data_path_direction=hci.HCI_LE_Setup_ISO_Data_Path_Command.Direction.CONTROLLER_TO_HOST,
data_path_id=0,
codec_id=hci.CodingFormat(codec_id=hci.CodecID.TRANSPARENT),
controller_delay=0,
codec_configuration=b'',
),
check_result=True,
)
terminated = asyncio.Event()
big_sync.on(big_sync.Event.TERMINATION, lambda _: terminated.set())
await terminated.wait()
finally:
await audio_output.aclose()
async def run_transmit(
transport: str,
broadcast_id: int,
broadcast_code: str | None,
broadcast_name: str,
bitrate: int,
manufacturer_data: tuple[int, bytes] | None,
input: str,
input_format: str,
) -> None:
# Run a pre-flight check for the input.
try:
if not audio_io.check_audio_input(input):
return
except ValueError as error:
print(error)
return
async with create_device(transport) as device:
if not device.supports_le_periodic_advertising:
print(color('Periodic advertising not supported', 'red'))
return
basic_audio_announcement = bap.BasicAudioAnnouncement(
presentation_delay=40000,
subgroups=[
bap.BasicAudioAnnouncement.Subgroup(
codec_id=hci.CodingFormat(codec_id=hci.CodecID.LC3),
codec_specific_configuration=bap.CodecSpecificConfiguration(
sampling_frequency=bap.SamplingFrequency.FREQ_48000,
frame_duration=bap.FrameDuration.DURATION_10000_US,
octets_per_codec_frame=100,
),
metadata=le_audio.Metadata(
[
le_audio.Metadata.Entry(
tag=le_audio.Metadata.Tag.LANGUAGE, data=b'eng'
),
le_audio.Metadata.Entry(
tag=le_audio.Metadata.Tag.PROGRAM_INFO, data=b'Disco'
),
]
),
bis=[
bap.BasicAudioAnnouncement.BIS(
index=1,
codec_specific_configuration=bap.CodecSpecificConfiguration(
audio_channel_allocation=bap.AudioLocation.FRONT_LEFT
),
),
bap.BasicAudioAnnouncement.BIS(
index=2,
codec_specific_configuration=bap.CodecSpecificConfiguration(
audio_channel_allocation=bap.AudioLocation.FRONT_RIGHT
),
),
],
)
],
)
broadcast_audio_announcement = bap.BroadcastAudioAnnouncement(broadcast_id)
advertising_manufacturer_data = (
b''
if manufacturer_data is None
else bytes(
core.AdvertisingData(
[
(
core.AdvertisingData.MANUFACTURER_SPECIFIC_DATA,
struct.pack('<H', manufacturer_data[0])
+ manufacturer_data[1],
)
]
)
)
)
advertising_set = await device.create_advertising_set(
advertising_parameters=bumble.device.AdvertisingParameters(
advertising_event_properties=bumble.device.AdvertisingEventProperties(
is_connectable=False
),
primary_advertising_interval_min=100,
primary_advertising_interval_max=200,
),
advertising_data=(
broadcast_audio_announcement.get_advertising_data()
+ bytes(
core.AdvertisingData(
[(core.AdvertisingData.BROADCAST_NAME, broadcast_name.encode())]
)
)
+ advertising_manufacturer_data
),
periodic_advertising_parameters=bumble.device.PeriodicAdvertisingParameters(
periodic_advertising_interval_min=80,
periodic_advertising_interval_max=160,
),
periodic_advertising_data=basic_audio_announcement.get_advertising_data(),
auto_restart=True,
auto_start=True,
)
print('Start Periodic Advertising')
await advertising_set.start_periodic()
audio_input = await audio_io.create_audio_input(input, input_format)
pcm_format = await audio_input.open()
# This try should be replaced with contextlib.aclosing() when python 3.9 is no
# longer needed.
try:
if pcm_format.channels != 2:
print("Only 2 channels PCM configurations are supported")
return
if pcm_format.sample_type == audio_io.PcmFormat.SampleType.INT16:
pcm_bit_depth = 16
elif pcm_format.sample_type == audio_io.PcmFormat.SampleType.FLOAT32:
pcm_bit_depth = None
else:
print("Only INT16 and FLOAT32 sample types are supported")
return
encoder = lc3.Encoder(
frame_duration_us=AURACAST_DEFAULT_FRAME_DURATION,
sample_rate_hz=AURACAST_DEFAULT_SAMPLE_RATE,
num_channels=pcm_format.channels,
input_sample_rate_hz=pcm_format.sample_rate,
)
lc3_frame_samples = encoder.get_frame_samples()
lc3_frame_size = encoder.get_frame_bytes(bitrate)
print(
f'Encoding with {lc3_frame_samples} '
f'PCM samples per {lc3_frame_size} byte frame'
)
print('Setup BIG')
big = await device.create_big(
advertising_set,
parameters=bumble.device.BigParameters(
num_bis=pcm_format.channels,
sdu_interval=AURACAST_DEFAULT_FRAME_DURATION,
max_sdu=lc3_frame_size,
max_transport_latency=65,
rtn=4,
broadcast_code=(
bytes.fromhex(broadcast_code) if broadcast_code else None
),
),
)
iso_queues = [
bumble.device.IsoPacketStream(big.bis_links[0], 64),
bumble.device.IsoPacketStream(big.bis_links[1], 64),
]
def on_flow():
data_packet_queue = iso_queues[0].data_packet_queue
print(
f'\rPACKETS: pending={data_packet_queue.pending}, '
f'queued={data_packet_queue.queued}, '
f'completed={data_packet_queue.completed}',
end='',
)
iso_queues[0].data_packet_queue.on('flow', on_flow)
frame_count = 0
async for pcm_frame in audio_input.frames(lc3_frame_samples):
lc3_frame = encoder.encode(
pcm_frame, num_bytes=2 * lc3_frame_size, bit_depth=pcm_bit_depth
)
mid = len(lc3_frame) // 2
await iso_queues[0].write(lc3_frame[:mid])
await iso_queues[1].write(lc3_frame[mid:])
frame_count += 1
finally:
await audio_input.aclose()
def run_async(async_command: Coroutine) -> None:
try:
asyncio.run(async_command)
except bumble.core.ProtocolError as error:
except core.ProtocolError as error:
if error.error_namespace == 'att' and error.error_code in list(
bumble.profiles.bass.ApplicationError
bass.ApplicationError
):
message = bumble.profiles.bass.ApplicationError(error.error_code).name
message = bass.ApplicationError(error.error_code).name
else:
message = str(error)
@@ -622,9 +1012,7 @@ def run_async(async_command: Coroutine) -> None:
# -----------------------------------------------------------------------------
@click.group()
@click.pass_context
def auracast(
ctx,
):
def auracast(ctx):
ctx.ensure_object(dict)
@@ -669,7 +1057,7 @@ def scan(ctx, filter_duplicates, sync_timeout, transport):
@click.argument('address')
@click.pass_context
def assist(ctx, broadcast_name, source_id, command, transport, address):
"""Scan for broadcasts on behalf of a audio server"""
"""Scan for broadcasts on behalf of an audio server"""
run_async(run_assist(broadcast_name, source_id, command, transport, address))
@@ -682,6 +1070,166 @@ def pair(ctx, transport, address):
run_async(run_pair(transport, address))
@auracast.command('receive')
@click.argument('transport')
@click.argument(
'broadcast_id',
type=int,
required=False,
)
@click.option(
'--output',
default='device',
help=(
"Audio output. "
"'device' -> use the host's default sound output device, "
"'device:<DEVICE_ID>' -> use one of the host's sound output device "
"(specify 'device:?' to get a list of available sound output devices), "
"'stdout' -> send audio to stdout, "
"'file:<filename> -> write audio to a raw float32 PCM file, "
"'ffplay' -> pipe the audio to ffplay"
),
)
@click.option(
'--broadcast-code',
metavar='BROADCAST_CODE',
type=str,
help='Broadcast encryption code in hex format',
)
@click.option(
'--sync-timeout',
metavar='SYNC_TIMEOUT',
type=float,
default=AURACAST_DEFAULT_SYNC_TIMEOUT,
help='Sync timeout (in seconds)',
)
@click.option(
'--subgroup',
metavar='SUBGROUP',
type=int,
default=0,
help='Index of Subgroup',
)
@click.pass_context
def receive(
ctx,
transport,
broadcast_id,
output,
broadcast_code,
sync_timeout,
subgroup,
):
"""Receive a broadcast source"""
run_async(
run_receive(
transport,
broadcast_id,
output,
broadcast_code,
sync_timeout,
subgroup,
)
)
@auracast.command('transmit')
@click.argument('transport')
@click.option(
'--input',
required=True,
help=(
"Audio input. "
"'device' -> use the host's default sound input device, "
"'device:<DEVICE_ID>' -> use one of the host's sound input devices "
"(specify 'device:?' to get a list of available sound input devices), "
"'stdin' -> receive audio from stdin as int16 PCM, "
"'file:<filename> -> read audio from a .wav or raw int16 PCM file. "
"(The file: prefix may be omitted if the file path does not start with "
"the substring 'device:' or 'file:' and is not 'stdin')"
),
)
@click.option(
'--input-format',
metavar="FORMAT",
default='auto',
help=(
"Audio input format. "
"Use 'auto' for .wav files, or for the default setting with the devices. "
"For other inputs, the format is specified as "
"<sample-type>,<sample-rate>,<channels> (supported <sample-type>: 'int16le' "
"for 16-bit signed integers with little-endian byte order or 'float32le' for "
"32-bit floating point with little-endian byte order)"
),
)
@click.option(
'--broadcast-id',
metavar='BROADCAST_ID',
type=int,
default=123456,
help='Broadcast ID',
)
@click.option(
'--broadcast-code',
metavar='BROADCAST_CODE',
help='Broadcast encryption code in hex format',
)
@click.option(
'--broadcast-name',
metavar='BROADCAST_NAME',
default='Bumble Auracast',
help='Broadcast name',
)
@click.option(
'--bitrate',
type=int,
default=AURACAST_DEFAULT_TRANSMIT_BITRATE,
help='Bitrate, per channel, in bps',
)
@click.option(
'--manufacturer-data',
metavar='VENDOR-ID:DATA-HEX',
help='Manufacturer data (specify as <vendor-id>:<data-hex>)',
)
@click.pass_context
def transmit(
ctx,
transport,
broadcast_id,
broadcast_code,
manufacturer_data,
broadcast_name,
bitrate,
input,
input_format,
):
"""Transmit a broadcast source"""
if manufacturer_data:
vendor_id_str, data_hex = manufacturer_data.split(':')
vendor_id = int(vendor_id_str)
data = bytes.fromhex(data_hex)
manufacturer_data_tuple = (vendor_id, data)
else:
manufacturer_data_tuple = None
if (input == 'device' or input.startswith('device:')) and input_format == 'auto':
# Use a default format for device inputs
input_format = 'int16le,48000,1'
run_async(
run_transmit(
transport=transport,
broadcast_id=broadcast_id,
broadcast_code=broadcast_code,
broadcast_name=broadcast_name,
bitrate=bitrate,
manufacturer_data=manufacturer_data_tuple,
input=input,
input_format=input_format,
)
)
def main():
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
auracast()

View File

@@ -199,7 +199,7 @@ def log_stats(title, stats, precision=2):
stats_min = min(stats)
stats_max = max(stats)
stats_avg = statistics.mean(stats)
stats_stdev = statistics.stdev(stats)
stats_stdev = statistics.stdev(stats) if len(stats) >= 2 else 0
logging.info(
color(
(
@@ -468,6 +468,7 @@ class Ping:
for run in range(self.repeat + 1):
self.done.clear()
self.ping_times = []
if run > 0 and self.repeat and self.repeat_delay:
logging.info(color(f'*** Repeat delay: {self.repeat_delay}', 'green'))

View File

@@ -37,6 +37,8 @@ from bumble.hci import (
HCI_Command_Status_Event,
HCI_READ_BUFFER_SIZE_COMMAND,
HCI_Read_Buffer_Size_Command,
HCI_LE_READ_BUFFER_SIZE_V2_COMMAND,
HCI_LE_Read_Buffer_Size_V2_Command,
HCI_READ_BD_ADDR_COMMAND,
HCI_Read_BD_ADDR_Command,
HCI_READ_LOCAL_NAME_COMMAND,
@@ -75,7 +77,7 @@ async def get_classic_info(host: Host) -> None:
if command_succeeded(response):
print()
print(
color('Classic Address:', 'yellow'),
color('Public Address:', 'yellow'),
response.return_parameters.bd_addr.to_string(False),
)
@@ -147,7 +149,7 @@ async def get_le_info(host: Host) -> None:
# -----------------------------------------------------------------------------
async def get_acl_flow_control_info(host: Host) -> None:
async def get_flow_control_info(host: Host) -> None:
print()
if host.supports_command(HCI_READ_BUFFER_SIZE_COMMAND):
@@ -160,14 +162,28 @@ async def get_acl_flow_control_info(host: Host) -> None:
f'packets of size {response.return_parameters.hc_acl_data_packet_length}',
)
if host.supports_command(HCI_LE_READ_BUFFER_SIZE_COMMAND):
if host.supports_command(HCI_LE_READ_BUFFER_SIZE_V2_COMMAND):
response = await host.send_command(
HCI_LE_Read_Buffer_Size_V2_Command(), check_result=True
)
print(
color('LE ACL Flow Control:', 'yellow'),
f'{response.return_parameters.total_num_le_acl_data_packets} '
f'packets of size {response.return_parameters.le_acl_data_packet_length}',
)
print(
color('LE ISO Flow Control:', 'yellow'),
f'{response.return_parameters.total_num_iso_data_packets} '
f'packets of size {response.return_parameters.iso_data_packet_length}',
)
elif host.supports_command(HCI_LE_READ_BUFFER_SIZE_COMMAND):
response = await host.send_command(
HCI_LE_Read_Buffer_Size_Command(), check_result=True
)
print(
color('LE ACL Flow Control:', 'yellow'),
f'{response.return_parameters.hc_total_num_le_acl_data_packets} '
f'packets of size {response.return_parameters.hc_le_acl_data_packet_length}',
f'{response.return_parameters.total_num_le_acl_data_packets} '
f'packets of size {response.return_parameters.le_acl_data_packet_length}',
)
@@ -274,8 +290,8 @@ async def async_main(latency_probes, transport):
# Get the LE info
await get_le_info(host)
# Print the ACL flow control info
await get_acl_flow_control_info(host)
# Print the flow control info
await get_flow_control_info(host)
# Get codec info
await get_codecs_info(host)

View File

@@ -29,7 +29,9 @@ from bumble.gatt import Service
from bumble.profiles.device_information_service import DeviceInformationServiceProxy
from bumble.profiles.battery_service import BatteryServiceProxy
from bumble.profiles.gap import GenericAccessServiceProxy
from bumble.profiles.pacs import PublishedAudioCapabilitiesServiceProxy
from bumble.profiles.tmap import TelephonyAndMediaAudioServiceProxy
from bumble.profiles.vcs import VolumeControlServiceProxy
from bumble.transport import open_transport_or_link
@@ -126,14 +128,52 @@ async def show_tmas(
print(color('### Telephony And Media Audio Service', 'yellow'))
if tmas.role:
print(
color(' Role:', 'green'),
await tmas.role.read_value(),
)
role = await tmas.role.read_value()
print(color(' Role:', 'green'), role)
print()
# -----------------------------------------------------------------------------
async def show_pacs(pacs: PublishedAudioCapabilitiesServiceProxy) -> None:
print(color('### Published Audio Capabilities Service', 'yellow'))
contexts = await pacs.available_audio_contexts.read_value()
print(color(' Available Audio Contexts:', 'green'), contexts)
contexts = await pacs.supported_audio_contexts.read_value()
print(color(' Supported Audio Contexts:', 'green'), contexts)
if pacs.sink_pac:
pac = await pacs.sink_pac.read_value()
print(color(' Sink PAC: ', 'green'), pac)
if pacs.sink_audio_locations:
audio_locations = await pacs.sink_audio_locations.read_value()
print(color(' Sink Audio Locations: ', 'green'), audio_locations)
if pacs.source_pac:
pac = await pacs.source_pac.read_value()
print(color(' Source PAC: ', 'green'), pac)
if pacs.source_audio_locations:
audio_locations = await pacs.source_audio_locations.read_value()
print(color(' Source Audio Locations: ', 'green'), audio_locations)
print()
# -----------------------------------------------------------------------------
async def show_vcs(vcs: VolumeControlServiceProxy) -> None:
print(color('### Volume Control Service', 'yellow'))
volume_state = await vcs.volume_state.read_value()
print(color(' Volume State:', 'green'), volume_state)
volume_flags = await vcs.volume_flags.read_value()
print(color(' Volume Flags:', 'green'), volume_flags)
# -----------------------------------------------------------------------------
async def show_device_info(peer, done: Optional[asyncio.Future]) -> None:
try:
@@ -161,6 +201,12 @@ async def show_device_info(peer, done: Optional[asyncio.Future]) -> None:
if tmas := peer.create_service_proxy(TelephonyAndMediaAudioServiceProxy):
await try_show(show_tmas, tmas)
if pacs := peer.create_service_proxy(PublishedAudioCapabilitiesServiceProxy):
await try_show(show_pacs, pacs)
if vcs := peer.create_service_proxy(VolumeControlServiceProxy):
await try_show(show_vcs, vcs)
if done is not None:
done.set_result(None)
except asyncio.CancelledError:

View File

@@ -83,7 +83,7 @@ async def async_main():
return_parameters=bytes([hci.HCI_SUCCESS]),
)
# Return a packet with 'respond to sender' set to True
return (response.to_bytes(), True)
return (bytes(response), True)
return None

View File

@@ -16,23 +16,22 @@
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import asyncio
import datetime
import enum
import functools
from importlib import resources
import json
import os
import logging
import pathlib
from typing import Optional, List, cast
import weakref
import struct
import wave
import ctypes
import wasmtime
import wasmtime.loader
import liblc3 # type: ignore
try:
import lc3 # type: ignore # pylint: disable=E0401
except ImportError as e:
raise ImportError("Try `python -m pip install \".[lc3]\"`.") from e
import click
import aiohttp.web
@@ -40,11 +39,12 @@ import aiohttp.web
import bumble
from bumble.core import AdvertisingData
from bumble.colors import color
from bumble.device import Device, DeviceConfiguration, AdvertisingParameters
from bumble.device import Device, DeviceConfiguration, AdvertisingParameters, CisLink
from bumble.transport import open_transport
from bumble.profiles import ascs, bap, pacs
from bumble.hci import Address, CodecID, CodingFormat, HCI_IsoDataPacket
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
@@ -54,6 +54,7 @@ logger = logging.getLogger(__name__)
# Constants
# -----------------------------------------------------------------------------
DEFAULT_UI_PORT = 7654
DEFAULT_PCM_BYTES_PER_SAMPLE = 2
def _sink_pac_record() -> pacs.PacRecord:
@@ -100,153 +101,8 @@ def _source_pac_record() -> pacs.PacRecord:
)
# -----------------------------------------------------------------------------
# WASM - liblc3
# -----------------------------------------------------------------------------
store = wasmtime.loader.store
_memory = cast(wasmtime.Memory, liblc3.memory)
STACK_POINTER = _memory.data_len(store)
_memory.grow(store, 1)
# Mapping wasmtime memory to linear address
memory = (ctypes.c_ubyte * _memory.data_len(store)).from_address(
ctypes.addressof(_memory.data_ptr(store).contents) # type: ignore
)
class Liblc3PcmFormat(enum.IntEnum):
S16 = 0
S24 = 1
S24_3LE = 2
FLOAT = 3
MAX_DECODER_SIZE = liblc3.lc3_decoder_size(10000, 48000)
MAX_ENCODER_SIZE = liblc3.lc3_encoder_size(10000, 48000)
DECODER_STACK_POINTER = STACK_POINTER
ENCODER_STACK_POINTER = DECODER_STACK_POINTER + MAX_DECODER_SIZE * 2
DECODE_BUFFER_STACK_POINTER = ENCODER_STACK_POINTER + MAX_ENCODER_SIZE * 2
ENCODE_BUFFER_STACK_POINTER = DECODE_BUFFER_STACK_POINTER + 8192
DEFAULT_PCM_SAMPLE_RATE = 48000
DEFAULT_PCM_FORMAT = Liblc3PcmFormat.S16
DEFAULT_PCM_BYTES_PER_SAMPLE = 2
encoders: List[int] = []
decoders: List[int] = []
def setup_encoders(
sample_rate_hz: int, frame_duration_us: int, num_channels: int
) -> None:
logger.info(
f"setup_encoders {sample_rate_hz}Hz {frame_duration_us}us {num_channels}channels"
)
encoders[:num_channels] = [
liblc3.lc3_setup_encoder(
frame_duration_us,
sample_rate_hz,
DEFAULT_PCM_SAMPLE_RATE, # Input sample rate
ENCODER_STACK_POINTER + MAX_ENCODER_SIZE * i,
)
for i in range(num_channels)
]
def setup_decoders(
sample_rate_hz: int, frame_duration_us: int, num_channels: int
) -> None:
logger.info(
f"setup_decoders {sample_rate_hz}Hz {frame_duration_us}us {num_channels}channels"
)
decoders[:num_channels] = [
liblc3.lc3_setup_decoder(
frame_duration_us,
sample_rate_hz,
DEFAULT_PCM_SAMPLE_RATE, # Output sample rate
DECODER_STACK_POINTER + MAX_DECODER_SIZE * i,
)
for i in range(num_channels)
]
def decode(
frame_duration_us: int,
num_channels: int,
input_bytes: bytes,
) -> bytes:
if not input_bytes:
return b''
input_buffer_offset = DECODE_BUFFER_STACK_POINTER
input_buffer_size = len(input_bytes)
input_bytes_per_frame = input_buffer_size // num_channels
# Copy into wasm
memory[input_buffer_offset : input_buffer_offset + input_buffer_size] = input_bytes # type: ignore
output_buffer_offset = input_buffer_offset + input_buffer_size
output_buffer_size = (
liblc3.lc3_frame_samples(frame_duration_us, DEFAULT_PCM_SAMPLE_RATE)
* DEFAULT_PCM_BYTES_PER_SAMPLE
* num_channels
)
for i in range(num_channels):
res = liblc3.lc3_decode(
decoders[i],
input_buffer_offset + input_bytes_per_frame * i,
input_bytes_per_frame,
DEFAULT_PCM_FORMAT,
output_buffer_offset + i * DEFAULT_PCM_BYTES_PER_SAMPLE,
num_channels, # Stride
)
if res != 0:
logging.error(f"Parsing failed, res={res}")
# Extract decoded data from the output buffer
return bytes(
memory[output_buffer_offset : output_buffer_offset + output_buffer_size]
)
def encode(
sdu_length: int,
num_channels: int,
stride: int,
input_bytes: bytes,
) -> bytes:
if not input_bytes:
return b''
input_buffer_offset = ENCODE_BUFFER_STACK_POINTER
input_buffer_size = len(input_bytes)
# Copy into wasm
memory[input_buffer_offset : input_buffer_offset + input_buffer_size] = input_bytes # type: ignore
output_buffer_offset = input_buffer_offset + input_buffer_size
output_buffer_size = sdu_length
output_frame_size = output_buffer_size // num_channels
for i in range(num_channels):
res = liblc3.lc3_encode(
encoders[i],
DEFAULT_PCM_FORMAT,
input_buffer_offset + DEFAULT_PCM_BYTES_PER_SAMPLE * i,
stride,
output_frame_size,
output_buffer_offset + output_frame_size * i,
)
if res != 0:
logging.error(f"Parsing failed, res={res}")
# Extract decoded data from the output buffer
return bytes(
memory[output_buffer_offset : output_buffer_offset + output_buffer_size]
)
decoder: lc3.Decoder | None = None
encoding_config: bap.CodecSpecificConfiguration | None = None
async def lc3_source_task(
@@ -254,44 +110,49 @@ async def lc3_source_task(
sdu_length: int,
frame_duration_us: int,
device: Device,
cis_handle: int,
cis_link: CisLink,
) -> None:
with open(filename, 'rb') as f:
header = f.read(44)
assert header[8:12] == b'WAVE'
logger.info(
"lc3_source_task filename=%s, sdu_length=%d, frame_duration=%.1f",
filename,
sdu_length,
frame_duration_us / 1000,
)
with wave.open(filename, 'rb') as wav:
bits_per_sample = wav.getsampwidth() * 8
pcm_num_channel, pcm_sample_rate, _byte_rate, _block_align, bits_per_sample = (
struct.unpack("<HIIHH", header[22:36])
)
assert pcm_sample_rate == DEFAULT_PCM_SAMPLE_RATE
assert bits_per_sample == DEFAULT_PCM_BYTES_PER_SAMPLE * 8
frame_bytes = (
liblc3.lc3_frame_samples(frame_duration_us, DEFAULT_PCM_SAMPLE_RATE)
* DEFAULT_PCM_BYTES_PER_SAMPLE
)
packet_sequence_number = 0
encoder: lc3.Encoder | None = None
while True:
next_round = datetime.datetime.now() + datetime.timedelta(
microseconds=frame_duration_us
)
pcm_data = f.read(frame_bytes)
sdu = encode(sdu_length, pcm_num_channel, pcm_num_channel, pcm_data)
if not encoder:
if (
encoding_config
and (frame_duration := encoding_config.frame_duration)
and (sampling_frequency := encoding_config.sampling_frequency)
and (
audio_channel_allocation := encoding_config.audio_channel_allocation
)
):
logger.info("Use %s", encoding_config)
encoder = lc3.Encoder(
frame_duration_us=frame_duration.us,
sample_rate_hz=sampling_frequency.hz,
num_channels=audio_channel_allocation.channel_count,
input_sample_rate_hz=wav.getframerate(),
)
else:
sdu = encoder.encode(
pcm=wav.readframes(encoder.get_frame_samples()),
num_bytes=sdu_length,
bit_depth=bits_per_sample,
)
cis_link.write(sdu)
iso_packet = HCI_IsoDataPacket(
connection_handle=cis_handle,
data_total_length=sdu_length + 4,
packet_sequence_number=packet_sequence_number,
pb_flag=0b10,
packet_status_flag=0,
iso_sdu_length=sdu_length,
iso_sdu_fragment=sdu,
)
device.host.send_hci_packet(iso_packet)
packet_sequence_number += 1
sleep_time = next_round - datetime.datetime.now()
await asyncio.sleep(sleep_time.total_seconds())
await asyncio.sleep(sleep_time.total_seconds() * 0.9)
# -----------------------------------------------------------------------------
@@ -410,7 +271,7 @@ class Speaker:
def __init__(
self,
device_config_path: Optional[str],
device_config_path: str | None,
ui_port: int,
transport: str,
lc3_input_file_path: str,
@@ -437,6 +298,7 @@ class Speaker:
advertising_interval_min=25,
advertising_interval_max=25,
address=Address('F1:F2:F3:F4:F5:F6'),
identity_address_type=Address.RANDOM_DEVICE_ADDRESS,
)
device_config.le_enabled = True
@@ -486,20 +348,31 @@ class Speaker:
def on_pdu(pdu: HCI_IsoDataPacket, ase: ascs.AseStateMachine):
codec_config = ase.codec_specific_configuration
assert isinstance(codec_config, bap.CodecSpecificConfiguration)
pcm = decode(
codec_config.frame_duration.us,
codec_config.audio_channel_allocation.channel_count,
pdu.iso_sdu_fragment,
if (
not isinstance(codec_config, bap.CodecSpecificConfiguration)
or codec_config.frame_duration is None
or codec_config.audio_channel_allocation is None
or decoder is None
or not pdu.iso_sdu_fragment
):
return
pcm = decoder.decode(
pdu.iso_sdu_fragment, bit_depth=DEFAULT_PCM_BYTES_PER_SAMPLE * 8
)
self.device.abort_on('disconnection', self.ui_server.send_audio(pcm))
def on_ase_state_change(ase: ascs.AseStateMachine) -> None:
codec_config = ase.codec_specific_configuration
if ase.state == ascs.AseStateMachine.State.STREAMING:
codec_config = ase.codec_specific_configuration
assert isinstance(codec_config, bap.CodecSpecificConfiguration)
assert ase.cis_link
if ase.role == ascs.AudioRole.SOURCE:
if (
not isinstance(codec_config, bap.CodecSpecificConfiguration)
or ase.cis_link is None
or codec_config.octets_per_codec_frame is None
or codec_config.frame_duration is None
or codec_config.codec_frames_per_sdu is None
):
return
ase.cis_link.abort_on(
'disconnection',
lc3_source_task(
@@ -510,25 +383,30 @@ class Speaker:
),
frame_duration_us=codec_config.frame_duration.us,
device=self.device,
cis_handle=ase.cis_link.handle,
cis_link=ase.cis_link,
),
)
else:
if not ase.cis_link:
return
ase.cis_link.sink = functools.partial(on_pdu, ase=ase)
elif ase.state == ascs.AseStateMachine.State.CODEC_CONFIGURED:
codec_config = ase.codec_specific_configuration
assert isinstance(codec_config, bap.CodecSpecificConfiguration)
if (
not isinstance(codec_config, bap.CodecSpecificConfiguration)
or codec_config.sampling_frequency is None
or codec_config.frame_duration is None
or codec_config.audio_channel_allocation is None
):
return
if ase.role == ascs.AudioRole.SOURCE:
setup_encoders(
codec_config.sampling_frequency.hz,
codec_config.frame_duration.us,
codec_config.audio_channel_allocation.channel_count,
)
global encoding_config
encoding_config = codec_config
else:
setup_decoders(
codec_config.sampling_frequency.hz,
codec_config.frame_duration.us,
codec_config.audio_channel_allocation.channel_count,
global decoder
decoder = lc3.Decoder(
frame_duration_us=codec_config.frame_duration.us,
sample_rate_hz=codec_config.sampling_frequency.hz,
num_channels=codec_config.audio_channel_allocation.channel_count,
)
for ase in ascs_service.ase_state_machines.values():
@@ -567,7 +445,7 @@ def speaker(ui_port: int, device_config: str, transport: str, lc3_file: str) ->
# -----------------------------------------------------------------------------
def main():
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'WARNING').upper())
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
speaker()

Binary file not shown.

View File

@@ -373,7 +373,9 @@ async def pair(
shared_data = (
None
if oob == '-'
else OobData.from_ad(AdvertisingData.from_bytes(bytes.fromhex(oob)))
else OobData.from_ad(
AdvertisingData.from_bytes(bytes.fromhex(oob))
).shared_data
)
legacy_context = OobLegacyContext()
oob_contexts = PairingConfig.OobConfig(
@@ -381,16 +383,19 @@ async def pair(
peer_data=shared_data,
legacy_context=legacy_context,
)
oob_data = OobData(
address=device.random_address,
shared_data=shared_data,
legacy_context=legacy_context,
)
print(color('@@@-----------------------------------', 'yellow'))
print(color('@@@ OOB Data:', 'yellow'))
print(color(f'@@@ {our_oob_context.share()}', 'yellow'))
if shared_data is None:
oob_data = OobData(
address=device.random_address, shared_data=our_oob_context.share()
)
print(
color(
f'@@@ SHARE: {bytes(oob_data.to_ad()).hex()}',
'yellow',
)
)
print(color(f'@@@ TK={legacy_context.tk.hex()}', 'yellow'))
print(color(f'@@@ HEX: ({bytes(oob_data.to_ad()).hex()})', 'yellow'))
print(color('@@@-----------------------------------', 'yellow'))
else:
oob_contexts = None

View File

@@ -144,18 +144,18 @@ class Printer:
help='Format of the input file',
)
@click.option(
'--vendors',
'--vendor',
type=click.Choice(['android', 'zephyr']),
multiple=True,
help='Support vendor-specific commands (list one or more)',
)
@click.argument('filename')
# pylint: disable=redefined-builtin
def main(format, vendors, filename):
for vendor in vendors:
if vendor == 'android':
def main(format, vendor, filename):
for vendor_name in vendor:
if vendor_name == 'android':
import bumble.vendor.android.hci
elif vendor == 'zephyr':
elif vendor_name == 'zephyr':
import bumble.vendor.zephyr.hci
input = open(filename, 'rb')
@@ -180,7 +180,7 @@ def main(format, vendors, filename):
else:
printer.print(color("[TRUNCATED]", "red"))
except Exception as error:
logger.exception()
logger.exception('')
print(color(f'!!! {error}', 'red'))

View File

@@ -57,6 +57,7 @@ if TYPE_CHECKING:
# pylint: disable=line-too-long
ATT_CID = 0x04
ATT_PSM = 0x001F
ATT_ERROR_RESPONSE = 0x01
ATT_EXCHANGE_MTU_REQUEST = 0x02
@@ -291,9 +292,6 @@ class ATT_PDU:
def init_from_bytes(self, pdu, offset):
return HCI_Object.init_from_bytes(self, pdu, offset, self.fields)
def to_bytes(self):
return self.pdu
@property
def is_command(self):
return ((self.op_code >> 6) & 1) == 1
@@ -303,7 +301,7 @@ class ATT_PDU:
return ((self.op_code >> 7) & 1) == 1
def __bytes__(self):
return self.to_bytes()
return self.pdu
def __str__(self):
result = color(self.name, 'yellow')

View File

@@ -1,4 +1,4 @@
# Copyright 2021-2022 Google LLC
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -12,6 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from setuptools import setup
setup()
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------

553
bumble/audio/io.py Normal file
View File

@@ -0,0 +1,553 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import asyncio
import abc
from concurrent.futures import ThreadPoolExecutor
import dataclasses
import enum
import logging
import pathlib
from typing import (
AsyncGenerator,
BinaryIO,
TYPE_CHECKING,
)
import sys
import wave
from bumble.colors import color
if TYPE_CHECKING:
import sounddevice # type: ignore[import-untyped]
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
# Classes
# -----------------------------------------------------------------------------
@dataclasses.dataclass
class PcmFormat:
class Endianness(enum.Enum):
LITTLE = 0
BIG = 1
class SampleType(enum.Enum):
FLOAT32 = 0
INT16 = 1
endianness: Endianness
sample_type: SampleType
sample_rate: int
channels: int
@classmethod
def from_str(cls, format_str: str) -> PcmFormat:
endianness = cls.Endianness.LITTLE # Others not yet supported.
sample_type_str, sample_rate_str, channels_str = format_str.split(',')
if sample_type_str == 'int16le':
sample_type = cls.SampleType.INT16
elif sample_type_str == 'float32le':
sample_type = cls.SampleType.FLOAT32
else:
raise ValueError(f'sample type {sample_type_str} not supported')
sample_rate = int(sample_rate_str)
channels = int(channels_str)
return cls(endianness, sample_type, sample_rate, channels)
@property
def bytes_per_sample(self) -> int:
return 2 if self.sample_type == self.SampleType.INT16 else 4
def check_audio_output(output: str) -> bool:
if output == 'device' or output.startswith('device:'):
try:
import sounddevice
except ImportError as exc:
raise ValueError(
'audio output not available (sounddevice python module not installed)'
) from exc
except OSError as exc:
raise ValueError(
'audio output not available '
'(sounddevice python module failed to load: '
f'{exc})'
) from exc
if output == 'device':
# Default device
return True
# Specific device
device = output[7:]
if device == '?':
print(color('Audio Devices:', 'yellow'))
for device_info in [
device_info
for device_info in sounddevice.query_devices()
if device_info['max_output_channels'] > 0
]:
device_index = device_info['index']
is_default = (
color(' [default]', 'green')
if sounddevice.default.device[1] == device_index
else ''
)
print(
f'{color(device_index, "cyan")}: {device_info["name"]}{is_default}'
)
return False
try:
device_info = sounddevice.query_devices(int(device))
except sounddevice.PortAudioError as exc:
raise ValueError('No such audio device') from exc
if device_info['max_output_channels'] < 1:
raise ValueError(
f'Device {device} ({device_info["name"]}) does not have an output'
)
return True
async def create_audio_output(output: str) -> AudioOutput:
if output == 'stdout':
return StreamAudioOutput(sys.stdout.buffer)
if output == 'device' or output.startswith('device:'):
device_name = '' if output == 'device' else output[7:]
return SoundDeviceAudioOutput(device_name)
if output == 'ffplay':
return SubprocessAudioOutput(
command=(
'ffplay -probesize 32 -fflags nobuffer -analyzeduration 0 '
'-ar {sample_rate} '
'-ch_layout {channel_layout} '
'-f f32le pipe:0'
)
)
if output.startswith('file:'):
return FileAudioOutput(output[5:])
raise ValueError('unsupported audio output')
class AudioOutput(abc.ABC):
"""Audio output to which PCM samples can be written."""
async def open(self, pcm_format: PcmFormat) -> None:
"""Start the output."""
@abc.abstractmethod
def write(self, pcm_samples: bytes) -> None:
"""Write PCM samples. Must not block."""
async def aclose(self) -> None:
"""Close the output."""
class ThreadedAudioOutput(AudioOutput):
"""Base class for AudioOutput classes that may need to call blocking functions.
The actual writing is performed in a thread, so as to ensure that calling write()
does not block the caller.
"""
def __init__(self) -> None:
self._thread_pool = ThreadPoolExecutor(1)
self._pcm_samples: asyncio.Queue[bytes] = asyncio.Queue()
self._write_task = asyncio.create_task(self._write_loop())
async def _write_loop(self) -> None:
while True:
pcm_samples = await self._pcm_samples.get()
await asyncio.get_running_loop().run_in_executor(
self._thread_pool, self._write, pcm_samples
)
@abc.abstractmethod
def _write(self, pcm_samples: bytes) -> None:
"""This method does the actual writing and can block."""
def write(self, pcm_samples: bytes) -> None:
self._pcm_samples.put_nowait(pcm_samples)
def _close(self) -> None:
"""This method does the actual closing and can block."""
async def aclose(self) -> None:
await asyncio.get_running_loop().run_in_executor(self._thread_pool, self._close)
self._write_task.cancel()
self._thread_pool.shutdown()
class SoundDeviceAudioOutput(ThreadedAudioOutput):
def __init__(self, device_name: str) -> None:
super().__init__()
self._device = int(device_name) if device_name else None
self._stream: sounddevice.RawOutputStream | None = None
async def open(self, pcm_format: PcmFormat) -> None:
import sounddevice # pylint: disable=import-error
self._stream = sounddevice.RawOutputStream(
samplerate=pcm_format.sample_rate,
device=self._device,
channels=pcm_format.channels,
dtype='float32',
)
self._stream.start()
def _write(self, pcm_samples: bytes) -> None:
if self._stream is None:
return
try:
self._stream.write(pcm_samples)
except Exception as error:
print(f'Sound device error: {error}')
raise
def _close(self):
self._stream.stop()
self._stream = None
class StreamAudioOutput(ThreadedAudioOutput):
"""AudioOutput where PCM samples are written to a stream that may block."""
def __init__(self, stream: BinaryIO) -> None:
super().__init__()
self._stream = stream
def _write(self, pcm_samples: bytes) -> None:
self._stream.write(pcm_samples)
self._stream.flush()
class FileAudioOutput(StreamAudioOutput):
"""AudioOutput where PCM samples are written to a file."""
def __init__(self, filename: str) -> None:
self._file = open(filename, "wb")
super().__init__(self._file)
async def shutdown(self):
self._file.close()
return await super().shutdown()
class SubprocessAudioOutput(AudioOutput):
"""AudioOutput where audio samples are written to a subprocess via stdin."""
def __init__(self, command: str) -> None:
self._command = command
self._subprocess: asyncio.subprocess.Process | None
async def open(self, pcm_format: PcmFormat) -> None:
if pcm_format.channels == 1:
channel_layout = 'mono'
elif pcm_format.channels == 2:
channel_layout = 'stereo'
else:
raise ValueError(f'{pcm_format.channels} channels not supported')
command = self._command.format(
sample_rate=pcm_format.sample_rate, channel_layout=channel_layout
)
self._subprocess = await asyncio.create_subprocess_shell(
command,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
def write(self, pcm_samples: bytes) -> None:
if self._subprocess is None or self._subprocess.stdin is None:
return
self._subprocess.stdin.write(pcm_samples)
async def aclose(self):
if self._subprocess:
self._subprocess.terminate()
def check_audio_input(input: str) -> bool:
if input == 'device' or input.startswith('device:'):
try:
import sounddevice # pylint: disable=import-error
except ImportError as exc:
raise ValueError(
'audio input not available (sounddevice python module not installed)'
) from exc
except OSError as exc:
raise ValueError(
'audio input not available '
'(sounddevice python module failed to load: '
f'{exc})'
) from exc
if input == 'device':
# Default device
return True
# Specific device
device = input[7:]
if device == '?':
print(color('Audio Devices:', 'yellow'))
for device_info in [
device_info
for device_info in sounddevice.query_devices()
if device_info['max_input_channels'] > 0
]:
device_index = device_info["index"]
is_mono = device_info['max_input_channels'] == 1
max_channels = color(f'[{"mono" if is_mono else "stereo"}]', 'cyan')
is_default = (
color(' [default]', 'green')
if sounddevice.default.device[0] == device_index
else ''
)
print(
f'{color(device_index, "cyan")}: {device_info["name"]}'
f' {max_channels}{is_default}'
)
return False
try:
device_info = sounddevice.query_devices(int(device))
except sounddevice.PortAudioError as exc:
raise ValueError('No such audio device') from exc
if device_info['max_input_channels'] < 1:
raise ValueError(
f'Device {device} ({device_info["name"]}) does not have an input'
)
return True
async def create_audio_input(input: str, input_format: str) -> AudioInput:
pcm_format: PcmFormat | None
if input_format == 'auto':
pcm_format = None
else:
pcm_format = PcmFormat.from_str(input_format)
if input == 'stdin':
if not pcm_format:
raise ValueError('input format details required for stdin')
return StreamAudioInput(sys.stdin.buffer, pcm_format)
if input == 'device' or input.startswith('device:'):
if not pcm_format:
raise ValueError('input format details required for device')
device_name = '' if input == 'device' else input[7:]
return SoundDeviceAudioInput(device_name, pcm_format)
# If there's no file: prefix, check if we can assume it is a file.
if pathlib.Path(input).is_file():
input = 'file:' + input
if input.startswith('file:'):
filename = input[5:]
if filename.endswith('.wav'):
if input_format != 'auto':
raise ValueError(".wav file only supported with 'auto' format")
return WaveAudioInput(filename)
if pcm_format is None:
raise ValueError('input format details required for raw PCM files')
return FileAudioInput(filename, pcm_format)
raise ValueError('input not supported')
class AudioInput(abc.ABC):
"""Audio input that produces PCM samples."""
@abc.abstractmethod
async def open(self) -> PcmFormat:
"""Open the input."""
@abc.abstractmethod
def frames(self, frame_size: int) -> AsyncGenerator[bytes]:
"""Generate one frame of PCM samples. Must not block."""
async def aclose(self) -> None:
"""Close the input."""
class ThreadedAudioInput(AudioInput):
"""Base class for AudioInput implementation where reading samples may block."""
def __init__(self) -> None:
self._thread_pool = ThreadPoolExecutor(1)
self._pcm_samples: asyncio.Queue[bytes] = asyncio.Queue()
@abc.abstractmethod
def _read(self, frame_size: int) -> bytes:
pass
@abc.abstractmethod
def _open(self) -> PcmFormat:
pass
def _close(self) -> None:
pass
async def open(self) -> PcmFormat:
return await asyncio.get_running_loop().run_in_executor(
self._thread_pool, self._open
)
async def frames(self, frame_size: int) -> AsyncGenerator[bytes]:
while pcm_sample := await asyncio.get_running_loop().run_in_executor(
self._thread_pool, self._read, frame_size
):
yield pcm_sample
async def aclose(self) -> None:
await asyncio.get_running_loop().run_in_executor(self._thread_pool, self._close)
self._thread_pool.shutdown()
class WaveAudioInput(ThreadedAudioInput):
"""Audio input that reads PCM samples from a .wav file."""
def __init__(self, filename: str) -> None:
super().__init__()
self._filename = filename
self._wav: wave.Wave_read | None = None
self._bytes_read = 0
def _open(self) -> PcmFormat:
self._wav = wave.open(self._filename, 'rb')
if self._wav.getsampwidth() != 2:
raise ValueError('sample width not supported')
return PcmFormat(
PcmFormat.Endianness.LITTLE,
PcmFormat.SampleType.INT16,
self._wav.getframerate(),
self._wav.getnchannels(),
)
def _read(self, frame_size: int) -> bytes:
if not self._wav:
return b''
pcm_samples = self._wav.readframes(frame_size)
if not pcm_samples and self._bytes_read:
# Loop around.
self._wav.rewind()
self._bytes_read = 0
pcm_samples = self._wav.readframes(frame_size)
self._bytes_read += len(pcm_samples)
return pcm_samples
def _close(self) -> None:
if self._wav:
self._wav.close()
class StreamAudioInput(ThreadedAudioInput):
"""AudioInput where samples are read from a raw PCM stream that may block."""
def __init__(self, stream: BinaryIO, pcm_format: PcmFormat) -> None:
super().__init__()
self._stream = stream
self._pcm_format = pcm_format
def _open(self) -> PcmFormat:
return self._pcm_format
def _read(self, frame_size: int) -> bytes:
return self._stream.read(
frame_size * self._pcm_format.channels * self._pcm_format.bytes_per_sample
)
class FileAudioInput(StreamAudioInput):
"""AudioInput where PCM samples are read from a raw PCM file."""
def __init__(self, filename: str, pcm_format: PcmFormat) -> None:
self._stream = open(filename, "rb")
super().__init__(self._stream, pcm_format)
def _close(self) -> None:
self._stream.close()
class SoundDeviceAudioInput(ThreadedAudioInput):
def __init__(self, device_name: str, pcm_format: PcmFormat) -> None:
super().__init__()
self._device = int(device_name) if device_name else None
self._pcm_format = pcm_format
self._stream: sounddevice.RawInputStream | None = None
def _open(self) -> PcmFormat:
import sounddevice # pylint: disable=import-error
self._stream = sounddevice.RawInputStream(
samplerate=self._pcm_format.sample_rate,
device=self._device,
channels=self._pcm_format.channels,
dtype='int16',
)
self._stream.start()
return PcmFormat(
PcmFormat.Endianness.LITTLE,
PcmFormat.SampleType.INT16,
self._pcm_format.sample_rate,
2,
)
def _read(self, frame_size: int) -> bytes:
if not self._stream:
return b''
pcm_buffer, overflowed = self._stream.read(frame_size)
if overflowed:
logger.warning("input overflow")
# Convert the buffer to stereo if needed
if self._pcm_format.channels == 1:
stereo_buffer = bytearray()
for i in range(frame_size):
sample = pcm_buffer[i * 2 : i * 2 + 2]
stereo_buffer += sample + sample
return stereo_buffer
return bytes(pcm_buffer)
def _close(self):
self._stream.stop()
self._stream = None

View File

@@ -154,15 +154,17 @@ class Controller:
'0000000060000000'
) # BR/EDR Not Supported, LE Supported (Controller)
self.manufacturer_name = 0xFFFF
self.hc_data_packet_length = 27
self.hc_total_num_data_packets = 64
self.hc_le_data_packet_length = 27
self.hc_total_num_le_data_packets = 64
self.acl_data_packet_length = 27
self.total_num_acl_data_packets = 64
self.le_acl_data_packet_length = 27
self.total_num_le_acl_data_packets = 64
self.iso_data_packet_length = 960
self.total_num_iso_data_packets = 64
self.event_mask = 0
self.event_mask_page_2 = 0
self.supported_commands = bytes.fromhex(
'2000800000c000000000e4000000a822000000000000040000f7ffff7f000000'
'30f0f9ff01008004000000000000000000000000000000000000000000000000'
'30f0f9ff01008004002000000000000000000000000000000000000000000000'
)
self.le_event_mask = 0
self.advertising_parameters = None
@@ -314,7 +316,7 @@ class Controller:
f'{color("CONTROLLER -> HOST", "green")}: {packet}'
)
if self.host:
self.host.on_packet(packet.to_bytes())
self.host.on_packet(bytes(packet))
# This method allows the controller to emulate the same API as a transport source
async def wait_for_termination(self):
@@ -1181,9 +1183,9 @@ class Controller:
return struct.pack(
'<BHBHH',
HCI_SUCCESS,
self.hc_data_packet_length,
self.acl_data_packet_length,
0,
self.hc_total_num_data_packets,
self.total_num_acl_data_packets,
0,
)
@@ -1192,7 +1194,7 @@ class Controller:
See Bluetooth spec Vol 4, Part E - 7.4.6 Read BD_ADDR Command
'''
bd_addr = (
self._public_address.to_bytes()
bytes(self._public_address)
if self._public_address is not None
else bytes(6)
)
@@ -1212,8 +1214,21 @@ class Controller:
return struct.pack(
'<BHB',
HCI_SUCCESS,
self.hc_le_data_packet_length,
self.hc_total_num_le_data_packets,
self.le_acl_data_packet_length,
self.total_num_le_acl_data_packets,
)
def on_hci_le_read_buffer_size_v2_command(self, _command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.2 LE Read Buffer Size Command
'''
return struct.pack(
'<BHBHB',
HCI_SUCCESS,
self.le_acl_data_packet_length,
self.total_num_le_acl_data_packets,
self.iso_data_packet_length,
self.total_num_iso_data_packets,
)
def on_hci_le_read_local_supported_features_command(self, _command):
@@ -1543,6 +1558,41 @@ class Controller:
}
return bytes([HCI_SUCCESS])
def on_hci_le_set_advertising_set_random_address_command(self, _command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.52 LE Set Advertising Set Random Address
Command
'''
return bytes([HCI_SUCCESS])
def on_hci_le_set_extended_advertising_parameters_command(self, _command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.53 LE Set Extended Advertising Parameters
Command
'''
return bytes([HCI_SUCCESS, 0])
def on_hci_le_set_extended_advertising_data_command(self, _command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.54 LE Set Extended Advertising Data
Command
'''
return bytes([HCI_SUCCESS])
def on_hci_le_set_extended_scan_response_data_command(self, _command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.55 LE Set Extended Scan Response Data
Command
'''
return bytes([HCI_SUCCESS])
def on_hci_le_set_extended_advertising_enable_command(self, _command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.56 LE Set Extended Advertising Enable
Command
'''
return bytes([HCI_SUCCESS])
def on_hci_le_read_maximum_advertising_data_length_command(self, _command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.57 LE Read Maximum Advertising Data
@@ -1557,6 +1607,27 @@ class Controller:
'''
return struct.pack('<BB', HCI_SUCCESS, 0xF0)
def on_hci_le_set_periodic_advertising_parameters_command(self, _command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.61 LE Set Periodic Advertising Parameters
Command
'''
return bytes([HCI_SUCCESS])
def on_hci_le_set_periodic_advertising_data_command(self, _command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.62 LE Set Periodic Advertising Data
Command
'''
return bytes([HCI_SUCCESS])
def on_hci_le_set_periodic_advertising_enable_command(self, _command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.63 LE Set Periodic Advertising Enable
Command
'''
return bytes([HCI_SUCCESS])
def on_hci_le_read_transmit_power_command(self, _command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.74 LE Read Transmit Power Command

View File

@@ -1501,7 +1501,10 @@ class AdvertisingData:
ad_data_str = f'"{ad_data.decode("utf-8")}"'
elif ad_type == AdvertisingData.COMPLETE_LOCAL_NAME:
ad_type_str = 'Complete Local Name'
ad_data_str = f'"{ad_data.decode("utf-8")}"'
try:
ad_data_str = f'"{ad_data.decode("utf-8")}"'
except UnicodeDecodeError:
ad_data_str = ad_data.hex()
elif ad_type == AdvertisingData.TX_POWER_LEVEL:
ad_type_str = 'TX Power Level'
ad_data_str = str(ad_data[0])

View File

@@ -17,7 +17,8 @@
# -----------------------------------------------------------------------------
from __future__ import annotations
import asyncio
from collections.abc import Iterable
import collections
from collections.abc import Iterable, Sequence
from contextlib import (
asynccontextmanager,
AsyncExitStack,
@@ -36,10 +37,9 @@ from typing import (
Any,
Callable,
ClassVar,
Deque,
Dict,
List,
Optional,
Tuple,
Type,
TypeVar,
Union,
@@ -54,7 +54,7 @@ from pyee import EventEmitter
from .colors import color
from .att import ATT_CID, ATT_DEFAULT_MTU, ATT_PDU
from .gatt import Characteristic, Descriptor, Service
from .host import Host
from .host import DataPacketQueue, Host
from .profiles.gap import GenericAccessService
from .core import (
BT_BR_EDR_TRANSPORT,
@@ -119,6 +119,8 @@ DEVICE_MIN_LE_RSSI = -127
DEVICE_MAX_LE_RSSI = 20
DEVICE_MIN_EXTENDED_ADVERTISING_SET_HANDLE = 0x00
DEVICE_MAX_EXTENDED_ADVERTISING_SET_HANDLE = 0xEF
DEVICE_MIN_BIG_HANDLE = 0x00
DEVICE_MAX_BIG_HANDLE = 0xEF
DEVICE_DEFAULT_ADDRESS = '00:00:00:00:00:00'
DEVICE_DEFAULT_ADVERTISING_INTERVAL = 1000 # ms
@@ -557,8 +559,15 @@ class AdvertisingParameters:
# -----------------------------------------------------------------------------
@dataclass
class PeriodicAdvertisingParameters:
# TODO implement this class
pass
periodic_advertising_interval_min: int = DEVICE_DEFAULT_ADVERTISING_INTERVAL
periodic_advertising_interval_max: int = DEVICE_DEFAULT_ADVERTISING_INTERVAL
periodic_advertising_properties: (
hci.HCI_LE_Set_Periodic_Advertising_Parameters_Command.Properties
) = field(
default_factory=lambda: hci.HCI_LE_Set_Periodic_Advertising_Parameters_Command.Properties(
0
)
)
# -----------------------------------------------------------------------------
@@ -575,6 +584,7 @@ class AdvertisingSet(EventEmitter):
periodic_advertising_data: bytes
selected_tx_power: int = 0
enabled: bool = False
periodic_enabled: bool = False
def __post_init__(self) -> None:
super().__init__()
@@ -603,7 +613,7 @@ class AdvertisingSet(EventEmitter):
int(advertising_parameters.primary_advertising_interval_min / 0.625)
),
primary_advertising_interval_max=(
int(advertising_parameters.primary_advertising_interval_min / 0.625)
int(advertising_parameters.primary_advertising_interval_max / 0.625)
),
primary_advertising_channel_map=int(
advertising_parameters.primary_advertising_channel_map
@@ -671,10 +681,26 @@ class AdvertisingSet(EventEmitter):
async def set_periodic_advertising_parameters(
self, advertising_parameters: PeriodicAdvertisingParameters
) -> None:
await self.device.send_command(
hci.HCI_LE_Set_Periodic_Advertising_Parameters_Command(
advertising_handle=self.advertising_handle,
periodic_advertising_interval_min=advertising_parameters.periodic_advertising_interval_min,
periodic_advertising_interval_max=advertising_parameters.periodic_advertising_interval_max,
periodic_advertising_properties=advertising_parameters.periodic_advertising_properties,
),
check_result=True,
)
self.periodic_advertising_parameters = advertising_parameters
async def set_periodic_advertising_data(self, advertising_data: bytes) -> None:
# TODO: send command
await self.device.send_command(
hci.HCI_LE_Set_Periodic_Advertising_Data_Command(
advertising_handle=self.advertising_handle,
operation=hci.HCI_LE_Set_Extended_Advertising_Data_Command.Operation.COMPLETE_DATA,
advertising_data=advertising_data,
),
check_result=True,
)
self.periodic_advertising_data = advertising_data
async def set_random_address(self, random_address: hci.Address) -> None:
@@ -712,17 +738,6 @@ class AdvertisingSet(EventEmitter):
self.emit('start')
async def start_periodic(self, include_adi: bool = False) -> None:
await self.device.send_command(
hci.HCI_LE_Set_Periodic_Advertising_Enable_Command(
enable=1 | (2 if include_adi else 0),
advertising_handles=self.advertising_handle,
),
check_result=True,
)
self.emit('start_periodic')
async def stop(self) -> None:
await self.device.send_command(
hci.HCI_LE_Set_Extended_Advertising_Enable_Command(
@@ -737,14 +752,31 @@ class AdvertisingSet(EventEmitter):
self.emit('stop')
async def stop_periodic(self) -> None:
async def start_periodic(self, include_adi: bool = False) -> None:
if self.periodic_enabled:
return
await self.device.send_command(
hci.HCI_LE_Set_Periodic_Advertising_Enable_Command(
enable=0,
advertising_handles=self.advertising_handle,
enable=1 | (2 if include_adi else 0),
advertising_handle=self.advertising_handle,
),
check_result=True,
)
self.periodic_enabled = True
self.emit('start_periodic')
async def stop_periodic(self) -> None:
if not self.periodic_enabled:
return
await self.device.send_command(
hci.HCI_LE_Set_Periodic_Advertising_Enable_Command(
enable=0,
advertising_handle=self.advertising_handle,
),
check_result=True,
)
self.periodic_enabled = False
self.emit('stop_periodic')
@@ -962,6 +994,130 @@ class PeriodicAdvertisingSync(EventEmitter):
)
# -----------------------------------------------------------------------------
@dataclass
class BigParameters:
num_bis: int
sdu_interval: int
max_sdu: int
max_transport_latency: int
rtn: int
phy: hci.PhyBit = hci.PhyBit.LE_2M
packing: int = 0
framing: int = 0
broadcast_code: bytes | None = None
# -----------------------------------------------------------------------------
@dataclass
class Big(EventEmitter):
class State(IntEnum):
PENDING = 0
ACTIVE = 1
TERMINATED = 2
class Event(str, Enum):
ESTABLISHMENT = 'establishment'
ESTABLISHMENT_FAILURE = 'establishment_failure'
TERMINATION = 'termination'
big_handle: int
advertising_set: AdvertisingSet
parameters: BigParameters
state: State = State.PENDING
# Attributes provided by BIG Create Complete event
big_sync_delay: int = 0
transport_latency_big: int = 0
phy: int = 0
nse: int = 0
bn: int = 0
pto: int = 0
irc: int = 0
max_pdu: int = 0
iso_interval: int = 0
bis_links: Sequence[BisLink] = ()
def __post_init__(self) -> None:
super().__init__()
self.device = self.advertising_set.device
async def terminate(
self,
reason: int = hci.HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR,
) -> None:
if self.state != Big.State.ACTIVE:
logger.error('BIG %d is not active.', self.big_handle)
return
with closing(EventWatcher()) as watcher:
terminated = asyncio.Event()
watcher.once(self, Big.Event.TERMINATION, lambda _: terminated.set())
await self.device.send_command(
hci.HCI_LE_Terminate_BIG_Command(
big_handle=self.big_handle, reason=reason
),
check_result=True,
)
await terminated.wait()
# -----------------------------------------------------------------------------
@dataclass
class BigSyncParameters:
big_sync_timeout: int
bis: Sequence[int]
mse: int = 0
broadcast_code: bytes | None = None
# -----------------------------------------------------------------------------
@dataclass
class BigSync(EventEmitter):
class State(IntEnum):
PENDING = 0
ACTIVE = 1
TERMINATED = 2
class Event(str, Enum):
ESTABLISHMENT = 'establishment'
ESTABLISHMENT_FAILURE = 'establishment_failure'
TERMINATION = 'termination'
big_handle: int
pa_sync: PeriodicAdvertisingSync
parameters: BigSyncParameters
state: State = State.PENDING
# Attributes provided by BIG Create Sync Complete event
transport_latency_big: int = 0
nse: int = 0
bn: int = 0
pto: int = 0
irc: int = 0
max_pdu: int = 0
iso_interval: int = 0
bis_links: Sequence[BisLink] = ()
def __post_init__(self) -> None:
super().__init__()
self.device = self.pa_sync.device
async def terminate(self) -> None:
if self.state != BigSync.State.ACTIVE:
logger.error('BIG Sync %d is not active.', self.big_handle)
return
with closing(EventWatcher()) as watcher:
terminated = asyncio.Event()
watcher.once(self, BigSync.Event.TERMINATION, lambda _: terminated.set())
await self.device.send_command(
hci.HCI_LE_BIG_Terminate_Sync_Command(big_handle=self.big_handle),
check_result=True,
)
await terminated.wait()
# -----------------------------------------------------------------------------
class LePhyOptions:
# Coded PHY preference
@@ -989,7 +1145,7 @@ class Peer:
connection.gatt_client = self.gatt_client
@property
def services(self) -> List[gatt_client.ServiceProxy]:
def services(self) -> list[gatt_client.ServiceProxy]:
return self.gatt_client.services
async def request_mtu(self, mtu: int) -> int:
@@ -999,24 +1155,24 @@ class Peer:
async def discover_service(
self, uuid: Union[core.UUID, str]
) -> List[gatt_client.ServiceProxy]:
) -> list[gatt_client.ServiceProxy]:
return await self.gatt_client.discover_service(uuid)
async def discover_services(
self, uuids: Iterable[core.UUID] = ()
) -> List[gatt_client.ServiceProxy]:
) -> list[gatt_client.ServiceProxy]:
return await self.gatt_client.discover_services(uuids)
async def discover_included_services(
self, service: gatt_client.ServiceProxy
) -> List[gatt_client.ServiceProxy]:
) -> list[gatt_client.ServiceProxy]:
return await self.gatt_client.discover_included_services(service)
async def discover_characteristics(
self,
uuids: Iterable[Union[core.UUID, str]] = (),
service: Optional[gatt_client.ServiceProxy] = None,
) -> List[gatt_client.CharacteristicProxy]:
) -> list[gatt_client.CharacteristicProxy]:
return await self.gatt_client.discover_characteristics(
uuids=uuids, service=service
)
@@ -1031,7 +1187,7 @@ class Peer:
characteristic, start_handle, end_handle
)
async def discover_attributes(self) -> List[gatt_client.AttributeProxy]:
async def discover_attributes(self) -> list[gatt_client.AttributeProxy]:
return await self.gatt_client.discover_attributes()
async def discover_all(self):
@@ -1075,17 +1231,17 @@ class Peer:
async def read_characteristics_by_uuid(
self, uuid: core.UUID, service: Optional[gatt_client.ServiceProxy] = None
) -> List[bytes]:
) -> list[bytes]:
return await self.gatt_client.read_characteristics_by_uuid(uuid, service)
def get_services_by_uuid(self, uuid: core.UUID) -> List[gatt_client.ServiceProxy]:
def get_services_by_uuid(self, uuid: core.UUID) -> list[gatt_client.ServiceProxy]:
return self.gatt_client.get_services_by_uuid(uuid)
def get_characteristics_by_uuid(
self,
uuid: core.UUID,
service: Optional[Union[gatt_client.ServiceProxy, core.UUID]] = None,
) -> List[gatt_client.CharacteristicProxy]:
) -> list[gatt_client.CharacteristicProxy]:
if isinstance(service, core.UUID):
return list(
itertools.chain(
@@ -1171,9 +1327,82 @@ class ScoLink(CompositeEventEmitter):
await self.device.disconnect(self, reason)
# -----------------------------------------------------------------------------
class _IsoLink:
handle: int
device: Device
sink: Callable[[hci.HCI_IsoDataPacket], Any] | None = None
class Direction(IntEnum):
HOST_TO_CONTROLLER = (
hci.HCI_LE_Setup_ISO_Data_Path_Command.Direction.HOST_TO_CONTROLLER
)
CONTROLLER_TO_HOST = (
hci.HCI_LE_Setup_ISO_Data_Path_Command.Direction.CONTROLLER_TO_HOST
)
async def setup_data_path(
self,
direction: _IsoLink.Direction,
data_path_id: int = 0,
codec_id: hci.CodingFormat | None = None,
controller_delay: int = 0,
codec_configuration: bytes = b'',
) -> None:
"""Create a data path between controller and given entry.
Args:
direction: Direction of data path.
data_path_id: ID of data path. Default is 0 (HCI).
codec_id: Codec ID. Default is Transparent.
controller_delay: Controller delay in microseconds. Default is 0.
codec_configuration: Codec-specific configuration.
Raises:
HCI_Error: When command complete status is not HCI_SUCCESS.
"""
await self.device.send_command(
hci.HCI_LE_Setup_ISO_Data_Path_Command(
connection_handle=self.handle,
data_path_direction=direction,
data_path_id=data_path_id,
codec_id=codec_id or hci.CodingFormat(hci.CodecID.TRANSPARENT),
controller_delay=controller_delay,
codec_configuration=codec_configuration,
),
check_result=True,
)
async def remove_data_path(self, direction: _IsoLink.Direction) -> int:
"""Remove a data path with controller on given direction.
Args:
direction: Direction of data path.
Returns:
Command status.
"""
response = await self.device.send_command(
hci.HCI_LE_Remove_ISO_Data_Path_Command(
connection_handle=self.handle,
data_path_direction=direction,
),
check_result=False,
)
return response.return_parameters.status
def write(self, sdu: bytes) -> None:
"""Write an ISO SDU."""
self.device.host.send_iso_sdu(connection_handle=self.handle, sdu=sdu)
@property
def data_packet_queue(self) -> DataPacketQueue | None:
return self.device.host.get_data_packet_queue(self.handle)
# -----------------------------------------------------------------------------
@dataclass
class CisLink(CompositeEventEmitter):
class CisLink(CompositeEventEmitter, _IsoLink):
class State(IntEnum):
PENDING = 0
ESTABLISHED = 1
@@ -1184,7 +1413,7 @@ class CisLink(CompositeEventEmitter):
cis_id: int # CIS ID assigned by Central device
cig_id: int # CIG ID assigned by Central device
state: State = State.PENDING
sink: Optional[Callable[[hci.HCI_IsoDataPacket], Any]] = None
sink: Callable[[hci.HCI_IsoDataPacket], Any] | None = None
def __post_init__(self) -> None:
super().__init__()
@@ -1195,6 +1424,60 @@ class CisLink(CompositeEventEmitter):
await self.device.disconnect(self, reason)
# -----------------------------------------------------------------------------
@dataclass
class BisLink(_IsoLink):
handle: int
big: Big | BigSync
sink: Callable[[hci.HCI_IsoDataPacket], Any] | None = None
def __post_init__(self) -> None:
self.device = self.big.device
# -----------------------------------------------------------------------------
class IsoPacketStream:
"""Async stream that can write SDUs to a CIS or BIS, with a maximum queue size."""
iso_link: _IsoLink
data_packet_queue: DataPacketQueue
def __init__(self, iso_link: _IsoLink, max_queue_size: int) -> None:
if iso_link.data_packet_queue is None:
raise ValueError('link has no data packet queue')
self.iso_link = iso_link
self.data_packet_queue = iso_link.data_packet_queue
self.data_packet_queue.on('flow', self._on_flow)
self._thresholds: Deque[int] = collections.deque()
self._semaphore = asyncio.Semaphore(max_queue_size)
def _on_flow(self) -> None:
# Release the semaphore once for each completed packet.
while (
self._thresholds and self.data_packet_queue.completed >= self._thresholds[0]
):
self._thresholds.popleft()
self._semaphore.release()
async def write(self, sdu: bytes) -> None:
"""
Write an SDU to the queue.
This method blocks until there are fewer than max_queue_size packets queued
but not yet completed.
"""
# Wait until there's space in the queue.
await self._semaphore.acquire()
# Queue the packet.
self.iso_link.write(sdu)
# Remember the position of the packet so we can know when it is completed.
self._thresholds.append(self.data_packet_queue.queued)
# -----------------------------------------------------------------------------
class Connection(CompositeEventEmitter):
device: Device
@@ -1440,6 +1723,10 @@ class Connection(CompositeEventEmitter):
self.peer_le_features = await self.device.get_remote_le_features(self)
return self.peer_le_features
@property
def data_packet_queue(self) -> DataPacketQueue | None:
return self.device.host.get_data_packet_queue(self.handle)
async def __aenter__(self):
return self
@@ -1507,7 +1794,7 @@ class DeviceConfiguration:
io_capability: int = pairing.PairingDelegate.IoCapability.NO_OUTPUT_NO_INPUT
def __post_init__(self) -> None:
self.gatt_services: List[Dict[str, Any]] = []
self.gatt_services: list[Dict[str, Any]] = []
def load_from_dict(self, config: Dict[str, Any]) -> None:
config = copy.deepcopy(config)
@@ -1542,6 +1829,10 @@ class DeviceConfiguration:
)
)
# Load scan response data
if scan_response_data := config.pop('scan_response_data', None):
self.scan_response_data = bytes.fromhex(scan_response_data)
# Load advertising interval (for backward compatibility)
if advertising_interval := config.pop('advertising_interval', None):
self.advertising_interval_min = advertising_interval
@@ -1650,7 +1941,7 @@ def host_event_handler(function):
# List of host event handlers for the Device class.
# (we define this list outside the class, because referencing a class in method
# decorators is not straightforward)
device_host_event_handlers: List[str] = []
device_host_event_handlers: list[str] = []
# -----------------------------------------------------------------------------
@@ -1671,15 +1962,18 @@ class Device(CompositeEventEmitter):
pending_connections: Dict[hci.Address, Connection]
classic_pending_accepts: Dict[
hci.Address,
List[asyncio.Future[Union[Connection, Tuple[hci.Address, int, int]]]],
list[asyncio.Future[Union[Connection, tuple[hci.Address, int, int]]]],
]
advertisement_accumulators: Dict[hci.Address, AdvertisementDataAccumulator]
periodic_advertising_syncs: List[PeriodicAdvertisingSync]
periodic_advertising_syncs: list[PeriodicAdvertisingSync]
config: DeviceConfiguration
legacy_advertiser: Optional[LegacyAdvertiser]
sco_links: Dict[int, ScoLink]
cis_links: Dict[int, CisLink]
_pending_cis: Dict[int, Tuple[int, int]]
bigs = dict[int, Big]()
bis_links = dict[int, BisLink]()
big_syncs = dict[int, BigSync]()
_pending_cis: Dict[int, tuple[int, int]]
@composite_listener
class Listener:
@@ -1952,7 +2246,7 @@ class Device(CompositeEventEmitter):
check_address_type: bool = False,
) -> Optional[Connection]:
for connection in self.connections.values():
if connection.peer_address.to_bytes() == bd_addr.to_bytes():
if bytes(connection.peer_address) == bytes(bd_addr):
if (
check_address_type
and connection.peer_address.address_type != bd_addr.address_type
@@ -1975,6 +2269,17 @@ class Device(CompositeEventEmitter):
None,
)
def next_big_handle(self) -> int | None:
return next(
(
handle
for handle in range(DEVICE_MIN_BIG_HANDLE, DEVICE_MAX_BIG_HANDLE + 1)
if handle
not in itertools.chain(self.bigs.keys(), self.big_syncs.keys())
),
None,
)
@deprecated("Please use create_l2cap_server()")
def register_l2cap_server(self, psm, server) -> int:
return self.l2cap_channel_manager.register_server(psm, server)
@@ -2460,14 +2765,27 @@ class Device(CompositeEventEmitter):
if advertising_parameters is None:
advertising_parameters = AdvertisingParameters()
if periodic_advertising_data and periodic_advertising_parameters is None:
periodic_advertising_parameters = PeriodicAdvertisingParameters()
if (
not advertising_parameters.advertising_event_properties.is_legacy
and advertising_data
and scan_response_data
):
raise InvalidArgumentError(
"Extended advertisements can't have both data and scan \
response data"
"Extended advertisements can't have both data and scan response data"
)
if periodic_advertising_parameters and (
advertising_parameters.advertising_event_properties.is_connectable
or advertising_parameters.advertising_event_properties.is_scannable
or advertising_parameters.advertising_event_properties.is_anonymous
or advertising_parameters.advertising_event_properties.is_legacy
):
raise InvalidArgumentError(
"Periodic advertising set cannot be connectable, scannable, anonymous,"
"or legacy"
)
# Allocate a new handle
@@ -2522,12 +2840,14 @@ class Device(CompositeEventEmitter):
await advertising_set.set_scan_response_data(scan_response_data)
if periodic_advertising_parameters:
# TODO: call LE Set Periodic Advertising Parameters command
raise NotImplementedError('periodic advertising not yet supported')
await advertising_set.set_periodic_advertising_parameters(
periodic_advertising_parameters
)
if periodic_advertising_data:
# TODO: call LE Set Periodic Advertising Data command
raise NotImplementedError('periodic advertising not yet supported')
await advertising_set.set_periodic_advertising_data(
periodic_advertising_data
)
except hci.HCI_Error as error:
# Remove the advertising set so that it doesn't stay dangling in the
@@ -2578,7 +2898,7 @@ class Device(CompositeEventEmitter):
scan_window: int = DEVICE_DEFAULT_SCAN_WINDOW, # Scan window in ms
own_address_type: int = hci.OwnAddressType.RANDOM,
filter_duplicates: bool = False,
scanning_phys: List[int] = [hci.HCI_LE_1M_PHY, hci.HCI_LE_CODED_PHY],
scanning_phys: Sequence[int] = (hci.HCI_LE_1M_PHY, hci.HCI_LE_CODED_PHY),
) -> None:
# Check that the arguments are legal
if scan_interval < scan_window:
@@ -2625,7 +2945,7 @@ class Device(CompositeEventEmitter):
scanning_filter_policy=scanning_filter_policy,
scanning_phys=scanning_phys_bits,
scan_types=[scan_type] * scanning_phy_count,
scan_intervals=[int(scan_window / 0.625)] * scanning_phy_count,
scan_intervals=[int(scan_interval / 0.625)] * scanning_phy_count,
scan_windows=[int(scan_window / 0.625)] * scanning_phy_count,
),
check_result=True,
@@ -3909,13 +4229,13 @@ class Device(CompositeEventEmitter):
async def setup_cig(
self,
cig_id: int,
cis_id: List[int],
sdu_interval: Tuple[int, int],
cis_id: Sequence[int],
sdu_interval: tuple[int, int],
framing: int,
max_sdu: Tuple[int, int],
max_sdu: tuple[int, int],
retransmission_number: int,
max_transport_latency: Tuple[int, int],
) -> List[int]:
max_transport_latency: tuple[int, int],
) -> list[int]:
"""Sends hci.HCI_LE_Set_CIG_Parameters_Command.
Args:
@@ -3964,7 +4284,9 @@ class Device(CompositeEventEmitter):
# [LE only]
@experimental('Only for testing.')
async def create_cis(self, cis_acl_pairs: List[Tuple[int, int]]) -> List[CisLink]:
async def create_cis(
self, cis_acl_pairs: Sequence[tuple[int, int]]
) -> list[CisLink]:
for cis_handle, acl_handle in cis_acl_pairs:
acl_connection = self.lookup_connection(acl_handle)
assert acl_connection
@@ -4063,6 +4385,106 @@ class Device(CompositeEventEmitter):
check_result=True,
)
# [LE only]
@experimental('Only for testing.')
async def create_big(
self, advertising_set: AdvertisingSet, parameters: BigParameters
) -> Big:
if (big_handle := self.next_big_handle()) is None:
raise core.OutOfResourcesError("All valid BIG handles already in use")
with closing(EventWatcher()) as watcher:
big = Big(
big_handle=big_handle,
parameters=parameters,
advertising_set=advertising_set,
)
self.bigs[big_handle] = big
established = asyncio.get_running_loop().create_future()
watcher.once(
big, big.Event.ESTABLISHMENT, lambda: established.set_result(None)
)
watcher.once(
big,
big.Event.ESTABLISHMENT_FAILURE,
lambda status: established.set_exception(hci.HCI_Error(status)),
)
try:
await self.send_command(
hci.HCI_LE_Create_BIG_Command(
big_handle=big_handle,
advertising_handle=advertising_set.advertising_handle,
num_bis=parameters.num_bis,
sdu_interval=parameters.sdu_interval,
max_sdu=parameters.max_sdu,
max_transport_latency=parameters.max_transport_latency,
rtn=parameters.rtn,
phy=parameters.phy,
packing=parameters.packing,
framing=parameters.framing,
encryption=1 if parameters.broadcast_code else 0,
broadcast_code=parameters.broadcast_code or bytes(16),
),
check_result=True,
)
await established
except hci.HCI_Error:
del self.bigs[big_handle]
raise
return big
# [LE only]
@experimental('Only for testing.')
async def create_big_sync(
self, pa_sync: PeriodicAdvertisingSync, parameters: BigSyncParameters
) -> BigSync:
if (big_handle := self.next_big_handle()) is None:
raise core.OutOfResourcesError("All valid BIG handles already in use")
if (pa_sync_handle := pa_sync.sync_handle) is None:
raise core.InvalidStateError("PA Sync is not established")
with closing(EventWatcher()) as watcher:
big_sync = BigSync(
big_handle=big_handle,
parameters=parameters,
pa_sync=pa_sync,
)
self.big_syncs[big_handle] = big_sync
established = asyncio.get_running_loop().create_future()
watcher.once(
big_sync,
big_sync.Event.ESTABLISHMENT,
lambda: established.set_result(None),
)
watcher.once(
big_sync,
big_sync.Event.ESTABLISHMENT_FAILURE,
lambda status: established.set_exception(hci.HCI_Error(status)),
)
try:
await self.send_command(
hci.HCI_LE_BIG_Create_Sync_Command(
big_handle=big_handle,
sync_handle=pa_sync_handle,
encryption=1 if parameters.broadcast_code else 0,
broadcast_code=parameters.broadcast_code or bytes(16),
mse=parameters.mse,
big_sync_timeout=parameters.big_sync_timeout,
bis=parameters.bis,
),
check_result=True,
)
await established
except hci.HCI_Error:
del self.big_syncs[big_handle]
raise
return big_sync
async def get_remote_le_features(self, connection: Connection) -> hci.LeFeatureMask:
"""[LE Only] Reads remote LE supported features.
@@ -4184,6 +4606,112 @@ class Device(CompositeEventEmitter):
)
self.connecting_extended_advertising_sets[connection_handle] = advertising_set
@host_event_handler
def on_big_establishment(
self,
status: int,
big_handle: int,
bis_handles: list[int],
big_sync_delay: int,
transport_latency_big: int,
phy: int,
nse: int,
bn: int,
pto: int,
irc: int,
max_pdu: int,
iso_interval: int,
) -> None:
if not (big := self.bigs.get(big_handle)):
logger.warning('BIG %d not found', big_handle)
return
if status != hci.HCI_SUCCESS:
del self.bigs[big_handle]
logger.debug('Unable to create BIG %d', big_handle)
big.state = Big.State.TERMINATED
big.emit(Big.Event.ESTABLISHMENT_FAILURE, status)
return
big.bis_links = [BisLink(handle=handle, big=big) for handle in bis_handles]
big.big_sync_delay = big_sync_delay
big.transport_latency_big = transport_latency_big
big.phy = phy
big.nse = nse
big.bn = bn
big.pto = pto
big.irc = irc
big.max_pdu = max_pdu
big.iso_interval = iso_interval
big.state = Big.State.ACTIVE
for bis_link in big.bis_links:
self.bis_links[bis_link.handle] = bis_link
big.emit(Big.Event.ESTABLISHMENT)
@host_event_handler
def on_big_termination(self, reason: int, big_handle: int) -> None:
if not (big := self.bigs.pop(big_handle, None)):
logger.warning('BIG %d not found', big_handle)
return
big.state = Big.State.TERMINATED
for bis_link in big.bis_links:
self.bis_links.pop(bis_link.handle, None)
big.emit(Big.Event.TERMINATION, reason)
@host_event_handler
def on_big_sync_establishment(
self,
status: int,
big_handle: int,
transport_latency_big: int,
nse: int,
bn: int,
pto: int,
irc: int,
max_pdu: int,
iso_interval: int,
bis_handles: list[int],
) -> None:
if not (big_sync := self.big_syncs.get(big_handle)):
logger.warning('BIG Sync %d not found', big_handle)
return
if status != hci.HCI_SUCCESS:
del self.big_syncs[big_handle]
logger.debug('Unable to create BIG Sync %d', big_handle)
big_sync.state = BigSync.State.TERMINATED
big_sync.emit(BigSync.Event.ESTABLISHMENT_FAILURE, status)
return
big_sync.transport_latency_big = transport_latency_big
big_sync.nse = nse
big_sync.bn = bn
big_sync.pto = pto
big_sync.irc = irc
big_sync.max_pdu = max_pdu
big_sync.iso_interval = iso_interval
big_sync.bis_links = [
BisLink(handle=handle, big=big_sync) for handle in bis_handles
]
big_sync.state = BigSync.State.ACTIVE
for bis_link in big_sync.bis_links:
self.bis_links[bis_link.handle] = bis_link
big_sync.emit(BigSync.Event.ESTABLISHMENT)
@host_event_handler
def on_big_sync_lost(self, big_handle: int, reason: int) -> None:
if not (big_sync := self.big_syncs.pop(big_handle, None)):
logger.warning('BIG %d not found', big_handle)
return
for bis_link in big_sync.bis_links:
self.bis_links.pop(bis_link.handle, None)
big_sync.state = BigSync.State.TERMINATED
big_sync.emit(BigSync.Event.TERMINATION, reason)
def _complete_le_extended_advertising_connection(
self, connection: Connection, advertising_set: AdvertisingSet
) -> None:
@@ -4830,6 +5358,8 @@ class Device(CompositeEventEmitter):
def on_iso_packet(self, handle: int, packet: hci.HCI_IsoDataPacket) -> None:
if (cis_link := self.cis_links.get(handle)) and cis_link.sink:
cis_link.sink(packet)
elif (bis_link := self.bis_links.get(handle)) and bis_link.sink:
bis_link.sink(packet)
@host_event_handler
@with_connection_from_handle

View File

@@ -20,6 +20,8 @@ Common types for drivers.
# -----------------------------------------------------------------------------
import abc
from bumble import core
# -----------------------------------------------------------------------------
# Classes

View File

@@ -11,18 +11,33 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Support for Intel USB controllers.
Loosely based on the Fuchsia OS implementation.
"""
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import asyncio
import collections
import dataclasses
import logging
import os
import pathlib
import platform
import struct
from typing import Any, Deque, Optional, TYPE_CHECKING
from bumble import core
from bumble.drivers import common
from bumble.hci import (
hci_vendor_command_op_code, # type: ignore
HCI_Command,
HCI_Reset_Command,
)
from bumble import hci
from bumble import utils
if TYPE_CHECKING:
from bumble.host import Host
# -----------------------------------------------------------------------------
# Logging
@@ -34,39 +49,328 @@ logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
INTEL_USB_PRODUCTS = {
# Intel AX210
(0x8087, 0x0032),
# Intel BE200
(0x8087, 0x0036),
(0x8087, 0x0032), # AX210
(0x8087, 0x0036), # BE200
}
INTEL_FW_IMAGE_NAMES = [
"ibt-0040-0041",
"ibt-0040-1020",
"ibt-0040-1050",
"ibt-0040-2120",
"ibt-0040-4150",
"ibt-0041-0041",
"ibt-0180-0041",
"ibt-0180-1050",
"ibt-0180-4150",
"ibt-0291-0291",
"ibt-1040-0041",
"ibt-1040-1020",
"ibt-1040-1050",
"ibt-1040-2120",
"ibt-1040-4150",
]
INTEL_FIRMWARE_DIR_ENV = "BUMBLE_INTEL_FIRMWARE_DIR"
INTEL_LINUX_FIRMWARE_DIR = "/lib/firmware/intel"
_MAX_FRAGMENT_SIZE = 252
_POST_RESET_DELAY = 0.2
# -----------------------------------------------------------------------------
# HCI Commands
# -----------------------------------------------------------------------------
HCI_INTEL_DDC_CONFIG_WRITE_COMMAND = hci_vendor_command_op_code(0xFC8B) # type: ignore
HCI_INTEL_DDC_CONFIG_WRITE_PAYLOAD = [0x03, 0xE4, 0x02, 0x00]
HCI_INTEL_WRITE_DEVICE_CONFIG_COMMAND = hci.hci_vendor_command_op_code(0x008B)
HCI_INTEL_READ_VERSION_COMMAND = hci.hci_vendor_command_op_code(0x0005)
HCI_INTEL_RESET_COMMAND = hci.hci_vendor_command_op_code(0x0001)
HCI_INTEL_SECURE_SEND_COMMAND = hci.hci_vendor_command_op_code(0x0009)
HCI_INTEL_WRITE_BOOT_PARAMS_COMMAND = hci.hci_vendor_command_op_code(0x000E)
HCI_Command.register_commands(globals())
hci.HCI_Command.register_commands(globals())
@HCI_Command.command( # type: ignore
fields=[("params", "*")],
@hci.HCI_Command.command(
fields=[
("param0", 1),
],
return_parameters_fields=[
("params", "*"),
("status", hci.STATUS_SPEC),
("tlv", "*"),
],
)
class Hci_Intel_DDC_Config_Write_Command(HCI_Command):
class HCI_Intel_Read_Version_Command(hci.HCI_Command):
pass
@hci.HCI_Command.command(
fields=[("data_type", 1), ("data", "*")],
return_parameters_fields=[
("status", 1),
],
)
class Hci_Intel_Secure_Send_Command(hci.HCI_Command):
pass
@hci.HCI_Command.command(
fields=[
("reset_type", 1),
("patch_enable", 1),
("ddc_reload", 1),
("boot_option", 1),
("boot_address", 4),
],
return_parameters_fields=[
("data", "*"),
],
)
class HCI_Intel_Reset_Command(hci.HCI_Command):
pass
@hci.HCI_Command.command(
fields=[("data", "*")],
return_parameters_fields=[
("status", hci.STATUS_SPEC),
("params", "*"),
],
)
class Hci_Intel_Write_Device_Config_Command(hci.HCI_Command):
pass
# -----------------------------------------------------------------------------
# Functions
# -----------------------------------------------------------------------------
def intel_firmware_dir() -> pathlib.Path:
"""
Returns:
A path to a subdir of the project data dir for Intel firmware.
The directory is created if it doesn't exist.
"""
from bumble.drivers import project_data_dir
p = project_data_dir() / "firmware" / "intel"
p.mkdir(parents=True, exist_ok=True)
return p
def _find_binary_path(file_name: str) -> pathlib.Path | None:
# First check if an environment variable is set
if INTEL_FIRMWARE_DIR_ENV in os.environ:
if (
path := pathlib.Path(os.environ[INTEL_FIRMWARE_DIR_ENV]) / file_name
).is_file():
logger.debug(f"{file_name} found in env dir")
return path
# When the environment variable is set, don't look elsewhere
return None
# Then, look where the firmware download tool writes by default
if (path := intel_firmware_dir() / file_name).is_file():
logger.debug(f"{file_name} found in project data dir")
return path
# Then, look in the package's driver directory
if (path := pathlib.Path(__file__).parent / "intel_fw" / file_name).is_file():
logger.debug(f"{file_name} found in package dir")
return path
# On Linux, check the system's FW directory
if (
platform.system() == "Linux"
and (path := pathlib.Path(INTEL_LINUX_FIRMWARE_DIR) / file_name).is_file()
):
logger.debug(f"{file_name} found in Linux system FW dir")
return path
# Finally look in the current directory
if (path := pathlib.Path.cwd() / file_name).is_file():
logger.debug(f"{file_name} found in CWD")
return path
return None
def _parse_tlv(data: bytes) -> list[tuple[ValueType, Any]]:
result: list[tuple[ValueType, Any]] = []
while len(data) >= 2:
value_type = ValueType(data[0])
value_length = data[1]
value = data[2 : 2 + value_length]
typed_value: Any
if value_type == ValueType.END:
break
if value_type in (ValueType.CNVI, ValueType.CNVR):
(v,) = struct.unpack("<I", value)
typed_value = (
(((v >> 0) & 0xF) << 12)
| (((v >> 4) & 0xF) << 0)
| (((v >> 8) & 0xF) << 4)
| (((v >> 24) & 0xF) << 8)
)
elif value_type == ValueType.HARDWARE_INFO:
(v,) = struct.unpack("<I", value)
typed_value = HardwareInfo(
HardwarePlatform((v >> 8) & 0xFF), HardwareVariant((v >> 16) & 0x3F)
)
elif value_type in (
ValueType.USB_VENDOR_ID,
ValueType.USB_PRODUCT_ID,
ValueType.DEVICE_REVISION,
):
(typed_value,) = struct.unpack("<H", value)
elif value_type == ValueType.CURRENT_MODE_OF_OPERATION:
typed_value = ModeOfOperation(value[0])
elif value_type in (
ValueType.BUILD_TYPE,
ValueType.BUILD_NUMBER,
ValueType.SECURE_BOOT,
ValueType.OTP_LOCK,
ValueType.API_LOCK,
ValueType.DEBUG_LOCK,
ValueType.SECURE_BOOT_ENGINE_TYPE,
):
typed_value = value[0]
elif value_type == ValueType.TIMESTAMP:
typed_value = Timestamp(value[0], value[1])
elif value_type == ValueType.FIRMWARE_BUILD:
typed_value = FirmwareBuild(value[0], Timestamp(value[1], value[2]))
elif value_type == ValueType.BLUETOOTH_ADDRESS:
typed_value = hci.Address(
value, address_type=hci.Address.PUBLIC_DEVICE_ADDRESS
)
else:
typed_value = value
result.append((value_type, typed_value))
data = data[2 + value_length :]
return result
# -----------------------------------------------------------------------------
# Classes
# -----------------------------------------------------------------------------
class DriverError(core.BaseBumbleError):
def __init__(self, message: str) -> None:
super().__init__(message)
self.message = message
def __str__(self) -> str:
return f"IntelDriverError({self.message})"
class ValueType(utils.OpenIntEnum):
END = 0x00
CNVI = 0x10
CNVR = 0x11
HARDWARE_INFO = 0x12
DEVICE_REVISION = 0x16
CURRENT_MODE_OF_OPERATION = 0x1C
USB_VENDOR_ID = 0x17
USB_PRODUCT_ID = 0x18
TIMESTAMP = 0x1D
BUILD_TYPE = 0x1E
BUILD_NUMBER = 0x1F
SECURE_BOOT = 0x28
OTP_LOCK = 0x2A
API_LOCK = 0x2B
DEBUG_LOCK = 0x2C
FIRMWARE_BUILD = 0x2D
SECURE_BOOT_ENGINE_TYPE = 0x2F
BLUETOOTH_ADDRESS = 0x30
class HardwarePlatform(utils.OpenIntEnum):
INTEL_37 = 0x37
class HardwareVariant(utils.OpenIntEnum):
# This is a just a partial list.
# Add other constants here as new hardware is encountered and tested.
TYPHOON_PEAK = 0x17
GALE_PEAK = 0x1C
@dataclasses.dataclass
class HardwareInfo:
platform: HardwarePlatform
variant: HardwareVariant
@dataclasses.dataclass
class Timestamp:
week: int
year: int
@dataclasses.dataclass
class FirmwareBuild:
build_number: int
timestamp: Timestamp
class ModeOfOperation(utils.OpenIntEnum):
BOOTLOADER = 0x01
INTERMEDIATE = 0x02
OPERATIONAL = 0x03
class SecureBootEngineType(utils.OpenIntEnum):
RSA = 0x00
ECDSA = 0x01
@dataclasses.dataclass
class BootParams:
css_header_offset: int
css_header_size: int
pki_offset: int
pki_size: int
sig_offset: int
sig_size: int
write_offset: int
_BOOT_PARAMS = {
SecureBootEngineType.RSA: BootParams(0, 128, 128, 256, 388, 256, 964),
SecureBootEngineType.ECDSA: BootParams(644, 128, 772, 96, 868, 96, 964),
}
class Driver(common.Driver):
def __init__(self, host):
def __init__(self, host: Host) -> None:
self.host = host
self.max_in_flight_firmware_load_commands = 1
self.pending_firmware_load_commands: Deque[hci.HCI_Command] = (
collections.deque()
)
self.can_send_firmware_load_command = asyncio.Event()
self.can_send_firmware_load_command.set()
self.firmware_load_complete = asyncio.Event()
self.reset_complete = asyncio.Event()
# Parse configuration options from the driver name.
self.ddc_addon: Optional[bytes] = None
self.ddc_override: Optional[bytes] = None
driver = host.hci_metadata.get("driver")
if driver is not None and driver.startswith("intel/"):
for key, value in [
key_eq_value.split(":") for key_eq_value in driver[6:].split("+")
]:
if key == "ddc_addon":
self.ddc_addon = bytes.fromhex(value)
elif key == "ddc_override":
self.ddc_override = bytes.fromhex(value)
@staticmethod
def check(host):
def check(host: Host) -> bool:
driver = host.hci_metadata.get("driver")
if driver == "intel":
if driver == "intel" or driver is not None and driver.startswith("intel/"):
return True
vendor_id = host.hci_metadata.get("vendor_id")
@@ -85,18 +389,283 @@ class Driver(common.Driver):
return True
@classmethod
async def for_host(cls, host, force=False): # type: ignore
async def for_host(cls, host: Host, force: bool = False):
# Only instantiate this driver if explicitly selected
if not force and not cls.check(host):
return None
return cls(host)
async def init_controller(self):
def on_packet(self, packet: bytes) -> None:
"""Handler for event packets that are received from an ACL channel"""
event = hci.HCI_Event.from_bytes(packet)
if not isinstance(event, hci.HCI_Command_Complete_Event):
self.host.on_hci_event_packet(event)
return
if not event.return_parameters == hci.HCI_SUCCESS:
raise DriverError("HCI_Command_Complete_Event error")
if self.max_in_flight_firmware_load_commands != event.num_hci_command_packets:
logger.debug(
"max_in_flight_firmware_load_commands update: "
f"{event.num_hci_command_packets}"
)
self.max_in_flight_firmware_load_commands = event.num_hci_command_packets
logger.debug(f"event: {event}")
self.pending_firmware_load_commands.popleft()
in_flight = len(self.pending_firmware_load_commands)
logger.debug(f"event received, {in_flight} still in flight")
if in_flight < self.max_in_flight_firmware_load_commands:
self.can_send_firmware_load_command.set()
async def send_firmware_load_command(self, command: hci.HCI_Command) -> None:
# Wait until we can send.
await self.can_send_firmware_load_command.wait()
# Send the command and adjust counters.
self.host.send_hci_packet(command)
self.pending_firmware_load_commands.append(command)
in_flight = len(self.pending_firmware_load_commands)
if in_flight >= self.max_in_flight_firmware_load_commands:
logger.debug(f"max commands in flight reached [{in_flight}]")
self.can_send_firmware_load_command.clear()
async def send_firmware_data(self, data_type: int, data: bytes) -> None:
while data:
fragment_size = min(len(data), _MAX_FRAGMENT_SIZE)
fragment = data[:fragment_size]
data = data[fragment_size:]
await self.send_firmware_load_command(
Hci_Intel_Secure_Send_Command(data_type=data_type, data=fragment)
)
async def load_firmware(self) -> None:
self.host.ready = True
await self.host.send_command(HCI_Reset_Command(), check_result=True)
await self.host.send_command(
Hci_Intel_DDC_Config_Write_Command(
params=HCI_INTEL_DDC_CONFIG_WRITE_PAYLOAD
device_info = await self.read_device_info()
logger.debug(
"device info: \n%s",
"\n".join(
[
f" {value_type.name}: {value}"
for value_type, value in device_info.items()
]
),
)
# Check if the firmware is already loaded.
if (
device_info.get(ValueType.CURRENT_MODE_OF_OPERATION)
== ModeOfOperation.OPERATIONAL
):
logger.debug("firmware already loaded")
return
# We only support some platforms and variants.
hardware_info = device_info.get(ValueType.HARDWARE_INFO)
if hardware_info is None:
raise DriverError("hardware info missing")
if hardware_info.platform != HardwarePlatform.INTEL_37:
raise DriverError("hardware platform not supported")
if hardware_info.variant not in (
HardwareVariant.TYPHOON_PEAK,
HardwareVariant.GALE_PEAK,
):
raise DriverError("hardware variant not supported")
# Compute the firmware name.
if ValueType.CNVI not in device_info or ValueType.CNVR not in device_info:
raise DriverError("insufficient device info, missing CNVI or CNVR")
firmware_base_name = (
"ibt-"
f"{device_info[ValueType.CNVI]:04X}-"
f"{device_info[ValueType.CNVR]:04X}"
)
logger.debug(f"FW base name: {firmware_base_name}")
firmware_name = f"{firmware_base_name}.sfi"
firmware_path = _find_binary_path(firmware_name)
if not firmware_path:
logger.warning(f"Firmware file {firmware_name} not found")
logger.warning("See https://google.github.io/bumble/drivers/intel.html")
return None
logger.debug(f"loading firmware from {firmware_path}")
firmware_image = firmware_path.read_bytes()
engine_type = device_info.get(ValueType.SECURE_BOOT_ENGINE_TYPE)
if engine_type is None:
raise DriverError("secure boot engine type missing")
if engine_type not in _BOOT_PARAMS:
raise DriverError("secure boot engine type not supported")
boot_params = _BOOT_PARAMS[engine_type]
if len(firmware_image) < boot_params.write_offset:
raise DriverError("firmware image too small")
# Register to receive vendor events.
def on_vendor_event(event: hci.HCI_Vendor_Event):
logger.debug(f"vendor event: {event}")
event_type = event.parameters[0]
if event_type == 0x02:
# Boot event
logger.debug("boot complete")
self.reset_complete.set()
elif event_type == 0x06:
# Firmware load event
logger.debug("download complete")
self.firmware_load_complete.set()
else:
logger.debug(f"ignoring vendor event type {event_type}")
self.host.on("vendor_event", on_vendor_event)
# We need to temporarily intercept packets from the controller,
# because they are formatted as HCI event packets but are received
# on the ACL channel, so the host parser would get confused.
saved_on_packet = self.host.on_packet
self.host.on_packet = self.on_packet # type: ignore
self.firmware_load_complete.clear()
# Send the CSS header
data = firmware_image[
boot_params.css_header_offset : boot_params.css_header_offset
+ boot_params.css_header_size
]
await self.send_firmware_data(0x00, data)
# Send the PKI header
data = firmware_image[
boot_params.pki_offset : boot_params.pki_offset + boot_params.pki_size
]
await self.send_firmware_data(0x03, data)
# Send the Signature header
data = firmware_image[
boot_params.sig_offset : boot_params.sig_offset + boot_params.sig_size
]
await self.send_firmware_data(0x02, data)
# Send the rest of the image.
# The payload consists of command objects, which are sent when they add up
# to a multiple of 4 bytes.
boot_address = 0
offset = boot_params.write_offset
fragment_size = 0
while offset + 3 < len(firmware_image):
(command_opcode,) = struct.unpack_from(
"<H", firmware_image, offset + fragment_size
)
command_size = firmware_image[offset + fragment_size + 2]
if command_opcode == HCI_INTEL_WRITE_BOOT_PARAMS_COMMAND:
(boot_address,) = struct.unpack_from(
"<I", firmware_image, offset + fragment_size + 3
)
logger.debug(
"found HCI_INTEL_WRITE_BOOT_PARAMS_COMMAND, "
f"boot_address={boot_address}"
)
fragment_size += 3 + command_size
if fragment_size % 4 == 0:
await self.send_firmware_data(
0x01, firmware_image[offset : offset + fragment_size]
)
logger.debug(f"sent {fragment_size} bytes")
offset += fragment_size
fragment_size = 0
# Wait for the firmware loading to be complete.
logger.debug("waiting for firmware to be loaded")
await self.firmware_load_complete.wait()
logger.debug("firmware loaded")
# Restore the original packet handler.
self.host.on_packet = saved_on_packet # type: ignore
# Reset
self.reset_complete.clear()
self.host.send_hci_packet(
HCI_Intel_Reset_Command(
reset_type=0x00,
patch_enable=0x01,
ddc_reload=0x00,
boot_option=0x01,
boot_address=boot_address,
)
)
logger.debug("waiting for reset completion")
await self.reset_complete.wait()
logger.debug("reset complete")
# Load the device config if there is one.
if self.ddc_override:
logger.debug("loading overridden DDC")
await self.load_device_config(self.ddc_override)
else:
ddc_name = f"{firmware_base_name}.ddc"
ddc_path = _find_binary_path(ddc_name)
if ddc_path:
logger.debug(f"loading DDC from {ddc_path}")
ddc_data = ddc_path.read_bytes()
await self.load_device_config(ddc_data)
if self.ddc_addon:
logger.debug("loading DDC addon")
await self.load_device_config(self.ddc_addon)
async def load_device_config(self, ddc_data: bytes) -> None:
while ddc_data:
ddc_len = 1 + ddc_data[0]
ddc_payload = ddc_data[:ddc_len]
await self.host.send_command(
Hci_Intel_Write_Device_Config_Command(data=ddc_payload)
)
ddc_data = ddc_data[ddc_len:]
async def reboot_bootloader(self) -> None:
self.host.send_hci_packet(
HCI_Intel_Reset_Command(
reset_type=0x01,
patch_enable=0x01,
ddc_reload=0x01,
boot_option=0x00,
boot_address=0,
)
)
await asyncio.sleep(_POST_RESET_DELAY)
async def read_device_info(self) -> dict[ValueType, Any]:
self.host.ready = True
response = await self.host.send_command(hci.HCI_Reset_Command())
if not (
isinstance(response, hci.HCI_Command_Complete_Event)
and response.return_parameters
in (hci.HCI_UNKNOWN_HCI_COMMAND_ERROR, hci.HCI_SUCCESS)
):
# When the controller is in operational mode, the response is a
# successful response.
# When the controller is in bootloader mode,
# HCI_UNKNOWN_HCI_COMMAND_ERROR is the expected response. Anything
# else is a failure.
logger.warning(f"unexpected response: {response}")
raise DriverError("unexpected HCI response")
# Read the firmware version.
response = await self.host.send_command(
HCI_Intel_Read_Version_Command(param0=0xFF)
)
if not isinstance(response, hci.HCI_Command_Complete_Event):
raise DriverError("unexpected HCI response")
if response.return_parameters.status != 0: # type: ignore
raise DriverError("HCI_Intel_Read_Version_Command error")
tlvs = _parse_tlv(response.return_parameters.tlv) # type: ignore
# Convert the list to a dict. That's Ok here because we only expect each type
# to appear just once.
return dict(tlvs)
async def init_controller(self):
await self.load_firmware()

View File

@@ -42,7 +42,7 @@ from typing import (
)
from bumble.colors import color
from bumble.core import BaseBumbleError, UUID
from bumble.core import BaseBumbleError, InvalidOperationError, UUID
from bumble.att import Attribute, AttributeValue
from bumble.utils import ByteSerializable
@@ -279,6 +279,13 @@ GATT_SOURCE_AUDIO_LOCATION_CHARACTERISTIC = UUID.from_16_bits(0x2BCC, 'Sou
GATT_AVAILABLE_AUDIO_CONTEXTS_CHARACTERISTIC = UUID.from_16_bits(0x2BCD, 'Available Audio Contexts')
GATT_SUPPORTED_AUDIO_CONTEXTS_CHARACTERISTIC = UUID.from_16_bits(0x2BCE, 'Supported Audio Contexts')
# Gaming Audio Service (GMAS)
GATT_GMAP_ROLE_CHARACTERISTIC = UUID.from_16_bits(0x2C00, 'GMAP Role')
GATT_UGG_FEATURES_CHARACTERISTIC = UUID.from_16_bits(0x2C01, 'UGG Features')
GATT_UGT_FEATURES_CHARACTERISTIC = UUID.from_16_bits(0x2C02, 'UGT Features')
GATT_BGS_FEATURES_CHARACTERISTIC = UUID.from_16_bits(0x2C03, 'BGS Features')
GATT_BGR_FEATURES_CHARACTERISTIC = UUID.from_16_bits(0x2C04, 'BGR Features')
# Hearing Access Service
GATT_HEARING_AID_FEATURES_CHARACTERISTIC = UUID.from_16_bits(0x2BDA, 'Hearing Aid Features')
GATT_HEARING_AID_PRESET_CONTROL_POINT_CHARACTERISTIC = UUID.from_16_bits(0x2BDB, 'Hearing Aid Preset Control Point')
@@ -308,6 +315,7 @@ GATT_CENTRAL_ADDRESS_RESOLUTION__CHARACTERISTIC = UUID.from_16_bi
GATT_CLIENT_SUPPORTED_FEATURES_CHARACTERISTIC = UUID.from_16_bits(0x2B29, 'Client Supported Features')
GATT_DATABASE_HASH_CHARACTERISTIC = UUID.from_16_bits(0x2B2A, 'Database Hash')
GATT_SERVER_SUPPORTED_FEATURES_CHARACTERISTIC = UUID.from_16_bits(0x2B3A, 'Server Supported Features')
GATT_LE_GATT_SECURITY_LEVELS_CHARACTERISTIC = UUID.from_16_bits(0x2BF5, 'E GATT Security Levels')
# fmt: on
# pylint: enable=line-too-long
@@ -316,8 +324,6 @@ GATT_SERVER_SUPPORTED_FEATURES_CHARACTERISTIC = UUID.from_16_bi
# -----------------------------------------------------------------------------
# Utils
# -----------------------------------------------------------------------------
def show_services(services: Iterable[Service]) -> None:
for service in services:
print(color(str(service), 'cyan'))
@@ -414,7 +420,7 @@ class IncludedServiceDeclaration(Attribute):
def __init__(self, service: Service) -> None:
declaration_bytes = struct.pack(
'<HH2s', service.handle, service.end_group_handle, service.uuid.to_bytes()
'<HH2s', service.handle, service.end_group_handle, bytes(service.uuid)
)
super().__init__(
GATT_INCLUDE_ATTRIBUTE_TYPE, Attribute.READABLE, declaration_bytes
@@ -673,10 +679,14 @@ class DelegatedCharacteristicAdapter(CharacteristicAdapter):
self.decode = decode
def encode_value(self, value):
return self.encode(value) if self.encode else value
if self.encode is None:
raise InvalidOperationError('delegated adapter does not have an encoder')
return self.encode(value)
def decode_value(self, value):
return self.decode(value) if self.decode else value
if self.decode is None:
raise InvalidOperationError('delegate adapter does not have a decoder')
return self.decode(value)
# -----------------------------------------------------------------------------

View File

@@ -78,6 +78,7 @@ from .gatt import (
GATT_INCLUDE_ATTRIBUTE_TYPE,
Characteristic,
ClientCharacteristicConfigurationBits,
InvalidServiceError,
TemplateService,
)
@@ -162,12 +163,23 @@ class ServiceProxy(AttributeProxy):
self.uuid = uuid
self.characteristics = []
async def discover_characteristics(self, uuids=()):
async def discover_characteristics(self, uuids=()) -> list[CharacteristicProxy]:
return await self.client.discover_characteristics(uuids, self)
def get_characteristics_by_uuid(self, uuid):
def get_characteristics_by_uuid(self, uuid: UUID) -> list[CharacteristicProxy]:
"""Get all the characteristics with a specified UUID."""
return self.client.get_characteristics_by_uuid(uuid, self)
def get_required_characteristic_by_uuid(self, uuid: UUID) -> CharacteristicProxy:
"""
Get the first characteristic with a specified UUID.
If no characteristic with that UUID is found, an InvalidServiceError is raised.
"""
if not (characteristics := self.get_characteristics_by_uuid(uuid)):
raise InvalidServiceError(f'{uuid} characteristic not found')
return characteristics[0]
def __str__(self) -> str:
return f'Service(handle=0x{self.handle:04X}, uuid={self.uuid})'
@@ -292,7 +304,7 @@ class Client:
logger.debug(
f'GATT Command from client: [0x{self.connection.handle:04X}] {command}'
)
self.send_gatt_pdu(command.to_bytes())
self.send_gatt_pdu(bytes(command))
async def send_request(self, request: ATT_PDU):
logger.debug(
@@ -310,7 +322,7 @@ class Client:
self.pending_request = request
try:
self.send_gatt_pdu(request.to_bytes())
self.send_gatt_pdu(bytes(request))
response = await asyncio.wait_for(
self.pending_response, GATT_REQUEST_TIMEOUT
)
@@ -328,7 +340,7 @@ class Client:
f'GATT Confirmation from client: [0x{self.connection.handle:04X}] '
f'{confirmation}'
)
self.send_gatt_pdu(confirmation.to_bytes())
self.send_gatt_pdu(bytes(confirmation))
async def request_mtu(self, mtu: int) -> int:
# Check the range

View File

@@ -364,7 +364,7 @@ class Server(EventEmitter):
logger.debug(
f'GATT Response from server: [0x{connection.handle:04X}] {response}'
)
self.send_gatt_pdu(connection.handle, response.to_bytes())
self.send_gatt_pdu(connection.handle, bytes(response))
async def notify_subscriber(
self,
@@ -461,7 +461,7 @@ class Server(EventEmitter):
)
try:
self.send_gatt_pdu(connection.handle, indication.to_bytes())
self.send_gatt_pdu(connection.handle, bytes(indication))
await asyncio.wait_for(pending_confirmation, GATT_REQUEST_TIMEOUT)
except asyncio.TimeoutError as error:
logger.warning(color('!!! GATT Indicate timeout', 'red'))

File diff suppressed because it is too large Load Diff

View File

@@ -141,7 +141,7 @@ class HfFeature(enum.IntFlag):
"""
HF supported features (AT+BRSF=) (normative).
Hands-Free Profile v1.8, 4.34.2, AT Capabilities Re-Used from GSM 07.07 and 3GPP 27.007.
Hands-Free Profile v1.9, 4.34.2, AT Capabilities Re-Used from GSM 07.07 and 3GPP 27.007.
"""
EC_NR = 0x001 # Echo Cancel & Noise reduction
@@ -155,14 +155,14 @@ class HfFeature(enum.IntFlag):
HF_INDICATORS = 0x100
ESCO_S4_SETTINGS_SUPPORTED = 0x200
ENHANCED_VOICE_RECOGNITION_STATUS = 0x400
VOICE_RECOGNITION_TEST = 0x800
VOICE_RECOGNITION_TEXT = 0x800
class AgFeature(enum.IntFlag):
"""
AG supported features (+BRSF:) (normative).
Hands-Free Profile v1.8, 4.34.2, AT Capabilities Re-Used from GSM 07.07 and 3GPP 27.007.
Hands-Free Profile v1.9, 4.34.2, AT Capabilities Re-Used from GSM 07.07 and 3GPP 27.007.
"""
THREE_WAY_CALLING = 0x001
@@ -178,7 +178,7 @@ class AgFeature(enum.IntFlag):
HF_INDICATORS = 0x400
ESCO_S4_SETTINGS_SUPPORTED = 0x800
ENHANCED_VOICE_RECOGNITION_STATUS = 0x1000
VOICE_RECOGNITION_TEST = 0x2000
VOICE_RECOGNITION_TEXT = 0x2000
class AudioCodec(enum.IntEnum):
@@ -1390,6 +1390,7 @@ class AgProtocol(pyee.EventEmitter):
def _on_bac(self, *args) -> None:
self.supported_audio_codecs = [AudioCodec(int(value)) for value in args]
self.emit('supported_audio_codecs', self.supported_audio_codecs)
self.send_ok()
def _on_bcs(self, codec: bytes) -> None:
@@ -1618,7 +1619,7 @@ class ProfileVersion(enum.IntEnum):
"""
Profile version (normative).
Hands-Free Profile v1.8, 5.3 SDP Interoperability Requirements.
Hands-Free Profile v1.8, 6.3 SDP Interoperability Requirements.
"""
V1_5 = 0x0105
@@ -1632,7 +1633,7 @@ class HfSdpFeature(enum.IntFlag):
"""
HF supported features (normative).
Hands-Free Profile v1.8, 5.3 SDP Interoperability Requirements.
Hands-Free Profile v1.9, 6.3 SDP Interoperability Requirements.
"""
EC_NR = 0x01 # Echo Cancel & Noise reduction
@@ -1640,16 +1641,17 @@ class HfSdpFeature(enum.IntFlag):
CLI_PRESENTATION_CAPABILITY = 0x04
VOICE_RECOGNITION_ACTIVATION = 0x08
REMOTE_VOLUME_CONTROL = 0x10
WIDE_BAND = 0x20 # Wide band speech
WIDE_BAND_SPEECH = 0x20
ENHANCED_VOICE_RECOGNITION_STATUS = 0x40
VOICE_RECOGNITION_TEST = 0x80
VOICE_RECOGNITION_TEXT = 0x80
SUPER_WIDE_BAND = 0x100
class AgSdpFeature(enum.IntFlag):
"""
AG supported features (normative).
Hands-Free Profile v1.8, 5.3 SDP Interoperability Requirements.
Hands-Free Profile v1.9, 6.3 SDP Interoperability Requirements.
"""
THREE_WAY_CALLING = 0x01
@@ -1657,9 +1659,10 @@ class AgSdpFeature(enum.IntFlag):
VOICE_RECOGNITION_FUNCTION = 0x04
IN_BAND_RING_TONE_CAPABILITY = 0x08
VOICE_TAG = 0x10 # Attach a number to voice tag
WIDE_BAND = 0x20 # Wide band speech
WIDE_BAND_SPEECH = 0x20
ENHANCED_VOICE_RECOGNITION_STATUS = 0x40
VOICE_RECOGNITION_TEST = 0x80
VOICE_RECOGNITION_TEXT = 0x80
SUPER_WIDE_BAND_SPEED_SPEECH = 0x100
def make_hf_sdp_records(
@@ -1692,11 +1695,11 @@ def make_hf_sdp_records(
in configuration.supported_hf_features
):
hf_supported_features |= HfSdpFeature.ENHANCED_VOICE_RECOGNITION_STATUS
if HfFeature.VOICE_RECOGNITION_TEST in configuration.supported_hf_features:
hf_supported_features |= HfSdpFeature.VOICE_RECOGNITION_TEST
if HfFeature.VOICE_RECOGNITION_TEXT in configuration.supported_hf_features:
hf_supported_features |= HfSdpFeature.VOICE_RECOGNITION_TEXT
if AudioCodec.MSBC in configuration.supported_audio_codecs:
hf_supported_features |= HfSdpFeature.WIDE_BAND
hf_supported_features |= HfSdpFeature.WIDE_BAND_SPEECH
return [
sdp.ServiceAttribute(
@@ -1772,14 +1775,14 @@ def make_ag_sdp_records(
in configuration.supported_ag_features
):
ag_supported_features |= AgSdpFeature.ENHANCED_VOICE_RECOGNITION_STATUS
if AgFeature.VOICE_RECOGNITION_TEST in configuration.supported_ag_features:
ag_supported_features |= AgSdpFeature.VOICE_RECOGNITION_TEST
if AgFeature.VOICE_RECOGNITION_TEXT in configuration.supported_ag_features:
ag_supported_features |= AgSdpFeature.VOICE_RECOGNITION_TEXT
if AgFeature.IN_BAND_RING_TONE_CAPABILITY in configuration.supported_ag_features:
ag_supported_features |= AgSdpFeature.IN_BAND_RING_TONE_CAPABILITY
if AgFeature.VOICE_RECOGNITION_FUNCTION in configuration.supported_ag_features:
ag_supported_features |= AgSdpFeature.VOICE_RECOGNITION_FUNCTION
if AudioCodec.MSBC in configuration.supported_audio_codecs:
ag_supported_features |= AgSdpFeature.WIDE_BAND
ag_supported_features |= AgSdpFeature.WIDE_BAND_SPEECH
return [
sdp.ServiceAttribute(

View File

@@ -1,4 +1,4 @@
# Copyright 2021-2022 Google LLC
# Copyright 2021-2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -34,6 +34,8 @@ from typing import (
TYPE_CHECKING,
)
import pyee
from bumble.colors import color
from bumble.l2cap import L2CAP_PDU
from bumble.snoop import Snooper
@@ -59,7 +61,19 @@ logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
class AclPacketQueue:
class DataPacketQueue(pyee.EventEmitter):
"""
Flow-control queue for host->controller data packets (ACL, ISO).
The queue holds packets associated with a connection handle. The packets
are sent to the controller, up to a maximum total number of packets in flight.
A packet is considered to be "in flight" when it has been sent to the controller
but not completed yet. Packets are no longer "in flight" when the controller
declares them as completed.
The queue emits a 'flow' event whenever one or more packets are completed.
"""
max_packet_size: int
def __init__(
@@ -68,40 +82,105 @@ class AclPacketQueue:
max_in_flight: int,
send: Callable[[hci.HCI_Packet], None],
) -> None:
super().__init__()
self.max_packet_size = max_packet_size
self.max_in_flight = max_in_flight
self.in_flight = 0
self.send = send
self.packets: Deque[hci.HCI_AclDataPacket] = collections.deque()
self._in_flight = 0 # Total number of packets in flight across all connections
self._in_flight_per_connection: dict[int, int] = collections.defaultdict(
int
) # Number of packets in flight per connection
self._send = send
self._packets: Deque[tuple[hci.HCI_Packet, int]] = collections.deque()
self._queued = 0
self._completed = 0
def enqueue(self, packet: hci.HCI_AclDataPacket) -> None:
self.packets.appendleft(packet)
self.check_queue()
@property
def queued(self) -> int:
"""Total number of packets queued since creation."""
return self._queued
if self.packets:
@property
def completed(self) -> int:
"""Total number of packets completed since creation."""
return self._completed
@property
def pending(self) -> int:
"""Number of packets that have been queued but not completed."""
return self._queued - self._completed
def enqueue(self, packet: hci.HCI_Packet, connection_handle: int) -> None:
"""Enqueue a packet associated with a connection"""
self._packets.appendleft((packet, connection_handle))
self._queued += 1
self._check_queue()
if self._packets:
logger.debug(
f'{self.in_flight} ACL packets in flight, '
f'{len(self.packets)} in queue'
f'{self._in_flight} packets in flight, '
f'{len(self._packets)} in queue'
)
def check_queue(self) -> None:
while self.packets and self.in_flight < self.max_in_flight:
packet = self.packets.pop()
self.send(packet)
self.in_flight += 1
def flush(self, connection_handle: int) -> None:
"""
Remove all packets associated with a connection.
def on_packets_completed(self, packet_count: int) -> None:
if packet_count > self.in_flight:
All packets associated with the connection that are in flight are implicitly
marked as completed, but no 'flow' event is emitted.
"""
packets_to_keep = [
(packet, handle)
for (packet, handle) in self._packets
if handle != connection_handle
]
if flushed_count := len(self._packets) - len(packets_to_keep):
self._completed += flushed_count
self._packets = collections.deque(packets_to_keep)
if connection_handle in self._in_flight_per_connection:
in_flight = self._in_flight_per_connection[connection_handle]
self._completed += in_flight
self._in_flight -= in_flight
del self._in_flight_per_connection[connection_handle]
def _check_queue(self) -> None:
while self._packets and self._in_flight < self.max_in_flight:
packet, connection_handle = self._packets.pop()
self._send(packet)
self._in_flight += 1
self._in_flight_per_connection[connection_handle] += 1
def on_packets_completed(self, packet_count: int, connection_handle: int) -> None:
"""Mark one or more packets associated with a connection as completed."""
if connection_handle not in self._in_flight_per_connection:
logger.warning(
color(
'!!! {packet_count} completed but only '
f'{self.in_flight} in flight'
)
f'received completion for unknown connection {connection_handle}'
)
packet_count = self.in_flight
return
self.in_flight -= packet_count
self.check_queue()
in_flight_for_connection = self._in_flight_per_connection[connection_handle]
if packet_count <= in_flight_for_connection:
self._in_flight_per_connection[connection_handle] -= packet_count
else:
logger.warning(
f'{packet_count} completed for {connection_handle} '
f'but only {in_flight_for_connection} in flight'
)
self._in_flight_per_connection[connection_handle] = 0
if packet_count <= self._in_flight:
self._in_flight -= packet_count
self._completed += packet_count
else:
logger.warning(
f'{packet_count} completed but only {self._in_flight} in flight'
)
self._in_flight = 0
self._completed = self._queued
self._check_queue()
self.emit('flow')
# -----------------------------------------------------------------------------
@@ -114,7 +193,7 @@ class Connection:
self.peer_address = peer_address
self.assembler = hci.HCI_AclDataPacketAssembler(self.on_acl_pdu)
self.transport = transport
acl_packet_queue: Optional[AclPacketQueue] = (
acl_packet_queue: Optional[DataPacketQueue] = (
host.le_acl_packet_queue
if transport == BT_LE_TRANSPORT
else host.acl_packet_queue
@@ -129,28 +208,37 @@ class Connection:
l2cap_pdu = L2CAP_PDU.from_bytes(pdu)
self.host.on_l2cap_pdu(self, l2cap_pdu.cid, l2cap_pdu.payload)
def __str__(self) -> str:
return (
f'Connection(transport={self.transport}, peer_address={self.peer_address})'
)
# -----------------------------------------------------------------------------
@dataclasses.dataclass
class ScoLink:
peer_address: hci.Address
handle: int
connection_handle: int
# -----------------------------------------------------------------------------
@dataclasses.dataclass
class CisLink:
peer_address: hci.Address
class IsoLink:
handle: int
packet_queue: DataPacketQueue = dataclasses.field(repr=False)
packet_sequence_number: int = 0
# -----------------------------------------------------------------------------
class Host(AbortableEventEmitter):
connections: Dict[int, Connection]
cis_links: Dict[int, CisLink]
cis_links: Dict[int, IsoLink]
bis_links: Dict[int, IsoLink]
sco_links: Dict[int, ScoLink]
acl_packet_queue: Optional[AclPacketQueue] = None
le_acl_packet_queue: Optional[AclPacketQueue] = None
bigs: dict[int, set[int]] = {} # BIG Handle to BIS Handles
acl_packet_queue: Optional[DataPacketQueue] = None
le_acl_packet_queue: Optional[DataPacketQueue] = None
iso_packet_queue: Optional[DataPacketQueue] = None
hci_sink: Optional[TransportSink] = None
hci_metadata: Dict[str, Any]
long_term_key_provider: Optional[
@@ -169,6 +257,7 @@ class Host(AbortableEventEmitter):
self.ready = False # True when we can accept incoming packets
self.connections = {} # Connections, by connection handle
self.cis_links = {} # CIS links, by connection handle
self.bis_links = {} # BIS links, by connection handle
self.sco_links = {} # SCO links, by connection handle
self.pending_command = None
self.pending_response: Optional[asyncio.Future[Any]] = None
@@ -199,7 +288,7 @@ class Host(AbortableEventEmitter):
check_address_type: bool = False,
) -> Optional[Connection]:
for connection in self.connections.values():
if connection.peer_address.to_bytes() == bd_addr.to_bytes():
if bytes(connection.peer_address) == bytes(bd_addr):
if (
check_address_type
and connection.peer_address.address_type != bd_addr.address_type
@@ -411,39 +500,70 @@ class Host(AbortableEventEmitter):
f'hc_total_num_acl_data_packets={hc_total_num_acl_data_packets}'
)
self.acl_packet_queue = AclPacketQueue(
self.acl_packet_queue = DataPacketQueue(
max_packet_size=hc_acl_data_packet_length,
max_in_flight=hc_total_num_acl_data_packets,
send=self.send_hci_packet,
)
hc_le_acl_data_packet_length = 0
hc_total_num_le_acl_data_packets = 0
if self.supports_command(hci.HCI_LE_READ_BUFFER_SIZE_COMMAND):
le_acl_data_packet_length = 0
total_num_le_acl_data_packets = 0
iso_data_packet_length = 0
total_num_iso_data_packets = 0
if self.supports_command(hci.HCI_LE_READ_BUFFER_SIZE_V2_COMMAND):
response = await self.send_command(
hci.HCI_LE_Read_Buffer_Size_V2_Command(), check_result=True
)
le_acl_data_packet_length = (
response.return_parameters.le_acl_data_packet_length
)
total_num_le_acl_data_packets = (
response.return_parameters.total_num_le_acl_data_packets
)
iso_data_packet_length = response.return_parameters.iso_data_packet_length
total_num_iso_data_packets = (
response.return_parameters.total_num_iso_data_packets
)
logger.debug(
'HCI LE flow control: '
f'le_acl_data_packet_length={le_acl_data_packet_length},'
f'total_num_le_acl_data_packets={total_num_le_acl_data_packets}'
f'iso_data_packet_length={iso_data_packet_length},'
f'total_num_iso_data_packets={total_num_iso_data_packets}'
)
elif self.supports_command(hci.HCI_LE_READ_BUFFER_SIZE_COMMAND):
response = await self.send_command(
hci.HCI_LE_Read_Buffer_Size_Command(), check_result=True
)
hc_le_acl_data_packet_length = (
response.return_parameters.hc_le_acl_data_packet_length
le_acl_data_packet_length = (
response.return_parameters.le_acl_data_packet_length
)
hc_total_num_le_acl_data_packets = (
response.return_parameters.hc_total_num_le_acl_data_packets
total_num_le_acl_data_packets = (
response.return_parameters.total_num_le_acl_data_packets
)
logger.debug(
'HCI LE ACL flow control: '
f'hc_le_acl_data_packet_length={hc_le_acl_data_packet_length},'
f'hc_total_num_le_acl_data_packets={hc_total_num_le_acl_data_packets}'
f'le_acl_data_packet_length={le_acl_data_packet_length},'
f'total_num_le_acl_data_packets={total_num_le_acl_data_packets}'
)
if hc_le_acl_data_packet_length == 0 or hc_total_num_le_acl_data_packets == 0:
if le_acl_data_packet_length == 0 or total_num_le_acl_data_packets == 0:
# LE and Classic share the same queue
self.le_acl_packet_queue = self.acl_packet_queue
else:
# Create a separate queue for LE
self.le_acl_packet_queue = AclPacketQueue(
max_packet_size=hc_le_acl_data_packet_length,
max_in_flight=hc_total_num_le_acl_data_packets,
self.le_acl_packet_queue = DataPacketQueue(
max_packet_size=le_acl_data_packet_length,
max_in_flight=total_num_le_acl_data_packets,
send=self.send_hci_packet,
)
if iso_data_packet_length and total_num_iso_data_packets:
self.iso_packet_queue = DataPacketQueue(
max_packet_size=iso_data_packet_length,
max_in_flight=total_num_iso_data_packets,
send=self.send_hci_packet,
)
@@ -552,7 +672,7 @@ class Host(AbortableEventEmitter):
return response
except Exception as error:
logger.warning(
logger.exception(
f'{color("!!! Exception while sending command:", "red")} {error}'
)
raise error
@@ -595,11 +715,78 @@ class Host(AbortableEventEmitter):
data=l2cap_pdu[offset : offset + data_total_length],
)
logger.debug(f'>>> ACL packet enqueue: (CID={cid}) {acl_packet}')
packet_queue.enqueue(acl_packet)
packet_queue.enqueue(acl_packet, connection_handle)
pb_flag = 1
offset += data_total_length
bytes_remaining -= data_total_length
def get_data_packet_queue(self, connection_handle: int) -> DataPacketQueue | None:
if connection := self.connections.get(connection_handle):
return connection.acl_packet_queue
if iso_link := self.cis_links.get(connection_handle) or self.bis_links.get(
connection_handle
):
return iso_link.packet_queue
return None
def send_iso_sdu(self, connection_handle: int, sdu: bytes) -> None:
if not (
iso_link := self.cis_links.get(connection_handle)
or self.bis_links.get(connection_handle)
):
logger.warning(f"no ISO link for connection handle {connection_handle}")
return
if iso_link.packet_queue is None:
logger.warning("ISO link has no data packet queue")
return
bytes_remaining = len(sdu)
offset = 0
while bytes_remaining:
is_first_fragment = offset == 0
header_length = 4 if is_first_fragment else 0
assert iso_link.packet_queue.max_packet_size > header_length
fragment_length = min(
bytes_remaining, iso_link.packet_queue.max_packet_size - header_length
)
is_last_fragment = bytes_remaining == fragment_length
iso_sdu_fragment = sdu[offset : offset + fragment_length]
iso_link.packet_queue.enqueue(
(
hci.HCI_IsoDataPacket(
connection_handle=connection_handle,
data_total_length=header_length + fragment_length,
packet_sequence_number=iso_link.packet_sequence_number,
pb_flag=0b10 if is_last_fragment else 0b00,
packet_status_flag=0,
iso_sdu_length=len(sdu),
iso_sdu_fragment=iso_sdu_fragment,
)
if is_first_fragment
else hci.HCI_IsoDataPacket(
connection_handle=connection_handle,
data_total_length=fragment_length,
pb_flag=0b11 if is_last_fragment else 0b01,
iso_sdu_fragment=iso_sdu_fragment,
)
),
connection_handle,
)
offset += fragment_length
bytes_remaining -= fragment_length
iso_link.packet_sequence_number = (iso_link.packet_sequence_number + 1) & 0xFFFF
def remove_big(self, big_handle: int) -> None:
if big := self.bigs.pop(big_handle, None):
for connection_handle in big:
if bis_link := self.bis_links.pop(connection_handle, None):
bis_link.packet_queue.flush(bis_link.handle)
def supports_command(self, op_code: int) -> bool:
return (
self.local_supported_commands
@@ -727,16 +914,17 @@ class Host(AbortableEventEmitter):
def on_hci_command_status_event(self, event):
return self.on_command_processed(event)
def on_hci_number_of_completed_packets_event(self, event):
def on_hci_number_of_completed_packets_event(
self, event: hci.HCI_Number_Of_Completed_Packets_Event
) -> None:
for connection_handle, num_completed_packets in zip(
event.connection_handles, event.num_completed_packets
):
if connection := self.connections.get(connection_handle):
connection.acl_packet_queue.on_packets_completed(num_completed_packets)
elif not (
self.cis_links.get(connection_handle)
or self.sco_links.get(connection_handle)
):
if queue := self.get_data_packet_queue(connection_handle):
queue.on_packets_completed(num_completed_packets, connection_handle)
continue
if connection_handle not in self.sco_links:
logger.warning(
'received packet completion event for unknown handle '
f'0x{connection_handle:04X}'
@@ -854,11 +1042,7 @@ class Host(AbortableEventEmitter):
return
if event.status == hci.HCI_SUCCESS:
logger.debug(
f'### DISCONNECTION: [0x{handle:04X}] '
f'{connection.peer_address} '
f'reason={event.reason}'
)
logger.debug(f'### DISCONNECTION: {connection}, reason={event.reason}')
# Notify the listeners
self.emit('disconnection', handle, event.reason)
@@ -869,6 +1053,12 @@ class Host(AbortableEventEmitter):
or self.cis_links.pop(handle, 0)
or self.sco_links.pop(handle, 0)
)
# Flush the data queues
self.acl_packet_queue.flush(handle)
self.le_acl_packet_queue.flush(handle)
if self.iso_packet_queue:
self.iso_packet_queue.flush(handle)
else:
logger.debug(f'### DISCONNECTION FAILED: {event.status}')
@@ -953,12 +1143,68 @@ class Host(AbortableEventEmitter):
event.cis_id,
)
def on_hci_le_create_big_complete_event(self, event):
self.bigs[event.big_handle] = set(event.connection_handle)
if self.iso_packet_queue is None:
logger.warning("BIS established but ISO packets not supported")
for connection_handle in event.connection_handle:
self.bis_links[connection_handle] = IsoLink(
connection_handle, self.iso_packet_queue
)
self.emit(
'big_establishment',
event.status,
event.big_handle,
event.connection_handle,
event.big_sync_delay,
event.transport_latency_big,
event.phy,
event.nse,
event.bn,
event.pto,
event.irc,
event.max_pdu,
event.iso_interval,
)
def on_hci_le_big_sync_established_event(self, event):
self.bigs[event.big_handle] = set(event.connection_handle)
for connection_handle in event.connection_handle:
self.bis_links[connection_handle] = IsoLink(
connection_handle, self.iso_packet_queue
)
self.emit(
'big_sync_establishment',
event.status,
event.big_handle,
event.transport_latency_big,
event.nse,
event.bn,
event.pto,
event.irc,
event.max_pdu,
event.iso_interval,
event.connection_handle,
)
def on_hci_le_big_sync_lost_event(self, event):
self.remove_big(event.big_handle)
self.emit('big_sync_lost', event.big_handle, event.reason)
def on_hci_le_terminate_big_complete_event(self, event):
self.remove_big(event.big_handle)
self.emit('big_termination', event.reason, event.big_handle)
def on_hci_le_cis_established_event(self, event):
# The remaining parameters are unused for now.
if event.status == hci.HCI_SUCCESS:
self.cis_links[event.connection_handle] = CisLink(
handle=event.connection_handle,
peer_address=hci.Address.ANY,
if self.iso_packet_queue is None:
logger.warning("CIS established but ISO packets not supported")
self.cis_links[event.connection_handle] = IsoLink(
handle=event.connection_handle, packet_queue=self.iso_packet_queue
)
self.emit('cis_establishment', event.connection_handle)
else:
@@ -1028,7 +1274,7 @@ class Host(AbortableEventEmitter):
self.sco_links[event.connection_handle] = ScoLink(
peer_address=event.bd_addr,
handle=event.connection_handle,
connection_handle=event.connection_handle,
)
# Notify the client
@@ -1248,3 +1494,6 @@ class Host(AbortableEventEmitter):
event.connection_handle,
int.from_bytes(event.le_features, 'little'),
)
def on_hci_vendor_event(self, event):
self.emit('vendor_event', event)

View File

@@ -225,7 +225,7 @@ class L2CAP_PDU:
return L2CAP_PDU(l2cap_pdu_cid, l2cap_pdu_payload)
def to_bytes(self) -> bytes:
def __bytes__(self) -> bytes:
header = struct.pack('<HH', len(self.payload), self.cid)
return header + self.payload
@@ -233,9 +233,6 @@ class L2CAP_PDU:
self.cid = cid
self.payload = payload
def __bytes__(self) -> bytes:
return self.to_bytes()
def __str__(self) -> str:
return f'{color("L2CAP", "green")} [CID={self.cid}]: {self.payload.hex()}'
@@ -333,11 +330,8 @@ class L2CAP_Control_Frame:
def init_from_bytes(self, pdu, offset):
return HCI_Object.init_from_bytes(self, pdu, offset, self.fields)
def to_bytes(self) -> bytes:
return self.pdu
def __bytes__(self) -> bytes:
return self.to_bytes()
return self.pdu
def __str__(self) -> str:
result = f'{color(self.name, "yellow")} [ID={self.identifier}]'
@@ -779,7 +773,6 @@ class ClassicChannel(EventEmitter):
self.psm = psm
self.source_cid = source_cid
self.destination_cid = 0
self.response = None
self.connection_result = None
self.disconnection_result = None
self.sink = None
@@ -789,27 +782,15 @@ class ClassicChannel(EventEmitter):
self.state = new_state
def send_pdu(self, pdu: Union[SupportsBytes, bytes]) -> None:
if self.state != self.State.OPEN:
raise InvalidStateError('channel not open')
self.manager.send_pdu(self.connection, self.destination_cid, pdu)
def send_control_frame(self, frame: L2CAP_Control_Frame) -> None:
self.manager.send_control_frame(self.connection, self.signaling_cid, frame)
async def send_request(self, request: SupportsBytes) -> bytes:
# Check that there isn't already a request pending
if self.response:
raise InvalidStateError('request already pending')
if self.state != self.State.OPEN:
raise InvalidStateError('channel not open')
self.response = asyncio.get_running_loop().create_future()
self.send_pdu(request)
return await self.response
def on_pdu(self, pdu: bytes) -> None:
if self.response:
self.response.set_result(pdu)
self.response = None
elif self.sink:
if self.sink:
# pylint: disable=not-callable
self.sink(pdu)
else:

View File

@@ -139,16 +139,19 @@ class PairingDelegate:
io_capability: IoCapability
local_initiator_key_distribution: KeyDistribution
local_responder_key_distribution: KeyDistribution
maximum_encryption_key_size: int
def __init__(
self,
io_capability: IoCapability = NO_OUTPUT_NO_INPUT,
local_initiator_key_distribution: KeyDistribution = DEFAULT_KEY_DISTRIBUTION,
local_responder_key_distribution: KeyDistribution = DEFAULT_KEY_DISTRIBUTION,
maximum_encryption_key_size: int = 16,
) -> None:
self.io_capability = io_capability
self.local_initiator_key_distribution = local_initiator_key_distribution
self.local_responder_key_distribution = local_responder_key_distribution
self.maximum_encryption_key_size = maximum_encryption_key_size
@property
def classic_io_capability(self) -> int:

View File

@@ -97,7 +97,7 @@ class AudioInputStatus(OpenIntEnum):
Cf. 3.4 Audio Input Status
'''
INATIVE = 0x00
INACTIVE = 0x00
ACTIVE = 0x01
@@ -106,7 +106,7 @@ class AudioInputControlPointOpCode(OpenIntEnum):
Cf. 3.5.1 Audio Input Control Point procedure requirements
'''
SET_GAIN_SETTING = 0x00
SET_GAIN_SETTING = 0x01
UNMUTE = 0x02
MUTE = 0x03
SET_MANUAL_GAIN_MODE = 0x04
@@ -235,7 +235,7 @@ class AudioInputControlPoint:
or gain_settings_operand
> self.gain_settings_properties.gain_settings_maximum
):
logger.error("gain_seetings value out of range")
logger.error("gain_settings value out of range")
raise ATT_Error(ErrorCode.VALUE_OUT_OF_RANGE)
if self.audio_input_state.gain_settings != gain_settings_operand:
@@ -451,54 +451,35 @@ class AICSServiceProxy(ProfileServiceProxy):
def __init__(self, service_proxy: ServiceProxy) -> None:
self.service_proxy = service_proxy
if not (
characteristics := service_proxy.get_characteristics_by_uuid(
GATT_AUDIO_INPUT_STATE_CHARACTERISTIC
)
):
raise gatt.InvalidServiceError("Audio Input State Characteristic not found")
self.audio_input_state = SerializableCharacteristicAdapter(
characteristics[0], AudioInputState
service_proxy.get_required_characteristic_by_uuid(
GATT_AUDIO_INPUT_STATE_CHARACTERISTIC
),
AudioInputState,
)
if not (
characteristics := service_proxy.get_characteristics_by_uuid(
GATT_GAIN_SETTINGS_ATTRIBUTE_CHARACTERISTIC
)
):
raise gatt.InvalidServiceError(
"Gain Settings Attribute Characteristic not found"
)
self.gain_settings_properties = SerializableCharacteristicAdapter(
characteristics[0], GainSettingsProperties
service_proxy.get_required_characteristic_by_uuid(
GATT_GAIN_SETTINGS_ATTRIBUTE_CHARACTERISTIC
),
GainSettingsProperties,
)
if not (
characteristics := service_proxy.get_characteristics_by_uuid(
self.audio_input_status = PackedCharacteristicAdapter(
service_proxy.get_required_characteristic_by_uuid(
GATT_AUDIO_INPUT_STATUS_CHARACTERISTIC
)
):
raise gatt.InvalidServiceError(
"Audio Input Status Characteristic not found"
)
self.audio_input_status = PackedCharacteristicAdapter(characteristics[0], 'B')
),
'B',
)
if not (
characteristics := service_proxy.get_characteristics_by_uuid(
self.audio_input_control_point = (
service_proxy.get_required_characteristic_by_uuid(
GATT_AUDIO_INPUT_CONTROL_POINT_CHARACTERISTIC
)
):
raise gatt.InvalidServiceError(
"Audio Input Control Point Characteristic not found"
)
self.audio_input_control_point = characteristics[0]
)
if not (
characteristics := service_proxy.get_characteristics_by_uuid(
self.audio_input_description = UTF8CharacteristicAdapter(
service_proxy.get_required_characteristic_by_uuid(
GATT_AUDIO_INPUT_DESCRIPTION_CHARACTERISTIC
)
):
raise gatt.InvalidServiceError(
"Audio Input Description Characteristic not found"
)
self.audio_input_description = UTF8CharacteristicAdapter(characteristics[0])
)

View File

@@ -17,6 +17,7 @@
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import enum
import logging
import struct
@@ -258,8 +259,8 @@ class AseReasonCode(enum.IntEnum):
# -----------------------------------------------------------------------------
class AudioRole(enum.IntEnum):
SINK = hci.HCI_LE_Setup_ISO_Data_Path_Command.Direction.CONTROLLER_TO_HOST
SOURCE = hci.HCI_LE_Setup_ISO_Data_Path_Command.Direction.HOST_TO_CONTROLLER
SINK = device.CisLink.Direction.CONTROLLER_TO_HOST
SOURCE = device.CisLink.Direction.HOST_TO_CONTROLLER
# -----------------------------------------------------------------------------
@@ -354,16 +355,7 @@ class AseStateMachine(gatt.Characteristic):
cis_link.on('disconnection', self.on_cis_disconnection)
async def post_cis_established():
await self.service.device.send_command(
hci.HCI_LE_Setup_ISO_Data_Path_Command(
connection_handle=cis_link.handle,
data_path_direction=self.role,
data_path_id=0x00, # Fixed HCI
codec_id=hci.CodingFormat(hci.CodecID.TRANSPARENT),
controller_delay=0,
codec_configuration=b'',
)
)
await cis_link.setup_data_path(direction=self.role)
if self.role == AudioRole.SINK:
self.state = self.State.STREAMING
await self.service.device.notify_subscribers(self, self.value)
@@ -511,12 +503,8 @@ class AseStateMachine(gatt.Characteristic):
self.state = self.State.RELEASING
async def remove_cis_async():
await self.service.device.send_command(
hci.HCI_LE_Remove_ISO_Data_Path_Command(
connection_handle=self.cis_link.handle,
data_path_direction=self.role,
)
)
if self.cis_link:
await self.cis_link.remove_data_path(self.role)
self.state = self.State.IDLE
await self.service.device.notify_subscribers(self, self.value)

View File

@@ -288,8 +288,8 @@ class AshaServiceProxy(gatt_client.ProfileServiceProxy):
'psm_characteristic',
),
):
if not (
characteristics := self.service_proxy.get_characteristics_by_uuid(uuid)
):
raise gatt.InvalidServiceError(f"Missing {uuid} Characteristic")
setattr(self, attribute_name, characteristics[0])
setattr(
self,
attribute_name,
self.service_proxy.get_required_characteristic_by_uuid(uuid),
)

View File

@@ -265,7 +265,7 @@ class UnicastServerAdvertisingData:
core.AdvertisingData.SERVICE_DATA_16_BIT_UUID,
struct.pack(
'<2sBIB',
gatt.GATT_AUDIO_STREAM_CONTROL_SERVICE.to_bytes(),
bytes(gatt.GATT_AUDIO_STREAM_CONTROL_SERVICE),
self.announcement_type,
self.available_audio_contexts,
len(self.metadata),
@@ -398,18 +398,21 @@ class CodecSpecificConfiguration:
OCTETS_PER_FRAME = 0x04
CODEC_FRAMES_PER_SDU = 0x05
sampling_frequency: SamplingFrequency
frame_duration: FrameDuration
audio_channel_allocation: AudioLocation
octets_per_codec_frame: int
codec_frames_per_sdu: int
sampling_frequency: SamplingFrequency | None = None
frame_duration: FrameDuration | None = None
audio_channel_allocation: AudioLocation | None = None
octets_per_codec_frame: int | None = None
codec_frames_per_sdu: int | None = None
@classmethod
def from_bytes(cls, data: bytes) -> CodecSpecificConfiguration:
offset = 0
# Allowed default values.
audio_channel_allocation = AudioLocation.NOT_ALLOWED
codec_frames_per_sdu = 1
sampling_frequency: SamplingFrequency | None = None
frame_duration: FrameDuration | None = None
audio_channel_allocation: AudioLocation | None = None
octets_per_codec_frame: int | None = None
codec_frames_per_sdu: int | None = None
while offset < len(data):
length, type = struct.unpack_from('BB', data, offset)
offset += 2
@@ -427,8 +430,6 @@ class CodecSpecificConfiguration:
elif type == CodecSpecificConfiguration.Type.CODEC_FRAMES_PER_SDU:
codec_frames_per_sdu = value
# It is expected here that if some fields are missing, an error should be raised.
# pylint: disable=possibly-used-before-assignment,used-before-assignment
return CodecSpecificConfiguration(
sampling_frequency=sampling_frequency,
frame_duration=frame_duration,
@@ -438,23 +439,43 @@ class CodecSpecificConfiguration:
)
def __bytes__(self) -> bytes:
return struct.pack(
'<BBBBBBBBIBBHBBB',
2,
CodecSpecificConfiguration.Type.SAMPLING_FREQUENCY,
self.sampling_frequency,
2,
CodecSpecificConfiguration.Type.FRAME_DURATION,
self.frame_duration,
5,
CodecSpecificConfiguration.Type.AUDIO_CHANNEL_ALLOCATION,
self.audio_channel_allocation,
3,
CodecSpecificConfiguration.Type.OCTETS_PER_FRAME,
self.octets_per_codec_frame,
2,
CodecSpecificConfiguration.Type.CODEC_FRAMES_PER_SDU,
self.codec_frames_per_sdu,
return b''.join(
[
struct.pack(fmt, length, tag, value)
for fmt, length, tag, value in [
(
'<BBB',
2,
CodecSpecificConfiguration.Type.SAMPLING_FREQUENCY,
self.sampling_frequency,
),
(
'<BBB',
2,
CodecSpecificConfiguration.Type.FRAME_DURATION,
self.frame_duration,
),
(
'<BBI',
5,
CodecSpecificConfiguration.Type.AUDIO_CHANNEL_ALLOCATION,
self.audio_channel_allocation,
),
(
'<BBH',
3,
CodecSpecificConfiguration.Type.OCTETS_PER_FRAME,
self.octets_per_codec_frame,
),
(
'<BBB',
2,
CodecSpecificConfiguration.Type.CODEC_FRAMES_PER_SDU,
self.codec_frames_per_sdu,
),
]
if value is not None
]
)
@@ -466,6 +487,24 @@ class BroadcastAudioAnnouncement:
def from_bytes(cls, data: bytes) -> Self:
return cls(int.from_bytes(data[:3], 'little'))
def __bytes__(self) -> bytes:
return self.broadcast_id.to_bytes(3, 'little')
def get_advertising_data(self) -> bytes:
return bytes(
core.AdvertisingData(
[
(
core.AdvertisingData.SERVICE_DATA_16_BIT_UUID,
(
bytes(gatt.GATT_BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE)
+ bytes(self)
),
)
]
)
)
@dataclasses.dataclass
class BasicAudioAnnouncement:
@@ -474,26 +513,37 @@ class BasicAudioAnnouncement:
index: int
codec_specific_configuration: CodecSpecificConfiguration
@dataclasses.dataclass
class CodecInfo:
coding_format: hci.CodecID
company_id: int
vendor_specific_codec_id: int
@classmethod
def from_bytes(cls, data: bytes) -> Self:
coding_format = hci.CodecID(data[0])
company_id = int.from_bytes(data[1:3], 'little')
vendor_specific_codec_id = int.from_bytes(data[3:5], 'little')
return cls(coding_format, company_id, vendor_specific_codec_id)
def __bytes__(self) -> bytes:
codec_specific_configuration_bytes = bytes(
self.codec_specific_configuration
)
return (
bytes([self.index, len(codec_specific_configuration_bytes)])
+ codec_specific_configuration_bytes
)
@dataclasses.dataclass
class Subgroup:
codec_id: BasicAudioAnnouncement.CodecInfo
codec_id: hci.CodingFormat
codec_specific_configuration: CodecSpecificConfiguration
metadata: le_audio.Metadata
bis: List[BasicAudioAnnouncement.BIS]
def __bytes__(self) -> bytes:
metadata_bytes = bytes(self.metadata)
codec_specific_configuration_bytes = bytes(
self.codec_specific_configuration
)
return (
bytes([len(self.bis)])
+ bytes(self.codec_id)
+ bytes([len(codec_specific_configuration_bytes)])
+ codec_specific_configuration_bytes
+ bytes([len(metadata_bytes)])
+ metadata_bytes
+ b''.join(map(bytes, self.bis))
)
presentation_delay: int
subgroups: List[BasicAudioAnnouncement.Subgroup]
@@ -505,7 +555,7 @@ class BasicAudioAnnouncement:
for _ in range(data[3]):
num_bis = data[offset]
offset += 1
codec_id = cls.CodecInfo.from_bytes(data[offset : offset + 5])
codec_id = hci.CodingFormat.from_bytes(data[offset : offset + 5])
offset += 5
codec_specific_configuration_length = data[offset]
offset += 1
@@ -549,3 +599,25 @@ class BasicAudioAnnouncement:
)
return cls(presentation_delay, subgroups)
def __bytes__(self) -> bytes:
return (
self.presentation_delay.to_bytes(3, 'little')
+ bytes([len(self.subgroups)])
+ b''.join(map(bytes, self.subgroups))
)
def get_advertising_data(self) -> bytes:
return bytes(
core.AdvertisingData(
[
(
core.AdvertisingData.SERVICE_DATA_16_BIT_UUID,
(
bytes(gatt.GATT_BASIC_AUDIO_ANNOUNCEMENT_SERVICE)
+ bytes(self)
),
)
]
)
)

View File

@@ -354,34 +354,25 @@ class BroadcastAudioScanServiceProxy(gatt_client.ProfileServiceProxy):
SERVICE_CLASS = BroadcastAudioScanService
broadcast_audio_scan_control_point: gatt_client.CharacteristicProxy
broadcast_receive_states: List[gatt.SerializableCharacteristicAdapter]
broadcast_receive_states: List[gatt.DelegatedCharacteristicAdapter]
def __init__(self, service_proxy: gatt_client.ServiceProxy):
self.service_proxy = service_proxy
if not (
characteristics := service_proxy.get_characteristics_by_uuid(
self.broadcast_audio_scan_control_point = (
service_proxy.get_required_characteristic_by_uuid(
gatt.GATT_BROADCAST_AUDIO_SCAN_CONTROL_POINT_CHARACTERISTIC
)
):
raise gatt.InvalidServiceError(
"Broadcast Audio Scan Control Point characteristic not found"
)
self.broadcast_audio_scan_control_point = characteristics[0]
)
if not (
characteristics := service_proxy.get_characteristics_by_uuid(
self.broadcast_receive_states = [
gatt.DelegatedCharacteristicAdapter(
characteristic,
decode=lambda x: BroadcastReceiveState.from_bytes(x) if x else None,
)
for characteristic in service_proxy.get_characteristics_by_uuid(
gatt.GATT_BROADCAST_RECEIVE_STATE_CHARACTERISTIC
)
):
raise gatt.InvalidServiceError(
"Broadcast Receive State characteristic not found"
)
self.broadcast_receive_states = [
gatt.SerializableCharacteristicAdapter(
characteristic, BroadcastReceiveState
)
for characteristic in characteristics
]
async def send_control_point_operation(

193
bumble/profiles/gmap.py Normal file
View File

@@ -0,0 +1,193 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""LE Audio - Gaming Audio Profile"""
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import struct
from typing import Optional
from bumble.gatt import (
TemplateService,
DelegatedCharacteristicAdapter,
Characteristic,
GATT_GAMING_AUDIO_SERVICE,
GATT_GMAP_ROLE_CHARACTERISTIC,
GATT_UGG_FEATURES_CHARACTERISTIC,
GATT_UGT_FEATURES_CHARACTERISTIC,
GATT_BGS_FEATURES_CHARACTERISTIC,
GATT_BGR_FEATURES_CHARACTERISTIC,
)
from bumble.gatt_client import ProfileServiceProxy, ServiceProxy
from enum import IntFlag
# -----------------------------------------------------------------------------
# Classes
# -----------------------------------------------------------------------------
class GmapRole(IntFlag):
UNICAST_GAME_GATEWAY = 1 << 0
UNICAST_GAME_TERMINAL = 1 << 1
BROADCAST_GAME_SENDER = 1 << 2
BROADCAST_GAME_RECEIVER = 1 << 3
class UggFeatures(IntFlag):
UGG_MULTIPLEX = 1 << 0
UGG_96_KBPS_SOURCE = 1 << 1
UGG_MULTISINK = 1 << 2
class UgtFeatures(IntFlag):
UGT_SOURCE = 1 << 0
UGT_80_KBPS_SOURCE = 1 << 1
UGT_SINK = 1 << 2
UGT_64_KBPS_SINK = 1 << 3
UGT_MULTIPLEX = 1 << 4
UGT_MULTISINK = 1 << 5
UGT_MULTISOURCE = 1 << 6
class BgsFeatures(IntFlag):
BGS_96_KBPS = 1 << 0
class BgrFeatures(IntFlag):
BGR_MULTISINK = 1 << 0
BGR_MULTIPLEX = 1 << 1
# -----------------------------------------------------------------------------
# Server
# -----------------------------------------------------------------------------
class GamingAudioService(TemplateService):
UUID = GATT_GAMING_AUDIO_SERVICE
gmap_role: Characteristic
ugg_features: Optional[Characteristic] = None
ugt_features: Optional[Characteristic] = None
bgs_features: Optional[Characteristic] = None
bgr_features: Optional[Characteristic] = None
def __init__(
self,
gmap_role: GmapRole,
ugg_features: Optional[UggFeatures] = None,
ugt_features: Optional[UgtFeatures] = None,
bgs_features: Optional[BgsFeatures] = None,
bgr_features: Optional[BgrFeatures] = None,
) -> None:
characteristics = []
ugg_features = UggFeatures(0) if ugg_features is None else ugg_features
ugt_features = UgtFeatures(0) if ugt_features is None else ugt_features
bgs_features = BgsFeatures(0) if bgs_features is None else bgs_features
bgr_features = BgrFeatures(0) if bgr_features is None else bgr_features
self.gmap_role = Characteristic(
uuid=GATT_GMAP_ROLE_CHARACTERISTIC,
properties=Characteristic.Properties.READ,
permissions=Characteristic.Permissions.READABLE,
value=struct.pack('B', gmap_role),
)
characteristics.append(self.gmap_role)
if gmap_role & GmapRole.UNICAST_GAME_GATEWAY:
self.ugg_features = Characteristic(
uuid=GATT_UGG_FEATURES_CHARACTERISTIC,
properties=Characteristic.Properties.READ,
permissions=Characteristic.Permissions.READABLE,
value=struct.pack('B', ugg_features),
)
characteristics.append(self.ugg_features)
if gmap_role & GmapRole.UNICAST_GAME_TERMINAL:
self.ugt_features = Characteristic(
uuid=GATT_UGT_FEATURES_CHARACTERISTIC,
properties=Characteristic.Properties.READ,
permissions=Characteristic.Permissions.READABLE,
value=struct.pack('B', ugt_features),
)
characteristics.append(self.ugt_features)
if gmap_role & GmapRole.BROADCAST_GAME_SENDER:
self.bgs_features = Characteristic(
uuid=GATT_BGS_FEATURES_CHARACTERISTIC,
properties=Characteristic.Properties.READ,
permissions=Characteristic.Permissions.READABLE,
value=struct.pack('B', bgs_features),
)
characteristics.append(self.bgs_features)
if gmap_role & GmapRole.BROADCAST_GAME_RECEIVER:
self.bgr_features = Characteristic(
uuid=GATT_BGR_FEATURES_CHARACTERISTIC,
properties=Characteristic.Properties.READ,
permissions=Characteristic.Permissions.READABLE,
value=struct.pack('B', bgr_features),
)
characteristics.append(self.bgr_features)
super().__init__(characteristics)
# -----------------------------------------------------------------------------
# Client
# -----------------------------------------------------------------------------
class GamingAudioServiceProxy(ProfileServiceProxy):
SERVICE_CLASS = GamingAudioService
def __init__(self, service_proxy: ServiceProxy) -> None:
self.service_proxy = service_proxy
self.gmap_role = DelegatedCharacteristicAdapter(
service_proxy.get_required_characteristic_by_uuid(
GATT_GMAP_ROLE_CHARACTERISTIC
),
decode=lambda value: GmapRole(value[0]),
)
if characteristics := service_proxy.get_characteristics_by_uuid(
GATT_UGG_FEATURES_CHARACTERISTIC
):
self.ugg_features = DelegatedCharacteristicAdapter(
characteristic=characteristics[0],
decode=lambda value: UggFeatures(value[0]),
)
if characteristics := service_proxy.get_characteristics_by_uuid(
GATT_UGT_FEATURES_CHARACTERISTIC
):
self.ugt_features = DelegatedCharacteristicAdapter(
characteristic=characteristics[0],
decode=lambda value: UgtFeatures(value[0]),
)
if characteristics := service_proxy.get_characteristics_by_uuid(
GATT_BGS_FEATURES_CHARACTERISTIC
):
self.bgs_features = DelegatedCharacteristicAdapter(
characteristic=characteristics[0],
decode=lambda value: BgsFeatures(value[0]),
)
if characteristics := service_proxy.get_characteristics_by_uuid(
GATT_BGR_FEATURES_CHARACTERISTIC
):
self.bgr_features = DelegatedCharacteristicAdapter(
characteristic=characteristics[0],
decode=lambda value: BgrFeatures(value[0]),
)

View File

@@ -17,23 +17,35 @@
# -----------------------------------------------------------------------------
from __future__ import annotations
import dataclasses
import enum
import struct
from typing import List, Type
from typing import Any, List, Type
from typing_extensions import Self
from bumble.profiles import bap
from bumble import utils
# -----------------------------------------------------------------------------
# Classes
# -----------------------------------------------------------------------------
class AudioActiveState(utils.OpenIntEnum):
NO_AUDIO_DATA_TRANSMITTED = 0x00
AUDIO_DATA_TRANSMITTED = 0x01
class AssistedListeningStream(utils.OpenIntEnum):
UNSPECIFIED_AUDIO_ENHANCEMENT = 0x00
@dataclasses.dataclass
class Metadata:
'''Bluetooth Assigned Numbers, Section 6.12.6 - Metadata LTV structures.
As Metadata fields may extend, and Spec doesn't forbid duplication, we don't parse
Metadata into a key-value style dataclass here. Rather, we encourage users to parse
again outside the lib.
As Metadata fields may extend, and the spec may not guarantee the uniqueness of
tags, we don't automatically parse the Metadata data into specific classes.
Users of this class may decode the data by themselves, or use the Entry.decode
method.
'''
class Tag(utils.OpenIntEnum):
@@ -57,6 +69,44 @@ class Metadata:
tag: Metadata.Tag
data: bytes
def decode(self) -> Any:
"""
Decode the data into an object, if possible.
If no specific object class exists to represent the data, the raw data
bytes are returned.
"""
if self.tag in (
Metadata.Tag.PREFERRED_AUDIO_CONTEXTS,
Metadata.Tag.STREAMING_AUDIO_CONTEXTS,
):
return bap.ContextType(struct.unpack("<H", self.data)[0])
if self.tag in (
Metadata.Tag.PROGRAM_INFO,
Metadata.Tag.PROGRAM_INFO_URI,
Metadata.Tag.BROADCAST_NAME,
):
return self.data.decode("utf-8")
if self.tag == Metadata.Tag.LANGUAGE:
return self.data.decode("ascii")
if self.tag == Metadata.Tag.CCID_LIST:
return list(self.data)
if self.tag == Metadata.Tag.PARENTAL_RATING:
return self.data[0]
if self.tag == Metadata.Tag.AUDIO_ACTIVE_STATE:
return AudioActiveState(self.data[0])
if self.tag == Metadata.Tag.ASSISTED_LISTENING_STREAM:
return AssistedListeningStream(self.data[0])
return self.data
@classmethod
def from_bytes(cls: Type[Self], data: bytes) -> Self:
return cls(tag=Metadata.Tag(data[0]), data=data[1:])
@@ -66,6 +116,29 @@ class Metadata:
entries: List[Entry] = dataclasses.field(default_factory=list)
def pretty_print(self, indent: str) -> str:
"""Convenience method to generate a string with one key-value pair per line."""
max_key_length = 0
keys = []
values = []
for entry in self.entries:
key = entry.tag.name
max_key_length = max(max_key_length, len(key))
keys.append(key)
decoded = entry.decode()
if isinstance(decoded, enum.Enum):
values.append(decoded.name)
elif isinstance(decoded, bytes):
values.append(decoded.hex())
else:
values.append(str(decoded))
return '\n'.join(
f'{indent}{key}: {" " * (max_key_length-len(key))}{value}'
for key, value in zip(keys, values)
)
@classmethod
def from_bytes(cls: Type[Self], data: bytes) -> Self:
entries = []
@@ -81,3 +154,13 @@ class Metadata:
def __bytes__(self) -> bytes:
return b''.join([bytes(entry) for entry in self.entries])
def __str__(self) -> str:
entries_str = []
for entry in self.entries:
decoded = entry.decode()
entries_str.append(
f'{entry.tag.name}: '
f'{decoded.hex() if isinstance(decoded, bytes) else decoded!r}'
)
return f'Metadata(entries={", ".join(entry_str for entry_str in entries_str)})'

View File

@@ -72,6 +72,19 @@ class PacRecord:
metadata=metadata,
)
@classmethod
def list_from_bytes(cls, data: bytes) -> list[PacRecord]:
"""Parse a serialized list of records preceded by a one byte list length."""
record_count = data[0]
records = []
offset = 1
for _ in range(record_count):
record = PacRecord.from_bytes(data[offset:])
offset += len(bytes(record))
records.append(record)
return records
def __bytes__(self) -> bytes:
capabilities_bytes = bytes(self.codec_specific_capabilities)
metadata_bytes = bytes(self.metadata)
@@ -172,39 +185,58 @@ class PublishedAudioCapabilitiesService(gatt.TemplateService):
class PublishedAudioCapabilitiesServiceProxy(gatt_client.ProfileServiceProxy):
SERVICE_CLASS = PublishedAudioCapabilitiesService
sink_pac: Optional[gatt_client.CharacteristicProxy] = None
sink_audio_locations: Optional[gatt_client.CharacteristicProxy] = None
source_pac: Optional[gatt_client.CharacteristicProxy] = None
source_audio_locations: Optional[gatt_client.CharacteristicProxy] = None
available_audio_contexts: gatt_client.CharacteristicProxy
supported_audio_contexts: gatt_client.CharacteristicProxy
sink_pac: Optional[gatt.DelegatedCharacteristicAdapter] = None
sink_audio_locations: Optional[gatt.DelegatedCharacteristicAdapter] = None
source_pac: Optional[gatt.DelegatedCharacteristicAdapter] = None
source_audio_locations: Optional[gatt.DelegatedCharacteristicAdapter] = None
available_audio_contexts: gatt.DelegatedCharacteristicAdapter
supported_audio_contexts: gatt.DelegatedCharacteristicAdapter
def __init__(self, service_proxy: gatt_client.ServiceProxy):
self.service_proxy = service_proxy
self.available_audio_contexts = service_proxy.get_characteristics_by_uuid(
gatt.GATT_AVAILABLE_AUDIO_CONTEXTS_CHARACTERISTIC
)[0]
self.supported_audio_contexts = service_proxy.get_characteristics_by_uuid(
gatt.GATT_SUPPORTED_AUDIO_CONTEXTS_CHARACTERISTIC
)[0]
self.available_audio_contexts = gatt.DelegatedCharacteristicAdapter(
service_proxy.get_required_characteristic_by_uuid(
gatt.GATT_AVAILABLE_AUDIO_CONTEXTS_CHARACTERISTIC
),
decode=lambda x: tuple(map(ContextType, struct.unpack('<HH', x))),
)
self.supported_audio_contexts = gatt.DelegatedCharacteristicAdapter(
service_proxy.get_required_characteristic_by_uuid(
gatt.GATT_SUPPORTED_AUDIO_CONTEXTS_CHARACTERISTIC
),
decode=lambda x: tuple(map(ContextType, struct.unpack('<HH', x))),
)
if characteristics := service_proxy.get_characteristics_by_uuid(
gatt.GATT_SINK_PAC_CHARACTERISTIC
):
self.sink_pac = characteristics[0]
self.sink_pac = gatt.DelegatedCharacteristicAdapter(
characteristics[0],
decode=PacRecord.list_from_bytes,
)
if characteristics := service_proxy.get_characteristics_by_uuid(
gatt.GATT_SOURCE_PAC_CHARACTERISTIC
):
self.source_pac = characteristics[0]
self.source_pac = gatt.DelegatedCharacteristicAdapter(
characteristics[0],
decode=PacRecord.list_from_bytes,
)
if characteristics := service_proxy.get_characteristics_by_uuid(
gatt.GATT_SINK_AUDIO_LOCATION_CHARACTERISTIC
):
self.sink_audio_locations = characteristics[0]
self.sink_audio_locations = gatt.DelegatedCharacteristicAdapter(
characteristics[0],
decode=lambda x: AudioLocation(struct.unpack('<I', x)[0]),
)
if characteristics := service_proxy.get_characteristics_by_uuid(
gatt.GATT_SOURCE_AUDIO_LOCATION_CHARACTERISTIC
):
self.source_audio_locations = characteristics[0]
self.source_audio_locations = gatt.DelegatedCharacteristicAdapter(
characteristics[0],
decode=lambda x: AudioLocation(struct.unpack('<I', x)[0]),
)

View File

@@ -25,7 +25,6 @@ from bumble.gatt import (
TemplateService,
Characteristic,
DelegatedCharacteristicAdapter,
InvalidServiceError,
GATT_TELEPHONY_AND_MEDIA_AUDIO_SERVICE,
GATT_TMAP_ROLE_CHARACTERISTIC,
)
@@ -74,15 +73,10 @@ class TelephonyAndMediaAudioServiceProxy(ProfileServiceProxy):
def __init__(self, service_proxy: ServiceProxy):
self.service_proxy = service_proxy
if not (
characteristics := service_proxy.get_characteristics_by_uuid(
GATT_TMAP_ROLE_CHARACTERISTIC
)
):
raise InvalidServiceError('TMAP Role characteristic not found')
self.role = DelegatedCharacteristicAdapter(
characteristics[0],
service_proxy.get_required_characteristic_by_uuid(
GATT_TMAP_ROLE_CHARACTERISTIC
),
decode=lambda value: Role(
struct.unpack_from('<H', value, 0)[0],
),

View File

@@ -1,4 +1,4 @@
# Copyright 2021-2024 Google LLC
# Copyright 2021-2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -17,14 +17,16 @@
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import dataclasses
import enum
from typing import Optional, Sequence
from bumble import att
from bumble import device
from bumble import gatt
from bumble import gatt_client
from typing import Optional, Sequence
# -----------------------------------------------------------------------------
# Constants
@@ -67,6 +69,20 @@ class VolumeControlPointOpcode(enum.IntEnum):
MUTE = 0x06
@dataclasses.dataclass
class VolumeState:
volume_setting: int
mute: int
change_counter: int
@classmethod
def from_bytes(cls, data: bytes) -> VolumeState:
return cls(data[0], data[1], data[2])
def __bytes__(self) -> bytes:
return bytes([self.volume_setting, self.mute, self.change_counter])
# -----------------------------------------------------------------------------
# Server
# -----------------------------------------------------------------------------
@@ -126,16 +142,8 @@ class VolumeControlService(gatt.TemplateService):
included_services=list(included_services),
)
@property
def volume_state_bytes(self) -> bytes:
return bytes([self.volume_setting, self.muted, self.change_counter])
@volume_state_bytes.setter
def volume_state_bytes(self, new_value: bytes) -> None:
self.volume_setting, self.muted, self.change_counter = new_value
def _on_read_volume_state(self, _connection: Optional[device.Connection]) -> bytes:
return self.volume_state_bytes
return bytes(VolumeState(self.volume_setting, self.muted, self.change_counter))
def _on_write_volume_control_point(
self, connection: Optional[device.Connection], value: bytes
@@ -153,14 +161,9 @@ class VolumeControlService(gatt.TemplateService):
self.change_counter = (self.change_counter + 1) % 256
connection.abort_on(
'disconnection',
connection.device.notify_subscribers(
attribute=self.volume_state,
value=self.volume_state_bytes,
),
)
self.emit(
'volume_state', self.volume_setting, self.muted, self.change_counter
connection.device.notify_subscribers(attribute=self.volume_state),
)
self.emit('volume_state_change')
def _on_relative_volume_down(self) -> bool:
old_volume = self.volume_setting
@@ -207,24 +210,26 @@ class VolumeControlServiceProxy(gatt_client.ProfileServiceProxy):
SERVICE_CLASS = VolumeControlService
volume_control_point: gatt_client.CharacteristicProxy
volume_state: gatt.SerializableCharacteristicAdapter
volume_flags: gatt.DelegatedCharacteristicAdapter
def __init__(self, service_proxy: gatt_client.ServiceProxy) -> None:
self.service_proxy = service_proxy
self.volume_state = gatt.PackedCharacteristicAdapter(
service_proxy.get_characteristics_by_uuid(
self.volume_state = gatt.SerializableCharacteristicAdapter(
service_proxy.get_required_characteristic_by_uuid(
gatt.GATT_VOLUME_STATE_CHARACTERISTIC
)[0],
'BBB',
),
VolumeState,
)
self.volume_control_point = service_proxy.get_characteristics_by_uuid(
self.volume_control_point = service_proxy.get_required_characteristic_by_uuid(
gatt.GATT_VOLUME_CONTROL_POINT_CHARACTERISTIC
)[0]
self.volume_flags = gatt.PackedCharacteristicAdapter(
service_proxy.get_characteristics_by_uuid(
gatt.GATT_VOLUME_FLAGS_CHARACTERISTIC
)[0],
'B',
)
self.volume_flags = gatt.DelegatedCharacteristicAdapter(
service_proxy.get_required_characteristic_by_uuid(
gatt.GATT_VOLUME_FLAGS_CHARACTERISTIC
),
decode=lambda data: VolumeFlags(data[0]),
)

299
bumble/profiles/vocs.py Normal file
View File

@@ -0,0 +1,299 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import struct
from dataclasses import dataclass
from typing import Optional
from bumble.device import Connection
from bumble.att import ATT_Error
from bumble.gatt import (
Characteristic,
DelegatedCharacteristicAdapter,
TemplateService,
CharacteristicValue,
SerializableCharacteristicAdapter,
UTF8CharacteristicAdapter,
GATT_VOLUME_OFFSET_CONTROL_SERVICE,
GATT_VOLUME_OFFSET_STATE_CHARACTERISTIC,
GATT_AUDIO_LOCATION_CHARACTERISTIC,
GATT_VOLUME_OFFSET_CONTROL_POINT_CHARACTERISTIC,
GATT_AUDIO_OUTPUT_DESCRIPTION_CHARACTERISTIC,
)
from bumble.gatt_client import ProfileServiceProxy, ServiceProxy
from bumble.utils import OpenIntEnum
from bumble.profiles.bap import AudioLocation
# -----------------------------------------------------------------------------
# Constants
# -----------------------------------------------------------------------------
MIN_VOLUME_OFFSET = -255
MAX_VOLUME_OFFSET = 255
CHANGE_COUNTER_MAX_VALUE = 0xFF
class SetVolumeOffsetOpCode(OpenIntEnum):
SET_VOLUME_OFFSET = 0x01
class ErrorCode(OpenIntEnum):
"""
See Volume Offset Control Service 1.6. Application error codes.
"""
INVALID_CHANGE_COUNTER = 0x80
OPCODE_NOT_SUPPORTED = 0x81
VALUE_OUT_OF_RANGE = 0x82
# -----------------------------------------------------------------------------
@dataclass
class VolumeOffsetState:
volume_offset: int = 0
change_counter: int = 0
attribute_value: Optional[CharacteristicValue] = None
def __bytes__(self) -> bytes:
return struct.pack('<hB', self.volume_offset, self.change_counter)
@classmethod
def from_bytes(cls, data: bytes):
volume_offset, change_counter = struct.unpack('<hB', data)
return cls(volume_offset, change_counter)
def increment_change_counter(self) -> None:
self.change_counter = (self.change_counter + 1) % (CHANGE_COUNTER_MAX_VALUE + 1)
async def notify_subscribers_via_connection(self, connection: Connection) -> None:
assert self.attribute_value is not None
await connection.device.notify_subscribers(attribute=self.attribute_value)
def on_read(self, _connection: Optional[Connection]) -> bytes:
return bytes(self)
@dataclass
class VocsAudioLocation:
audio_location: AudioLocation = AudioLocation.NOT_ALLOWED
attribute_value: Optional[CharacteristicValue] = None
def __bytes__(self) -> bytes:
return struct.pack('<I', self.audio_location)
@classmethod
def from_bytes(cls, data: bytes):
audio_location = AudioLocation(struct.unpack('<I', data)[0])
return cls(audio_location)
def on_read(self, _connection: Optional[Connection]) -> bytes:
return bytes(self)
async def on_write(self, connection: Optional[Connection], value: bytes) -> None:
assert connection
assert self.attribute_value
self.audio_location = AudioLocation(int.from_bytes(value, 'little'))
await connection.device.notify_subscribers(attribute=self.attribute_value)
@dataclass
class VolumeOffsetControlPoint:
volume_offset_state: VolumeOffsetState
async def on_write(self, connection: Optional[Connection], value: bytes) -> None:
assert connection
opcode = value[0]
if opcode != SetVolumeOffsetOpCode.SET_VOLUME_OFFSET:
raise ATT_Error(ErrorCode.OPCODE_NOT_SUPPORTED)
change_counter, volume_offset = struct.unpack('<Bh', value[1:])
await self._set_volume_offset(connection, change_counter, volume_offset)
async def _set_volume_offset(
self,
connection: Connection,
change_counter_operand: int,
volume_offset_operand: int,
) -> None:
change_counter = self.volume_offset_state.change_counter
if change_counter != change_counter_operand:
raise ATT_Error(ErrorCode.INVALID_CHANGE_COUNTER)
if not MIN_VOLUME_OFFSET <= volume_offset_operand <= MAX_VOLUME_OFFSET:
raise ATT_Error(ErrorCode.VALUE_OUT_OF_RANGE)
self.volume_offset_state.volume_offset = volume_offset_operand
self.volume_offset_state.increment_change_counter()
await self.volume_offset_state.notify_subscribers_via_connection(connection)
@dataclass
class AudioOutputDescription:
audio_output_description: str = ''
attribute_value: Optional[CharacteristicValue] = None
@classmethod
def from_bytes(cls, data: bytes):
return cls(audio_output_description=data.decode('utf-8'))
def __bytes__(self) -> bytes:
return self.audio_output_description.encode('utf-8')
def on_read(self, _connection: Optional[Connection]) -> bytes:
return bytes(self)
async def on_write(self, connection: Optional[Connection], value: bytes) -> None:
assert connection
assert self.attribute_value
self.audio_output_description = value.decode('utf-8')
await connection.device.notify_subscribers(attribute=self.attribute_value)
# -----------------------------------------------------------------------------
class VolumeOffsetControlService(TemplateService):
UUID = GATT_VOLUME_OFFSET_CONTROL_SERVICE
def __init__(
self,
volume_offset_state: Optional[VolumeOffsetState] = None,
audio_location: Optional[VocsAudioLocation] = None,
audio_output_description: Optional[AudioOutputDescription] = None,
) -> None:
self.volume_offset_state = (
VolumeOffsetState() if volume_offset_state is None else volume_offset_state
)
self.audio_location = (
VocsAudioLocation() if audio_location is None else audio_location
)
self.audio_output_description = (
AudioOutputDescription()
if audio_output_description is None
else audio_output_description
)
self.volume_offset_control_point: VolumeOffsetControlPoint = (
VolumeOffsetControlPoint(self.volume_offset_state)
)
self.volume_offset_state_characteristic = Characteristic(
uuid=GATT_VOLUME_OFFSET_STATE_CHARACTERISTIC,
properties=(
Characteristic.Properties.READ | Characteristic.Properties.NOTIFY
),
permissions=Characteristic.Permissions.READ_REQUIRES_ENCRYPTION,
value=CharacteristicValue(read=self.volume_offset_state.on_read),
)
self.audio_location_characteristic = Characteristic(
uuid=GATT_AUDIO_LOCATION_CHARACTERISTIC,
properties=(
Characteristic.Properties.READ
| Characteristic.Properties.NOTIFY
| Characteristic.Properties.WRITE_WITHOUT_RESPONSE
),
permissions=(
Characteristic.Permissions.READ_REQUIRES_ENCRYPTION
| Characteristic.Permissions.WRITE_REQUIRES_ENCRYPTION
),
value=CharacteristicValue(
read=self.audio_location.on_read,
write=self.audio_location.on_write,
),
)
self.audio_location.attribute_value = self.audio_location_characteristic.value
self.volume_offset_control_point_characteristic = Characteristic(
uuid=GATT_VOLUME_OFFSET_CONTROL_POINT_CHARACTERISTIC,
properties=Characteristic.Properties.WRITE,
permissions=Characteristic.Permissions.WRITE_REQUIRES_ENCRYPTION,
value=CharacteristicValue(write=self.volume_offset_control_point.on_write),
)
self.audio_output_description_characteristic = Characteristic(
uuid=GATT_AUDIO_OUTPUT_DESCRIPTION_CHARACTERISTIC,
properties=(
Characteristic.Properties.READ
| Characteristic.Properties.NOTIFY
| Characteristic.Properties.WRITE_WITHOUT_RESPONSE
),
permissions=(
Characteristic.Permissions.READ_REQUIRES_ENCRYPTION
| Characteristic.Permissions.WRITE_REQUIRES_ENCRYPTION
),
value=CharacteristicValue(
read=self.audio_output_description.on_read,
write=self.audio_output_description.on_write,
),
)
self.audio_output_description.attribute_value = (
self.audio_output_description_characteristic.value
)
super().__init__(
characteristics=[
self.volume_offset_state_characteristic, # type: ignore
self.audio_location_characteristic, # type: ignore
self.volume_offset_control_point_characteristic, # type: ignore
self.audio_output_description_characteristic, # type: ignore
],
primary=False,
)
# -----------------------------------------------------------------------------
# Client
# -----------------------------------------------------------------------------
class VolumeOffsetControlServiceProxy(ProfileServiceProxy):
SERVICE_CLASS = VolumeOffsetControlService
def __init__(self, service_proxy: ServiceProxy) -> None:
self.service_proxy = service_proxy
self.volume_offset_state = SerializableCharacteristicAdapter(
service_proxy.get_required_characteristic_by_uuid(
GATT_VOLUME_OFFSET_STATE_CHARACTERISTIC
),
VolumeOffsetState,
)
self.audio_location = DelegatedCharacteristicAdapter(
service_proxy.get_required_characteristic_by_uuid(
GATT_AUDIO_LOCATION_CHARACTERISTIC
),
encode=lambda value: bytes([int(value)]),
decode=lambda data: AudioLocation(data[0]),
)
self.volume_offset_control_point = (
service_proxy.get_required_characteristic_by_uuid(
GATT_VOLUME_OFFSET_CONTROL_POINT_CHARACTERISTIC
)
)
self.audio_output_description = UTF8CharacteristicAdapter(
service_proxy.get_required_characteristic_by_uuid(
GATT_AUDIO_OUTPUT_DESCRIPTION_CHARACTERISTIC
)
)

View File

@@ -16,15 +16,21 @@
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import asyncio
import logging
import struct
from typing import Dict, List, Type, Optional, Tuple, Union, NewType, TYPE_CHECKING
from typing import Iterable, NewType, Optional, Union, Sequence, Type, TYPE_CHECKING
from typing_extensions import Self
from . import core, l2cap
from .colors import color
from .core import InvalidStateError, InvalidArgumentError, InvalidPacketError
from .hci import HCI_Object, name_or_number, key_with_value
from bumble import core, l2cap
from bumble.colors import color
from bumble.core import (
InvalidStateError,
InvalidArgumentError,
InvalidPacketError,
ProtocolError,
)
from bumble.hci import HCI_Object, name_or_number, key_with_value
if TYPE_CHECKING:
from .device import Device, Connection
@@ -242,11 +248,11 @@ class DataElement:
return DataElement(DataElement.BOOLEAN, value)
@staticmethod
def sequence(value: List[DataElement]) -> DataElement:
def sequence(value: Iterable[DataElement]) -> DataElement:
return DataElement(DataElement.SEQUENCE, value)
@staticmethod
def alternative(value: List[DataElement]) -> DataElement:
def alternative(value: Iterable[DataElement]) -> DataElement:
return DataElement(DataElement.ALTERNATIVE, value)
@staticmethod
@@ -344,9 +350,6 @@ class DataElement:
] # Keep a copy so we can re-serialize to an exact replica
return result
def to_bytes(self):
return bytes(self)
def __bytes__(self):
# Return early if we have a cache
if self.bytes:
@@ -476,7 +479,9 @@ class ServiceAttribute:
self.value = value
@staticmethod
def list_from_data_elements(elements: List[DataElement]) -> List[ServiceAttribute]:
def list_from_data_elements(
elements: Sequence[DataElement],
) -> list[ServiceAttribute]:
attribute_list = []
for i in range(0, len(elements) // 2):
attribute_id, attribute_value = elements[2 * i : 2 * (i + 1)]
@@ -489,7 +494,7 @@ class ServiceAttribute:
@staticmethod
def find_attribute_in_list(
attribute_list: List[ServiceAttribute], attribute_id: int
attribute_list: Iterable[ServiceAttribute], attribute_id: int
) -> Optional[DataElement]:
return next(
(
@@ -537,7 +542,12 @@ class SDP_PDU:
See Bluetooth spec @ Vol 3, Part B - 4.2 PROTOCOL DATA UNIT FORMAT
'''
sdp_pdu_classes: Dict[int, Type[SDP_PDU]] = {}
RESPONSE_PDU_IDS = {
SDP_SERVICE_SEARCH_REQUEST: SDP_SERVICE_SEARCH_RESPONSE,
SDP_SERVICE_ATTRIBUTE_REQUEST: SDP_SERVICE_ATTRIBUTE_RESPONSE,
SDP_SERVICE_SEARCH_ATTRIBUTE_REQUEST: SDP_SERVICE_SEARCH_ATTRIBUTE_RESPONSE,
}
sdp_pdu_classes: dict[int, Type[SDP_PDU]] = {}
name = None
pdu_id = 0
@@ -561,7 +571,7 @@ class SDP_PDU:
@staticmethod
def parse_service_record_handle_list_preceded_by_count(
data: bytes, offset: int
) -> Tuple[int, List[int]]:
) -> tuple[int, list[int]]:
count = struct.unpack_from('>H', data, offset - 2)[0]
handle_list = [
struct.unpack_from('>I', data, offset + x * 4)[0] for x in range(count)
@@ -623,11 +633,8 @@ class SDP_PDU:
def init_from_bytes(self, pdu, offset):
return HCI_Object.init_from_bytes(self, pdu, offset, self.fields)
def to_bytes(self):
return self.pdu
def __bytes__(self):
return self.to_bytes()
return self.pdu
def __str__(self):
result = f'{color(self.name, "blue")} [TID={self.transaction_id}]'
@@ -645,6 +652,8 @@ class SDP_ErrorResponse(SDP_PDU):
See Bluetooth spec @ Vol 3, Part B - 4.4.1 SDP_ErrorResponse PDU
'''
error_code: int
# -----------------------------------------------------------------------------
@SDP_PDU.subclass(
@@ -681,7 +690,7 @@ class SDP_ServiceSearchResponse(SDP_PDU):
See Bluetooth spec @ Vol 3, Part B - 4.5.2 SDP_ServiceSearchResponse PDU
'''
service_record_handle_list: List[int]
service_record_handle_list: list[int]
total_service_record_count: int
current_service_record_count: int
continuation_state: bytes
@@ -758,31 +767,99 @@ class SDP_ServiceSearchAttributeResponse(SDP_PDU):
See Bluetooth spec @ Vol 3, Part B - 4.7.2 SDP_ServiceSearchAttributeResponse PDU
'''
attribute_list_byte_count: int
attribute_list: bytes
attribute_lists_byte_count: int
attribute_lists: bytes
continuation_state: bytes
# -----------------------------------------------------------------------------
class Client:
channel: Optional[l2cap.ClassicChannel]
def __init__(self, connection: Connection) -> None:
def __init__(self, connection: Connection, mtu: int = 0) -> None:
self.connection = connection
self.pending_request = None
self.channel = None
self.channel: Optional[l2cap.ClassicChannel] = None
self.mtu = mtu
self.request_semaphore = asyncio.Semaphore(1)
self.pending_request: Optional[SDP_PDU] = None
self.pending_response: Optional[asyncio.futures.Future[SDP_PDU]] = None
self.next_transaction_id = 0
async def connect(self) -> None:
self.channel = await self.connection.create_l2cap_channel(
spec=l2cap.ClassicChannelSpec(SDP_PSM)
spec=(
l2cap.ClassicChannelSpec(SDP_PSM, self.mtu)
if self.mtu
else l2cap.ClassicChannelSpec(SDP_PSM)
)
)
self.channel.sink = self.on_pdu
async def disconnect(self) -> None:
if self.channel:
await self.channel.disconnect()
self.channel = None
async def search_services(self, uuids: List[core.UUID]) -> List[int]:
def make_transaction_id(self) -> int:
transaction_id = self.next_transaction_id
self.next_transaction_id = (self.next_transaction_id + 1) & 0xFFFF
return transaction_id
def on_pdu(self, pdu: bytes) -> None:
if not self.pending_request:
logger.warning('received response with no pending request')
return
assert self.pending_response is not None
response = SDP_PDU.from_bytes(pdu)
# Check that the transaction ID is what we expect
if self.pending_request.transaction_id != response.transaction_id:
logger.warning(
f"received response with transaction ID {response.transaction_id} "
f"but expected {self.pending_request.transaction_id}"
)
return
# Check if the response is an error
if isinstance(response, SDP_ErrorResponse):
self.pending_response.set_exception(
ProtocolError(error_code=response.error_code)
)
return
# Check that the type of the response matches the request
if response.pdu_id != SDP_PDU.RESPONSE_PDU_IDS.get(self.pending_request.pdu_id):
logger.warning("response type mismatch")
return
self.pending_response.set_result(response)
async def send_request(self, request: SDP_PDU) -> SDP_PDU:
assert self.channel is not None
async with self.request_semaphore:
assert self.pending_request is None
assert self.pending_response is None
# Create a future value to hold the eventual response
self.pending_response = asyncio.get_running_loop().create_future()
self.pending_request = request
try:
self.channel.send_pdu(bytes(request))
return await self.pending_response
finally:
self.pending_request = None
self.pending_response = None
async def search_services(self, uuids: Iterable[core.UUID]) -> list[int]:
"""
Search for services by UUID.
Args:
uuids: service the UUIDs to search for.
Returns:
A list of matching service record handles.
"""
if self.pending_request is not None:
raise InvalidStateError('request already pending')
if self.channel is None:
@@ -797,16 +874,16 @@ class Client:
continuation_state = bytes([0])
watchdog = SDP_CONTINUATION_WATCHDOG
while watchdog > 0:
response_pdu = await self.channel.send_request(
response = await self.send_request(
SDP_ServiceSearchRequest(
transaction_id=0, # Transaction ID TODO: pick a real value
transaction_id=self.make_transaction_id(),
service_search_pattern=service_search_pattern,
maximum_service_record_count=0xFFFF,
continuation_state=continuation_state,
)
)
response = SDP_PDU.from_bytes(response_pdu)
logger.debug(f'<<< Response: {response}')
assert isinstance(response, SDP_ServiceSearchResponse)
service_record_handle_list += response.service_record_handle_list
continuation_state = response.continuation_state
if len(continuation_state) == 1 and continuation_state[0] == 0:
@@ -817,8 +894,21 @@ class Client:
return service_record_handle_list
async def search_attributes(
self, uuids: List[core.UUID], attribute_ids: List[Union[int, Tuple[int, int]]]
) -> List[List[ServiceAttribute]]:
self,
uuids: Iterable[core.UUID],
attribute_ids: Iterable[Union[int, tuple[int, int]]],
) -> list[list[ServiceAttribute]]:
"""
Search for attributes by UUID and attribute IDs.
Args:
uuids: the service UUIDs to search for.
attribute_ids: list of attribute IDs or (start, end) attribute ID ranges.
(use (0, 0xFFFF) to include all attributes)
Returns:
A list of list of attributes, one list per matching service.
"""
if self.pending_request is not None:
raise InvalidStateError('request already pending')
if self.channel is None:
@@ -830,8 +920,8 @@ class Client:
attribute_id_list = DataElement.sequence(
[
(
DataElement.unsigned_integer(
attribute_id[0], value_size=attribute_id[1]
DataElement.unsigned_integer_32(
attribute_id[0] << 16 | attribute_id[1]
)
if isinstance(attribute_id, tuple)
else DataElement.unsigned_integer_16(attribute_id)
@@ -845,17 +935,17 @@ class Client:
continuation_state = bytes([0])
watchdog = SDP_CONTINUATION_WATCHDOG
while watchdog > 0:
response_pdu = await self.channel.send_request(
response = await self.send_request(
SDP_ServiceSearchAttributeRequest(
transaction_id=0, # Transaction ID TODO: pick a real value
transaction_id=self.make_transaction_id(),
service_search_pattern=service_search_pattern,
maximum_attribute_byte_count=0xFFFF,
attribute_id_list=attribute_id_list,
continuation_state=continuation_state,
)
)
response = SDP_PDU.from_bytes(response_pdu)
logger.debug(f'<<< Response: {response}')
assert isinstance(response, SDP_ServiceSearchAttributeResponse)
accumulator += response.attribute_lists
continuation_state = response.continuation_state
if len(continuation_state) == 1 and continuation_state[0] == 0:
@@ -878,8 +968,18 @@ class Client:
async def get_attributes(
self,
service_record_handle: int,
attribute_ids: List[Union[int, Tuple[int, int]]],
) -> List[ServiceAttribute]:
attribute_ids: Iterable[Union[int, tuple[int, int]]],
) -> list[ServiceAttribute]:
"""
Get attributes for a service.
Args:
service_record_handle: the handle for a service
attribute_ids: list or attribute IDs or (start, end) attribute ID handles.
Returns:
A list of attributes.
"""
if self.pending_request is not None:
raise InvalidStateError('request already pending')
if self.channel is None:
@@ -888,8 +988,8 @@ class Client:
attribute_id_list = DataElement.sequence(
[
(
DataElement.unsigned_integer(
attribute_id[0], value_size=attribute_id[1]
DataElement.unsigned_integer_32(
attribute_id[0] << 16 | attribute_id[1]
)
if isinstance(attribute_id, tuple)
else DataElement.unsigned_integer_16(attribute_id)
@@ -903,17 +1003,17 @@ class Client:
continuation_state = bytes([0])
watchdog = SDP_CONTINUATION_WATCHDOG
while watchdog > 0:
response_pdu = await self.channel.send_request(
response = await self.send_request(
SDP_ServiceAttributeRequest(
transaction_id=0, # Transaction ID TODO: pick a real value
transaction_id=self.make_transaction_id(),
service_record_handle=service_record_handle,
maximum_attribute_byte_count=0xFFFF,
attribute_id_list=attribute_id_list,
continuation_state=continuation_state,
)
)
response = SDP_PDU.from_bytes(response_pdu)
logger.debug(f'<<< Response: {response}')
assert isinstance(response, SDP_ServiceAttributeResponse)
accumulator += response.attribute_list
continuation_state = response.continuation_state
if len(continuation_state) == 1 and continuation_state[0] == 0:
@@ -939,17 +1039,17 @@ class Client:
# -----------------------------------------------------------------------------
class Server:
CONTINUATION_STATE = bytes([0x01, 0x43])
CONTINUATION_STATE = bytes([0x01, 0x00])
channel: Optional[l2cap.ClassicChannel]
Service = NewType('Service', List[ServiceAttribute])
service_records: Dict[int, Service]
current_response: Union[None, bytes, Tuple[int, List[int]]]
Service = NewType('Service', list[ServiceAttribute])
service_records: dict[int, Service]
current_response: Union[None, bytes, tuple[int, list[int]]]
def __init__(self, device: Device) -> None:
self.device = device
self.service_records = {} # Service records maps, by record handle
self.channel = None
self.current_response = None
self.current_response = None # Current response data, used for continuations
def register(self, l2cap_channel_manager: l2cap.ChannelManager) -> None:
l2cap_channel_manager.create_classic_server(
@@ -960,7 +1060,7 @@ class Server:
logger.debug(f'{color(">>> Sending SDP Response", "blue")}: {response}')
self.channel.send_pdu(response)
def match_services(self, search_pattern: DataElement) -> Dict[int, Service]:
def match_services(self, search_pattern: DataElement) -> dict[int, Service]:
# Find the services for which the attributes in the pattern is a subset of the
# service's attribute values (NOTE: the value search recurses into sequences)
matching_services = {}
@@ -1017,6 +1117,31 @@ class Server:
)
)
def check_continuation(
self,
continuation_state: bytes,
transaction_id: int,
) -> Optional[bool]:
# Check if this is a valid continuation
if len(continuation_state) > 1:
if (
self.current_response is None
or continuation_state != self.CONTINUATION_STATE
):
self.send_response(
SDP_ErrorResponse(
transaction_id=transaction_id,
error_code=SDP_INVALID_CONTINUATION_STATE_ERROR,
)
)
return None
return True
# Cleanup any partial response leftover
self.current_response = None
return False
def get_next_response_payload(self, maximum_size):
if len(self.current_response) > maximum_size:
payload = self.current_response[:maximum_size]
@@ -1031,7 +1156,7 @@ class Server:
@staticmethod
def get_service_attributes(
service: Service, attribute_ids: List[DataElement]
service: Service, attribute_ids: Iterable[DataElement]
) -> DataElement:
attributes = []
for attribute_id in attribute_ids:
@@ -1059,30 +1184,24 @@ class Server:
def on_sdp_service_search_request(self, request: SDP_ServiceSearchRequest) -> None:
# Check if this is a continuation
if len(request.continuation_state) > 1:
if self.current_response is None:
self.send_response(
SDP_ErrorResponse(
transaction_id=request.transaction_id,
error_code=SDP_INVALID_CONTINUATION_STATE_ERROR,
)
)
return
else:
# Cleanup any partial response leftover
self.current_response = None
if (
continuation := self.check_continuation(
request.continuation_state, request.transaction_id
)
) is None:
return
if not continuation:
# Find the matching services
matching_services = self.match_services(request.service_search_pattern)
service_record_handles = list(matching_services.keys())
logger.debug(f'Service Record Handles: {service_record_handles}')
# Only return up to the maximum requested
service_record_handles_subset = service_record_handles[
: request.maximum_service_record_count
]
# Serialize to a byte array, and remember the total count
logger.debug(f'Service Record Handles: {service_record_handles}')
self.current_response = (
len(service_record_handles),
service_record_handles_subset,
@@ -1090,15 +1209,21 @@ class Server:
# Respond, keeping any unsent handles for later
assert isinstance(self.current_response, tuple)
service_record_handles = self.current_response[1][
: request.maximum_service_record_count
assert self.channel is not None
total_service_record_count, service_record_handles = self.current_response
maximum_service_record_count = (self.channel.peer_mtu - 11) // 4
service_record_handles_remaining = service_record_handles[
maximum_service_record_count:
]
service_record_handles = service_record_handles[:maximum_service_record_count]
self.current_response = (
self.current_response[0],
self.current_response[1][request.maximum_service_record_count :],
total_service_record_count,
service_record_handles_remaining,
)
continuation_state = (
Server.CONTINUATION_STATE if self.current_response[1] else bytes([0])
Server.CONTINUATION_STATE
if service_record_handles_remaining
else bytes([0])
)
service_record_handle_list = b''.join(
[struct.pack('>I', handle) for handle in service_record_handles]
@@ -1106,7 +1231,7 @@ class Server:
self.send_response(
SDP_ServiceSearchResponse(
transaction_id=request.transaction_id,
total_service_record_count=self.current_response[0],
total_service_record_count=total_service_record_count,
current_service_record_count=len(service_record_handles),
service_record_handle_list=service_record_handle_list,
continuation_state=continuation_state,
@@ -1117,19 +1242,14 @@ class Server:
self, request: SDP_ServiceAttributeRequest
) -> None:
# Check if this is a continuation
if len(request.continuation_state) > 1:
if self.current_response is None:
self.send_response(
SDP_ErrorResponse(
transaction_id=request.transaction_id,
error_code=SDP_INVALID_CONTINUATION_STATE_ERROR,
)
)
return
else:
# Cleanup any partial response leftover
self.current_response = None
if (
continuation := self.check_continuation(
request.continuation_state, request.transaction_id
)
) is None:
return
if not continuation:
# Check that the service exists
service = self.service_records.get(request.service_record_handle)
if service is None:
@@ -1151,14 +1271,18 @@ class Server:
self.current_response = bytes(attribute_list)
# Respond, keeping any pending chunks for later
assert self.channel is not None
maximum_attribute_byte_count = min(
request.maximum_attribute_byte_count, self.channel.peer_mtu - 9
)
attribute_list_response, continuation_state = self.get_next_response_payload(
request.maximum_attribute_byte_count
maximum_attribute_byte_count
)
self.send_response(
SDP_ServiceAttributeResponse(
transaction_id=request.transaction_id,
attribute_list_byte_count=len(attribute_list_response),
attribute_list=attribute_list,
attribute_list=attribute_list_response,
continuation_state=continuation_state,
)
)
@@ -1167,18 +1291,14 @@ class Server:
self, request: SDP_ServiceSearchAttributeRequest
) -> None:
# Check if this is a continuation
if len(request.continuation_state) > 1:
if self.current_response is None:
self.send_response(
SDP_ErrorResponse(
transaction_id=request.transaction_id,
error_code=SDP_INVALID_CONTINUATION_STATE_ERROR,
)
)
else:
# Cleanup any partial response leftover
self.current_response = None
if (
continuation := self.check_continuation(
request.continuation_state, request.transaction_id
)
) is None:
return
if not continuation:
# Find the matching services
matching_services = self.match_services(
request.service_search_pattern
@@ -1198,14 +1318,18 @@ class Server:
self.current_response = bytes(attribute_lists)
# Respond, keeping any pending chunks for later
assert self.channel is not None
maximum_attribute_byte_count = min(
request.maximum_attribute_byte_count, self.channel.peer_mtu - 9
)
attribute_lists_response, continuation_state = self.get_next_response_payload(
request.maximum_attribute_byte_count
maximum_attribute_byte_count
)
self.send_response(
SDP_ServiceSearchAttributeResponse(
transaction_id=request.transaction_id,
attribute_lists_byte_count=len(attribute_lists_response),
attribute_lists=attribute_lists,
attribute_lists=attribute_lists_response,
continuation_state=continuation_state,
)
)

View File

@@ -298,11 +298,8 @@ class SMP_Command:
def init_from_bytes(self, pdu: bytes, offset: int) -> None:
return HCI_Object.init_from_bytes(self, pdu, offset, self.fields)
def to_bytes(self):
return self.pdu
def __bytes__(self):
return self.to_bytes()
return self.pdu
def __str__(self):
result = color(self.name, 'yellow')
@@ -698,6 +695,7 @@ class Session:
self.ltk_ediv = 0
self.ltk_rand = bytes(8)
self.link_key: Optional[bytes] = None
self.maximum_encryption_key_size: int = 0
self.initiator_key_distribution: int = 0
self.responder_key_distribution: int = 0
self.peer_random_value: Optional[bytes] = None
@@ -744,6 +742,10 @@ class Session:
else:
self.pairing_result = None
self.maximum_encryption_key_size = (
pairing_config.delegate.maximum_encryption_key_size
)
# Key Distribution (default values before negotiation)
self.initiator_key_distribution = (
pairing_config.delegate.local_initiator_key_distribution
@@ -996,7 +998,7 @@ class Session:
io_capability=self.io_capability,
oob_data_flag=self.oob_data_flag,
auth_req=self.auth_req,
maximum_encryption_key_size=16,
maximum_encryption_key_size=self.maximum_encryption_key_size,
initiator_key_distribution=self.initiator_key_distribution,
responder_key_distribution=self.responder_key_distribution,
)
@@ -1008,7 +1010,7 @@ class Session:
io_capability=self.io_capability,
oob_data_flag=self.oob_data_flag,
auth_req=self.auth_req,
maximum_encryption_key_size=16,
maximum_encryption_key_size=self.maximum_encryption_key_size,
initiator_key_distribution=self.initiator_key_distribution,
responder_key_distribution=self.responder_key_distribution,
)
@@ -1949,7 +1951,7 @@ class Manager(EventEmitter):
f'{connection.peer_address}: {command}'
)
cid = SMP_BR_CID if connection.transport == BT_BR_EDR_TRANSPORT else SMP_CID
connection.send_l2cap_pdu(cid, command.to_bytes())
connection.send_l2cap_pdu(cid, bytes(command))
def on_smp_security_request_command(
self, connection: Connection, request: SMP_Security_Request_Command

View File

@@ -149,7 +149,10 @@ async def open_usb_transport(spec: str) -> Transport:
if status != usb1.TRANSFER_COMPLETED:
logger.warning(
color(f'!!! OUT transfer not completed: status={status}', 'red')
color(
f'!!! OUT transfer not completed: status={status}',
'red',
)
)
async def process_queue(self):
@@ -275,7 +278,10 @@ async def open_usb_transport(spec: str) -> Transport:
)
else:
logger.warning(
color(f'!!! IN transfer not completed: status={status}', 'red')
color(
f'!!! IN[{packet_type}] transfer not completed: status={status}',
'red',
)
)
self.loop.call_soon_threadsafe(self.on_transport_lost)

View File

@@ -16,6 +16,7 @@
# Imports
# -----------------------------------------------------------------------------
import struct
from typing import Dict, Optional, Type
from bumble.hci import (
name_or_number,
@@ -24,7 +25,9 @@ from bumble.hci import (
HCI_Constant,
HCI_Object,
HCI_Command,
HCI_Vendor_Event,
HCI_Event,
HCI_Extended_Event,
HCI_VENDOR_EVENT,
STATUS_SPEC,
)
@@ -48,7 +51,6 @@ HCI_DYNAMIC_AUDIO_BUFFER_COMMAND = hci_vendor_command_op_code(0x15F)
HCI_BLUETOOTH_QUALITY_REPORT_EVENT = 0x58
HCI_Command.register_commands(globals())
HCI_Vendor_Event.register_subevents(globals())
# -----------------------------------------------------------------------------
@@ -279,7 +281,29 @@ class HCI_Dynamic_Audio_Buffer_Command(HCI_Command):
# -----------------------------------------------------------------------------
@HCI_Vendor_Event.event(
class HCI_Android_Vendor_Event(HCI_Extended_Event):
event_code: int = HCI_VENDOR_EVENT
subevent_classes: Dict[int, Type[HCI_Extended_Event]] = {}
@classmethod
def subclass_from_parameters(
cls, parameters: bytes
) -> Optional[HCI_Extended_Event]:
subevent_code = parameters[0]
if subevent_code == HCI_BLUETOOTH_QUALITY_REPORT_EVENT:
quality_report_id = parameters[1]
if quality_report_id in (0x01, 0x02, 0x03, 0x04, 0x07, 0x08, 0x09):
return HCI_Bluetooth_Quality_Report_Event.from_parameters(parameters)
return None
HCI_Android_Vendor_Event.register_subevents(globals())
HCI_Event.add_vendor_factory(HCI_Android_Vendor_Event.subclass_from_parameters)
# -----------------------------------------------------------------------------
@HCI_Extended_Event.event(
fields=[
('quality_report_id', 1),
('packet_types', 1),
@@ -308,10 +332,11 @@ class HCI_Dynamic_Audio_Buffer_Command(HCI_Command):
('tx_last_subevent_packets', 4),
('crc_error_packets', 4),
('rx_duplicate_packets', 4),
('rx_unreceived_packets', 4),
('vendor_specific_parameters', '*'),
]
)
class HCI_Bluetooth_Quality_Report_Event(HCI_Vendor_Event):
class HCI_Bluetooth_Quality_Report_Event(HCI_Android_Vendor_Event):
# pylint: disable=line-too-long
'''
See https://source.android.com/docs/core/connect/bluetooth/hci_requirements#bluetooth-quality-report-sub-event

View File

@@ -39,12 +39,14 @@ nav:
- Drivers:
- drivers/index.md
- Realtek: drivers/realtek.md
- Intel: drivers/intel.md
- API:
- Guide: api/guide.md
- Examples: api/examples.md
- Reference: api/reference.md
- Apps & Tools:
- apps_and_tools/index.md
- Auracast: apps_and_tools/auracast.md
- Console: apps_and_tools/console.md
- Bench: apps_and_tools/bench.md
- Speaker: apps_and_tools/speaker.md
@@ -108,8 +110,8 @@ markdown_extensions:
- pymdownx.details
- pymdownx.superfences
- pymdownx.emoji:
emoji_index: !!python/name:materialx.emoji.twemoji
emoji_generator: !!python/name:materialx.emoji.to_svg
emoji_index: !!python/name:material.extensions.emoji.twemoji
emoji_generator: !!python/name:material.extensions.emoji.to_svg
- pymdownx.tabbed:
alternate_style: true
- codehilite:

View File

@@ -4,12 +4,13 @@ APPS & TOOLS
Included in the project are a few apps and tools, built on top of the core libraries.
These include:
* [Console](console.md) - an interactive text-based console
* [Bench](bench.md) - Speed and Latency benchmarking between two devices (LE and Classic)
* [Pair](pair.md) - Pair/bond two devices (LE and Classic)
* [Unbond](unbond.md) - Remove a previously established bond
* [HCI Bridge](hci_bridge.md) - a HCI transport bridge to connect two HCI transports and filter/snoop the HCI packets
* [Golden Gate Bridge](gg_bridge.md) - a bridge between GATT and UDP to use with the Golden Gate "stack tool"
* [Show](show.md) - Parse a file with HCI packets and print the details of each packet in a human readable form
* [Auracast](auracast.md) - Commands to broadcast, receive and/or control LE Audio.
* [Console](console.md) - An interactive text-based console.
* [Bench](bench.md) - Speed and Latency benchmarking between two devices (LE and Classic).
* [Pair](pair.md) - Pair/bond two devices (LE and Classic).
* [Unbond](unbond.md) - Remove a previously established bond.
* [HCI Bridge](hci_bridge.md) - An HCI transport bridge to connect two HCI transports and filter/snoop the HCI packets.
* [Golden Gate Bridge](gg_bridge.md) - Bridge between GATT and UDP to use with the Golden Gate "stack tool".
* [Show](show.md) - Parse a file with HCI packets and print the details of each packet in a human readable form.
* [Speaker](speaker.md) - Virtual Bluetooth speaker, with a command line and browser-based UI.
* [Link Relay](link_relay.md) - WebSocket relay for virtual RemoteLink instances to communicate with each other.

View File

@@ -16,4 +16,5 @@ USB vendor ID and product ID.
Drivers included in the module are:
* [Realtek](realtek.md): Loading of Firmware and Config for Realtek USB dongles.
* [Realtek](realtek.md): Loading of Firmware and Config for Realtek USB dongles.
* [Intel](intel.md): Loading of Firmware and Config for Intel USB controllers.

View File

@@ -0,0 +1,73 @@
INTEL DRIVER
==============
This driver supports loading firmware images and optional config data to
Intel USB controllers.
A number of USB dongles are supported, but likely not all.
The initial implementation has been tested on BE200 and AX210 controllers.
When using a USB controller, the USB product ID and vendor ID are used
to find whether a matching set of firmware image and config data
is needed for that specific model. If a match exists, the driver will try
load the firmware image and, if needed, config data.
Alternatively, the metadata property ``driver=intel`` may be specified in a transport
name to force that driver to be used (ex: ``usb:[driver=intel]0`` instead of just
``usb:0`` for the first USB device).
The driver will look for the firmware and config files by name, in order, in:
* The directory specified by the environment variable `BUMBLE_INTEL_FIRMWARE_DIR`
if set.
* The directory `<package-dir>/drivers/intel_fw` where `<package-dir>` is the directory
where the `bumble` package is installed.
* The current directory.
It is also possible to override or extend the config data with parameters passed via the
transport name. The driver name `intel` may be suffixed with `/<param:value>[+<param:value>]...`
The supported params are:
* `ddc_addon`: configuration data to add to the data loaded from the config data file
* `ddc_override`: configuration data to use instead of the data loaded from the config data file.
With both `dcc_addon` and `dcc_override`, the param value is a hex-encoded byte array containing
the config data (same format as the config file).
Example transport name:
`usb:[driver=intel/dcc_addon:03E40200]0`
Obtaining Firmware Images and Config Data
-----------------------------------------
Firmware images and config data may be obtained from a variety of online
sources.
To facilitate finding a downloading the, the utility program `bumble-intel-fw-download`
may be used.
```
Usage: bumble-intel-fw-download [OPTIONS]
Download Intel firmware images and configs.
Options:
--output-dir TEXT Output directory where the files will be saved.
Defaults to the OS-specific app data dir, which the
driver will check when trying to find firmware
--source [linux-kernel] [default: linux-kernel]
--single TEXT Only download a single image set, by its base name
--force Overwrite files if they already exist
--help Show this message and exit.
```
Utility
-------
The `bumble-intel-util` utility may be used to interact with an Intel USB controller.
```
Usage: bumble-intel-util [OPTIONS] COMMAND [ARGS]...
Options:
--help Show this message and exit.
Commands:
bootloader Reboot in bootloader mode.
info Get the firmware info.
load Load a firmware image.
```

View File

@@ -9,9 +9,9 @@ for your platform.
Throughout the documentation, when shell commands are shown, it is assumed that you can
invoke Python as
```
$ python
$ python3
```
If invoking python is different on your platform (it may be `python3` for example, or just `py` or `py.exe`),
If invoking python is different on your platform (it may be `python` for example, or just `py` or `py.exe`),
adjust accordingly.
You may be simply using Bumble as a module for your own application or as a dependency to your own
@@ -30,12 +30,18 @@ manager, or from source.
python environment, or in a virtual environment, such as a `venv`, `pyenv` or `conda` environment.
See the [Python Environments page](development/python_environments.md) page for details.
### Install from PyPI
```
$ python3 -m pip install bumble
```
### Install From Source
Install with `pip`. Run in a command shell in the directory where you downloaded the source
distribution
```
$ python -m pip install -e .
$ python3 -m pip install -e .
```
### Install from GitHub
@@ -44,21 +50,21 @@ You can install directly from GitHub without first downloading the repo.
Install the latest commit from the main branch with `pip`:
```
$ python -m pip install git+https://github.com/google/bumble.git
$ python3 -m pip install git+https://github.com/google/bumble.git
```
You can specify a specific tag.
Install tag `v0.0.1` with `pip`:
```
$ python -m pip install git+https://github.com/google/bumble.git@v0.0.1
$ python3 -m pip install git+https://github.com/google/bumble.git@v0.0.1
```
You can also specify a specific commit.
Install commit `27c0551` with `pip`:
```
$ python -m pip install git+https://github.com/google/bumble.git@27c0551
$ python3 -m pip install git+https://github.com/google/bumble.git@27c0551
```
# Working On The Bumble Code
@@ -78,21 +84,21 @@ directory of the project.
```bash
$ export PYTHONPATH=.
$ python apps/console.py serial:/dev/tty.usbmodem0006839912171
$ python3 apps/console.py serial:/dev/tty.usbmodem0006839912171
```
or running an example, with the working directory set to the `examples` subdirectory
```bash
$ cd examples
$ export PYTHONPATH=..
$ python run_scanner.py usb:0
$ python3 run_scanner.py usb:0
```
Or course, `export PYTHONPATH` only needs to be invoked once, not before each app/script execution.
Setting `PYTHONPATH` locally with each command would look something like:
```
$ PYTHONPATH=. python examples/run_advertiser.py examples/device1.json serial:/dev/tty.usbmodem0006839912171
$ PYTHONPATH=. python3 examples/run_advertiser.py examples/device1.json serial:/dev/tty.usbmodem0006839912171
```
# Where To Go Next

View File

@@ -35,11 +35,11 @@ the command line.
visit [this Android Studio user guide page](https://developer.android.com/studio/run/emulator-commandline)
The `-packet-streamer-endpoint <endpoint>` command line option may be used to enable
Bluetooth emulation and tell the emulator which virtual controller to connect to.
Bluetooth emulation and tell the emulator which virtual controller to connect to.
## Connecting to Netsim
If the emulator doesn't have Bluetooth emulation enabled by default, use the
If the emulator doesn't have Bluetooth emulation enabled by default, use the
`-packet-streamer-endpoint default` option to tell it to connect to Netsim.
If Netsim is not running, the emulator will start it automatically.
@@ -60,17 +60,17 @@ the Bumble `android-netsim` transport in `host` mode (the default).
!!! example "Run the example GATT server connected to the emulator via Netsim"
``` shell
$ python run_gatt_server.py device1.json android-netsim
$ python3 run_gatt_server.py device1.json android-netsim
```
By default, the Bumble `android-netsim` transport will try to automatically discover
the port number on which the netsim process is exposing its gRPC server interface. If
that discovery process fails, or if you want to specify the interface manually, you
that discovery process fails, or if you want to specify the interface manually, you
can pass a `hostname` and `port` as parameters to the transport, as: `android-netsim:<host>:<port>`.
!!! example "Run the example GATT server connected to the emulator via Netsim on a localhost, port 8877"
``` shell
$ python run_gatt_server.py device1.json android-netsim:localhost:8877
$ python3 run_gatt_server.py device1.json android-netsim:localhost:8877
```
### Multiple Instances
@@ -84,7 +84,7 @@ For example: `android-netsim:localhost:8877,name=bumble1`
This is an advanced use case, which may not be officially supported, but should work in recent
versions of the emulator.
The first step is to run the Bumble HCI bridge, specifying netsim as the "host" end of the
The first step is to run the Bumble HCI bridge, specifying netsim as the "host" end of the
bridge, and another controller (typically a USB Bluetooth dongle, but any other supported
transport can work as well) as the "controller" end of the bridge.

View File

@@ -25,7 +25,7 @@ import struct
import sys
from typing import Any, List, Union
from bumble.device import Connection, Device, Peer
from bumble.device import Device, Peer
from bumble import transport
from bumble import gatt
from bumble import hci
@@ -82,19 +82,19 @@ async def client(device: Device, address: hci.Address) -> None:
for index in range(1, 9):
characteristics.append(
service.get_characteristics_by_uuid(
CHARACTERISTIC_UUID_BASE + f"{index:02X}"
core.UUID(CHARACTERISTIC_UUID_BASE + f"{index:02X}")
)[0]
)
# Read all characteristics as raw bytes.
for characteristic in characteristics:
value = await characteristic.read_value()
print(f"### {characteristic} = {value} ({value.hex()})")
print(f"### {characteristic} = {value!r} ({value.hex()})")
# Static characteristic with a bytes value.
c1 = characteristics[0]
c1_value = await c1.read_value()
print(f"@@@ C1 {c1} value = {c1_value} (type={type(c1_value)})")
print(f"@@@ C1 {c1} value = {c1_value!r} (type={type(c1_value)})")
await c1.write_value("happy π day".encode("utf-8"))
# Static characteristic with a string value.
@@ -136,7 +136,7 @@ async def client(device: Device, address: hci.Address) -> None:
# Dynamic characteristic with a bytes value.
c7 = characteristics[6]
c7_value = await c7.read_value()
print(f"@@@ C7 {c7} value = {c7_value} (type={type(c7_value)})")
print(f"@@@ C7 {c7} value = {c7_value!r} (type={type(c7_value)})")
await c7.write_value(bytes.fromhex("01020304"))
# Dynamic characteristic with a string value.

View File

@@ -21,9 +21,9 @@ import sys
import os
import io
import logging
import websockets
from typing import Iterable, Optional
from typing import Optional
import websockets
import bumble.core
from bumble.device import Device, ScoLink
@@ -82,6 +82,10 @@ def on_microphone_volume(level: int):
send_message(type='microphone_volume', level=level)
def on_supported_audio_codecs(codecs: Iterable[hfp.AudioCodec]):
send_message(type='supported_audio_codecs', codecs=[codec.name for codec in codecs])
def on_sco_state_change(codec: int):
if codec == hfp.AudioCodec.CVSD:
sample_rate = 8000
@@ -207,6 +211,7 @@ async def main() -> None:
ag_protocol = hfp.AgProtocol(dlc, configuration)
ag_protocol.on('speaker_volume', on_speaker_volume)
ag_protocol.on('microphone_volume', on_microphone_volume)
ag_protocol.on('supported_audio_codecs', on_supported_audio_codecs)
on_hfp_state_change(True)
dlc.multiplexer.l2cap_channel.on(
'close', lambda: on_hfp_state_change(False)
@@ -241,7 +246,7 @@ async def main() -> None:
# Pick the first one
channel, version, hf_sdp_features = hfp_record
print(f'HF version: {version}')
print(f'HF features: {hf_sdp_features}')
print(f'HF features: {hf_sdp_features.name}')
# Request authentication
print('*** Authenticating...')

View File

@@ -161,7 +161,13 @@ async def main() -> None:
else:
file_output = open(f'{datetime.datetime.now().isoformat()}.lc3', 'wb')
codec_configuration = ase.codec_specific_configuration
assert isinstance(codec_configuration, CodecSpecificConfiguration)
if (
not isinstance(codec_configuration, CodecSpecificConfiguration)
or codec_configuration.sampling_frequency is None
or codec_configuration.audio_channel_allocation is None
or codec_configuration.frame_duration is None
):
return
# Write a LC3 header.
file_output.write(
bytes([0x1C, 0xCC]) # Header.

View File

@@ -42,7 +42,7 @@ from bumble.profiles.bap import (
from bumble.profiles.pacs import PacRecord, PublishedAudioCapabilitiesService
from bumble.profiles.cap import CommonAudioServiceService
from bumble.profiles.csip import CoordinatedSetIdentificationService, SirkType
from bumble.profiles.vcp import VolumeControlService
from bumble.profiles.vcs import VolumeControlService
from bumble.transport import open_transport_or_link
@@ -117,13 +117,17 @@ async def main() -> None:
ws: Optional[websockets.WebSocketServerProtocol] = None
def on_volume_state(volume_setting: int, muted: int, change_counter: int):
def on_volume_state_change():
if ws:
asyncio.create_task(
ws.send(dumps_volume_state(volume_setting, muted, change_counter))
ws.send(
dumps_volume_state(
vcs.volume_setting, vcs.muted, vcs.change_counter
)
)
)
vcs.on('volume_state', on_volume_state)
vcs.on('volume_state_change', on_volume_state_change)
advertising_data = (
bytes(
@@ -170,16 +174,10 @@ async def main() -> None:
ws = websocket
async for message in websocket:
volume_state = json.loads(message)
vcs.volume_state_bytes = bytes(
[
volume_state['volume_setting'],
volume_state['muted'],
volume_state['change_counter'],
]
)
await device.notify_subscribers(
vcs.volume_state, vcs.volume_state_bytes
)
vcs.volume_setting = volume_state['volume_setting']
vcs.muted = volume_state['muted']
vcs.change_counter = volume_state['change_counter']
await device.notify_subscribers(vcs.volume_state)
ws = None
await websockets.serve(serve, 'localhost', 8989)

View File

@@ -1,21 +1,135 @@
[build-system]
requires = ["setuptools>=52", "wheel", "setuptools_scm>=6.2"]
requires = ["setuptools>=61", "wheel", "setuptools_scm>=8"]
build-backend = "setuptools.build_meta"
[project]
name = "bumble"
dynamic = ["version"]
description = "Bluetooth Stack for Apps, Emulation, Test and Experimentation"
readme = "README.md"
authors = [{ name = "Google", email = "bumble-dev@google.com" }]
requires-python = ">=3.8"
dependencies = [
"aiohttp ~= 3.8; platform_system!='Emscripten'",
"appdirs >= 1.4; platform_system!='Emscripten'",
"click >= 8.1.3; platform_system!='Emscripten'",
"cryptography >= 39; platform_system!='Emscripten'",
# Pyodide bundles a version of cryptography that is built for wasm, which may not match the
# versions available on PyPI. Relax the version requirement since it's better than being
# completely unable to import the package in case of version mismatch.
"cryptography >= 39.0; platform_system=='Emscripten'",
"grpcio >= 1.62.1; platform_system!='Emscripten'",
"humanize >= 4.6.0; platform_system!='Emscripten'",
"libusb1 >= 2.0.1; platform_system!='Emscripten'",
"libusb-package == 1.0.26.1; platform_system!='Emscripten'",
"platformdirs >= 3.10.0; platform_system!='Emscripten'",
"prompt_toolkit >= 3.0.16; platform_system!='Emscripten'",
"prettytable >= 3.6.0; platform_system!='Emscripten'",
"protobuf >= 3.12.4; platform_system!='Emscripten'",
"pyee >= 8.2.2",
"pyserial-asyncio >= 0.5; platform_system!='Emscripten'",
"pyserial >= 3.5; platform_system!='Emscripten'",
"pyusb >= 1.2; platform_system!='Emscripten'",
"websockets == 13.1; platform_system!='Emscripten'",
]
[project.optional-dependencies]
build = ["build >= 0.7"]
test = [
"pytest >= 8.2",
"pytest-asyncio >= 0.23.5",
"pytest-html >= 3.2.0",
"coverage >= 6.4",
]
development = [
"black == 24.3",
"bt-test-interfaces >= 0.0.6",
"grpcio-tools >= 1.62.1",
"invoke >= 1.7.3",
"mobly >= 1.12.2",
"mypy == 1.12.0",
"nox >= 2022",
"pylint == 3.3.1",
"pyyaml >= 6.0",
"types-appdirs >= 1.4.3",
"types-invoke >= 1.7.3",
"types-protobuf >= 4.21.0",
]
avatar = [
"pandora-avatar == 0.0.10",
"rootcanal == 1.10.0 ; python_version>='3.10'",
]
pandora = ["bt-test-interfaces >= 0.0.6"]
documentation = [
"mkdocs >= 1.6.0",
"mkdocs-material >= 9.6",
"mkdocstrings[python] >= 0.27.0",
]
auracast = [
"lc3py ; python_version>='3.10' and platform_system=='Linux' and platform_machine=='x86_64'",
"sounddevice >= 0.5.1",
]
[project.scripts]
bumble-auracast = "bumble.apps.auracast:main"
bumble-ble-rpa-tool = "bumble.apps.ble_rpa_tool:main"
bumble-console = "bumble.apps.console:main"
bumble-controller-info = "bumble.apps.controller_info:main"
bumble-controller-loopback = "bumble.apps.controller_loopback:main"
bumble-gatt-dump = "bumble.apps.gatt_dump:main"
bumble-hci-bridge = "bumble.apps.hci_bridge:main"
bumble-l2cap-bridge = "bumble.apps.l2cap_bridge:main"
bumble-rfcomm-bridge = "bumble.apps.rfcomm_bridge:main"
bumble-pair = "bumble.apps.pair:main"
bumble-scan = "bumble.apps.scan:main"
bumble-show = "bumble.apps.show:main"
bumble-unbond = "bumble.apps.unbond:main"
bumble-usb-probe = "bumble.apps.usb_probe:main"
bumble-link-relay = "bumble.apps.link_relay.link_relay:main"
bumble-bench = "bumble.apps.bench:main"
bumble-player = "bumble.apps.player.player:main"
bumble-speaker = "bumble.apps.speaker.speaker:main"
bumble-pandora-server = "bumble.apps.pandora_server:main"
bumble-rtk-util = "bumble.tools.rtk_util:main"
bumble-rtk-fw-download = "bumble.tools.rtk_fw_download:main"
bumble-intel-util = "bumble.tools.intel_util:main"
bumble-intel-fw-download = "bumble.tools.intel_fw_download:main"
[project.urls]
Homepage = "https://github.com/google/bumble"
[tool.setuptools]
packages = [
"bumble",
"bumble.transport",
"bumble.transport.grpc_protobuf",
"bumble.drivers",
"bumble.profiles",
"bumble.apps",
"bumble.apps.link_relay",
"bumble.pandora",
"bumble.tools",
]
[tool.setuptools.package-dir]
"bumble" = "bumble"
"bumble.apps" = "apps"
"bumble.tools" = "tools"
[tool.setuptools_scm]
write_to = "bumble/_version.py"
[tool.setuptools.package-data]
"*" = ["*.pyi", "py.typed"]
[tool.pytest.ini_options]
pythonpath = "."
testpaths = [
"tests"
]
testpaths = ["tests"]
[tool.pylint.master]
init-hook = 'import sys; sys.path.append(".")'
ignore-paths = [
'.*_pb2(_grpc)?.py'
]
ignore-paths = ['.*_pb2(_grpc)?.py']
[tool.pylint.messages_control]
max-line-length = "88"
@@ -25,8 +139,8 @@ disable = [
"fixme",
"logging-fstring-interpolation",
"logging-not-lazy",
"no-member", # Temporary until pylint works better with class/method decorators
"no-value-for-parameter", # Temporary until pylint works better with class/method decorators
"no-member", # Temporary until pylint works better with class/method decorators
"no-value-for-parameter", # Temporary until pylint works better with class/method decorators
"missing-class-docstring",
"missing-function-docstring",
"missing-module-docstring",
@@ -41,11 +155,11 @@ disable = [
]
[tool.pylint.main]
ignore="pandora" # FIXME: pylint does not support stubs yet:
ignore = "pandora" # FIXME: pylint does not support stubs yet:
[tool.pylint.typecheck]
signature-mutators="AsyncRunner.run_in_task"
disable="not-callable"
signature-mutators = "AsyncRunner.run_in_task"
disable = "not-callable"
[tool.black]
skip-string-normalization = true
@@ -78,6 +192,10 @@ ignore_missing_imports = true
module = "serial_asyncio.*"
ignore_missing_imports = true
[[tool.mypy.overrides]]
module = "sounddevice.*"
ignore_missing_imports = true
[[tool.mypy.overrides]]
module = "usb.*"
ignore_missing_imports = true
@@ -85,4 +203,3 @@ ignore_missing_imports = true
[[tool.mypy.overrides]]
module = "usb1.*"
ignore_missing_imports = true

111
setup.cfg
View File

@@ -1,111 +0,0 @@
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
[metadata]
name = bumble
use_scm_version = True
description = Bluetooth Stack for Apps, Emulation, Test and Experimentation
long_description = file: README.md
long_description_content_type = text/markdown
author = Google
author_email = tbd@tbd.com
url = https://github.com/google/bumble
[options]
python_requires = >=3.8
packages = bumble, bumble.transport, bumble.transport.grpc_protobuf, bumble.drivers, bumble.profiles, bumble.apps, bumble.apps.link_relay, bumble.pandora, bumble.tools
package_dir =
bumble = bumble
bumble.apps = apps
bumble.tools = tools
include_package_data = True
install_requires =
aiohttp ~= 3.8; platform_system!='Emscripten'
appdirs >= 1.4; platform_system!='Emscripten'
click >= 8.1.3; platform_system!='Emscripten'
cryptography == 39; platform_system!='Emscripten'
# Pyodide bundles a version of cryptography that is built for wasm, which may not match the
# versions available on PyPI. Relax the version requirement since it's better than being
# completely unable to import the package in case of version mismatch.
cryptography >= 39.0; platform_system=='Emscripten'
grpcio >= 1.62.1; platform_system!='Emscripten'
humanize >= 4.6.0; platform_system!='Emscripten'
libusb1 >= 2.0.1; platform_system!='Emscripten'
libusb-package == 1.0.26.1; platform_system!='Emscripten'
platformdirs >= 3.10.0; platform_system!='Emscripten'
prompt_toolkit >= 3.0.16; platform_system!='Emscripten'
prettytable >= 3.6.0; platform_system!='Emscripten'
protobuf >= 3.12.4; platform_system!='Emscripten'
pyee >= 8.2.2
pyserial-asyncio >= 0.5; platform_system!='Emscripten'
pyserial >= 3.5; platform_system!='Emscripten'
pyusb >= 1.2; platform_system!='Emscripten'
websockets == 13.1; platform_system!='Emscripten'
[options.entry_points]
console_scripts =
bumble-ble-rpa-tool = bumble.apps.ble_rpa_tool:main
bumble-console = bumble.apps.console:main
bumble-controller-info = bumble.apps.controller_info:main
bumble-controller-loopback = bumble.apps.controller_loopback:main
bumble-gatt-dump = bumble.apps.gatt_dump:main
bumble-hci-bridge = bumble.apps.hci_bridge:main
bumble-l2cap-bridge = bumble.apps.l2cap_bridge:main
bumble-rfcomm-bridge = bumble.apps.rfcomm_bridge:main
bumble-pair = bumble.apps.pair:main
bumble-scan = bumble.apps.scan:main
bumble-show = bumble.apps.show:main
bumble-unbond = bumble.apps.unbond:main
bumble-usb-probe = bumble.apps.usb_probe:main
bumble-link-relay = bumble.apps.link_relay.link_relay:main
bumble-bench = bumble.apps.bench:main
bumble-player = bumble.apps.player.player:main
bumble-speaker = bumble.apps.speaker.speaker:main
bumble-pandora-server = bumble.apps.pandora_server:main
bumble-rtk-util = bumble.tools.rtk_util:main
bumble-rtk-fw-download = bumble.tools.rtk_fw_download:main
[options.package_data]
* = py.typed, *.pyi
[options.extras_require]
build =
build >= 0.7
test =
pytest >= 8.2
pytest-asyncio >= 0.23.5
pytest-html >= 3.2.0
coverage >= 6.4
development =
black == 24.3
grpcio-tools >= 1.62.1
invoke >= 1.7.3
mobly >= 1.12.2
mypy == 1.12.0
nox >= 2022
pylint == 3.3.1
pyyaml >= 6.0
types-appdirs >= 1.4.3
types-invoke >= 1.7.3
types-protobuf >= 4.21.0
wasmtime == 20.0.0
avatar =
pandora-avatar == 0.0.10
rootcanal == 1.10.0 ; python_version>='3.10'
pandora =
bt-test-interfaces >= 0.0.6
documentation =
mkdocs >= 1.4.0
mkdocs-material >= 8.5.6
mkdocstrings[python] >= 0.19.0

View File

@@ -33,7 +33,7 @@ from bumble.profiles.aics import (
AudioInputControlPointOpCode,
ErrorCode,
)
from bumble.profiles.vcp import VolumeControlService, VolumeControlServiceProxy
from bumble.profiles.vcs import VolumeControlService, VolumeControlServiceProxy
from .test_utils import TwoDevices

View File

@@ -39,6 +39,8 @@ from bumble.profiles.ascs import (
)
from bumble.profiles.bap import (
AudioLocation,
BasicAudioAnnouncement,
BroadcastAudioAnnouncement,
SupportedFrameDuration,
SupportedSamplingFrequency,
SamplingFrequency,
@@ -200,6 +202,56 @@ def test_codec_specific_configuration() -> None:
assert CodecSpecificConfiguration.from_bytes(bytes(config)) == config
# -----------------------------------------------------------------------------
def test_broadcast_audio_announcement() -> None:
broadcast_audio_announcement = BroadcastAudioAnnouncement(123456)
assert (
BroadcastAudioAnnouncement.from_bytes(bytes(broadcast_audio_announcement))
== broadcast_audio_announcement
)
# -----------------------------------------------------------------------------
def test_basic_audio_announcement() -> None:
basic_audio_announcement = BasicAudioAnnouncement(
presentation_delay=40000,
subgroups=[
BasicAudioAnnouncement.Subgroup(
codec_id=CodingFormat(codec_id=CodecID.LC3),
codec_specific_configuration=CodecSpecificConfiguration(
sampling_frequency=SamplingFrequency.FREQ_48000,
frame_duration=FrameDuration.DURATION_10000_US,
octets_per_codec_frame=100,
),
metadata=Metadata(
[
Metadata.Entry(tag=Metadata.Tag.LANGUAGE, data=b'eng'),
Metadata.Entry(tag=Metadata.Tag.PROGRAM_INFO, data=b'Disco'),
]
),
bis=[
BasicAudioAnnouncement.BIS(
index=0,
codec_specific_configuration=CodecSpecificConfiguration(
audio_channel_allocation=AudioLocation.FRONT_LEFT
),
),
BasicAudioAnnouncement.BIS(
index=1,
codec_specific_configuration=CodecSpecificConfiguration(
audio_channel_allocation=AudioLocation.FRONT_RIGHT
),
),
],
)
],
)
assert (
BasicAudioAnnouncement.from_bytes(bytes(basic_audio_announcement))
== basic_audio_announcement
)
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_pacs():

View File

@@ -19,9 +19,7 @@ import asyncio
import functools
import logging
import os
from types import LambdaType
import pytest
from unittest import mock
from bumble.core import (
BT_BR_EDR_TRANSPORT,
@@ -29,8 +27,14 @@ from bumble.core import (
BT_PERIPHERAL_ROLE,
ConnectionParameters,
)
from bumble.device import AdvertisingParameters, Connection, Device
from bumble.host import AclPacketQueue, Host
from bumble.device import (
AdvertisingEventProperties,
AdvertisingParameters,
Connection,
Device,
PeriodicAdvertisingParameters,
)
from bumble.host import DataPacketQueue, Host
from bumble.hci import (
HCI_ACCEPT_CONNECTION_REQUEST_COMMAND,
HCI_COMMAND_STATUS_PENDING,
@@ -86,9 +90,9 @@ async def test_device_connect_parallel():
def _send(packet):
pass
d0.host.acl_packet_queue = AclPacketQueue(0, 0, _send)
d1.host.acl_packet_queue = AclPacketQueue(0, 0, _send)
d2.host.acl_packet_queue = AclPacketQueue(0, 0, _send)
d0.host.acl_packet_queue = DataPacketQueue(0, 0, _send)
d1.host.acl_packet_queue = DataPacketQueue(0, 0, _send)
d2.host.acl_packet_queue = DataPacketQueue(0, 0, _send)
# enable classic
d0.classic_enabled = True
@@ -265,7 +269,8 @@ async def test_flush():
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_legacy_advertising():
device = Device(host=mock.AsyncMock(Host))
device = TwoDevices()[0]
await device.power_on()
# Start advertising
await device.start_advertising()
@@ -283,7 +288,10 @@ async def test_legacy_advertising():
)
@pytest.mark.asyncio
async def test_legacy_advertising_disconnection(auto_restart):
device = Device(host=mock.AsyncMock(spec=Host))
devices = TwoDevices()
device = devices[0]
devices.controllers[0].le_features = bytes.fromhex('ffffffffffffffff')
await device.power_on()
peer_address = Address('F0:F1:F2:F3:F4:F5')
await device.start_advertising(auto_restart=auto_restart)
device.on_connection(
@@ -305,6 +313,11 @@ async def test_legacy_advertising_disconnection(auto_restart):
await async_barrier()
if auto_restart:
assert device.legacy_advertising_set
started = asyncio.Event()
if not device.is_advertising:
device.legacy_advertising_set.once('start', started.set)
await asyncio.wait_for(started.wait(), _TIMEOUT)
assert device.is_advertising
else:
assert not device.is_advertising
@@ -313,7 +326,8 @@ async def test_legacy_advertising_disconnection(auto_restart):
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_extended_advertising():
device = Device(host=mock.AsyncMock(Host))
device = TwoDevices()[0]
await device.power_on()
# Start advertising
advertising_set = await device.create_advertising_set()
@@ -332,7 +346,8 @@ async def test_extended_advertising():
)
@pytest.mark.asyncio
async def test_extended_advertising_connection(own_address_type):
device = Device(host=mock.AsyncMock(spec=Host))
device = TwoDevices()[0]
await device.power_on()
peer_address = Address('F0:F1:F2:F3:F4:F5')
advertising_set = await device.create_advertising_set(
advertising_parameters=AdvertisingParameters(own_address_type=own_address_type)
@@ -368,8 +383,10 @@ async def test_extended_advertising_connection(own_address_type):
)
@pytest.mark.asyncio
async def test_extended_advertising_connection_out_of_order(own_address_type):
device = Device(host=mock.AsyncMock(spec=Host))
peer_address = Address('F0:F1:F2:F3:F4:F5')
devices = TwoDevices()
device = devices[0]
devices.controllers[0].le_features = bytes.fromhex('ffffffffffffffff')
await device.power_on()
advertising_set = await device.create_advertising_set(
advertising_parameters=AdvertisingParameters(own_address_type=own_address_type)
)
@@ -382,7 +399,7 @@ async def test_extended_advertising_connection_out_of_order(own_address_type):
device.on_connection(
0x0001,
BT_LE_TRANSPORT,
peer_address,
Address('F0:F1:F2:F3:F4:F5'),
None,
None,
BT_PERIPHERAL_ROLE,
@@ -397,6 +414,34 @@ async def test_extended_advertising_connection_out_of_order(own_address_type):
await async_barrier()
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_periodic_advertising():
device = TwoDevices()[0]
await device.power_on()
# Start advertising
advertising_set = await device.create_advertising_set(
advertising_parameters=AdvertisingParameters(
advertising_event_properties=AdvertisingEventProperties(
is_connectable=False
)
),
advertising_data=b'123',
periodic_advertising_parameters=PeriodicAdvertisingParameters(),
periodic_advertising_data=b'abc',
)
assert device.extended_advertising_sets
assert advertising_set.enabled
assert not advertising_set.periodic_enabled
await advertising_set.start_periodic()
assert advertising_set.periodic_enabled
await advertising_set.stop_periodic()
assert not advertising_set.periodic_enabled
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_get_remote_le_features():

View File

@@ -60,7 +60,7 @@ from .test_utils import async_barrier
# -----------------------------------------------------------------------------
def basic_check(x):
pdu = x.to_bytes()
pdu = bytes(x)
parsed = ATT_PDU.from_bytes(pdu)
x_str = str(x)
parsed_str = str(parsed)
@@ -77,7 +77,7 @@ def test_UUID():
assert str(u) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6'
v = UUID(str(u))
assert str(v) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6'
w = UUID.from_bytes(v.to_bytes())
w = UUID.from_bytes(bytes(v))
assert str(w) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6'
u1 = UUID.from_16_bits(0x1234)

84
tests/gmap_test.py Normal file
View File

@@ -0,0 +1,84 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import pytest
import pytest_asyncio
from bumble import device
from bumble.profiles.gmap import (
GamingAudioService,
GamingAudioServiceProxy,
GmapRole,
UggFeatures,
UgtFeatures,
BgrFeatures,
BgsFeatures,
)
from .test_utils import TwoDevices
# -----------------------------------------------------------------------------
# Tests
# -----------------------------------------------------------------------------
gmas_service = GamingAudioService(
gmap_role=GmapRole.UNICAST_GAME_GATEWAY
| GmapRole.UNICAST_GAME_TERMINAL
| GmapRole.BROADCAST_GAME_RECEIVER
| GmapRole.BROADCAST_GAME_SENDER,
ugg_features=UggFeatures.UGG_MULTISINK,
ugt_features=UgtFeatures.UGT_SOURCE,
bgr_features=BgrFeatures.BGR_MULTISINK,
bgs_features=BgsFeatures.BGS_96_KBPS,
)
@pytest_asyncio.fixture
async def gmap_client():
devices = TwoDevices()
devices[0].add_service(gmas_service)
await devices.setup_connection()
assert devices.connections[0]
assert devices.connections[1]
devices.connections[0].encryption = 1
devices.connections[1].encryption = 1
peer = device.Peer(devices.connections[1])
gmap_client = await peer.discover_service_and_create_proxy(GamingAudioServiceProxy)
assert gmap_client
yield gmap_client
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_init_service(gmap_client: GamingAudioServiceProxy):
assert (
await gmap_client.gmap_role.read_value()
== GmapRole.UNICAST_GAME_GATEWAY
| GmapRole.UNICAST_GAME_TERMINAL
| GmapRole.BROADCAST_GAME_RECEIVER
| GmapRole.BROADCAST_GAME_SENDER
)
assert await gmap_client.ugg_features.read_value() == UggFeatures.UGG_MULTISINK
assert await gmap_client.ugt_features.read_value() == UgtFeatures.UGT_SOURCE
assert await gmap_client.bgr_features.read_value() == BgrFeatures.BGR_MULTISINK
assert await gmap_client.bgs_features.read_value() == BgsFeatures.BGS_96_KBPS

View File

@@ -15,6 +15,7 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import struct
from bumble.hci import (
HCI_DISCONNECT_COMMAND,
@@ -22,6 +23,7 @@ from bumble.hci import (
HCI_LE_CODED_PHY_BIT,
HCI_LE_READ_BUFFER_SIZE_COMMAND,
HCI_RESET_COMMAND,
HCI_VENDOR_EVENT,
HCI_SUCCESS,
HCI_LE_CONNECTION_COMPLETE_EVENT,
HCI_LE_ENHANCED_CONNECTION_COMPLETE_V2_EVENT,
@@ -67,6 +69,7 @@ from bumble.hci import (
HCI_Read_Local_Version_Information_Command,
HCI_Reset_Command,
HCI_Set_Event_Mask_Command,
HCI_Vendor_Event,
)
@@ -75,13 +78,13 @@ from bumble.hci import (
def basic_check(x):
packet = x.to_bytes()
packet = bytes(x)
print(packet.hex())
parsed = HCI_Packet.from_bytes(packet)
x_str = str(x)
parsed_str = str(parsed)
print(x_str)
parsed_bytes = parsed.to_bytes()
parsed_bytes = bytes(parsed)
assert x_str == parsed_str
assert packet == parsed_bytes
@@ -167,8 +170,8 @@ def test_HCI_Command_Complete_Event():
command_opcode=HCI_LE_READ_BUFFER_SIZE_COMMAND,
return_parameters=HCI_LE_Read_Buffer_Size_Command.create_return_parameters(
status=0,
hc_le_acl_data_packet_length=1234,
hc_total_num_le_acl_data_packets=56,
le_acl_data_packet_length=1234,
total_num_le_acl_data_packets=56,
),
)
basic_check(event)
@@ -188,7 +191,7 @@ def test_HCI_Command_Complete_Event():
return_parameters=bytes([7]),
)
basic_check(event)
event = HCI_Packet.from_bytes(event.to_bytes())
event = HCI_Packet.from_bytes(bytes(event))
assert event.return_parameters == 7
# With a simple status as an integer status
@@ -213,6 +216,41 @@ def test_HCI_Number_Of_Completed_Packets_Event():
basic_check(event)
# -----------------------------------------------------------------------------
def test_HCI_Vendor_Event():
data = bytes.fromhex('01020304')
event = HCI_Vendor_Event(data=data)
event_bytes = bytes(event)
parsed = HCI_Packet.from_bytes(event_bytes)
assert isinstance(parsed, HCI_Vendor_Event)
assert parsed.data == data
class HCI_Custom_Event(HCI_Event):
def __init__(self, blabla):
super().__init__(HCI_VENDOR_EVENT, parameters=struct.pack("<I", blabla))
self.name = 'HCI_CUSTOM_EVENT'
self.blabla = blabla
def create_event(payload):
if payload[0] == 1:
return HCI_Custom_Event(blabla=struct.unpack('<I', payload)[0])
return None
HCI_Event.add_vendor_factory(create_event)
parsed = HCI_Packet.from_bytes(event_bytes)
assert isinstance(parsed, HCI_Custom_Event)
assert parsed.blabla == 0x04030201
event_bytes2 = event_bytes[:3] + bytes([7]) + event_bytes[4:]
parsed = HCI_Packet.from_bytes(event_bytes2)
assert not isinstance(parsed, HCI_Custom_Event)
assert isinstance(parsed, HCI_Vendor_Event)
HCI_Event.remove_vendor_factory(create_event)
parsed = HCI_Packet.from_bytes(event_bytes)
assert not isinstance(parsed, HCI_Custom_Event)
assert isinstance(parsed, HCI_Vendor_Event)
# -----------------------------------------------------------------------------
def test_HCI_Command():
command = HCI_Command(0x5566)
@@ -562,7 +600,7 @@ def test_iso_data_packet():
'6281bc77ed6a3206d984bcdabee6be831c699cb50e2'
)
assert packet.to_bytes() == data
assert bytes(packet) == data
# -----------------------------------------------------------------------------
@@ -576,6 +614,7 @@ def run_test_events():
test_HCI_Command_Complete_Event()
test_HCI_Command_Status_Event()
test_HCI_Number_Of_Completed_Packets_Event()
test_HCI_Vendor_Event()
# -----------------------------------------------------------------------------

View File

@@ -61,7 +61,7 @@ def _default_hf_configuration() -> hfp.HfConfiguration:
# -----------------------------------------------------------------------------
def _default_hf_sdp_features() -> hfp.HfSdpFeature:
return (
hfp.HfSdpFeature.WIDE_BAND
hfp.HfSdpFeature.WIDE_BAND_SPEECH
| hfp.HfSdpFeature.THREE_WAY_CALLING
| hfp.HfSdpFeature.CLI_PRESENTATION_CAPABILITY
)
@@ -108,7 +108,7 @@ def _default_ag_configuration() -> hfp.AgConfiguration:
# -----------------------------------------------------------------------------
def _default_ag_sdp_features() -> hfp.AgSdpFeature:
return (
hfp.AgSdpFeature.WIDE_BAND
hfp.AgSdpFeature.WIDE_BAND_SPEECH
| hfp.AgSdpFeature.IN_BAND_RING_TONE_CAPABILITY
| hfp.AgSdpFeature.THREE_WAY_CALLING
)

View File

@@ -16,11 +16,14 @@
# Imports
# -----------------------------------------------------------------------------
import logging
import unittest.mock
import pytest
import unittest
from bumble.controller import Controller
from bumble.host import Host
from bumble.host import Host, DataPacketQueue
from bumble.transport import AsyncPipeSink
from bumble.hci import HCI_AclDataPacket
# -----------------------------------------------------------------------------
# Logging
@@ -60,3 +63,90 @@ async def test_reset(supported_commands: str, lmp_features: str):
assert host.local_lmp_features == int.from_bytes(
bytes.fromhex(lmp_features), 'little'
)
# -----------------------------------------------------------------------------
def test_data_packet_queue():
controller = unittest.mock.Mock()
queue = DataPacketQueue(10, 2, controller.send)
assert queue.queued == 0
assert queue.completed == 0
packet = HCI_AclDataPacket(
connection_handle=123, pb_flag=0, bc_flag=0, data_total_length=0, data=b''
)
queue.enqueue(packet, packet.connection_handle)
assert queue.queued == 1
assert queue.completed == 0
assert controller.send.call_count == 1
queue.enqueue(packet, packet.connection_handle)
assert queue.queued == 2
assert queue.completed == 0
assert controller.send.call_count == 2
queue.enqueue(packet, packet.connection_handle)
assert queue.queued == 3
assert queue.completed == 0
assert controller.send.call_count == 2
queue.on_packets_completed(1, 8000)
assert queue.queued == 3
assert queue.completed == 0
assert controller.send.call_count == 2
queue.on_packets_completed(1, 123)
assert queue.queued == 3
assert queue.completed == 1
assert controller.send.call_count == 3
queue.enqueue(packet, packet.connection_handle)
assert queue.queued == 4
assert queue.completed == 1
assert controller.send.call_count == 3
queue.on_packets_completed(2, 123)
assert queue.queued == 4
assert queue.completed == 3
assert controller.send.call_count == 4
queue.on_packets_completed(1, 123)
assert queue.queued == 4
assert queue.completed == 4
assert controller.send.call_count == 4
queue.enqueue(packet, 123)
queue.enqueue(packet, 123)
queue.enqueue(packet, 123)
queue.enqueue(packet, 124)
queue.enqueue(packet, 124)
queue.enqueue(packet, 124)
queue.on_packets_completed(1, 123)
assert queue.queued == 10
assert queue.completed == 5
queue.flush(123)
queue.flush(124)
assert queue.queued == 10
assert queue.completed == 10
queue.enqueue(packet, 123)
queue.on_packets_completed(1, 124)
assert queue.queued == 11
assert queue.completed == 10
queue.on_packets_completed(1000, 123)
assert queue.queued == 11
assert queue.completed == 11
drain_listener = unittest.mock.Mock()
queue.on('flow', drain_listener.on_flow)
queue.enqueue(packet, 123)
assert drain_listener.on_flow.call_count == 0
queue.on_packets_completed(1, 123)
assert drain_listener.on_flow.call_count == 1
queue.enqueue(packet, 123)
queue.enqueue(packet, 123)
queue.enqueue(packet, 123)
queue.flush(123)
assert drain_listener.on_flow.call_count == 1
assert queue.queued == 15
assert queue.completed == 15

View File

@@ -53,7 +53,7 @@ def test_import():
le_audio,
pacs,
pbp,
vcp,
vcs,
)
assert att
@@ -87,7 +87,7 @@ def test_import():
assert le_audio
assert pacs
assert pbp
assert vcp
assert vcs
# -----------------------------------------------------------------------------

View File

@@ -20,12 +20,11 @@ import logging
import os
import pytest
from bumble.core import UUID, BT_L2CAP_PROTOCOL_ID, BT_RFCOMM_PROTOCOL_ID
from bumble.core import UUID, BT_L2CAP_PROTOCOL_ID
from bumble.sdp import (
DataElement,
ServiceAttribute,
Client,
Server,
SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID,
SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID,
SDP_PUBLIC_BROWSE_ROOT,
@@ -174,9 +173,10 @@ def test_data_elements() -> None:
# -----------------------------------------------------------------------------
def sdp_records():
def sdp_records(record_count=1):
return {
0x00010001: [
0x00010001
+ i: [
ServiceAttribute(
SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID,
DataElement.unsigned_integer_32(0x00010001),
@@ -200,6 +200,7 @@ def sdp_records():
),
),
]
for i in range(record_count)
}
@@ -216,19 +217,55 @@ async def test_service_search():
devices.devices[0].sdp_server.service_records.update(sdp_records())
# Search for service
client = Client(devices.connections[1])
await client.connect()
services = await client.search_services(
[UUID('E6D55659-C8B4-4B85-96BB-B1143AF6D3AE')]
)
async with Client(devices.connections[1]) as client:
services = await client.search_services(
[UUID('E6D55659-C8B4-4B85-96BB-B1143AF6D3AF')]
)
assert len(services) == 0
# Then
assert services[0] == 0x00010001
services = await client.search_services(
[UUID('E6D55659-C8B4-4B85-96BB-B1143AF6D3AE')]
)
assert len(services) == 1
assert services[0] == 0x00010001
services = await client.search_services(
[BT_L2CAP_PROTOCOL_ID, SDP_PUBLIC_BROWSE_ROOT]
)
assert len(services) == 1
assert services[0] == 0x00010001
services = await client.search_services(
[BT_L2CAP_PROTOCOL_ID, SDP_PUBLIC_BROWSE_ROOT]
)
assert len(services) == 1
assert services[0] == 0x00010001
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_service_attribute():
async def test_service_search_with_continuation():
# Setup connections
devices = TwoDevices()
await devices.setup_connection()
# Register SDP service
records = sdp_records(100)
devices.devices[0].sdp_server.service_records.update(records)
# Search for service
async with Client(devices.connections[1], mtu=48) as client:
services = await client.search_services(
[UUID('E6D55659-C8B4-4B85-96BB-B1143AF6D3AE')]
)
assert len(services) == len(records)
for i in range(len(records)):
assert services[i] == 0x00010001 + i
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_service_attributes():
# Setup connections
devices = TwoDevices()
await devices.setup_connection()
@@ -236,15 +273,43 @@ async def test_service_attribute():
# Register SDP service
devices.devices[0].sdp_server.service_records.update(sdp_records())
# Search for service
client = Client(devices.connections[1])
await client.connect()
attributes = await client.get_attributes(
0x00010001, [SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID]
)
# Get attributes
async with Client(devices.connections[1]) as client:
attributes = await client.get_attributes(0x00010001, [1234])
assert len(attributes) == 0
# Then
assert attributes[0].value.value == sdp_records()[0x00010001][0].value.value
attributes = await client.get_attributes(
0x00010001, [SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID]
)
assert len(attributes) == 1
assert attributes[0].value.value == sdp_records()[0x00010001][0].value.value
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_service_attributes_with_continuation():
# Setup connections
devices = TwoDevices()
await devices.setup_connection()
# Register SDP service
records = {
0x00010001: [
ServiceAttribute(
x,
DataElement.unsigned_integer_32(0x00010001),
)
for x in range(100)
]
}
devices.devices[0].sdp_server.service_records.update(records)
# Get attributes
async with Client(devices.connections[1], mtu=48) as client:
attributes = await client.get_attributes(0x00010001, list(range(100)))
assert len(attributes) == 100
for i, attribute in enumerate(attributes):
assert attribute.id == i
# -----------------------------------------------------------------------------
@@ -255,19 +320,81 @@ async def test_service_search_attribute():
await devices.setup_connection()
# Register SDP service
devices.devices[0].sdp_server.service_records.update(sdp_records())
records = {
0x00010001: [
ServiceAttribute(
4,
DataElement.sequence(
[DataElement.uuid(UUID('E6D55659-C8B4-4B85-96BB-B1143AF6D3AE'))]
),
),
ServiceAttribute(
3,
DataElement.sequence(
[DataElement.uuid(UUID('E6D55659-C8B4-4B85-96BB-B1143AF6D3AE'))]
),
),
ServiceAttribute(
1,
DataElement.sequence(
[DataElement.uuid(UUID('E6D55659-C8B4-4B85-96BB-B1143AF6D3AE'))]
),
),
]
}
devices.devices[0].sdp_server.service_records.update(records)
# Search for service
client = Client(devices.connections[1])
await client.connect()
attributes = await client.search_attributes(
[UUID('E6D55659-C8B4-4B85-96BB-B1143AF6D3AE')], [(0x0000FFFF, 8)]
)
async with Client(devices.connections[1]) as client:
attributes = await client.search_attributes(
[UUID('E6D55659-C8B4-4B85-96BB-B1143AF6D3AE')], [(0, 0xFFFF)]
)
assert len(attributes) == 1
assert len(attributes[0]) == 3
assert attributes[0][0].id == 1
assert attributes[0][1].id == 3
assert attributes[0][2].id == 4
# Then
for expect, actual in zip(attributes, sdp_records().values()):
assert expect.id == actual.id
assert expect.value == actual.value
attributes = await client.search_attributes(
[UUID('E6D55659-C8B4-4B85-96BB-B1143AF6D3AE')], [1, 2, 3]
)
assert len(attributes) == 1
assert len(attributes[0]) == 2
assert attributes[0][0].id == 1
assert attributes[0][1].id == 3
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_service_search_attribute_with_continuation():
# Setup connections
devices = TwoDevices()
await devices.setup_connection()
# Register SDP service
records = {
0x00010001: [
ServiceAttribute(
x,
DataElement.sequence(
[DataElement.uuid(UUID('E6D55659-C8B4-4B85-96BB-B1143AF6D3AE'))]
),
)
for x in range(100)
]
}
devices.devices[0].sdp_server.service_records.update(records)
# Search for service
async with Client(devices.connections[1], mtu=48) as client:
attributes = await client.search_attributes(
[UUID('E6D55659-C8B4-4B85-96BB-B1143AF6D3AE')], [(0, 0xFFFF)]
)
assert len(attributes) == 1
assert len(attributes[0]) == 100
for i in range(100):
assert attributes[0][i].id == i
# -----------------------------------------------------------------------------
@@ -287,9 +414,12 @@ async def test_client_async_context():
# -----------------------------------------------------------------------------
async def run():
test_data_elements()
await test_service_attribute()
await test_service_attributes()
await test_service_attributes_with_continuation()
await test_service_search()
await test_service_search_with_continuation()
await test_service_search_attribute()
await test_service_search_attribute_with_continuation()
# -----------------------------------------------------------------------------

View File

@@ -240,7 +240,7 @@ async def test_self_gatt():
result = await peer.discover_included_services(result[0])
assert len(result) == 2
# Service UUID is only present when the UUID is 16-bit Bluetooth UUID
assert result[1].uuid.to_bytes() == s3.uuid.to_bytes()
assert bytes(result[1].uuid) == bytes(s3.uuid)
# -----------------------------------------------------------------------------

View File

@@ -20,7 +20,7 @@ import pytest_asyncio
import logging
from bumble import device
from bumble.profiles import vcp
from bumble.profiles import vcs
from .test_utils import TwoDevices
# -----------------------------------------------------------------------------
@@ -34,7 +34,7 @@ logger = logging.getLogger(__name__)
async def vcp_client():
devices = TwoDevices()
devices[0].add_service(
vcp.VolumeControlService(volume_setting=32, muted=1, volume_flags=1)
vcs.VolumeControlService(volume_setting=32, muted=1, volume_flags=1)
)
await devices.setup_connection()
@@ -48,76 +48,76 @@ async def vcp_client():
peer = device.Peer(devices.connections[1])
vcp_client = await peer.discover_service_and_create_proxy(
vcp.VolumeControlServiceProxy
vcs.VolumeControlServiceProxy
)
yield vcp_client
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_init_service(vcp_client: vcp.VolumeControlServiceProxy):
async def test_init_service(vcp_client: vcs.VolumeControlServiceProxy):
assert (await vcp_client.volume_flags.read_value()) == 1
assert (await vcp_client.volume_state.read_value()) == (32, 1, 0)
assert (await vcp_client.volume_state.read_value()) == vcs.VolumeState(32, 1, 0)
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_relative_volume_down(vcp_client: vcp.VolumeControlServiceProxy):
async def test_relative_volume_down(vcp_client: vcs.VolumeControlServiceProxy):
await vcp_client.volume_control_point.write_value(
bytes([vcp.VolumeControlPointOpcode.RELATIVE_VOLUME_DOWN, 0])
bytes([vcs.VolumeControlPointOpcode.RELATIVE_VOLUME_DOWN, 0])
)
assert (await vcp_client.volume_state.read_value()) == (16, 1, 1)
assert (await vcp_client.volume_state.read_value()) == vcs.VolumeState(16, 1, 1)
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_relative_volume_up(vcp_client: vcp.VolumeControlServiceProxy):
async def test_relative_volume_up(vcp_client: vcs.VolumeControlServiceProxy):
await vcp_client.volume_control_point.write_value(
bytes([vcp.VolumeControlPointOpcode.RELATIVE_VOLUME_UP, 0])
bytes([vcs.VolumeControlPointOpcode.RELATIVE_VOLUME_UP, 0])
)
assert (await vcp_client.volume_state.read_value()) == (48, 1, 1)
assert (await vcp_client.volume_state.read_value()) == vcs.VolumeState(48, 1, 1)
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_unmute_relative_volume_down(vcp_client: vcp.VolumeControlServiceProxy):
async def test_unmute_relative_volume_down(vcp_client: vcs.VolumeControlServiceProxy):
await vcp_client.volume_control_point.write_value(
bytes([vcp.VolumeControlPointOpcode.UNMUTE_RELATIVE_VOLUME_DOWN, 0])
bytes([vcs.VolumeControlPointOpcode.UNMUTE_RELATIVE_VOLUME_DOWN, 0])
)
assert (await vcp_client.volume_state.read_value()) == (16, 0, 1)
assert (await vcp_client.volume_state.read_value()) == vcs.VolumeState(16, 0, 1)
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_unmute_relative_volume_up(vcp_client: vcp.VolumeControlServiceProxy):
async def test_unmute_relative_volume_up(vcp_client: vcs.VolumeControlServiceProxy):
await vcp_client.volume_control_point.write_value(
bytes([vcp.VolumeControlPointOpcode.UNMUTE_RELATIVE_VOLUME_UP, 0])
bytes([vcs.VolumeControlPointOpcode.UNMUTE_RELATIVE_VOLUME_UP, 0])
)
assert (await vcp_client.volume_state.read_value()) == (48, 0, 1)
assert (await vcp_client.volume_state.read_value()) == vcs.VolumeState(48, 0, 1)
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_set_absolute_volume(vcp_client: vcp.VolumeControlServiceProxy):
async def test_set_absolute_volume(vcp_client: vcs.VolumeControlServiceProxy):
await vcp_client.volume_control_point.write_value(
bytes([vcp.VolumeControlPointOpcode.SET_ABSOLUTE_VOLUME, 0, 255])
bytes([vcs.VolumeControlPointOpcode.SET_ABSOLUTE_VOLUME, 0, 255])
)
assert (await vcp_client.volume_state.read_value()) == (255, 1, 1)
assert (await vcp_client.volume_state.read_value()) == vcs.VolumeState(255, 1, 1)
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_mute(vcp_client: vcp.VolumeControlServiceProxy):
async def test_mute(vcp_client: vcs.VolumeControlServiceProxy):
await vcp_client.volume_control_point.write_value(
bytes([vcp.VolumeControlPointOpcode.MUTE, 0])
bytes([vcs.VolumeControlPointOpcode.MUTE, 0])
)
assert (await vcp_client.volume_state.read_value()) == (32, 1, 0)
assert (await vcp_client.volume_state.read_value()) == vcs.VolumeState(32, 1, 0)
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_unmute(vcp_client: vcp.VolumeControlServiceProxy):
async def test_unmute(vcp_client: vcs.VolumeControlServiceProxy):
await vcp_client.volume_control_point.write_value(
bytes([vcp.VolumeControlPointOpcode.UNMUTE, 0])
bytes([vcs.VolumeControlPointOpcode.UNMUTE, 0])
)
assert (await vcp_client.volume_state.read_value()) == (32, 0, 1)
assert (await vcp_client.volume_state.read_value()) == vcs.VolumeState(32, 0, 1)

179
tests/vocs_test.py Normal file
View File

@@ -0,0 +1,179 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import pytest
import pytest_asyncio
import struct
from bumble import device
from bumble.att import ATT_Error
from bumble.profiles.vocs import (
VolumeOffsetControlService,
ErrorCode,
MIN_VOLUME_OFFSET,
MAX_VOLUME_OFFSET,
SetVolumeOffsetOpCode,
VolumeOffsetControlServiceProxy,
VolumeOffsetState,
)
from bumble.profiles.vcs import VolumeControlService, VolumeControlServiceProxy
from bumble.profiles.bap import AudioLocation
from .test_utils import TwoDevices
# -----------------------------------------------------------------------------
# Tests
# -----------------------------------------------------------------------------
vocs_service = VolumeOffsetControlService()
vcp_service = VolumeControlService(included_services=[vocs_service])
@pytest_asyncio.fixture
async def vocs_client():
devices = TwoDevices()
devices[0].add_service(vcp_service)
await devices.setup_connection()
assert devices.connections[0]
assert devices.connections[1]
devices.connections[0].encryption = 1
devices.connections[1].encryption = 1
peer = device.Peer(devices.connections[1])
vcp_client = await peer.discover_service_and_create_proxy(VolumeControlServiceProxy)
assert vcp_client
included_services = await peer.discover_included_services(vcp_client.service_proxy)
assert included_services
vocs_service_discovered = included_services[0]
await peer.discover_characteristics(service=vocs_service_discovered)
vocs_client = VolumeOffsetControlServiceProxy(vocs_service_discovered)
yield vocs_client
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_init_service(vocs_client: VolumeOffsetControlServiceProxy):
assert await vocs_client.volume_offset_state.read_value() == VolumeOffsetState(
volume_offset=0,
change_counter=0,
)
assert await vocs_client.audio_location.read_value() == AudioLocation.NOT_ALLOWED
description = await vocs_client.audio_output_description.read_value()
assert description == ''
@pytest.mark.asyncio
async def test_wrong_opcode_raise_error(vocs_client: VolumeOffsetControlServiceProxy):
with pytest.raises(ATT_Error) as e:
await vocs_client.volume_offset_control_point.write_value(
bytes(
[
0xFF,
]
),
with_response=True,
)
assert e.value.error_code == ErrorCode.OPCODE_NOT_SUPPORTED
@pytest.mark.asyncio
async def test_wrong_change_counter_raise_error(
vocs_client: VolumeOffsetControlServiceProxy,
):
initial_offset = vocs_service.volume_offset_state.volume_offset
initial_counter = vocs_service.volume_offset_state.change_counter
wrong_counter = initial_counter + 1
with pytest.raises(ATT_Error) as e:
await vocs_client.volume_offset_control_point.write_value(
struct.pack(
'<BBh', SetVolumeOffsetOpCode.SET_VOLUME_OFFSET, wrong_counter, 0
),
with_response=True,
)
assert e.value.error_code == ErrorCode.INVALID_CHANGE_COUNTER
counter = await vocs_client.volume_offset_state.read_value()
assert counter == VolumeOffsetState(initial_offset, initial_counter)
@pytest.mark.asyncio
async def test_wrong_volume_offset_raise_error(
vocs_client: VolumeOffsetControlServiceProxy,
):
invalid_offset_low = MIN_VOLUME_OFFSET - 1
invalid_offset_high = MAX_VOLUME_OFFSET + 1
with pytest.raises(ATT_Error) as e_low:
await vocs_client.volume_offset_control_point.write_value(
struct.pack(
'<BBh', SetVolumeOffsetOpCode.SET_VOLUME_OFFSET, 0, invalid_offset_low
),
with_response=True,
)
assert e_low.value.error_code == ErrorCode.VALUE_OUT_OF_RANGE
with pytest.raises(ATT_Error) as e_high:
await vocs_client.volume_offset_control_point.write_value(
struct.pack(
'<BBh', SetVolumeOffsetOpCode.SET_VOLUME_OFFSET, 0, invalid_offset_high
),
with_response=True,
)
assert e_high.value.error_code == ErrorCode.VALUE_OUT_OF_RANGE
@pytest.mark.asyncio
async def test_set_volume_offset(vocs_client: VolumeOffsetControlServiceProxy):
await vocs_client.volume_offset_control_point.write_value(
struct.pack('<BBh', SetVolumeOffsetOpCode.SET_VOLUME_OFFSET, 0, -255),
)
assert await vocs_client.volume_offset_state.read_value() == VolumeOffsetState(
-255, 1
)
@pytest.mark.asyncio
async def test_set_audio_channel_location(vocs_client: VolumeOffsetControlServiceProxy):
new_audio_location = AudioLocation.FRONT_LEFT
await vocs_client.audio_location.write_value(new_audio_location)
location = await vocs_client.audio_location.read_value()
assert location == new_audio_location
@pytest.mark.asyncio
async def test_set_audio_output_description(
vocs_client: VolumeOffsetControlServiceProxy,
):
new_description = 'Left Speaker'
await vocs_client.audio_output_description.write_value(new_description)
description = await vocs_client.audio_output_description.read_value()
assert description == new_description

130
tools/intel_fw_download.py Normal file
View File

@@ -0,0 +1,130 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import logging
import pathlib
import urllib.request
import urllib.error
import click
from bumble.colors import color
from bumble.drivers import intel
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
# Constants
# -----------------------------------------------------------------------------
LINUX_KERNEL_GIT_SOURCE = "https://git.kernel.org/pub/scm/linux/kernel/git/firmware/linux-firmware.git/plain/intel"
# -----------------------------------------------------------------------------
# Functions
# -----------------------------------------------------------------------------
def download_file(base_url, name):
url = f"{base_url}/{name}"
with urllib.request.urlopen(url) as file:
data = file.read()
print(f"Downloaded {name}: {len(data)} bytes")
return data
# -----------------------------------------------------------------------------
@click.command
@click.option(
"--output-dir",
default="",
help="Output directory where the files will be saved. Defaults to the OS-specific"
"app data dir, which the driver will check when trying to find firmware",
show_default=True,
)
@click.option(
"--source",
type=click.Choice(["linux-kernel"]),
default="linux-kernel",
show_default=True,
)
@click.option("--single", help="Only download a single image set, by its base name")
@click.option("--force", is_flag=True, help="Overwrite files if they already exist")
def main(output_dir, source, single, force):
"""Download Intel firmware images and configs."""
# Check that the output dir exists
if output_dir == '':
output_dir = intel.intel_firmware_dir()
else:
output_dir = pathlib.Path(output_dir)
if not output_dir.is_dir():
print("Output dir does not exist or is not a directory")
return
base_url = {
"linux-kernel": LINUX_KERNEL_GIT_SOURCE,
}[source]
print("Downloading")
print(color("FROM:", "green"), base_url)
print(color("TO:", "green"), output_dir)
if single:
images = [(f"{single}.sfi", f"{single}.ddc")]
else:
images = [
(f"{base_name}.sfi", f"{base_name}.ddc")
for base_name in intel.INTEL_FW_IMAGE_NAMES
]
for fw_name, config_name in images:
print(color("---", "yellow"))
fw_image_out = output_dir / fw_name
if not force and fw_image_out.exists():
print(color(f"{fw_image_out} already exists, skipping", "red"))
continue
if config_name:
config_image_out = output_dir / config_name
if not force and config_image_out.exists():
print(color("f{config_image_out} already exists, skipping", "red"))
continue
try:
fw_image = download_file(base_url, fw_name)
except urllib.error.HTTPError as error:
print(f"Failed to download {fw_name}: {error}")
continue
config_image = None
if config_name:
try:
config_image = download_file(base_url, config_name)
except urllib.error.HTTPError as error:
print(f"Failed to download {config_name}: {error}")
continue
fw_image_out.write_bytes(fw_image)
if config_image:
config_image_out.write_bytes(config_image)
# -----------------------------------------------------------------------------
if __name__ == '__main__':
main()

154
tools/intel_util.py Normal file
View File

@@ -0,0 +1,154 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import logging
import asyncio
import os
from typing import Any, Optional
import click
from bumble.colors import color
from bumble import transport
from bumble.drivers import intel
from bumble.host import Host
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
def print_device_info(device_info: dict[intel.ValueType, Any]) -> None:
if (mode := device_info.get(intel.ValueType.CURRENT_MODE_OF_OPERATION)) is not None:
print(
color("MODE:", "yellow"),
mode.name,
)
print(color("DETAILS:", "yellow"))
for key, value in device_info.items():
print(f" {color(key.name, 'green')}: {value}")
# -----------------------------------------------------------------------------
async def get_driver(host: Host, force: bool) -> Optional[intel.Driver]:
# Create a driver
driver = await intel.Driver.for_host(host, force)
if driver is None:
print("Device does not appear to be an Intel device")
return None
return driver
# -----------------------------------------------------------------------------
async def do_info(usb_transport, force):
async with await transport.open_transport(usb_transport) as (
hci_source,
hci_sink,
):
host = Host(hci_source, hci_sink)
driver = await get_driver(host, force)
if driver is None:
return
# Get and print the device info
print_device_info(await driver.read_device_info())
# -----------------------------------------------------------------------------
async def do_load(usb_transport: str, force: bool) -> None:
async with await transport.open_transport(usb_transport) as (
hci_source,
hci_sink,
):
host = Host(hci_source, hci_sink)
driver = await get_driver(host, force)
if driver is None:
return
# Reboot in bootloader mode
await driver.load_firmware()
# Get and print the device info
print_device_info(await driver.read_device_info())
# -----------------------------------------------------------------------------
async def do_bootloader(usb_transport: str, force: bool) -> None:
async with await transport.open_transport(usb_transport) as (
hci_source,
hci_sink,
):
host = Host(hci_source, hci_sink)
driver = await get_driver(host, force)
if driver is None:
return
# Reboot in bootloader mode
await driver.reboot_bootloader()
# -----------------------------------------------------------------------------
@click.group()
def main():
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
@main.command
@click.argument("usb_transport")
@click.option(
"--force",
is_flag=True,
default=False,
help="Try to get the device info even if the USB info doesn't match",
)
def info(usb_transport, force):
"""Get the firmware info."""
asyncio.run(do_info(usb_transport, force))
@main.command
@click.argument("usb_transport")
@click.option(
"--force",
is_flag=True,
default=False,
help="Load even if the USB info doesn't match",
)
def load(usb_transport, force):
"""Load a firmware image."""
asyncio.run(do_load(usb_transport, force))
@main.command
@click.argument("usb_transport")
@click.option(
"--force",
is_flag=True,
default=False,
help="Attempt to reboot event if the USB info doesn't match",
)
def bootloader(usb_transport, force):
"""Reboot in bootloader mode."""
asyncio.run(do_bootloader(usb_transport, force))
# -----------------------------------------------------------------------------
if __name__ == '__main__':
main()