Compare commits

...

44 Commits

Author SHA1 Message Date
Gilles Boccon-Gibod
d6100755b1 add bond listener 2025-02-04 17:47:55 -05:00
Gilles Boccon-Gibod
f368b5e518 wip 2025-02-03 18:02:14 -05:00
Gilles Boccon-Gibod
5293d32dc6 fix linter config 2025-02-03 18:02:14 -05:00
Gilles Boccon-Gibod
6d9a0bf4e1 fix linter config 2025-02-03 18:02:14 -05:00
Gilles Boccon-Gibod
3c7b5df7c5 add startupDelay and connectionPriority params to BtBench snippets 2025-02-03 18:00:46 -05:00
zxzxwu
dedc0aca54 Merge pull request #639 from zxzxwu/sdp
Correct SDP_ALL_ATTRIBUTES_RANGE value
2025-02-04 00:53:27 +08:00
Gilles Boccon-Gibod
7c019b574f Merge pull request #633 from markusjellitsch/fix/legacy-adv-params
fix advertising parameter usage for legacy advertising
2025-02-03 10:29:52 -05:00
markus
9b485fd943 revert python-avatar.yml 2025-02-03 15:17:22 +01:00
Josh Wu
fdee8269ec Correct SDP_ALL_ATTRIBUTES_RANGE value 2025-02-03 21:40:39 +08:00
zxzxwu
0767f2d4ae Merge pull request #638 from zxzxwu/avatar
Update actions/upload-artifact to v4
2025-02-03 21:31:42 +08:00
Josh Wu
c4a0846727 Update actions/upload-artifact to v4 2025-02-03 16:41:09 +08:00
zxzxwu
83ac70e426 Merge pull request #619 from zxzxwu/cs
Channel Sounding
2025-02-01 03:46:59 +08:00
markus
01cce3525f update avatar to github actions v4 2025-01-30 23:55:15 +01:00
markus
b9d35aea47 revert advertising_interval to type int 2025-01-30 19:47:20 +01:00
zxzxwu
079cf6b896 Merge pull request #624 from zxzxwu/gatt
Support GATT Service
2025-01-28 20:02:43 +08:00
Markus Jellitsch
180655088c run linter 2025-01-27 22:17:31 +01:00
Gilles Boccon-Gibod
a1bade6f20 Merge pull request #632 from markusjellitsch/fix/adapt-param-types
Adapt scanning and connection parameters type
2025-01-27 10:46:08 -05:00
Gilles Boccon-Gibod
5d80e7fd80 Merge pull request #634 from jmdietrich-gcx/fix_missing_await_for_update_rpa
Add missing await for update_rpa()
2025-01-27 10:45:42 -05:00
Jan-Marcel Dietrich
2198692961 Add missing await for update_rpa() 2025-01-27 15:14:52 +01:00
Gilles Boccon-Gibod
afee659ca6 Merge pull request #630 from google/gbg/iso-packet-queue
add support for ACL and ISO HCI packet queues
2025-01-24 15:59:19 -05:00
Gilles Boccon-Gibod
6fe7931d7d rename drain event to flow 2025-01-24 11:05:02 -05:00
Markus Jellitsch
9023407ee4 fix advertising parameters for legacy advertising 2025-01-23 15:14:54 +01:00
Markus Jellitsch
54d961bbe5 adapt scanning and connection parameters type 2025-01-23 14:53:20 +01:00
Gilles Boccon-Gibod
cbd46adbcf add support for ACL and ISO HCI packet queues 2025-01-22 13:42:29 -05:00
Josh Wu
745e107849 Channel Sounding device handlers 2025-01-22 23:38:44 +08:00
Gilles Boccon-Gibod
af466c2970 Merge pull request #629 from google/gbg/sdp-enforce-mtu
SDP: enforce MTU limits
2025-01-21 12:29:18 -05:00
Gilles Boccon-Gibod
931e2de854 address PR comments 2025-01-21 12:18:06 -05:00
Gilles Boccon-Gibod
55eb7eb237 enforce MTU limits 2025-01-21 10:31:10 -05:00
zxzxwu
bade4502f9 Merge pull request #628 from zxzxwu/cs-hci
Channel Sounding HCI packet definitions
2025-01-19 16:14:08 +08:00
Josh Wu
9f952f202f Channel Sounding HCI packet definitions 2025-01-16 14:33:34 +08:00
Josh Wu
1eb9d8d055 Support GATT Service 2025-01-15 02:13:25 +08:00
Gilles Boccon-Gibod
5a477eb391 Merge pull request #626 from markusjellitsch/fix/set-ext-scan-param-cmd
Update device.py - Fix scan_interval param in hci.HCI_LE_Set_Extended_Scan_Parameters_Command
2025-01-14 11:04:15 -05:00
Markus Jellitsch
86cda8771d Update device.py 2025-01-14 10:43:49 +01:00
zxzxwu
c1ea0ddd35 Merge pull request #622 from markusjellitsch/main
Fix: _IsoLink.write() struct.exception
2025-01-13 16:21:41 +08:00
Markus Jellitsch
f567711a6c avoid struct.error exception when packet_sequence_number > 0xFFFF 2025-01-10 01:33:43 +01:00
Gilles Boccon-Gibod
509df4c676 Merge pull request #618 from google/gbg/hci-event-multi-vendor
support multiple event factories
2025-01-07 15:00:20 -05:00
Gilles Boccon-Gibod
b375ed07b4 add test 2025-01-07 14:54:59 -05:00
Gilles Boccon-Gibod
69d62d3dd1 support multiple event factories 2025-01-06 08:42:09 -05:00
zxzxwu
fe3fa3d505 Merge pull request #617 from zxzxwu/iso
Unify ISO methods
2025-01-06 14:31:47 +08:00
Josh Wu
27fcd43224 Unify ISO methods 2025-01-02 14:19:36 +08:00
zxzxwu
c3b2bb19d5 Merge pull request #589 from zxzxwu/auracast
Auracast support
2025-01-02 01:02:13 +08:00
Gilles Boccon-Gibod
34287177b9 Merge pull request #615 from google/gbg/bluetooth-6-constants
add bluetooth 6.0 constants
2024-12-23 08:46:13 -05:00
Josh Wu
d238dd4059 Use dynamic sample rate 2024-12-23 17:01:11 +08:00
Josh Wu
7324d322fe BIG 2024-12-20 13:45:12 +08:00
48 changed files with 4719 additions and 768 deletions

View File

@@ -44,7 +44,7 @@ jobs:
run: cat rootcanal.log
- name: Upload Mobly logs
if: always()
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: mobly-logs
name: mobly-logs-${{ strategy.job-index }}
path: /tmp/logs/mobly/bumble.bumbles/

View File

@@ -14,9 +14,12 @@
"ASHA",
"asyncio",
"ATRAC",
"auracast",
"avctp",
"avdtp",
"avrcp",
"biginfo",
"bigs",
"bitpool",
"bitstruct",
"BSCP",
@@ -36,6 +39,7 @@
"deregistration",
"dhkey",
"diversifier",
"ediv",
"endianness",
"ESCO",
"Fitbit",
@@ -47,6 +51,7 @@
"libc",
"liblc",
"libusb",
"maxs",
"MITM",
"MSBC",
"NDIS",
@@ -54,8 +59,10 @@
"NONBLOCK",
"NONCONN",
"OXIMETER",
"PDUS",
"popleft",
"PRAND",
"prefs",
"protobuf",
"psms",
"pyee",

View File

@@ -1,4 +1,4 @@
# Copyright 2024 Google LLC
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -16,25 +16,35 @@
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import asyncio
import contextlib
import dataclasses
import functools
import logging
import os
import wave
import itertools
from typing import cast, Any, AsyncGenerator, Coroutine, Dict, Optional, Tuple
import click
import pyee
try:
import lc3 # type: ignore # pylint: disable=E0401
except ImportError as e:
raise ImportError("Try `python -m pip install \".[lc3]\"`.") from e
from bumble.colors import color
import bumble.company_ids
import bumble.core
from bumble import company_ids
from bumble import core
from bumble import gatt
from bumble import hci
from bumble.profiles import bap
from bumble.profiles import le_audio
from bumble.profiles import pbp
from bumble.profiles import bass
import bumble.device
import bumble.gatt
import bumble.hci
import bumble.profiles.bap
import bumble.profiles.bass
import bumble.profiles.pbp
import bumble.transport
import bumble.utils
@@ -49,7 +59,7 @@ logger = logging.getLogger(__name__)
# Constants
# -----------------------------------------------------------------------------
AURACAST_DEFAULT_DEVICE_NAME = 'Bumble Auracast'
AURACAST_DEFAULT_DEVICE_ADDRESS = bumble.hci.Address('F0:F1:F2:F3:F4:F5')
AURACAST_DEFAULT_DEVICE_ADDRESS = hci.Address('F0:F1:F2:F3:F4:F5')
AURACAST_DEFAULT_SYNC_TIMEOUT = 5.0
AURACAST_DEFAULT_ATT_MTU = 256
@@ -62,17 +72,12 @@ class BroadcastScanner(pyee.EventEmitter):
class Broadcast(pyee.EventEmitter):
name: str | None
sync: bumble.device.PeriodicAdvertisingSync
broadcast_id: int
rssi: int = 0
public_broadcast_announcement: Optional[
bumble.profiles.pbp.PublicBroadcastAnnouncement
] = None
broadcast_audio_announcement: Optional[
bumble.profiles.bap.BroadcastAudioAnnouncement
] = None
basic_audio_announcement: Optional[
bumble.profiles.bap.BasicAudioAnnouncement
] = None
appearance: Optional[bumble.core.Appearance] = None
public_broadcast_announcement: Optional[pbp.PublicBroadcastAnnouncement] = None
broadcast_audio_announcement: Optional[bap.BroadcastAudioAnnouncement] = None
basic_audio_announcement: Optional[bap.BasicAudioAnnouncement] = None
appearance: Optional[core.Appearance] = None
biginfo: Optional[bumble.device.BIGInfoAdvertisement] = None
manufacturer_data: Optional[Tuple[str, bytes]] = None
@@ -86,42 +91,36 @@ class BroadcastScanner(pyee.EventEmitter):
def update(self, advertisement: bumble.device.Advertisement) -> None:
self.rssi = advertisement.rssi
for service_data in advertisement.data.get_all(
bumble.core.AdvertisingData.SERVICE_DATA
core.AdvertisingData.SERVICE_DATA
):
assert isinstance(service_data, tuple)
service_uuid, data = service_data
assert isinstance(data, bytes)
if (
service_uuid
== bumble.gatt.GATT_PUBLIC_BROADCAST_ANNOUNCEMENT_SERVICE
):
if service_uuid == gatt.GATT_PUBLIC_BROADCAST_ANNOUNCEMENT_SERVICE:
self.public_broadcast_announcement = (
bumble.profiles.pbp.PublicBroadcastAnnouncement.from_bytes(data)
pbp.PublicBroadcastAnnouncement.from_bytes(data)
)
continue
if (
service_uuid
== bumble.gatt.GATT_BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE
):
if service_uuid == gatt.GATT_BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE:
self.broadcast_audio_announcement = (
bumble.profiles.bap.BroadcastAudioAnnouncement.from_bytes(data)
bap.BroadcastAudioAnnouncement.from_bytes(data)
)
continue
self.appearance = advertisement.data.get( # type: ignore[assignment]
bumble.core.AdvertisingData.APPEARANCE
core.AdvertisingData.APPEARANCE
)
if manufacturer_data := advertisement.data.get(
bumble.core.AdvertisingData.MANUFACTURER_SPECIFIC_DATA
core.AdvertisingData.MANUFACTURER_SPECIFIC_DATA
):
assert isinstance(manufacturer_data, tuple)
company_id = cast(int, manufacturer_data[0])
data = cast(bytes, manufacturer_data[1])
self.manufacturer_data = (
bumble.company_ids.COMPANY_IDENTIFIERS.get(
company_ids.COMPANY_IDENTIFIERS.get(
company_id, f'0x{company_id:04X}'
),
data,
@@ -232,15 +231,15 @@ class BroadcastScanner(pyee.EventEmitter):
return
for service_data in advertisement.data.get_all(
bumble.core.AdvertisingData.SERVICE_DATA
core.AdvertisingData.SERVICE_DATA
):
assert isinstance(service_data, tuple)
service_uuid, data = service_data
assert isinstance(data, bytes)
if service_uuid == bumble.gatt.GATT_BASIC_AUDIO_ANNOUNCEMENT_SERVICE:
if service_uuid == gatt.GATT_BASIC_AUDIO_ANNOUNCEMENT_SERVICE:
self.basic_audio_announcement = (
bumble.profiles.bap.BasicAudioAnnouncement.from_bytes(data)
bap.BasicAudioAnnouncement.from_bytes(data)
)
break
@@ -262,7 +261,7 @@ class BroadcastScanner(pyee.EventEmitter):
self.device = device
self.filter_duplicates = filter_duplicates
self.sync_timeout = sync_timeout
self.broadcasts: Dict[bumble.hci.Address, BroadcastScanner.Broadcast] = {}
self.broadcasts = dict[hci.Address, BroadcastScanner.Broadcast]()
device.on('advertisement', self.on_advertisement)
async def start(self) -> None:
@@ -277,33 +276,44 @@ class BroadcastScanner(pyee.EventEmitter):
def on_advertisement(self, advertisement: bumble.device.Advertisement) -> None:
if not (
ads := advertisement.data.get_all(
bumble.core.AdvertisingData.SERVICE_DATA_16_BIT_UUID
core.AdvertisingData.SERVICE_DATA_16_BIT_UUID
)
) or not (
any(
ad
for ad in ads
if isinstance(ad, tuple)
and ad[0] == bumble.gatt.GATT_BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE
broadcast_audio_announcement := next(
(
ad
for ad in ads
if isinstance(ad, tuple)
and ad[0] == gatt.GATT_BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE
),
None,
)
):
return
broadcast_name = advertisement.data.get(
bumble.core.AdvertisingData.BROADCAST_NAME
)
broadcast_name = advertisement.data.get(core.AdvertisingData.BROADCAST_NAME)
assert isinstance(broadcast_name, str) or broadcast_name is None
assert isinstance(broadcast_audio_announcement[1], bytes)
if broadcast := self.broadcasts.get(advertisement.address):
broadcast.update(advertisement)
return
bumble.utils.AsyncRunner.spawn(
self.on_new_broadcast(broadcast_name, advertisement)
self.on_new_broadcast(
broadcast_name,
advertisement,
bap.BroadcastAudioAnnouncement.from_bytes(
broadcast_audio_announcement[1]
).broadcast_id,
)
)
async def on_new_broadcast(
self, name: str | None, advertisement: bumble.device.Advertisement
self,
name: str | None,
advertisement: bumble.device.Advertisement,
broadcast_id: int,
) -> None:
periodic_advertising_sync = await self.device.create_periodic_advertising_sync(
advertiser_address=advertisement.address,
@@ -311,7 +321,7 @@ class BroadcastScanner(pyee.EventEmitter):
sync_timeout=self.sync_timeout,
filter_duplicates=self.filter_duplicates,
)
broadcast = self.Broadcast(name, periodic_advertising_sync)
broadcast = self.Broadcast(name, periodic_advertising_sync, broadcast_id)
broadcast.update(advertisement)
self.broadcasts[advertisement.address] = broadcast
periodic_advertising_sync.on('loss', lambda: self.on_broadcast_loss(broadcast))
@@ -323,10 +333,11 @@ class BroadcastScanner(pyee.EventEmitter):
self.emit('broadcast_loss', broadcast)
class PrintingBroadcastScanner:
class PrintingBroadcastScanner(pyee.EventEmitter):
def __init__(
self, device: bumble.device.Device, filter_duplicates: bool, sync_timeout: float
) -> None:
super().__init__()
self.scanner = BroadcastScanner(device, filter_duplicates, sync_timeout)
self.scanner.on('new_broadcast', self.on_new_broadcast)
self.scanner.on('broadcast_loss', self.on_broadcast_loss)
@@ -461,24 +472,26 @@ async def run_assist(
await peer.request_mtu(mtu)
# Get the BASS service
bass = await peer.discover_service_and_create_proxy(
bumble.profiles.bass.BroadcastAudioScanServiceProxy
bass_client = await peer.discover_service_and_create_proxy(
bass.BroadcastAudioScanServiceProxy
)
# Check that the service was found
if not bass:
if not bass_client:
print(color('!!! Broadcast Audio Scan Service not found', 'red'))
return
# Subscribe to and read the broadcast receive state characteristics
for i, broadcast_receive_state in enumerate(bass.broadcast_receive_states):
for i, broadcast_receive_state in enumerate(
bass_client.broadcast_receive_states
):
try:
await broadcast_receive_state.subscribe(
lambda value, i=i: print(
f"{color(f'Broadcast Receive State Update [{i}]:', 'green')} {value}"
)
)
except bumble.core.ProtocolError as error:
except core.ProtocolError as error:
print(
color(
f'!!! Failed to subscribe to Broadcast Receive State characteristic:',
@@ -497,7 +510,7 @@ async def run_assist(
if command == 'add-source':
# Find the requested broadcast
await bass.remote_scan_started()
await bass_client.remote_scan_started()
if broadcast_name:
print(color('Scanning for broadcast:', 'cyan'), broadcast_name)
else:
@@ -517,15 +530,15 @@ async def run_assist(
# Add the source
print(color('Adding source:', 'blue'), broadcast.sync.advertiser_address)
await bass.add_source(
await bass_client.add_source(
broadcast.sync.advertiser_address,
broadcast.sync.sid,
broadcast.broadcast_audio_announcement.broadcast_id,
bumble.profiles.bass.PeriodicAdvertisingSyncParams.SYNCHRONIZE_TO_PA_PAST_AVAILABLE,
bass.PeriodicAdvertisingSyncParams.SYNCHRONIZE_TO_PA_PAST_AVAILABLE,
0xFFFF,
[
bumble.profiles.bass.SubgroupInfo(
bumble.profiles.bass.SubgroupInfo.ANY_BIS,
bass.SubgroupInfo(
bass.SubgroupInfo.ANY_BIS,
bytes(broadcast.basic_audio_announcement.subgroups[0].metadata),
)
],
@@ -535,7 +548,7 @@ async def run_assist(
await broadcast.sync.transfer(peer.connection)
# Notify the sink that we're done scanning.
await bass.remote_scan_stopped()
await bass_client.remote_scan_stopped()
await peer.sustain()
return
@@ -546,7 +559,7 @@ async def run_assist(
return
# Find the requested broadcast
await bass.remote_scan_started()
await bass_client.remote_scan_started()
if broadcast_name:
print(color('Scanning for broadcast:', 'cyan'), broadcast_name)
else:
@@ -569,13 +582,13 @@ async def run_assist(
color('Modifying source:', 'blue'),
source_id,
)
await bass.modify_source(
await bass_client.modify_source(
source_id,
bumble.profiles.bass.PeriodicAdvertisingSyncParams.SYNCHRONIZE_TO_PA_PAST_NOT_AVAILABLE,
bass.PeriodicAdvertisingSyncParams.SYNCHRONIZE_TO_PA_PAST_NOT_AVAILABLE,
0xFFFF,
[
bumble.profiles.bass.SubgroupInfo(
bumble.profiles.bass.SubgroupInfo.ANY_BIS,
bass.SubgroupInfo(
bass.SubgroupInfo.ANY_BIS,
bytes(broadcast.basic_audio_announcement.subgroups[0].metadata),
)
],
@@ -590,7 +603,7 @@ async def run_assist(
# Remove the source
print(color('Removing source:', 'blue'), source_id)
await bass.remove_source(source_id)
await bass_client.remove_source(source_id)
await peer.sustain()
return
@@ -610,14 +623,242 @@ async def run_pair(transport: str, address: str) -> None:
print("+++ Paired")
async def run_receive(
transport: str,
broadcast_id: int,
broadcast_code: str | None,
sync_timeout: float,
subgroup_index: int,
) -> None:
async with create_device(transport) as device:
if not device.supports_le_periodic_advertising:
print(color('Periodic advertising not supported', 'red'))
return
scanner = BroadcastScanner(device, False, sync_timeout)
scan_result: asyncio.Future[BroadcastScanner.Broadcast] = (
asyncio.get_running_loop().create_future()
)
def on_new_broadcast(broadcast: BroadcastScanner.Broadcast) -> None:
if scan_result.done():
return
if broadcast.broadcast_id == broadcast_id:
scan_result.set_result(broadcast)
scanner.on('new_broadcast', on_new_broadcast)
await scanner.start()
print('Start scanning...')
broadcast = await scan_result
print('Advertisement found:')
broadcast.print()
basic_audio_announcement_scanned = asyncio.Event()
def on_change() -> None:
if (
broadcast.basic_audio_announcement
and not basic_audio_announcement_scanned.is_set()
):
basic_audio_announcement_scanned.set()
broadcast.on('change', on_change)
if not broadcast.basic_audio_announcement:
print('Wait for Basic Audio Announcement...')
await basic_audio_announcement_scanned.wait()
print('Basic Audio Announcement found')
broadcast.print()
print('Stop scanning')
await scanner.stop()
print('Start sync to BIG')
assert broadcast.basic_audio_announcement
subgroup = broadcast.basic_audio_announcement.subgroups[subgroup_index]
configuration = subgroup.codec_specific_configuration
assert configuration
assert (sampling_frequency := configuration.sampling_frequency)
assert (frame_duration := configuration.frame_duration)
big_sync = await device.create_big_sync(
broadcast.sync,
bumble.device.BigSyncParameters(
big_sync_timeout=0x4000,
bis=[bis.index for bis in subgroup.bis],
broadcast_code=(
bytes.fromhex(broadcast_code) if broadcast_code else None
),
),
)
num_bis = len(big_sync.bis_links)
decoder = lc3.Decoder(
frame_duration_us=frame_duration.us,
sample_rate_hz=sampling_frequency.hz,
num_channels=num_bis,
)
sdus = [b''] * num_bis
subprocess = await asyncio.create_subprocess_shell(
f'stdbuf -i0 ffplay -ar {sampling_frequency.hz} -ac {num_bis} -f f32le pipe:0',
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
for i, bis_link in enumerate(big_sync.bis_links):
print(f'Setup ISO for BIS {bis_link.handle}')
def sink(index: int, packet: hci.HCI_IsoDataPacket):
nonlocal sdus
sdus[index] = packet.iso_sdu_fragment
if all(sdus) and subprocess.stdin:
subprocess.stdin.write(decoder.decode(b''.join(sdus)).tobytes())
sdus = [b''] * num_bis
bis_link.sink = functools.partial(sink, i)
await bis_link.setup_data_path(
direction=bis_link.Direction.CONTROLLER_TO_HOST
)
terminated = asyncio.Event()
big_sync.on(big_sync.Event.TERMINATION, lambda _: terminated.set())
await terminated.wait()
async def run_broadcast(
transport: str, broadcast_id: int, broadcast_code: str | None, wav_file_path: str
) -> None:
async with create_device(transport) as device:
if not device.supports_le_periodic_advertising:
print(color('Periodic advertising not supported', 'red'))
return
with wave.open(wav_file_path, 'rb') as wav:
print('Encoding wav file into lc3...')
encoder = lc3.Encoder(
frame_duration_us=10000,
sample_rate_hz=48000,
num_channels=2,
input_sample_rate_hz=wav.getframerate(),
)
frames = list[bytes]()
while pcm := wav.readframes(encoder.get_frame_samples()):
frames.append(
encoder.encode(pcm, num_bytes=200, bit_depth=wav.getsampwidth() * 8)
)
del encoder
print('Encoding complete.')
basic_audio_announcement = bap.BasicAudioAnnouncement(
presentation_delay=40000,
subgroups=[
bap.BasicAudioAnnouncement.Subgroup(
codec_id=hci.CodingFormat(codec_id=hci.CodecID.LC3),
codec_specific_configuration=bap.CodecSpecificConfiguration(
sampling_frequency=bap.SamplingFrequency.FREQ_48000,
frame_duration=bap.FrameDuration.DURATION_10000_US,
octets_per_codec_frame=100,
),
metadata=le_audio.Metadata(
[
le_audio.Metadata.Entry(
tag=le_audio.Metadata.Tag.LANGUAGE, data=b'eng'
),
le_audio.Metadata.Entry(
tag=le_audio.Metadata.Tag.PROGRAM_INFO, data=b'Disco'
),
]
),
bis=[
bap.BasicAudioAnnouncement.BIS(
index=1,
codec_specific_configuration=bap.CodecSpecificConfiguration(
audio_channel_allocation=bap.AudioLocation.FRONT_LEFT
),
),
bap.BasicAudioAnnouncement.BIS(
index=2,
codec_specific_configuration=bap.CodecSpecificConfiguration(
audio_channel_allocation=bap.AudioLocation.FRONT_RIGHT
),
),
],
)
],
)
broadcast_audio_announcement = bap.BroadcastAudioAnnouncement(broadcast_id)
print('Start Advertising')
advertising_set = await device.create_advertising_set(
advertising_parameters=bumble.device.AdvertisingParameters(
advertising_event_properties=bumble.device.AdvertisingEventProperties(
is_connectable=False
),
primary_advertising_interval_min=100,
primary_advertising_interval_max=200,
),
advertising_data=(
broadcast_audio_announcement.get_advertising_data()
+ bytes(
core.AdvertisingData(
[(core.AdvertisingData.BROADCAST_NAME, b'Bumble Auracast')]
)
)
),
periodic_advertising_parameters=bumble.device.PeriodicAdvertisingParameters(
periodic_advertising_interval_min=80,
periodic_advertising_interval_max=160,
),
periodic_advertising_data=basic_audio_announcement.get_advertising_data(),
auto_restart=True,
auto_start=True,
)
print('Start Periodic Advertising')
await advertising_set.start_periodic()
print('Setup BIG')
big = await device.create_big(
advertising_set,
parameters=bumble.device.BigParameters(
num_bis=2,
sdu_interval=10000,
max_sdu=100,
max_transport_latency=65,
rtn=4,
broadcast_code=(
bytes.fromhex(broadcast_code) if broadcast_code else None
),
),
)
print('Setup ISO Data Path')
def on_flow(packet_queue):
print(
f'\rPACKETS: pending={packet_queue.pending}, '
f'queued={packet_queue.queued}, completed={packet_queue.completed}',
end='',
)
packet_queue = None
for bis_link in big.bis_links:
await bis_link.setup_data_path(
direction=bis_link.Direction.HOST_TO_CONTROLLER
)
if packet_queue is None:
packet_queue = bis_link.data_packet_queue
if packet_queue:
packet_queue.on('flow', lambda: on_flow(packet_queue))
for frame in itertools.cycle(frames):
mid = len(frame) // 2
big.bis_links[0].write(frame[:mid])
big.bis_links[1].write(frame[mid:])
await asyncio.sleep(0.009)
def run_async(async_command: Coroutine) -> None:
try:
asyncio.run(async_command)
except bumble.core.ProtocolError as error:
except core.ProtocolError as error:
if error.error_namespace == 'att' and error.error_code in list(
bumble.profiles.bass.ApplicationError
bass.ApplicationError
):
message = bumble.profiles.bass.ApplicationError(error.error_code).name
message = bass.ApplicationError(error.error_code).name
else:
message = str(error)
@@ -631,9 +872,7 @@ def run_async(async_command: Coroutine) -> None:
# -----------------------------------------------------------------------------
@click.group()
@click.pass_context
def auracast(
ctx,
):
def auracast(ctx):
ctx.ensure_object(dict)
@@ -691,6 +930,66 @@ def pair(ctx, transport, address):
run_async(run_pair(transport, address))
@auracast.command('receive')
@click.argument('transport')
@click.argument('broadcast_id', type=int)
@click.option(
'--broadcast-code',
metavar='BROADCAST_CODE',
type=str,
help='Broadcast encryption code in hex format',
)
@click.option(
'--sync-timeout',
metavar='SYNC_TIMEOUT',
type=float,
default=AURACAST_DEFAULT_SYNC_TIMEOUT,
help='Sync timeout (in seconds)',
)
@click.option(
'--subgroup',
metavar='SUBGROUP',
type=int,
default=0,
help='Index of Subgroup',
)
@click.pass_context
def receive(ctx, transport, broadcast_id, broadcast_code, sync_timeout, subgroup):
"""Receive a broadcast source"""
run_async(
run_receive(transport, broadcast_id, broadcast_code, sync_timeout, subgroup)
)
@auracast.command('broadcast')
@click.argument('transport')
@click.argument('wav_file_path', type=str)
@click.option(
'--broadcast-id',
metavar='BROADCAST_ID',
type=int,
default=123456,
help='Broadcast ID',
)
@click.option(
'--broadcast-code',
metavar='BROADCAST_CODE',
type=str,
help='Broadcast encryption code in hex format',
)
@click.pass_context
def broadcast(ctx, transport, broadcast_id, broadcast_code, wav_file_path):
"""Start a broadcast as a source."""
run_async(
run_broadcast(
transport=transport,
broadcast_id=broadcast_id,
broadcast_code=broadcast_code,
wav_file_path=wav_file_path,
)
)
def main():
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
auracast()

View File

@@ -16,6 +16,7 @@
# Imports
# -----------------------------------------------------------------------------
import asyncio
import dataclasses
import enum
import logging
import os
@@ -97,34 +98,6 @@ DEFAULT_RFCOMM_MTU = 2048
# -----------------------------------------------------------------------------
# Utils
# -----------------------------------------------------------------------------
def parse_packet(packet):
if len(packet) < 1:
logging.info(
color(f'!!! Packet too short (got {len(packet)} bytes, need >= 1)', 'red')
)
raise ValueError('packet too short')
try:
packet_type = PacketType(packet[0])
except ValueError:
logging.info(color(f'!!! Invalid packet type 0x{packet[0]:02X}', 'red'))
raise
return (packet_type, packet[1:])
def parse_packet_sequence(packet_data):
if len(packet_data) < 5:
logging.info(
color(
f'!!!Packet too short (got {len(packet_data)} bytes, need >= 5)',
'red',
)
)
raise ValueError('packet too short')
return struct.unpack_from('>bI', packet_data, 0)
def le_phy_name(phy_id):
return {HCI_LE_1M_PHY: '1M', HCI_LE_2M_PHY: '2M', HCI_LE_CODED_PHY: 'CODED'}.get(
phy_id, HCI_Constant.le_phy_name(phy_id)
@@ -225,13 +198,135 @@ async def switch_roles(connection, role):
logging.info(f'{color("### Role switch failed:", "red")} {error}')
class PacketType(enum.IntEnum):
RESET = 0
SEQUENCE = 1
ACK = 2
# -----------------------------------------------------------------------------
# Packet
# -----------------------------------------------------------------------------
@dataclasses.dataclass
class Packet:
class PacketType(enum.IntEnum):
RESET = 0
SEQUENCE = 1
ACK = 2
class PacketFlags(enum.IntFlag):
LAST = 1
packet_type: PacketType
flags: PacketFlags = PacketFlags(0)
sequence: int = 0
timestamp: int = 0
payload: bytes = b""
@classmethod
def from_bytes(cls, data: bytes):
if len(data) < 1:
logging.warning(
color(f'!!! Packet too short (got {len(data)} bytes, need >= 1)', 'red')
)
raise ValueError('packet too short')
try:
packet_type = cls.PacketType(data[0])
except ValueError:
logging.warning(color(f'!!! Invalid packet type 0x{data[0]:02X}', 'red'))
raise
if packet_type == cls.PacketType.RESET:
return cls(packet_type)
flags = cls.PacketFlags(data[1])
(sequence,) = struct.unpack_from("<I", data, 2)
if packet_type == cls.PacketType.ACK:
if len(data) < 6:
logging.warning(
color(
f'!!! Packet too short (got {len(data)} bytes, need >= 6)',
'red',
)
)
return cls(packet_type, flags, sequence)
if len(data) < 10:
logging.warning(
color(
f'!!! Packet too short (got {len(data)} bytes, need >= 10)', 'red'
)
)
raise ValueError('packet too short')
(timestamp,) = struct.unpack_from("<I", data, 6)
return cls(packet_type, flags, sequence, timestamp, data[10:])
def __bytes__(self):
if self.packet_type == self.PacketType.RESET:
return bytes([self.packet_type])
if self.packet_type == self.PacketType.ACK:
return struct.pack("<BBI", self.packet_type, self.flags, self.sequence)
return (
struct.pack(
"<BBII", self.packet_type, self.flags, self.sequence, self.timestamp
)
+ self.payload
)
PACKET_FLAG_LAST = 1
# -----------------------------------------------------------------------------
# Jitter Stats
# -----------------------------------------------------------------------------
class JitterStats:
def __init__(self):
self.reset()
def reset(self):
self.packets = []
self.receive_times = []
self.jitter = []
def on_packet_received(self, packet):
now = time.time()
self.packets.append(packet)
self.receive_times.append(now)
if packet.timestamp and len(self.packets) > 1:
expected_time = (
self.receive_times[0]
+ (packet.timestamp - self.packets[0].timestamp) / 1000000
)
jitter = now - expected_time
else:
jitter = 0.0
self.jitter.append(jitter)
return jitter
def show_stats(self):
if len(self.jitter) < 3:
return
average = sum(self.jitter) / len(self.jitter)
adjusted = [jitter - average for jitter in self.jitter]
log_stats('Jitter (signed)', adjusted, 3)
log_stats('Jitter (absolute)', [abs(jitter) for jitter in adjusted], 3)
# Show a histogram
bin_count = 20
bins = [0] * bin_count
interval_min = min(adjusted)
interval_max = max(adjusted)
interval_range = interval_max - interval_min
bin_thresholds = [
interval_min + i * (interval_range / bin_count) for i in range(bin_count)
]
for jitter in adjusted:
for i in reversed(range(bin_count)):
if jitter >= bin_thresholds[i]:
bins[i] += 1
break
for i in range(bin_count):
logging.info(f'@@@ >= {bin_thresholds[i]:.4f}: {bins[i]}')
# -----------------------------------------------------------------------------
@@ -281,19 +376,37 @@ class Sender:
await asyncio.sleep(self.tx_start_delay)
logging.info(color('=== Sending RESET', 'magenta'))
await self.packet_io.send_packet(bytes([PacketType.RESET]))
await self.packet_io.send_packet(
bytes(Packet(packet_type=Packet.PacketType.RESET))
)
self.start_time = time.time()
self.bytes_sent = 0
for tx_i in range(self.tx_packet_count):
packet_flags = (
PACKET_FLAG_LAST if tx_i == self.tx_packet_count - 1 else 0
if self.pace > 0:
# Wait until it is time to send the next packet
target_time = self.start_time + (tx_i * self.pace / 1000)
now = time.time()
if now < target_time:
await asyncio.sleep(target_time - now)
else:
await self.packet_io.drain()
packet = bytes(
Packet(
packet_type=Packet.PacketType.SEQUENCE,
flags=(
Packet.PacketFlags.LAST
if tx_i == self.tx_packet_count - 1
else 0
),
sequence=tx_i,
timestamp=int((time.time() - self.start_time) * 1000000),
payload=bytes(
self.tx_packet_size - 10 - self.packet_io.overhead_size
),
)
)
packet = struct.pack(
'>bbI',
PacketType.SEQUENCE,
packet_flags,
tx_i,
) + bytes(self.tx_packet_size - 6 - self.packet_io.overhead_size)
logging.info(
color(
f'Sending packet {tx_i}: {self.tx_packet_size} bytes', 'yellow'
@@ -302,14 +415,6 @@ class Sender:
self.bytes_sent += len(packet)
await self.packet_io.send_packet(packet)
if self.pace is None:
continue
if self.pace > 0:
await asyncio.sleep(self.pace / 1000)
else:
await self.packet_io.drain()
await self.done.wait()
run_counter = f'[{run + 1} of {self.repeat + 1}]' if self.repeat else ''
@@ -321,13 +426,13 @@ class Sender:
if self.repeat:
logging.info(color('--- End of runs', 'blue'))
def on_packet_received(self, packet):
def on_packet_received(self, data):
try:
packet_type, _ = parse_packet(packet)
packet = Packet.from_bytes(data)
except ValueError:
return
if packet_type == PacketType.ACK:
if packet.packet_type == Packet.PacketType.ACK:
elapsed = time.time() - self.start_time
average_tx_speed = self.bytes_sent / elapsed
self.stats.append(average_tx_speed)
@@ -350,52 +455,53 @@ class Receiver:
last_timestamp: float
def __init__(self, packet_io, linger):
self.reset()
self.jitter_stats = JitterStats()
self.packet_io = packet_io
self.packet_io.packet_listener = self
self.linger = linger
self.done = asyncio.Event()
self.reset()
def reset(self):
self.expected_packet_index = 0
self.measurements = [(time.time(), 0)]
self.total_bytes_received = 0
self.jitter_stats.reset()
def on_packet_received(self, packet):
def on_packet_received(self, data):
try:
packet_type, packet_data = parse_packet(packet)
packet = Packet.from_bytes(data)
except ValueError:
logging.exception("invalid packet")
return
if packet_type == PacketType.RESET:
if packet.packet_type == Packet.PacketType.RESET:
logging.info(color('=== Received RESET', 'magenta'))
self.reset()
return
try:
packet_flags, packet_index = parse_packet_sequence(packet_data)
except ValueError:
return
jitter = self.jitter_stats.on_packet_received(packet)
logging.info(
f'<<< Received packet {packet_index}: '
f'flags=0x{packet_flags:02X}, '
f'{len(packet) + self.packet_io.overhead_size} bytes'
f'<<< Received packet {packet.sequence}: '
f'flags={packet.flags}, '
f'jitter={jitter:.4f}, '
f'{len(data) + self.packet_io.overhead_size} bytes',
)
if packet_index != self.expected_packet_index:
if packet.sequence != self.expected_packet_index:
logging.info(
color(
f'!!! Unexpected packet, expected {self.expected_packet_index} '
f'but received {packet_index}'
f'but received {packet.sequence}'
)
)
now = time.time()
elapsed_since_start = now - self.measurements[0][0]
elapsed_since_last = now - self.measurements[-1][0]
self.measurements.append((now, len(packet)))
self.total_bytes_received += len(packet)
instant_rx_speed = len(packet) / elapsed_since_last
self.measurements.append((now, len(data)))
self.total_bytes_received += len(data)
instant_rx_speed = len(data) / elapsed_since_last
average_rx_speed = self.total_bytes_received / elapsed_since_start
window = self.measurements[-64:]
windowed_rx_speed = sum(measurement[1] for measurement in window[1:]) / (
@@ -411,15 +517,17 @@ class Receiver:
)
)
self.expected_packet_index = packet_index + 1
self.expected_packet_index = packet.sequence + 1
if packet_flags & PACKET_FLAG_LAST:
if packet.flags & Packet.PacketFlags.LAST:
AsyncRunner.spawn(
self.packet_io.send_packet(
struct.pack('>bbI', PacketType.ACK, packet_flags, packet_index)
bytes(Packet(Packet.PacketType.ACK, packet.flags, packet.sequence))
)
)
logging.info(color('@@@ Received last packet', 'green'))
self.jitter_stats.show_stats()
if not self.linger:
self.done.set()
@@ -479,25 +587,32 @@ class Ping:
await asyncio.sleep(self.tx_start_delay)
logging.info(color('=== Sending RESET', 'magenta'))
await self.packet_io.send_packet(bytes([PacketType.RESET]))
await self.packet_io.send_packet(bytes(Packet(Packet.PacketType.RESET)))
packet_interval = self.pace / 1000
start_time = time.time()
self.next_expected_packet_index = 0
for i in range(self.tx_packet_count):
target_time = start_time + (i * packet_interval)
target_time = start_time + (i * self.pace / 1000)
now = time.time()
if now < target_time:
await asyncio.sleep(target_time - now)
now = time.time()
packet = struct.pack(
'>bbI',
PacketType.SEQUENCE,
(PACKET_FLAG_LAST if i == self.tx_packet_count - 1 else 0),
i,
) + bytes(self.tx_packet_size - 6)
packet = bytes(
Packet(
packet_type=Packet.PacketType.SEQUENCE,
flags=(
Packet.PacketFlags.LAST
if i == self.tx_packet_count - 1
else 0
),
sequence=i,
timestamp=int((now - start_time) * 1000000),
payload=bytes(self.tx_packet_size - 10),
)
)
logging.info(color(f'Sending packet {i}', 'yellow'))
self.ping_times.append(time.time())
self.ping_times.append(now)
await self.packet_io.send_packet(packet)
await self.done.wait()
@@ -531,40 +646,35 @@ class Ping:
if self.repeat:
logging.info(color('--- End of runs', 'blue'))
def on_packet_received(self, packet):
def on_packet_received(self, data):
try:
packet_type, packet_data = parse_packet(packet)
packet = Packet.from_bytes(data)
except ValueError:
return
try:
packet_flags, packet_index = parse_packet_sequence(packet_data)
except ValueError:
return
if packet_type == PacketType.ACK:
elapsed = time.time() - self.ping_times[packet_index]
if packet.packet_type == Packet.PacketType.ACK:
elapsed = time.time() - self.ping_times[packet.sequence]
rtt = elapsed * 1000
self.rtts.append(rtt)
logging.info(
color(
f'<<< Received ACK [{packet_index}], RTT={rtt:.2f}ms',
f'<<< Received ACK [{packet.sequence}], RTT={rtt:.2f}ms',
'green',
)
)
if packet_index == self.next_expected_packet_index:
if packet.sequence == self.next_expected_packet_index:
self.next_expected_packet_index += 1
else:
logging.info(
color(
f'!!! Unexpected packet, '
f'expected {self.next_expected_packet_index} '
f'but received {packet_index}'
f'but received {packet.sequence}'
)
)
if packet_flags & PACKET_FLAG_LAST:
if packet.flags & Packet.PacketFlags.LAST:
self.done.set()
return
@@ -576,89 +686,56 @@ class Pong:
expected_packet_index: int
def __init__(self, packet_io, linger):
self.reset()
self.jitter_stats = JitterStats()
self.packet_io = packet_io
self.packet_io.packet_listener = self
self.linger = linger
self.done = asyncio.Event()
self.reset()
def reset(self):
self.expected_packet_index = 0
self.receive_times = []
def on_packet_received(self, packet):
self.receive_times.append(time.time())
self.jitter_stats.reset()
def on_packet_received(self, data):
try:
packet_type, packet_data = parse_packet(packet)
packet = Packet.from_bytes(data)
except ValueError:
return
if packet_type == PacketType.RESET:
if packet.packet_type == Packet.PacketType.RESET:
logging.info(color('=== Received RESET', 'magenta'))
self.reset()
return
try:
packet_flags, packet_index = parse_packet_sequence(packet_data)
except ValueError:
return
interval = (
self.receive_times[-1] - self.receive_times[-2]
if len(self.receive_times) >= 2
else 0
)
jitter = self.jitter_stats.on_packet_received(packet)
logging.info(
color(
f'<<< Received packet {packet_index}: '
f'flags=0x{packet_flags:02X}, {len(packet)} bytes, '
f'interval={interval:.4f}',
f'<<< Received packet {packet.sequence}: '
f'flags={packet.flags}, {len(data)} bytes, '
f'jitter={jitter:.4f}',
'green',
)
)
if packet_index != self.expected_packet_index:
if packet.sequence != self.expected_packet_index:
logging.info(
color(
f'!!! Unexpected packet, expected {self.expected_packet_index} '
f'but received {packet_index}'
f'but received {packet.sequence}'
)
)
self.expected_packet_index = packet_index + 1
self.expected_packet_index = packet.sequence + 1
AsyncRunner.spawn(
self.packet_io.send_packet(
struct.pack('>bbI', PacketType.ACK, packet_flags, packet_index)
bytes(Packet(Packet.PacketType.ACK, packet.flags, packet.sequence))
)
)
if packet_flags & PACKET_FLAG_LAST:
if len(self.receive_times) >= 3:
# Show basic stats
intervals = [
self.receive_times[i + 1] - self.receive_times[i]
for i in range(len(self.receive_times) - 1)
]
log_stats('Packet intervals', intervals, 3)
# Show a histogram
bin_count = 20
bins = [0] * bin_count
interval_min = min(intervals)
interval_max = max(intervals)
interval_range = interval_max - interval_min
bin_thresholds = [
interval_min + i * (interval_range / bin_count)
for i in range(bin_count)
]
for interval in intervals:
for i in reversed(range(bin_count)):
if interval >= bin_thresholds[i]:
bins[i] += 1
break
for i in range(bin_count):
logging.info(f'@@@ >= {bin_thresholds[i]:.4f}: {bins[i]}')
if packet.flags & Packet.PacketFlags.LAST:
self.jitter_stats.show_stats()
if not self.linger:
self.done.set()
@@ -1471,7 +1548,7 @@ def create_mode_factory(ctx, default_mode):
def create_scenario_factory(ctx, default_scenario):
scenario = ctx.obj['scenario']
if scenario is None:
scenarion = default_scenario
scenario = default_scenario
def create_scenario(packet_io):
if scenario == 'send':
@@ -1530,6 +1607,7 @@ def create_scenario_factory(ctx, default_scenario):
'--att-mtu',
metavar='MTU',
type=click.IntRange(23, 517),
default=517,
help='GATT MTU (gatt-client mode)',
)
@click.option(
@@ -1605,7 +1683,7 @@ def create_scenario_factory(ctx, default_scenario):
'--packet-size',
'-s',
metavar='SIZE',
type=click.IntRange(8, 8192),
type=click.IntRange(10, 8192),
default=500,
help='Packet size (send or ping scenario)',
)

View File

@@ -37,6 +37,8 @@ from bumble.hci import (
HCI_Command_Status_Event,
HCI_READ_BUFFER_SIZE_COMMAND,
HCI_Read_Buffer_Size_Command,
HCI_LE_READ_BUFFER_SIZE_V2_COMMAND,
HCI_LE_Read_Buffer_Size_V2_Command,
HCI_READ_BD_ADDR_COMMAND,
HCI_Read_BD_ADDR_Command,
HCI_READ_LOCAL_NAME_COMMAND,
@@ -147,7 +149,7 @@ async def get_le_info(host: Host) -> None:
# -----------------------------------------------------------------------------
async def get_acl_flow_control_info(host: Host) -> None:
async def get_flow_control_info(host: Host) -> None:
print()
if host.supports_command(HCI_READ_BUFFER_SIZE_COMMAND):
@@ -160,14 +162,28 @@ async def get_acl_flow_control_info(host: Host) -> None:
f'packets of size {response.return_parameters.hc_acl_data_packet_length}',
)
if host.supports_command(HCI_LE_READ_BUFFER_SIZE_COMMAND):
if host.supports_command(HCI_LE_READ_BUFFER_SIZE_V2_COMMAND):
response = await host.send_command(
HCI_LE_Read_Buffer_Size_V2_Command(), check_result=True
)
print(
color('LE ACL Flow Control:', 'yellow'),
f'{response.return_parameters.total_num_le_acl_data_packets} '
f'packets of size {response.return_parameters.le_acl_data_packet_length}',
)
print(
color('LE ISO Flow Control:', 'yellow'),
f'{response.return_parameters.total_num_iso_data_packets} '
f'packets of size {response.return_parameters.iso_data_packet_length}',
)
elif host.supports_command(HCI_LE_READ_BUFFER_SIZE_COMMAND):
response = await host.send_command(
HCI_LE_Read_Buffer_Size_Command(), check_result=True
)
print(
color('LE ACL Flow Control:', 'yellow'),
f'{response.return_parameters.hc_total_num_le_acl_data_packets} '
f'packets of size {response.return_parameters.hc_le_acl_data_packet_length}',
f'{response.return_parameters.total_num_le_acl_data_packets} '
f'packets of size {response.return_parameters.le_acl_data_packet_length}',
)
@@ -274,8 +290,8 @@ async def async_main(latency_probes, transport):
# Get the LE info
await get_le_info(host)
# Print the ACL flow control info
await get_acl_flow_control_info(host)
# Print the flow control info
await get_flow_control_info(host)
# Get codec info
await get_codecs_info(host)

View File

@@ -39,7 +39,7 @@ import aiohttp.web
import bumble
from bumble.core import AdvertisingData
from bumble.colors import color
from bumble.device import Device, DeviceConfiguration, AdvertisingParameters
from bumble.device import Device, DeviceConfiguration, AdvertisingParameters, CisLink
from bumble.transport import open_transport
from bumble.profiles import ascs, bap, pacs
from bumble.hci import Address, CodecID, CodingFormat, HCI_IsoDataPacket
@@ -110,7 +110,7 @@ async def lc3_source_task(
sdu_length: int,
frame_duration_us: int,
device: Device,
cis_handle: int,
cis_link: CisLink,
) -> None:
logger.info(
"lc3_source_task filename=%s, sdu_length=%d, frame_duration=%.1f",
@@ -120,7 +120,6 @@ async def lc3_source_task(
)
with wave.open(filename, 'rb') as wav:
bits_per_sample = wav.getsampwidth() * 8
packet_sequence_number = 0
encoder: lc3.Encoder | None = None
@@ -150,18 +149,8 @@ async def lc3_source_task(
num_bytes=sdu_length,
bit_depth=bits_per_sample,
)
cis_link.write(sdu)
iso_packet = HCI_IsoDataPacket(
connection_handle=cis_handle,
data_total_length=sdu_length + 4,
packet_sequence_number=packet_sequence_number,
pb_flag=0b10,
packet_status_flag=0,
iso_sdu_length=sdu_length,
iso_sdu_fragment=sdu,
)
device.host.send_hci_packet(iso_packet)
packet_sequence_number += 1
sleep_time = next_round - datetime.datetime.now()
await asyncio.sleep(sleep_time.total_seconds() * 0.9)
@@ -309,6 +298,7 @@ class Speaker:
advertising_interval_min=25,
advertising_interval_max=25,
address=Address('F1:F2:F3:F4:F5:F6'),
identity_address_type=Address.RANDOM_DEVICE_ADDRESS,
)
device_config.le_enabled = True
@@ -393,7 +383,7 @@ class Speaker:
),
frame_duration_us=codec_config.frame_duration.us,
device=self.device,
cis_handle=ase.cis_link.handle,
cis_link=ase.cis_link,
),
)
else:

View File

@@ -154,15 +154,17 @@ class Controller:
'0000000060000000'
) # BR/EDR Not Supported, LE Supported (Controller)
self.manufacturer_name = 0xFFFF
self.hc_data_packet_length = 27
self.hc_total_num_data_packets = 64
self.hc_le_data_packet_length = 27
self.hc_total_num_le_data_packets = 64
self.acl_data_packet_length = 27
self.total_num_acl_data_packets = 64
self.le_acl_data_packet_length = 27
self.total_num_le_acl_data_packets = 64
self.iso_data_packet_length = 960
self.total_num_iso_data_packets = 64
self.event_mask = 0
self.event_mask_page_2 = 0
self.supported_commands = bytes.fromhex(
'2000800000c000000000e4000000a822000000000000040000f7ffff7f000000'
'30f0f9ff01008004000000000000000000000000000000000000000000000000'
'30f0f9ff01008004002000000000000000000000000000000000000000000000'
)
self.le_event_mask = 0
self.advertising_parameters = None
@@ -1181,9 +1183,9 @@ class Controller:
return struct.pack(
'<BHBHH',
HCI_SUCCESS,
self.hc_data_packet_length,
self.acl_data_packet_length,
0,
self.hc_total_num_data_packets,
self.total_num_acl_data_packets,
0,
)
@@ -1212,8 +1214,21 @@ class Controller:
return struct.pack(
'<BHB',
HCI_SUCCESS,
self.hc_le_data_packet_length,
self.hc_total_num_le_data_packets,
self.le_acl_data_packet_length,
self.total_num_le_acl_data_packets,
)
def on_hci_le_read_buffer_size_v2_command(self, _command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.2 LE Read Buffer Size Command
'''
return struct.pack(
'<BHBHB',
HCI_SUCCESS,
self.le_acl_data_packet_length,
self.total_num_le_acl_data_packets,
self.iso_data_packet_length,
self.total_num_iso_data_packets,
)
def on_hci_le_read_local_supported_features_command(self, _command):

View File

@@ -1501,7 +1501,10 @@ class AdvertisingData:
ad_data_str = f'"{ad_data.decode("utf-8")}"'
elif ad_type == AdvertisingData.COMPLETE_LOCAL_NAME:
ad_type_str = 'Complete Local Name'
ad_data_str = f'"{ad_data.decode("utf-8")}"'
try:
ad_data_str = f'"{ad_data.decode("utf-8")}"'
except UnicodeDecodeError:
ad_data_str = ad_data.hex()
elif ad_type == AdvertisingData.TX_POWER_LEVEL:
ad_type_str = 'TX Power Level'
ad_data_str = str(ad_data[0])

File diff suppressed because it is too large Load Diff

View File

@@ -48,7 +48,6 @@ from bumble.utils import ByteSerializable
if TYPE_CHECKING:
from bumble.gatt_client import AttributeProxy
from bumble.device import Connection
# -----------------------------------------------------------------------------
@@ -802,3 +801,23 @@ class ClientCharacteristicConfigurationBits(enum.IntFlag):
DEFAULT = 0x0000
NOTIFICATION = 0x0001
INDICATION = 0x0002
# -----------------------------------------------------------------------------
class ClientSupportedFeatures(enum.IntFlag):
'''
See Vol 3, Part G - 7.2 - Table 7.6: Client Supported Features bit assignments.
'''
ROBUST_CACHING = 0x01
ENHANCED_ATT_BEARER = 0x02
MULTIPLE_HANDLE_VALUE_NOTIFICATIONS = 0x04
# -----------------------------------------------------------------------------
class ServerSupportedFeatures(enum.IntFlag):
'''
See Vol 3, Part G - 7.4 - Table 7.11: Server Supported Features bit assignments.
'''
EATT_SUPPORTED = 0x01

View File

@@ -275,7 +275,7 @@ HCI_LE_CS_READ_REMOTE_SUPPORTED_CAPABILITIES_COMPLETE_EVENT = 0x2C
HCI_LE_CS_READ_REMOTE_FAE_TABLE_COMPLETE_EVENT = 0x2D
HCI_LE_CS_SECURITY_ENABLE_COMPLETE_EVENT = 0x2E
HCI_LE_CS_CONFIG_COMPLETE_EVENT = 0x2F
HCI_LE_CS_PROCEDURE_ENABLE_EVENT = 0x30
HCI_LE_CS_PROCEDURE_ENABLE_COMPLETE_EVENT = 0x30
HCI_LE_CS_SUBEVENT_RESULT_EVENT = 0x31
HCI_LE_CS_SUBEVENT_RESULT_CONTINUE_EVENT = 0x32
HCI_LE_CS_TEST_END_COMPLETE_EVENT = 0x33
@@ -599,7 +599,7 @@ HCI_LE_READ_ALL_LOCAL_SUPPORTED_FEATURES_COMMAND = hci_c
HCI_LE_READ_ALL_REMOTE_FEATURES_COMMAND = hci_command_op_code(0x08, 0x0088)
HCI_LE_CS_READ_LOCAL_SUPPORTED_CAPABILITIES_COMMAND = hci_command_op_code(0x08, 0x0089)
HCI_LE_CS_READ_REMOTE_SUPPORTED_CAPABILITIES_COMMAND = hci_command_op_code(0x08, 0x008A)
HCI_LE_CS_WRITE_CACHED_REMOTE_SUPPORTED_CAPABILITIES = hci_command_op_code(0x08, 0x008B)
HCI_LE_CS_WRITE_CACHED_REMOTE_SUPPORTED_CAPABILITIES_COMMAND = hci_command_op_code(0x08, 0x008B)
HCI_LE_CS_SECURITY_ENABLE_COMMAND = hci_command_op_code(0x08, 0x008C)
HCI_LE_CS_SET_DEFAULT_SETTINGS_COMMAND = hci_command_op_code(0x08, 0x008D)
HCI_LE_CS_READ_REMOTE_FAE_TABLE_COMMAND = hci_command_op_code(0x08, 0x008E)
@@ -751,6 +751,67 @@ class PhyBit(enum.IntFlag):
LE_CODED = 1 << HCI_LE_CODED_PHY_BIT
class CsRole(OpenIntEnum):
INITIATOR = 0x00
REFLECTOR = 0x01
class CsRoleMask(enum.IntFlag):
INITIATOR = 0x01
REFLECTOR = 0x02
class CsSyncPhy(OpenIntEnum):
LE_1M = 1
LE_2M = 2
LE_2M_2BT = 3
class CsSyncPhySupported(enum.IntFlag):
LE_2M = 0x01
LE_2M_2BT = 0x02
class RttType(OpenIntEnum):
AA_ONLY = 0x00
SOUNDING_SEQUENCE_32_BIT = 0x01
SOUNDING_SEQUENCE_96_BIT = 0x02
RANDOM_SEQUENCE_32_BIT = 0x03
RANDOM_SEQUENCE_64_BIT = 0x04
RANDOM_SEQUENCE_96_BIT = 0x05
RANDOM_SEQUENCE_128_BIT = 0x06
class CsSnr(OpenIntEnum):
SNR_18_DB = 0x00
SNR_21_DB = 0x01
SNR_24_DB = 0x02
SNR_27_DB = 0x03
SNR_30_DB = 0x04
NOT_APPLIED = 0xFF
class CsDoneStatus(OpenIntEnum):
ALL_RESULTS_COMPLETED = 0x00
PARTIAL = 0x01
ABORTED = 0x0F
class CsProcedureAbortReason(OpenIntEnum):
NO_ABORT = 0x00
LOCAL_HOST_OR_REMOTE_REQUEST = 0x01
CHANNEL_MAP_UPDATE_INSTANT_PASSED = 0x02
UNSPECIFIED = 0x0F
class CsSubeventAbortReason(OpenIntEnum):
NO_ABORT = 0x00
LOCAL_HOST_OR_REMOTE_REQUEST = 0x01
NO_CS_SYNC_RECEIVED = 0x02
SCHEDULING_CONFLICT_OR_LIMITED_RESOURCES = 0x03
UNSPECIFIED = 0x0F
# Connection Parameters
HCI_CONNECTION_INTERVAL_MS_PER_UNIT = 1.25
HCI_CONNECTION_LATENCY_MS_PER_UNIT = 1.25
@@ -971,7 +1032,7 @@ HCI_SUPPORTED_COMMANDS_MASKS = {
HCI_READ_ENCRYPTION_KEY_SIZE_COMMAND : 1 << (20*8+4),
HCI_LE_CS_READ_LOCAL_SUPPORTED_CAPABILITIES_COMMAND : 1 << (20*8+5),
HCI_LE_CS_READ_REMOTE_SUPPORTED_CAPABILITIES_COMMAND : 1 << (20*8+6),
HCI_LE_CS_WRITE_CACHED_REMOTE_SUPPORTED_CAPABILITIES : 1 << (20*8+7),
HCI_LE_CS_WRITE_CACHED_REMOTE_SUPPORTED_CAPABILITIES_COMMAND : 1 << (20*8+7),
HCI_SET_EVENT_MASK_PAGE_2_COMMAND : 1 << (22*8+2),
HCI_READ_FLOW_CONTROL_MODE_COMMAND : 1 << (23*8+0),
HCI_WRITE_FLOW_CONTROL_MODE_COMMAND : 1 << (23*8+1),
@@ -1462,6 +1523,12 @@ class LmpFeatureMask(enum.IntFlag):
# -----------------------------------------------------------------------------
# pylint: disable-next=unnecessary-lambda
STATUS_SPEC = {'size': 1, 'mapper': lambda x: HCI_Constant.status_name(x)}
CS_ROLE_SPEC = {'size': 1, 'mapper': lambda x: CsRole(x).name}
CS_ROLE_MASK_SPEC = {'size': 1, 'mapper': lambda x: CsRoleMask(x).name}
CS_SYNC_PHY_SPEC = {'size': 1, 'mapper': lambda x: CsSyncPhy(x).name}
CS_SYNC_PHY_SUPPORTED_SPEC = {'size': 1, 'mapper': lambda x: CsSyncPhySupported(x).name}
RTT_TYPE_SPEC = {'size': 1, 'mapper': lambda x: RttType(x).name}
CS_SNR_SPEC = {'size': 1, 'mapper': lambda x: CsSnr(x).name}
class CodecID(OpenIntEnum):
@@ -3539,8 +3606,8 @@ class HCI_LE_Set_Event_Mask_Command(HCI_Command):
@HCI_Command.command(
return_parameters_fields=[
('status', STATUS_SPEC),
('hc_le_acl_data_packet_length', 2),
('hc_total_num_le_acl_data_packets', 1),
('le_acl_data_packet_length', 2),
('total_num_le_acl_data_packets', 1),
]
)
class HCI_LE_Read_Buffer_Size_Command(HCI_Command):
@@ -3549,6 +3616,22 @@ class HCI_LE_Read_Buffer_Size_Command(HCI_Command):
'''
# -----------------------------------------------------------------------------
@HCI_Command.command(
return_parameters_fields=[
('status', STATUS_SPEC),
('le_acl_data_packet_length', 2),
('total_num_le_acl_data_packets', 1),
('iso_data_packet_length', 2),
('total_num_iso_data_packets', 1),
]
)
class HCI_LE_Read_Buffer_Size_V2_Command(HCI_Command):
'''
See Bluetooth spec @ 7.8.2 LE Read Buffer Size V2 Command
'''
# -----------------------------------------------------------------------------
@HCI_Command.command(
return_parameters_fields=[('status', STATUS_SPEC), ('le_features', 8)]
@@ -4936,7 +5019,7 @@ class HCI_LE_Create_BIG_Command(HCI_Command):
packing: int
framing: int
encryption: int
broadcast_code: int
broadcast_code: bytes
# -----------------------------------------------------------------------------
@@ -5059,6 +5142,275 @@ class HCI_LE_Set_Host_Feature_Command(HCI_Command):
'''
# -----------------------------------------------------------------------------
@HCI_Command.command(
return_parameters_fields=[
('status', STATUS_SPEC),
('num_config_supported', 1),
('max_consecutive_procedures_supported', 2),
('num_antennas_supported', 1),
('max_antenna_paths_supported', 1),
('roles_supported', 1),
('modes_supported', 1),
('rtt_capability', 1),
('rtt_aa_only_n', 1),
('rtt_sounding_n', 1),
('rtt_random_payload_n', 1),
('nadm_sounding_capability', 2),
('nadm_random_capability', 2),
('cs_sync_phys_supported', CS_SYNC_PHY_SUPPORTED_SPEC),
('subfeatures_supported', 2),
('t_ip1_times_supported', 2),
('t_ip2_times_supported', 2),
('t_fcs_times_supported', 2),
('t_pm_times_supported', 2),
('t_sw_time_supported', 1),
('tx_snr_capability', CS_SNR_SPEC),
]
)
class HCI_LE_CS_Read_Local_Supported_Capabilities_Command(HCI_Command):
'''
See Bluetooth spec @ 7.8.130 LE CS Read Local Supported Capabilities command
'''
# -----------------------------------------------------------------------------
@HCI_Command.command([('connection_handle', 2)])
class HCI_LE_CS_Read_Remote_Supported_Capabilities_Command(HCI_Command):
'''
See Bluetooth spec @ 7.8.131 LE CS Read Remote Supported Capabilities command
'''
# -----------------------------------------------------------------------------
@HCI_Command.command(
[
('connection_handle', 2),
('num_config_supported', 1),
('max_consecutive_procedures_supported', 2),
('num_antennas_supported', 1),
('max_antenna_paths_supported', 1),
('roles_supported', 1),
('modes_supported', 1),
('rtt_capability', 1),
('rtt_aa_only_n', 1),
('rtt_sounding_n', 1),
('rtt_random_payload_n', 1),
('nadm_sounding_capability', 2),
('nadm_random_capability', 2),
('cs_sync_phys_supported', CS_SYNC_PHY_SUPPORTED_SPEC),
('subfeatures_supported', 2),
('t_ip1_times_supported', 2),
('t_ip2_times_supported', 2),
('t_fcs_times_supported', 2),
('t_pm_times_supported', 2),
('t_sw_time_supported', 1),
('tx_snr_capability', CS_SNR_SPEC),
],
return_parameters_fields=[
('status', STATUS_SPEC),
('connection_handle', 2),
],
)
class HCI_LE_CS_Write_Cached_Remote_Supported_Capabilities_Command(HCI_Command):
'''
See Bluetooth spec @ 7.8.132 LE CS Write Cached Remote Supported Capabilities command
'''
# -----------------------------------------------------------------------------
@HCI_Command.command([('connection_handle', 2)])
class HCI_LE_CS_Security_Enable_Command(HCI_Command):
'''
See Bluetooth spec @ 7.8.133 LE CS Security Enable command
'''
# -----------------------------------------------------------------------------
@HCI_Command.command(
[
('connection_handle', 2),
(
'role_enable',
CS_ROLE_MASK_SPEC,
),
('cs_sync_antenna_selection', 1),
('max_tx_power', 1),
],
return_parameters_fields=[('status', STATUS_SPEC), ('connection_handle', 2)],
)
class HCI_LE_CS_Set_Default_Settings_Command(HCI_Command):
'''
See Bluetooth spec @ 7.8.134 LE CS Security Enable command
'''
# -----------------------------------------------------------------------------
@HCI_Command.command([('connection_handle', 2)])
class HCI_LE_CS_Read_Remote_FAE_Table_Command(HCI_Command):
'''
See Bluetooth spec @ 7.8.135 LE CS Read Remote FAE Table command
'''
# -----------------------------------------------------------------------------
@HCI_Command.command(
[
('connection_handle', 2),
('remote_fae_table', 72),
],
return_parameters_fields=[('status', STATUS_SPEC), ('connection_handle', 2)],
)
class HCI_LE_CS_Write_Cached_Remote_FAE_Table_Command(HCI_Command):
'''
See Bluetooth spec @ 7.8.136 LE CS Write Cached Remote FAE Table command
'''
# -----------------------------------------------------------------------------
@HCI_Command.command(
[
('connection_handle', 2),
('config_id', 1),
('create_context', 1),
('main_mode_type', 1),
('sub_mode_type', 1),
('min_main_mode_steps', 1),
('max_main_mode_steps', 1),
('main_mode_repetition', 1),
('mode_0_steps', 1),
('role', CS_ROLE_SPEC),
('rtt_type', RTT_TYPE_SPEC),
('cs_sync_phy', CS_SYNC_PHY_SPEC),
('channel_map', 10),
('channel_map_repetition', 1),
('channel_selection_type', 1),
('ch3c_shape', 1),
('ch3c_jump', 1),
('reserved', 1),
],
)
class HCI_LE_CS_Create_Config_Command(HCI_Command):
'''
See Bluetooth spec @ 7.8.137 LE CS Create Config command
'''
class ChannelSelectionType(OpenIntEnum):
ALGO_3B = 0
ALGO_3C = 1
class Ch3cShape(OpenIntEnum):
HAT = 0x00
X = 0x01
# -----------------------------------------------------------------------------
@HCI_Command.command(
[
('connection_handle', 2),
('config_id', 1),
],
)
class HCI_LE_CS_Remove_Config_Command(HCI_Command):
'''
See Bluetooth spec @ 7.8.138 LE CS Remove Config command
'''
# -----------------------------------------------------------------------------
@HCI_Command.command(
[('channel_classification', 10)], return_parameters_fields=[('status', STATUS_SPEC)]
)
class HCI_LE_CS_Set_Channel_Classification_Command(HCI_Command):
'''
See Bluetooth spec @ 7.8.139 LE CS Set Channel Classification command
'''
# -----------------------------------------------------------------------------
@HCI_Command.command(
[
('connection_handle', 2),
('config_id', 1),
('max_procedure_len', 2),
('min_procedure_interval', 2),
('max_procedure_interval', 2),
('max_procedure_count', 2),
('min_subevent_len', 3),
('max_subevent_len', 3),
('tone_antenna_config_selection', 1),
('phy', 1),
('tx_power_delta', 1),
('preferred_peer_antenna', 1),
('snr_control_initiator', CS_SNR_SPEC),
('snr_control_reflector', CS_SNR_SPEC),
],
return_parameters_fields=[('status', STATUS_SPEC), ('connection_handle', 2)],
)
class HCI_LE_CS_Set_Procedure_Parameters_Command(HCI_Command):
'''
See Bluetooth spec @ 7.8.140 LE CS Set Procedure Parameters command
'''
# -----------------------------------------------------------------------------
@HCI_Command.command(
[
('connection_handle', 2),
('config_id', 1),
('enable', 1),
],
)
class HCI_LE_CS_Procedure_Enable_Command(HCI_Command):
'''
See Bluetooth spec @ 7.8.141 LE CS Procedure Enable command
'''
# -----------------------------------------------------------------------------
@HCI_Command.command(
[
('main_mode_type', 1),
('sub_mode_type', 1),
('main_mode_repetition', 1),
('mode_0_steps', 1),
('role', CS_ROLE_SPEC),
('rtt_type', RTT_TYPE_SPEC),
('cs_sync_phy', CS_SYNC_PHY_SPEC),
('cs_sync_antenna_selection', 1),
('subevent_len', 3),
('subevent_interval', 2),
('max_num_subevents', 1),
('transmit_power_level', 1),
('t_ip1_time', 1),
('t_ip2_time', 1),
('t_fcs_time', 1),
('t_pm_time', 1),
('t_sw_time', 1),
('tone_antenna_config_selection', 1),
('reserved', 1),
('snr_control_initiator', CS_SNR_SPEC),
('snr_control_reflector', CS_SNR_SPEC),
('drbg_nonce', 2),
('channel_map_repetition', 1),
('override_config', 2),
('override_parameters_data', 'v'),
],
)
class HCI_LE_CS_Test_Command(HCI_Command):
'''
See Bluetooth spec @ 7.8.142 LE CS Test command
'''
# -----------------------------------------------------------------------------
@HCI_Command.command()
class HCI_LE_CS_Test_End_Command(HCI_Command):
'''
See Bluetooth spec @ 7.8.143 LE CS Test End command
'''
# -----------------------------------------------------------------------------
# HCI Events
# -----------------------------------------------------------------------------
@@ -5070,7 +5422,7 @@ class HCI_Event(HCI_Packet):
hci_packet_type = HCI_EVENT_PACKET
event_names: Dict[int, str] = {}
event_classes: Dict[int, Type[HCI_Event]] = {}
vendor_factory: Optional[Callable[[bytes], Optional[HCI_Event]]] = None
vendor_factories: list[Callable[[bytes], Optional[HCI_Event]]] = []
@staticmethod
def event(fields=()):
@@ -5128,6 +5480,19 @@ class HCI_Event(HCI_Packet):
return event_class
@classmethod
def add_vendor_factory(
cls, factory: Callable[[bytes], Optional[HCI_Event]]
) -> None:
cls.vendor_factories.append(factory)
@classmethod
def remove_vendor_factory(
cls, factory: Callable[[bytes], Optional[HCI_Event]]
) -> None:
if factory in cls.vendor_factories:
cls.vendor_factories.remove(factory)
@classmethod
def from_bytes(cls, packet: bytes) -> HCI_Event:
event_code = packet[1]
@@ -5148,13 +5513,13 @@ class HCI_Event(HCI_Packet):
elif event_code == HCI_VENDOR_EVENT:
# Invoke all the registered factories to see if any of them can handle
# the event
if cls.vendor_factory:
if event := cls.vendor_factory(parameters):
for vendor_factory in cls.vendor_factories:
if event := vendor_factory(parameters):
return event
# No factory, or the factory could not create an instance,
# return a generic vendor event
return HCI_Event(event_code, parameters)
return HCI_Vendor_Event(data=parameters)
else:
subclass = HCI_Event.event_classes.get(event_code)
if subclass is None:
@@ -5825,7 +6190,7 @@ class HCI_LE_Periodic_Advertising_Sync_Lost_Event(HCI_LE_Meta_Event):
# -----------------------------------------------------------------------------
@HCI_LE_Meta_Event.event(
[
('status', 1),
('status', STATUS_SPEC),
('advertising_handle', 1),
('connection_handle', 2),
('num_completed_extended_advertising_events', 1),
@@ -5908,6 +6273,70 @@ class HCI_LE_CIS_Request_Event(HCI_LE_Meta_Event):
'''
# -----------------------------------------------------------------------------
@HCI_LE_Meta_Event.event(
[
('status', STATUS_SPEC),
('big_handle', 1),
('big_sync_delay', 3),
('transport_latency_big', 3),
('phy', 1),
('nse', 1),
('bn', 1),
('pto', 1),
('irc', 1),
('max_pdu', 2),
('iso_interval', 2),
[('connection_handle', 2)],
]
)
class HCI_LE_Create_BIG_Complete_Event(HCI_LE_Meta_Event):
'''
See Bluetooth spec @ 7.7.65.27 LE Create BIG Complete Event
'''
# -----------------------------------------------------------------------------
@HCI_LE_Meta_Event.event([('big_handle', 1), ('reason', 1)])
class HCI_LE_Terminate_BIG_Complete_Event(HCI_LE_Meta_Event):
'''
See Bluetooth spec @ 7.7.65.28 LE Terminate BIG Complete Event
'''
# -----------------------------------------------------------------------------
@HCI_LE_Meta_Event.event(
[
('status', STATUS_SPEC),
('big_handle', 1),
('transport_latency_big', 3),
('nse', 1),
('bn', 1),
('pto', 1),
('irc', 1),
('max_pdu', 2),
('iso_interval', 2),
[('connection_handle', 2)],
]
)
class HCI_LE_BIG_Sync_Established_Event(HCI_LE_Meta_Event):
'''
See Bluetooth spec @ 7.7.65.29 LE BIG Sync Established event
'''
# -----------------------------------------------------------------------------
@HCI_LE_Meta_Event.event([('big_handle', 1), ('reason', 1)])
class HCI_LE_BIG_Sync_Lost_Event(HCI_LE_Meta_Event):
'''
See Bluetooth spec @ 7.7.65.30 LE BIG Sync Lost event
'''
# -----------------------------------------------------------------------------
@HCI_LE_Meta_Event.event(
[
@@ -5932,6 +6361,291 @@ class HCI_LE_BIGInfo_Advertising_Report_Event(HCI_LE_Meta_Event):
'''
# -----------------------------------------------------------------------------
@HCI_LE_Meta_Event.event(
[
('status', STATUS_SPEC),
('connection_handle', 2),
('num_config_supported', 1),
('max_consecutive_procedures_supported', 2),
('num_antennas_supported', 1),
('max_antenna_paths_supported', 1),
('roles_supported', 1),
('modes_supported', 1),
('rtt_capability', 1),
('rtt_aa_only_n', 1),
('rtt_sounding_n', 1),
('rtt_random_payload_n', 1),
('nadm_sounding_capability', 2),
('nadm_random_capability', 2),
('cs_sync_phys_supported', CS_SYNC_PHY_SUPPORTED_SPEC),
('subfeatures_supported', 2),
('t_ip1_times_supported', 2),
('t_ip2_times_supported', 2),
('t_fcs_times_supported', 2),
('t_pm_times_supported', 2),
('t_sw_time_supported', 1),
('tx_snr_capability', CS_SNR_SPEC),
]
)
class HCI_LE_CS_Read_Remote_Supported_Capabilities_Complete_Event(HCI_LE_Meta_Event):
'''
See Bluetooth spec @ 7.7.65.39 LE CS Read Remote Supported Capabilities Complete event
'''
status: int
connection_handle: int
num_config_supported: int
max_consecutive_procedures_supported: int
num_antennas_supported: int
max_antenna_paths_supported: int
roles_supported: int
modes_supported: int
rtt_capability: int
rtt_aa_only_n: int
rtt_sounding_n: int
rtt_random_payload_n: int
nadm_sounding_capability: int
nadm_random_capability: int
cs_sync_phys_supported: int
subfeatures_supported: int
t_ip1_times_supported: int
t_ip2_times_supported: int
t_fcs_times_supported: int
t_pm_times_supported: int
t_sw_time_supported: int
tx_snr_capability: int
# -----------------------------------------------------------------------------
@HCI_LE_Meta_Event.event(
[
('status', STATUS_SPEC),
('connection_handle', 2),
('remote_fae_table', 72),
]
)
class HCI_LE_CS_Read_Remote_FAE_Table_Complete_Event(HCI_LE_Meta_Event):
'''
See Bluetooth spec @ 7.7.65.40 LE CS Read Remote FAE Table Complete event
'''
status: int
connection_handle: int
remote_fae_table: bytes
# -----------------------------------------------------------------------------
@HCI_LE_Meta_Event.event(
[
('status', STATUS_SPEC),
('connection_handle', 2),
]
)
class HCI_LE_CS_Security_Enable_Complete_Event(HCI_LE_Meta_Event):
'''
See Bluetooth spec @ 7.7.65.41 LE CS Security Enable Complete event
'''
status: int
connection_handle: int
# -----------------------------------------------------------------------------
@HCI_LE_Meta_Event.event(
[
('status', STATUS_SPEC),
('connection_handle', 2),
('config_id', 1),
(
'action',
{
'size': 1,
'mapper': lambda x: HCI_LE_CS_Config_Complete_Event.Action(x).name,
},
),
('main_mode_type', 1),
('sub_mode_type', 1),
('min_main_mode_steps', 1),
('max_main_mode_steps', 1),
('main_mode_repetition', 1),
('mode_0_steps', 1),
('role', CS_ROLE_SPEC),
('rtt_type', RTT_TYPE_SPEC),
('cs_sync_phy', CS_SYNC_PHY_SPEC),
('channel_map', 10),
('channel_map_repetition', 1),
('channel_selection_type', 1),
('ch3c_shape', 1),
('ch3c_jump', 1),
('reserved', 1),
('t_ip1_time', 1),
('t_ip2_time', 1),
('t_fcs_time', 1),
('t_pm_time', 1),
]
)
class HCI_LE_CS_Config_Complete_Event(HCI_LE_Meta_Event):
'''
See Bluetooth spec @ 7.7.65.42 LE CS Config Complete event
'''
class Action(OpenIntEnum):
REMOVED = 0
CREATED = 1
status: int
connection_handle: int
config_id: int
action: int
main_mode_type: int
sub_mode_type: int
min_main_mode_steps: int
max_main_mode_steps: int
main_mode_repetition: int
mode_0_steps: int
role: int
rtt_type: int
cs_sync_phy: int
channel_map: bytes
channel_map_repetition: int
channel_selection_type: int
ch3c_shape: int
ch3c_jump: int
reserved: int
t_ip1_time: int
t_ip2_time: int
t_fcs_time: int
t_pm_time: int
# -----------------------------------------------------------------------------
@HCI_LE_Meta_Event.event(
[
('status', STATUS_SPEC),
('connection_handle', 2),
('config_id', 1),
('state', 1),
('tone_antenna_config_selection', 1),
('selected_tx_power', -1),
('subevent_len', 3),
('subevents_per_event', 1),
('subevent_interval', 2),
('event_interval', 2),
('procedure_interval', 2),
('procedure_count', 2),
('max_procedure_len', 2),
]
)
class HCI_LE_CS_Procedure_Enable_Complete_Event(HCI_LE_Meta_Event):
'''
See Bluetooth spec @ 7.7.65.43 LE CS Procedure Enable Complete event
'''
class State(OpenIntEnum):
DISABLED = 0
ENABLED = 1
status: int
connection_handle: int
config_id: int
state: int
tone_antenna_config_selection: int
selected_tx_power: int
subevent_len: int
subevents_per_event: int
subevent_interval: int
event_interval: int
procedure_interval: int
procedure_count: int
max_procedure_len: int
# -----------------------------------------------------------------------------
@HCI_LE_Meta_Event.event(
[
('connection_handle', 2),
('config_id', 1),
('start_acl_conn_event_counter', 2),
('procedure_counter', 2),
('frequency_compensation', 2),
('reference_power_level', -1),
('procedure_done_status', 1),
('subevent_done_status', 1),
('abort_reason', 1),
('num_antenna_paths', 1),
[
('step_mode', 1),
('step_channel', 1),
('step_data', 'v'),
],
]
)
class HCI_LE_CS_Subevent_Result_Event(HCI_LE_Meta_Event):
'''
See Bluetooth spec @ 7.7.65.44 LE CS Subevent Result event
'''
connection_handle: int
config_id: int
start_acl_conn_event_counter: int
procedure_counter: int
frequency_compensation: int
reference_power_level: int
procedure_done_status: int
subevent_done_status: int
abort_reason: int
num_antenna_paths: int
step_mode: list[int]
step_channel: list[int]
step_data: list[bytes]
# -----------------------------------------------------------------------------
@HCI_LE_Meta_Event.event(
[
('connection_handle', 2),
('config_id', 1),
('procedure_done_status', 1),
('subevent_done_status', 1),
('abort_reason', 1),
('num_antenna_paths', 1),
[
('step_mode', 1),
('step_channel', 1),
('step_data', 'v'),
],
]
)
class HCI_LE_CS_Subevent_Result_Continue_Event(HCI_LE_Meta_Event):
'''
See Bluetooth spec @ 7.7.65.45 LE CS Subevent Result Continue event
'''
connection_handle: int
config_id: int
procedure_done_status: int
subevent_done_status: int
abort_reason: int
num_antenna_paths: int
step_mode: list[int]
step_channel: list[int]
step_data: list[bytes]
# -----------------------------------------------------------------------------
@HCI_LE_Meta_Event.event(
[
('connection_handle', 2),
('status', STATUS_SPEC),
]
)
class HCI_LE_CS_Test_End_Complete_Event(HCI_LE_Meta_Event):
'''
See Bluetooth spec @ 7.7.65.46 LE CS Test End Complete event
'''
# -----------------------------------------------------------------------------
@HCI_Event.event([('status', STATUS_SPEC)])
class HCI_Inquiry_Complete_Event(HCI_Event):
@@ -6878,7 +7592,7 @@ class HCI_IsoDataPacket(HCI_Packet):
if should_include_sdu_info:
packet_sequence_number, sdu_info = struct.unpack_from('<HH', packet, pos)
iso_sdu_length = sdu_info & 0xFFF
packet_status_flag = sdu_info >> 14
packet_status_flag = (sdu_info >> 15) & 1
pos += 4
iso_sdu_fragment = packet[pos:]
@@ -6912,7 +7626,7 @@ class HCI_IsoDataPacket(HCI_Packet):
fmt += 'HH'
args += [
self.packet_sequence_number,
self.iso_sdu_length | self.packet_status_flag << 14,
self.iso_sdu_length | self.packet_status_flag << 15,
]
return struct.pack(fmt, *args) + self.iso_sdu_fragment
@@ -6920,9 +7634,10 @@ class HCI_IsoDataPacket(HCI_Packet):
return (
f'{color("ISO", "blue")}: '
f'handle=0x{self.connection_handle:04x}, '
f'pb={self.pb_flag}, '
f'ps={self.packet_status_flag}, '
f'data_total_length={self.data_total_length}, '
f'sdu={self.iso_sdu_fragment.hex()}'
f'sdu_fragment={self.iso_sdu_fragment.hex()}'
)

View File

@@ -1,4 +1,4 @@
# Copyright 2021-2022 Google LLC
# Copyright 2021-2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -34,6 +34,8 @@ from typing import (
TYPE_CHECKING,
)
import pyee
from bumble.colors import color
from bumble.l2cap import L2CAP_PDU
from bumble.snoop import Snooper
@@ -59,7 +61,19 @@ logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
class AclPacketQueue:
class DataPacketQueue(pyee.EventEmitter):
"""
Flow-control queue for host->controller data packets (ACL, ISO).
The queue holds packets associated with a connection handle. The packets
are sent to the controller, up to a maximum total number of packets in flight.
A packet is considered to be "in flight" when it has been sent to the controller
but not completed yet. Packets are no longer "in flight" when the controller
declares them as completed.
The queue emits a 'flow' event whenever one or more packets are completed.
"""
max_packet_size: int
def __init__(
@@ -68,40 +82,105 @@ class AclPacketQueue:
max_in_flight: int,
send: Callable[[hci.HCI_Packet], None],
) -> None:
super().__init__()
self.max_packet_size = max_packet_size
self.max_in_flight = max_in_flight
self.in_flight = 0
self.send = send
self.packets: Deque[hci.HCI_AclDataPacket] = collections.deque()
self._in_flight = 0 # Total number of packets in flight across all connections
self._in_flight_per_connection: dict[int, int] = collections.defaultdict(
int
) # Number of packets in flight per connection
self._send = send
self._packets: Deque[tuple[hci.HCI_Packet, int]] = collections.deque()
self._queued = 0
self._completed = 0
def enqueue(self, packet: hci.HCI_AclDataPacket) -> None:
self.packets.appendleft(packet)
self.check_queue()
@property
def queued(self) -> int:
"""Total number of packets queued since creation."""
return self._queued
if self.packets:
@property
def completed(self) -> int:
"""Total number of packets completed since creation."""
return self._completed
@property
def pending(self) -> int:
"""Number of packets that have been queued but not completed."""
return self._queued - self._completed
def enqueue(self, packet: hci.HCI_Packet, connection_handle: int) -> None:
"""Enqueue a packet associated with a connection"""
self._packets.appendleft((packet, connection_handle))
self._queued += 1
self._check_queue()
if self._packets:
logger.debug(
f'{self.in_flight} ACL packets in flight, '
f'{len(self.packets)} in queue'
f'{self._in_flight} packets in flight, '
f'{len(self._packets)} in queue'
)
def check_queue(self) -> None:
while self.packets and self.in_flight < self.max_in_flight:
packet = self.packets.pop()
self.send(packet)
self.in_flight += 1
def flush(self, connection_handle: int) -> None:
"""
Remove all packets associated with a connection.
def on_packets_completed(self, packet_count: int) -> None:
if packet_count > self.in_flight:
All packets associated with the connection that are in flight are implicitly
marked as completed, but no 'flow' event is emitted.
"""
packets_to_keep = [
(packet, handle)
for (packet, handle) in self._packets
if handle != connection_handle
]
if flushed_count := len(self._packets) - len(packets_to_keep):
self._completed += flushed_count
self._packets = collections.deque(packets_to_keep)
if connection_handle in self._in_flight_per_connection:
in_flight = self._in_flight_per_connection[connection_handle]
self._completed += in_flight
self._in_flight -= in_flight
del self._in_flight_per_connection[connection_handle]
def _check_queue(self) -> None:
while self._packets and self._in_flight < self.max_in_flight:
packet, connection_handle = self._packets.pop()
self._send(packet)
self._in_flight += 1
self._in_flight_per_connection[connection_handle] += 1
def on_packets_completed(self, packet_count: int, connection_handle: int) -> None:
"""Mark one or more packets associated with a connection as completed."""
if connection_handle not in self._in_flight_per_connection:
logger.warning(
color(
'!!! {packet_count} completed but only '
f'{self.in_flight} in flight'
)
f'received completion for unknown connection {connection_handle}'
)
packet_count = self.in_flight
return
self.in_flight -= packet_count
self.check_queue()
in_flight_for_connection = self._in_flight_per_connection[connection_handle]
if packet_count <= in_flight_for_connection:
self._in_flight_per_connection[connection_handle] -= packet_count
else:
logger.warning(
f'{packet_count} completed for {connection_handle} '
f'but only {in_flight_for_connection} in flight'
)
self._in_flight_per_connection[connection_handle] = 0
if packet_count <= self._in_flight:
self._in_flight -= packet_count
self._completed += packet_count
else:
logger.warning(
f'{packet_count} completed but only {self._in_flight} in flight'
)
self._in_flight = 0
self._completed = self._queued
self._check_queue()
self.emit('flow')
# -----------------------------------------------------------------------------
@@ -114,7 +193,7 @@ class Connection:
self.peer_address = peer_address
self.assembler = hci.HCI_AclDataPacketAssembler(self.on_acl_pdu)
self.transport = transport
acl_packet_queue: Optional[AclPacketQueue] = (
acl_packet_queue: Optional[DataPacketQueue] = (
host.le_acl_packet_queue
if transport == BT_LE_TRANSPORT
else host.acl_packet_queue
@@ -129,28 +208,37 @@ class Connection:
l2cap_pdu = L2CAP_PDU.from_bytes(pdu)
self.host.on_l2cap_pdu(self, l2cap_pdu.cid, l2cap_pdu.payload)
def __str__(self) -> str:
return (
f'Connection(transport={self.transport}, peer_address={self.peer_address})'
)
# -----------------------------------------------------------------------------
@dataclasses.dataclass
class ScoLink:
peer_address: hci.Address
handle: int
connection_handle: int
# -----------------------------------------------------------------------------
@dataclasses.dataclass
class CisLink:
peer_address: hci.Address
class IsoLink:
handle: int
packet_queue: DataPacketQueue = dataclasses.field(repr=False)
packet_sequence_number: int = 0
# -----------------------------------------------------------------------------
class Host(AbortableEventEmitter):
connections: Dict[int, Connection]
cis_links: Dict[int, CisLink]
cis_links: Dict[int, IsoLink]
bis_links: Dict[int, IsoLink]
sco_links: Dict[int, ScoLink]
acl_packet_queue: Optional[AclPacketQueue] = None
le_acl_packet_queue: Optional[AclPacketQueue] = None
bigs: dict[int, set[int]] = {} # BIG Handle to BIS Handles
acl_packet_queue: Optional[DataPacketQueue] = None
le_acl_packet_queue: Optional[DataPacketQueue] = None
iso_packet_queue: Optional[DataPacketQueue] = None
hci_sink: Optional[TransportSink] = None
hci_metadata: Dict[str, Any]
long_term_key_provider: Optional[
@@ -169,6 +257,7 @@ class Host(AbortableEventEmitter):
self.ready = False # True when we can accept incoming packets
self.connections = {} # Connections, by connection handle
self.cis_links = {} # CIS links, by connection handle
self.bis_links = {} # BIS links, by connection handle
self.sco_links = {} # SCO links, by connection handle
self.pending_command = None
self.pending_response: Optional[asyncio.Future[Any]] = None
@@ -387,6 +476,12 @@ class Host(AbortableEventEmitter):
hci.HCI_LE_TRANSMIT_POWER_REPORTING_EVENT,
hci.HCI_LE_BIGINFO_ADVERTISING_REPORT_EVENT,
hci.HCI_LE_SUBRATE_CHANGE_EVENT,
hci.HCI_LE_CS_READ_REMOTE_SUPPORTED_CAPABILITIES_COMPLETE_EVENT,
hci.HCI_LE_CS_PROCEDURE_ENABLE_COMPLETE_EVENT,
hci.HCI_LE_CS_SECURITY_ENABLE_COMPLETE_EVENT,
hci.HCI_LE_CS_CONFIG_COMPLETE_EVENT,
hci.HCI_LE_CS_SUBEVENT_RESULT_EVENT,
hci.HCI_LE_CS_SUBEVENT_RESULT_CONTINUE_EVENT,
]
)
@@ -411,39 +506,70 @@ class Host(AbortableEventEmitter):
f'hc_total_num_acl_data_packets={hc_total_num_acl_data_packets}'
)
self.acl_packet_queue = AclPacketQueue(
self.acl_packet_queue = DataPacketQueue(
max_packet_size=hc_acl_data_packet_length,
max_in_flight=hc_total_num_acl_data_packets,
send=self.send_hci_packet,
)
hc_le_acl_data_packet_length = 0
hc_total_num_le_acl_data_packets = 0
if self.supports_command(hci.HCI_LE_READ_BUFFER_SIZE_COMMAND):
le_acl_data_packet_length = 0
total_num_le_acl_data_packets = 0
iso_data_packet_length = 0
total_num_iso_data_packets = 0
if self.supports_command(hci.HCI_LE_READ_BUFFER_SIZE_V2_COMMAND):
response = await self.send_command(
hci.HCI_LE_Read_Buffer_Size_V2_Command(), check_result=True
)
le_acl_data_packet_length = (
response.return_parameters.le_acl_data_packet_length
)
total_num_le_acl_data_packets = (
response.return_parameters.total_num_le_acl_data_packets
)
iso_data_packet_length = response.return_parameters.iso_data_packet_length
total_num_iso_data_packets = (
response.return_parameters.total_num_iso_data_packets
)
logger.debug(
'HCI LE flow control: '
f'le_acl_data_packet_length={le_acl_data_packet_length},'
f'total_num_le_acl_data_packets={total_num_le_acl_data_packets}'
f'iso_data_packet_length={iso_data_packet_length},'
f'total_num_iso_data_packets={total_num_iso_data_packets}'
)
elif self.supports_command(hci.HCI_LE_READ_BUFFER_SIZE_COMMAND):
response = await self.send_command(
hci.HCI_LE_Read_Buffer_Size_Command(), check_result=True
)
hc_le_acl_data_packet_length = (
response.return_parameters.hc_le_acl_data_packet_length
le_acl_data_packet_length = (
response.return_parameters.le_acl_data_packet_length
)
hc_total_num_le_acl_data_packets = (
response.return_parameters.hc_total_num_le_acl_data_packets
total_num_le_acl_data_packets = (
response.return_parameters.total_num_le_acl_data_packets
)
logger.debug(
'HCI LE ACL flow control: '
f'hc_le_acl_data_packet_length={hc_le_acl_data_packet_length},'
f'hc_total_num_le_acl_data_packets={hc_total_num_le_acl_data_packets}'
f'le_acl_data_packet_length={le_acl_data_packet_length},'
f'total_num_le_acl_data_packets={total_num_le_acl_data_packets}'
)
if hc_le_acl_data_packet_length == 0 or hc_total_num_le_acl_data_packets == 0:
if le_acl_data_packet_length == 0 or total_num_le_acl_data_packets == 0:
# LE and Classic share the same queue
self.le_acl_packet_queue = self.acl_packet_queue
else:
# Create a separate queue for LE
self.le_acl_packet_queue = AclPacketQueue(
max_packet_size=hc_le_acl_data_packet_length,
max_in_flight=hc_total_num_le_acl_data_packets,
self.le_acl_packet_queue = DataPacketQueue(
max_packet_size=le_acl_data_packet_length,
max_in_flight=total_num_le_acl_data_packets,
send=self.send_hci_packet,
)
if iso_data_packet_length and total_num_iso_data_packets:
self.iso_packet_queue = DataPacketQueue(
max_packet_size=iso_data_packet_length,
max_in_flight=total_num_iso_data_packets,
send=self.send_hci_packet,
)
@@ -595,11 +721,78 @@ class Host(AbortableEventEmitter):
data=l2cap_pdu[offset : offset + data_total_length],
)
logger.debug(f'>>> ACL packet enqueue: (CID={cid}) {acl_packet}')
packet_queue.enqueue(acl_packet)
packet_queue.enqueue(acl_packet, connection_handle)
pb_flag = 1
offset += data_total_length
bytes_remaining -= data_total_length
def get_data_packet_queue(self, connection_handle: int) -> DataPacketQueue | None:
if connection := self.connections.get(connection_handle):
return connection.acl_packet_queue
if iso_link := self.cis_links.get(connection_handle) or self.bis_links.get(
connection_handle
):
return iso_link.packet_queue
return None
def send_iso_sdu(self, connection_handle: int, sdu: bytes) -> None:
if not (
iso_link := self.cis_links.get(connection_handle)
or self.bis_links.get(connection_handle)
):
logger.warning(f"no ISO link for connection handle {connection_handle}")
return
if iso_link.packet_queue is None:
logger.warning("ISO link has no data packet queue")
return
bytes_remaining = len(sdu)
offset = 0
while bytes_remaining:
is_first_fragment = offset == 0
header_length = 4 if is_first_fragment else 0
assert iso_link.packet_queue.max_packet_size > header_length
fragment_length = min(
bytes_remaining, iso_link.packet_queue.max_packet_size - header_length
)
is_last_fragment = bytes_remaining == fragment_length
iso_sdu_fragment = sdu[offset : offset + fragment_length]
iso_link.packet_queue.enqueue(
(
hci.HCI_IsoDataPacket(
connection_handle=connection_handle,
data_total_length=header_length + fragment_length,
packet_sequence_number=iso_link.packet_sequence_number,
pb_flag=0b10 if is_last_fragment else 0b00,
packet_status_flag=0,
iso_sdu_length=len(sdu),
iso_sdu_fragment=iso_sdu_fragment,
)
if is_first_fragment
else hci.HCI_IsoDataPacket(
connection_handle=connection_handle,
data_total_length=fragment_length,
pb_flag=0b11 if is_last_fragment else 0b01,
iso_sdu_fragment=iso_sdu_fragment,
)
),
connection_handle,
)
offset += fragment_length
bytes_remaining -= fragment_length
iso_link.packet_sequence_number = (iso_link.packet_sequence_number + 1) & 0xFFFF
def remove_big(self, big_handle: int) -> None:
if big := self.bigs.pop(big_handle, None):
for connection_handle in big:
if bis_link := self.bis_links.pop(connection_handle, None):
bis_link.packet_queue.flush(bis_link.handle)
def supports_command(self, op_code: int) -> bool:
return (
self.local_supported_commands
@@ -727,16 +920,17 @@ class Host(AbortableEventEmitter):
def on_hci_command_status_event(self, event):
return self.on_command_processed(event)
def on_hci_number_of_completed_packets_event(self, event):
def on_hci_number_of_completed_packets_event(
self, event: hci.HCI_Number_Of_Completed_Packets_Event
) -> None:
for connection_handle, num_completed_packets in zip(
event.connection_handles, event.num_completed_packets
):
if connection := self.connections.get(connection_handle):
connection.acl_packet_queue.on_packets_completed(num_completed_packets)
elif not (
self.cis_links.get(connection_handle)
or self.sco_links.get(connection_handle)
):
if queue := self.get_data_packet_queue(connection_handle):
queue.on_packets_completed(num_completed_packets, connection_handle)
continue
if connection_handle not in self.sco_links:
logger.warning(
'received packet completion event for unknown handle '
f'0x{connection_handle:04X}'
@@ -854,11 +1048,7 @@ class Host(AbortableEventEmitter):
return
if event.status == hci.HCI_SUCCESS:
logger.debug(
f'### DISCONNECTION: [0x{handle:04X}] '
f'{connection.peer_address} '
f'reason={event.reason}'
)
logger.debug(f'### DISCONNECTION: {connection}, reason={event.reason}')
# Notify the listeners
self.emit('disconnection', handle, event.reason)
@@ -869,6 +1059,12 @@ class Host(AbortableEventEmitter):
or self.cis_links.pop(handle, 0)
or self.sco_links.pop(handle, 0)
)
# Flush the data queues
self.acl_packet_queue.flush(handle)
self.le_acl_packet_queue.flush(handle)
if self.iso_packet_queue:
self.iso_packet_queue.flush(handle)
else:
logger.debug(f'### DISCONNECTION FAILED: {event.status}')
@@ -953,12 +1149,68 @@ class Host(AbortableEventEmitter):
event.cis_id,
)
def on_hci_le_create_big_complete_event(self, event):
self.bigs[event.big_handle] = set(event.connection_handle)
if self.iso_packet_queue is None:
logger.warning("BIS established but ISO packets not supported")
for connection_handle in event.connection_handle:
self.bis_links[connection_handle] = IsoLink(
connection_handle, self.iso_packet_queue
)
self.emit(
'big_establishment',
event.status,
event.big_handle,
event.connection_handle,
event.big_sync_delay,
event.transport_latency_big,
event.phy,
event.nse,
event.bn,
event.pto,
event.irc,
event.max_pdu,
event.iso_interval,
)
def on_hci_le_big_sync_established_event(self, event):
self.bigs[event.big_handle] = set(event.connection_handle)
for connection_handle in event.connection_handle:
self.bis_links[connection_handle] = IsoLink(
connection_handle, self.iso_packet_queue
)
self.emit(
'big_sync_establishment',
event.status,
event.big_handle,
event.transport_latency_big,
event.nse,
event.bn,
event.pto,
event.irc,
event.max_pdu,
event.iso_interval,
event.connection_handle,
)
def on_hci_le_big_sync_lost_event(self, event):
self.remove_big(event.big_handle)
self.emit('big_sync_lost', event.big_handle, event.reason)
def on_hci_le_terminate_big_complete_event(self, event):
self.remove_big(event.big_handle)
self.emit('big_termination', event.reason, event.big_handle)
def on_hci_le_cis_established_event(self, event):
# The remaining parameters are unused for now.
if event.status == hci.HCI_SUCCESS:
self.cis_links[event.connection_handle] = CisLink(
handle=event.connection_handle,
peer_address=hci.Address.ANY,
if self.iso_packet_queue is None:
logger.warning("CIS established but ISO packets not supported")
self.cis_links[event.connection_handle] = IsoLink(
handle=event.connection_handle, packet_queue=self.iso_packet_queue
)
self.emit('cis_establishment', event.connection_handle)
else:
@@ -1028,7 +1280,7 @@ class Host(AbortableEventEmitter):
self.sco_links[event.connection_handle] = ScoLink(
peer_address=event.bd_addr,
handle=event.connection_handle,
connection_handle=event.connection_handle,
)
# Notify the client
@@ -1249,5 +1501,23 @@ class Host(AbortableEventEmitter):
int.from_bytes(event.le_features, 'little'),
)
def on_hci_le_cs_read_remote_supported_capabilities_complete_event(self, event):
self.emit('cs_remote_supported_capabilities', event)
def on_hci_le_cs_security_enable_complete_event(self, event):
self.emit('cs_security', event)
def on_hci_le_cs_config_complete_event(self, event):
self.emit('cs_config', event)
def on_hci_le_cs_procedure_enable_complete_event(self, event):
self.emit('cs_procedure', event)
def on_hci_le_cs_subevent_result_event(self, event):
self.emit('cs_subevent_result', event)
def on_hci_le_cs_subevent_result_continue_event(self, event):
self.emit('cs_subevent_result_continue', event)
def on_hci_vendor_event(self, event):
self.emit('vendor_event', event)

View File

@@ -773,7 +773,6 @@ class ClassicChannel(EventEmitter):
self.psm = psm
self.source_cid = source_cid
self.destination_cid = 0
self.response = None
self.connection_result = None
self.disconnection_result = None
self.sink = None
@@ -783,27 +782,15 @@ class ClassicChannel(EventEmitter):
self.state = new_state
def send_pdu(self, pdu: Union[SupportsBytes, bytes]) -> None:
if self.state != self.State.OPEN:
raise InvalidStateError('channel not open')
self.manager.send_pdu(self.connection, self.destination_cid, pdu)
def send_control_frame(self, frame: L2CAP_Control_Frame) -> None:
self.manager.send_control_frame(self.connection, self.signaling_cid, frame)
async def send_request(self, request: SupportsBytes) -> bytes:
# Check that there isn't already a request pending
if self.response:
raise InvalidStateError('request already pending')
if self.state != self.State.OPEN:
raise InvalidStateError('channel not open')
self.response = asyncio.get_running_loop().create_future()
self.send_pdu(request)
return await self.response
def on_pdu(self, pdu: bytes) -> None:
if self.response:
self.response.set_result(pdu)
self.response = None
elif self.sink:
if self.sink:
# pylint: disable=not-callable
self.sink(pdu)
else:

View File

@@ -17,6 +17,7 @@
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import enum
import logging
import struct
@@ -258,8 +259,8 @@ class AseReasonCode(enum.IntEnum):
# -----------------------------------------------------------------------------
class AudioRole(enum.IntEnum):
SINK = hci.HCI_LE_Setup_ISO_Data_Path_Command.Direction.CONTROLLER_TO_HOST
SOURCE = hci.HCI_LE_Setup_ISO_Data_Path_Command.Direction.HOST_TO_CONTROLLER
SINK = device.CisLink.Direction.CONTROLLER_TO_HOST
SOURCE = device.CisLink.Direction.HOST_TO_CONTROLLER
# -----------------------------------------------------------------------------
@@ -354,16 +355,7 @@ class AseStateMachine(gatt.Characteristic):
cis_link.on('disconnection', self.on_cis_disconnection)
async def post_cis_established():
await self.service.device.send_command(
hci.HCI_LE_Setup_ISO_Data_Path_Command(
connection_handle=cis_link.handle,
data_path_direction=self.role,
data_path_id=0x00, # Fixed HCI
codec_id=hci.CodingFormat(hci.CodecID.TRANSPARENT),
controller_delay=0,
codec_configuration=b'',
)
)
await cis_link.setup_data_path(direction=self.role)
if self.role == AudioRole.SINK:
self.state = self.State.STREAMING
await self.service.device.notify_subscribers(self, self.value)
@@ -511,12 +503,8 @@ class AseStateMachine(gatt.Characteristic):
self.state = self.State.RELEASING
async def remove_cis_async():
await self.service.device.send_command(
hci.HCI_LE_Remove_ISO_Data_Path_Command(
connection_handle=self.cis_link.handle,
data_path_direction=self.role,
)
)
if self.cis_link:
await self.cis_link.remove_data_path(self.role)
self.state = self.State.IDLE
await self.service.device.notify_subscribers(self, self.value)

View File

@@ -0,0 +1,166 @@
# Copyright 2021-2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import struct
from typing import TYPE_CHECKING
from bumble import att
from bumble import gatt
from bumble import gatt_client
from bumble import crypto
if TYPE_CHECKING:
from bumble import device
# -----------------------------------------------------------------------------
class GenericAttributeProfileService(gatt.TemplateService):
'''See Vol 3, Part G - 7 - DEFINED GENERIC ATTRIBUTE PROFILE SERVICE.'''
UUID = gatt.GATT_GENERIC_ATTRIBUTE_SERVICE
client_supported_features_characteristic: gatt.Characteristic | None = None
server_supported_features_characteristic: gatt.Characteristic | None = None
database_hash_characteristic: gatt.Characteristic | None = None
service_changed_characteristic: gatt.Characteristic | None = None
def __init__(
self,
server_supported_features: gatt.ServerSupportedFeatures | None = None,
database_hash_enabled: bool = True,
service_change_enabled: bool = True,
) -> None:
if server_supported_features is not None:
self.server_supported_features_characteristic = gatt.Characteristic(
uuid=gatt.GATT_SERVER_SUPPORTED_FEATURES_CHARACTERISTIC,
properties=gatt.Characteristic.Properties.READ,
permissions=gatt.Characteristic.Permissions.READABLE,
value=bytes([server_supported_features]),
)
if database_hash_enabled:
self.database_hash_characteristic = gatt.Characteristic(
uuid=gatt.GATT_DATABASE_HASH_CHARACTERISTIC,
properties=gatt.Characteristic.Properties.READ,
permissions=gatt.Characteristic.Permissions.READABLE,
value=gatt.CharacteristicValue(read=self.get_database_hash),
)
if service_change_enabled:
self.service_changed_characteristic = gatt.Characteristic(
uuid=gatt.GATT_SERVICE_CHANGED_CHARACTERISTIC,
properties=gatt.Characteristic.Properties.INDICATE,
permissions=gatt.Characteristic.Permissions(0),
value=b'',
)
if (database_hash_enabled and service_change_enabled) or (
server_supported_features
and (
server_supported_features & gatt.ServerSupportedFeatures.EATT_SUPPORTED
)
): # TODO: Support Multiple Handle Value Notifications
self.client_supported_features_characteristic = gatt.Characteristic(
uuid=gatt.GATT_CLIENT_SUPPORTED_FEATURES_CHARACTERISTIC,
properties=(
gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.WRITE
),
permissions=(
gatt.Characteristic.Permissions.READABLE
| gatt.Characteristic.Permissions.WRITEABLE
),
value=bytes(1),
)
super().__init__(
characteristics=[
c
for c in (
self.service_changed_characteristic,
self.client_supported_features_characteristic,
self.database_hash_characteristic,
self.server_supported_features_characteristic,
)
if c is not None
],
primary=True,
)
@classmethod
def get_attribute_data(cls, attribute: att.Attribute) -> bytes:
if attribute.type in (
gatt.GATT_PRIMARY_SERVICE_ATTRIBUTE_TYPE,
gatt.GATT_SECONDARY_SERVICE_ATTRIBUTE_TYPE,
gatt.GATT_INCLUDE_ATTRIBUTE_TYPE,
gatt.GATT_CHARACTERISTIC_ATTRIBUTE_TYPE,
gatt.GATT_CHARACTERISTIC_EXTENDED_PROPERTIES_DESCRIPTOR,
):
return (
struct.pack("<H", attribute.handle)
+ attribute.type.to_bytes()
+ attribute.value
)
elif attribute.type in (
gatt.GATT_CHARACTERISTIC_USER_DESCRIPTION_DESCRIPTOR,
gatt.GATT_CLIENT_CHARACTERISTIC_CONFIGURATION_DESCRIPTOR,
gatt.GATT_SERVER_CHARACTERISTIC_CONFIGURATION_DESCRIPTOR,
gatt.GATT_CHARACTERISTIC_PRESENTATION_FORMAT_DESCRIPTOR,
gatt.GATT_CHARACTERISTIC_AGGREGATE_FORMAT_DESCRIPTOR,
):
return struct.pack("<H", attribute.handle) + attribute.type.to_bytes()
return b''
def get_database_hash(self, connection: device.Connection | None) -> bytes:
assert connection
m = b''.join(
[
self.get_attribute_data(attribute)
for attribute in connection.device.gatt_server.attributes
]
)
return crypto.aes_cmac(m=m, k=bytes(16))
class GenericAttributeProfileServiceProxy(gatt_client.ProfileServiceProxy):
SERVICE_CLASS = GenericAttributeProfileService
client_supported_features_characteristic: gatt_client.CharacteristicProxy | None = (
None
)
server_supported_features_characteristic: gatt_client.CharacteristicProxy | None = (
None
)
database_hash_characteristic: gatt_client.CharacteristicProxy | None = None
service_changed_characteristic: gatt_client.CharacteristicProxy | None = None
_CHARACTERISTICS = {
gatt.GATT_CLIENT_SUPPORTED_FEATURES_CHARACTERISTIC: 'client_supported_features_characteristic',
gatt.GATT_SERVER_SUPPORTED_FEATURES_CHARACTERISTIC: 'server_supported_features_characteristic',
gatt.GATT_DATABASE_HASH_CHARACTERISTIC: 'database_hash_characteristic',
gatt.GATT_SERVICE_CHANGED_CHARACTERISTIC: 'service_changed_characteristic',
}
def __init__(self, service_proxy: gatt_client.ServiceProxy) -> None:
self.service_proxy = service_proxy
for uuid, attribute_name in self._CHARACTERISTICS.items():
if characteristics := self.service_proxy.get_characteristics_by_uuid(uuid):
setattr(self, attribute_name, characteristics[0])

View File

@@ -16,15 +16,21 @@
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import asyncio
import logging
import struct
from typing import Dict, List, Type, Optional, Tuple, Union, NewType, TYPE_CHECKING
from typing import Iterable, NewType, Optional, Union, Sequence, Type, TYPE_CHECKING
from typing_extensions import Self
from . import core, l2cap
from .colors import color
from .core import InvalidStateError, InvalidArgumentError, InvalidPacketError
from .hci import HCI_Object, name_or_number, key_with_value
from bumble import core, l2cap
from bumble.colors import color
from bumble.core import (
InvalidStateError,
InvalidArgumentError,
InvalidPacketError,
ProtocolError,
)
from bumble.hci import HCI_Object, name_or_number, key_with_value
if TYPE_CHECKING:
from .device import Device, Connection
@@ -124,7 +130,7 @@ SDP_ATTRIBUTE_ID_NAMES = {
SDP_PUBLIC_BROWSE_ROOT = core.UUID.from_16_bits(0x1002, 'PublicBrowseRoot')
# To be used in searches where an attribute ID list allows a range to be specified
SDP_ALL_ATTRIBUTES_RANGE = (0x0000FFFF, 4) # Express this as tuple so we can convey the desired encoding size
SDP_ALL_ATTRIBUTES_RANGE = (0x0000, 0xFFFF)
# fmt: on
# pylint: enable=line-too-long
@@ -242,11 +248,11 @@ class DataElement:
return DataElement(DataElement.BOOLEAN, value)
@staticmethod
def sequence(value: List[DataElement]) -> DataElement:
def sequence(value: Iterable[DataElement]) -> DataElement:
return DataElement(DataElement.SEQUENCE, value)
@staticmethod
def alternative(value: List[DataElement]) -> DataElement:
def alternative(value: Iterable[DataElement]) -> DataElement:
return DataElement(DataElement.ALTERNATIVE, value)
@staticmethod
@@ -473,7 +479,9 @@ class ServiceAttribute:
self.value = value
@staticmethod
def list_from_data_elements(elements: List[DataElement]) -> List[ServiceAttribute]:
def list_from_data_elements(
elements: Sequence[DataElement],
) -> list[ServiceAttribute]:
attribute_list = []
for i in range(0, len(elements) // 2):
attribute_id, attribute_value = elements[2 * i : 2 * (i + 1)]
@@ -486,7 +494,7 @@ class ServiceAttribute:
@staticmethod
def find_attribute_in_list(
attribute_list: List[ServiceAttribute], attribute_id: int
attribute_list: Iterable[ServiceAttribute], attribute_id: int
) -> Optional[DataElement]:
return next(
(
@@ -534,7 +542,12 @@ class SDP_PDU:
See Bluetooth spec @ Vol 3, Part B - 4.2 PROTOCOL DATA UNIT FORMAT
'''
sdp_pdu_classes: Dict[int, Type[SDP_PDU]] = {}
RESPONSE_PDU_IDS = {
SDP_SERVICE_SEARCH_REQUEST: SDP_SERVICE_SEARCH_RESPONSE,
SDP_SERVICE_ATTRIBUTE_REQUEST: SDP_SERVICE_ATTRIBUTE_RESPONSE,
SDP_SERVICE_SEARCH_ATTRIBUTE_REQUEST: SDP_SERVICE_SEARCH_ATTRIBUTE_RESPONSE,
}
sdp_pdu_classes: dict[int, Type[SDP_PDU]] = {}
name = None
pdu_id = 0
@@ -558,7 +571,7 @@ class SDP_PDU:
@staticmethod
def parse_service_record_handle_list_preceded_by_count(
data: bytes, offset: int
) -> Tuple[int, List[int]]:
) -> tuple[int, list[int]]:
count = struct.unpack_from('>H', data, offset - 2)[0]
handle_list = [
struct.unpack_from('>I', data, offset + x * 4)[0] for x in range(count)
@@ -639,6 +652,8 @@ class SDP_ErrorResponse(SDP_PDU):
See Bluetooth spec @ Vol 3, Part B - 4.4.1 SDP_ErrorResponse PDU
'''
error_code: int
# -----------------------------------------------------------------------------
@SDP_PDU.subclass(
@@ -675,7 +690,7 @@ class SDP_ServiceSearchResponse(SDP_PDU):
See Bluetooth spec @ Vol 3, Part B - 4.5.2 SDP_ServiceSearchResponse PDU
'''
service_record_handle_list: List[int]
service_record_handle_list: list[int]
total_service_record_count: int
current_service_record_count: int
continuation_state: bytes
@@ -752,31 +767,99 @@ class SDP_ServiceSearchAttributeResponse(SDP_PDU):
See Bluetooth spec @ Vol 3, Part B - 4.7.2 SDP_ServiceSearchAttributeResponse PDU
'''
attribute_list_byte_count: int
attribute_list: bytes
attribute_lists_byte_count: int
attribute_lists: bytes
continuation_state: bytes
# -----------------------------------------------------------------------------
class Client:
channel: Optional[l2cap.ClassicChannel]
def __init__(self, connection: Connection) -> None:
def __init__(self, connection: Connection, mtu: int = 0) -> None:
self.connection = connection
self.pending_request = None
self.channel = None
self.channel: Optional[l2cap.ClassicChannel] = None
self.mtu = mtu
self.request_semaphore = asyncio.Semaphore(1)
self.pending_request: Optional[SDP_PDU] = None
self.pending_response: Optional[asyncio.futures.Future[SDP_PDU]] = None
self.next_transaction_id = 0
async def connect(self) -> None:
self.channel = await self.connection.create_l2cap_channel(
spec=l2cap.ClassicChannelSpec(SDP_PSM)
spec=(
l2cap.ClassicChannelSpec(SDP_PSM, self.mtu)
if self.mtu
else l2cap.ClassicChannelSpec(SDP_PSM)
)
)
self.channel.sink = self.on_pdu
async def disconnect(self) -> None:
if self.channel:
await self.channel.disconnect()
self.channel = None
async def search_services(self, uuids: List[core.UUID]) -> List[int]:
def make_transaction_id(self) -> int:
transaction_id = self.next_transaction_id
self.next_transaction_id = (self.next_transaction_id + 1) & 0xFFFF
return transaction_id
def on_pdu(self, pdu: bytes) -> None:
if not self.pending_request:
logger.warning('received response with no pending request')
return
assert self.pending_response is not None
response = SDP_PDU.from_bytes(pdu)
# Check that the transaction ID is what we expect
if self.pending_request.transaction_id != response.transaction_id:
logger.warning(
f"received response with transaction ID {response.transaction_id} "
f"but expected {self.pending_request.transaction_id}"
)
return
# Check if the response is an error
if isinstance(response, SDP_ErrorResponse):
self.pending_response.set_exception(
ProtocolError(error_code=response.error_code)
)
return
# Check that the type of the response matches the request
if response.pdu_id != SDP_PDU.RESPONSE_PDU_IDS.get(self.pending_request.pdu_id):
logger.warning("response type mismatch")
return
self.pending_response.set_result(response)
async def send_request(self, request: SDP_PDU) -> SDP_PDU:
assert self.channel is not None
async with self.request_semaphore:
assert self.pending_request is None
assert self.pending_response is None
# Create a future value to hold the eventual response
self.pending_response = asyncio.get_running_loop().create_future()
self.pending_request = request
try:
self.channel.send_pdu(bytes(request))
return await self.pending_response
finally:
self.pending_request = None
self.pending_response = None
async def search_services(self, uuids: Iterable[core.UUID]) -> list[int]:
"""
Search for services by UUID.
Args:
uuids: service the UUIDs to search for.
Returns:
A list of matching service record handles.
"""
if self.pending_request is not None:
raise InvalidStateError('request already pending')
if self.channel is None:
@@ -791,16 +874,16 @@ class Client:
continuation_state = bytes([0])
watchdog = SDP_CONTINUATION_WATCHDOG
while watchdog > 0:
response_pdu = await self.channel.send_request(
response = await self.send_request(
SDP_ServiceSearchRequest(
transaction_id=0, # Transaction ID TODO: pick a real value
transaction_id=self.make_transaction_id(),
service_search_pattern=service_search_pattern,
maximum_service_record_count=0xFFFF,
continuation_state=continuation_state,
)
)
response = SDP_PDU.from_bytes(response_pdu)
logger.debug(f'<<< Response: {response}')
assert isinstance(response, SDP_ServiceSearchResponse)
service_record_handle_list += response.service_record_handle_list
continuation_state = response.continuation_state
if len(continuation_state) == 1 and continuation_state[0] == 0:
@@ -811,8 +894,21 @@ class Client:
return service_record_handle_list
async def search_attributes(
self, uuids: List[core.UUID], attribute_ids: List[Union[int, Tuple[int, int]]]
) -> List[List[ServiceAttribute]]:
self,
uuids: Iterable[core.UUID],
attribute_ids: Iterable[Union[int, tuple[int, int]]],
) -> list[list[ServiceAttribute]]:
"""
Search for attributes by UUID and attribute IDs.
Args:
uuids: the service UUIDs to search for.
attribute_ids: list of attribute IDs or (start, end) attribute ID ranges.
(use (0, 0xFFFF) to include all attributes)
Returns:
A list of list of attributes, one list per matching service.
"""
if self.pending_request is not None:
raise InvalidStateError('request already pending')
if self.channel is None:
@@ -824,8 +920,8 @@ class Client:
attribute_id_list = DataElement.sequence(
[
(
DataElement.unsigned_integer(
attribute_id[0], value_size=attribute_id[1]
DataElement.unsigned_integer_32(
attribute_id[0] << 16 | attribute_id[1]
)
if isinstance(attribute_id, tuple)
else DataElement.unsigned_integer_16(attribute_id)
@@ -839,17 +935,17 @@ class Client:
continuation_state = bytes([0])
watchdog = SDP_CONTINUATION_WATCHDOG
while watchdog > 0:
response_pdu = await self.channel.send_request(
response = await self.send_request(
SDP_ServiceSearchAttributeRequest(
transaction_id=0, # Transaction ID TODO: pick a real value
transaction_id=self.make_transaction_id(),
service_search_pattern=service_search_pattern,
maximum_attribute_byte_count=0xFFFF,
attribute_id_list=attribute_id_list,
continuation_state=continuation_state,
)
)
response = SDP_PDU.from_bytes(response_pdu)
logger.debug(f'<<< Response: {response}')
assert isinstance(response, SDP_ServiceSearchAttributeResponse)
accumulator += response.attribute_lists
continuation_state = response.continuation_state
if len(continuation_state) == 1 and continuation_state[0] == 0:
@@ -872,8 +968,18 @@ class Client:
async def get_attributes(
self,
service_record_handle: int,
attribute_ids: List[Union[int, Tuple[int, int]]],
) -> List[ServiceAttribute]:
attribute_ids: Iterable[Union[int, tuple[int, int]]],
) -> list[ServiceAttribute]:
"""
Get attributes for a service.
Args:
service_record_handle: the handle for a service
attribute_ids: list or attribute IDs or (start, end) attribute ID handles.
Returns:
A list of attributes.
"""
if self.pending_request is not None:
raise InvalidStateError('request already pending')
if self.channel is None:
@@ -882,8 +988,8 @@ class Client:
attribute_id_list = DataElement.sequence(
[
(
DataElement.unsigned_integer(
attribute_id[0], value_size=attribute_id[1]
DataElement.unsigned_integer_32(
attribute_id[0] << 16 | attribute_id[1]
)
if isinstance(attribute_id, tuple)
else DataElement.unsigned_integer_16(attribute_id)
@@ -897,17 +1003,17 @@ class Client:
continuation_state = bytes([0])
watchdog = SDP_CONTINUATION_WATCHDOG
while watchdog > 0:
response_pdu = await self.channel.send_request(
response = await self.send_request(
SDP_ServiceAttributeRequest(
transaction_id=0, # Transaction ID TODO: pick a real value
transaction_id=self.make_transaction_id(),
service_record_handle=service_record_handle,
maximum_attribute_byte_count=0xFFFF,
attribute_id_list=attribute_id_list,
continuation_state=continuation_state,
)
)
response = SDP_PDU.from_bytes(response_pdu)
logger.debug(f'<<< Response: {response}')
assert isinstance(response, SDP_ServiceAttributeResponse)
accumulator += response.attribute_list
continuation_state = response.continuation_state
if len(continuation_state) == 1 and continuation_state[0] == 0:
@@ -933,17 +1039,17 @@ class Client:
# -----------------------------------------------------------------------------
class Server:
CONTINUATION_STATE = bytes([0x01, 0x43])
CONTINUATION_STATE = bytes([0x01, 0x00])
channel: Optional[l2cap.ClassicChannel]
Service = NewType('Service', List[ServiceAttribute])
service_records: Dict[int, Service]
current_response: Union[None, bytes, Tuple[int, List[int]]]
Service = NewType('Service', list[ServiceAttribute])
service_records: dict[int, Service]
current_response: Union[None, bytes, tuple[int, list[int]]]
def __init__(self, device: Device) -> None:
self.device = device
self.service_records = {} # Service records maps, by record handle
self.channel = None
self.current_response = None
self.current_response = None # Current response data, used for continuations
def register(self, l2cap_channel_manager: l2cap.ChannelManager) -> None:
l2cap_channel_manager.create_classic_server(
@@ -954,7 +1060,7 @@ class Server:
logger.debug(f'{color(">>> Sending SDP Response", "blue")}: {response}')
self.channel.send_pdu(response)
def match_services(self, search_pattern: DataElement) -> Dict[int, Service]:
def match_services(self, search_pattern: DataElement) -> dict[int, Service]:
# Find the services for which the attributes in the pattern is a subset of the
# service's attribute values (NOTE: the value search recurses into sequences)
matching_services = {}
@@ -1011,6 +1117,31 @@ class Server:
)
)
def check_continuation(
self,
continuation_state: bytes,
transaction_id: int,
) -> Optional[bool]:
# Check if this is a valid continuation
if len(continuation_state) > 1:
if (
self.current_response is None
or continuation_state != self.CONTINUATION_STATE
):
self.send_response(
SDP_ErrorResponse(
transaction_id=transaction_id,
error_code=SDP_INVALID_CONTINUATION_STATE_ERROR,
)
)
return None
return True
# Cleanup any partial response leftover
self.current_response = None
return False
def get_next_response_payload(self, maximum_size):
if len(self.current_response) > maximum_size:
payload = self.current_response[:maximum_size]
@@ -1025,7 +1156,7 @@ class Server:
@staticmethod
def get_service_attributes(
service: Service, attribute_ids: List[DataElement]
service: Service, attribute_ids: Iterable[DataElement]
) -> DataElement:
attributes = []
for attribute_id in attribute_ids:
@@ -1053,30 +1184,24 @@ class Server:
def on_sdp_service_search_request(self, request: SDP_ServiceSearchRequest) -> None:
# Check if this is a continuation
if len(request.continuation_state) > 1:
if self.current_response is None:
self.send_response(
SDP_ErrorResponse(
transaction_id=request.transaction_id,
error_code=SDP_INVALID_CONTINUATION_STATE_ERROR,
)
)
return
else:
# Cleanup any partial response leftover
self.current_response = None
if (
continuation := self.check_continuation(
request.continuation_state, request.transaction_id
)
) is None:
return
if not continuation:
# Find the matching services
matching_services = self.match_services(request.service_search_pattern)
service_record_handles = list(matching_services.keys())
logger.debug(f'Service Record Handles: {service_record_handles}')
# Only return up to the maximum requested
service_record_handles_subset = service_record_handles[
: request.maximum_service_record_count
]
# Serialize to a byte array, and remember the total count
logger.debug(f'Service Record Handles: {service_record_handles}')
self.current_response = (
len(service_record_handles),
service_record_handles_subset,
@@ -1084,15 +1209,21 @@ class Server:
# Respond, keeping any unsent handles for later
assert isinstance(self.current_response, tuple)
service_record_handles = self.current_response[1][
: request.maximum_service_record_count
assert self.channel is not None
total_service_record_count, service_record_handles = self.current_response
maximum_service_record_count = (self.channel.peer_mtu - 11) // 4
service_record_handles_remaining = service_record_handles[
maximum_service_record_count:
]
service_record_handles = service_record_handles[:maximum_service_record_count]
self.current_response = (
self.current_response[0],
self.current_response[1][request.maximum_service_record_count :],
total_service_record_count,
service_record_handles_remaining,
)
continuation_state = (
Server.CONTINUATION_STATE if self.current_response[1] else bytes([0])
Server.CONTINUATION_STATE
if service_record_handles_remaining
else bytes([0])
)
service_record_handle_list = b''.join(
[struct.pack('>I', handle) for handle in service_record_handles]
@@ -1100,7 +1231,7 @@ class Server:
self.send_response(
SDP_ServiceSearchResponse(
transaction_id=request.transaction_id,
total_service_record_count=self.current_response[0],
total_service_record_count=total_service_record_count,
current_service_record_count=len(service_record_handles),
service_record_handle_list=service_record_handle_list,
continuation_state=continuation_state,
@@ -1111,19 +1242,14 @@ class Server:
self, request: SDP_ServiceAttributeRequest
) -> None:
# Check if this is a continuation
if len(request.continuation_state) > 1:
if self.current_response is None:
self.send_response(
SDP_ErrorResponse(
transaction_id=request.transaction_id,
error_code=SDP_INVALID_CONTINUATION_STATE_ERROR,
)
)
return
else:
# Cleanup any partial response leftover
self.current_response = None
if (
continuation := self.check_continuation(
request.continuation_state, request.transaction_id
)
) is None:
return
if not continuation:
# Check that the service exists
service = self.service_records.get(request.service_record_handle)
if service is None:
@@ -1145,14 +1271,18 @@ class Server:
self.current_response = bytes(attribute_list)
# Respond, keeping any pending chunks for later
assert self.channel is not None
maximum_attribute_byte_count = min(
request.maximum_attribute_byte_count, self.channel.peer_mtu - 9
)
attribute_list_response, continuation_state = self.get_next_response_payload(
request.maximum_attribute_byte_count
maximum_attribute_byte_count
)
self.send_response(
SDP_ServiceAttributeResponse(
transaction_id=request.transaction_id,
attribute_list_byte_count=len(attribute_list_response),
attribute_list=attribute_list,
attribute_list=attribute_list_response,
continuation_state=continuation_state,
)
)
@@ -1161,18 +1291,14 @@ class Server:
self, request: SDP_ServiceSearchAttributeRequest
) -> None:
# Check if this is a continuation
if len(request.continuation_state) > 1:
if self.current_response is None:
self.send_response(
SDP_ErrorResponse(
transaction_id=request.transaction_id,
error_code=SDP_INVALID_CONTINUATION_STATE_ERROR,
)
)
else:
# Cleanup any partial response leftover
self.current_response = None
if (
continuation := self.check_continuation(
request.continuation_state, request.transaction_id
)
) is None:
return
if not continuation:
# Find the matching services
matching_services = self.match_services(
request.service_search_pattern
@@ -1192,14 +1318,18 @@ class Server:
self.current_response = bytes(attribute_lists)
# Respond, keeping any pending chunks for later
assert self.channel is not None
maximum_attribute_byte_count = min(
request.maximum_attribute_byte_count, self.channel.peer_mtu - 9
)
attribute_lists_response, continuation_state = self.get_next_response_payload(
request.maximum_attribute_byte_count
maximum_attribute_byte_count
)
self.send_response(
SDP_ServiceSearchAttributeResponse(
transaction_id=request.transaction_id,
attribute_lists_byte_count=len(attribute_lists_response),
attribute_lists=attribute_lists,
attribute_lists=attribute_lists_response,
continuation_state=continuation_state,
)
)

View File

@@ -1326,7 +1326,7 @@ class Session:
self.connection.abort_on('disconnection', self.on_pairing())
def on_connection_encryption_change(self) -> None:
if self.connection.is_encrypted:
if self.connection.is_encrypted and not self.completed:
if self.is_responder:
# The responder distributes its keys first, the initiator later
self.distribute_keys()

View File

@@ -447,7 +447,7 @@ def deprecated(msg: str):
def wrapper(function):
@functools.wraps(function)
def inner(*args, **kwargs):
warnings.warn(msg, DeprecationWarning)
warnings.warn(msg, DeprecationWarning, stacklevel=2)
return function(*args, **kwargs)
return inner
@@ -464,7 +464,7 @@ def experimental(msg: str):
def wrapper(function):
@functools.wraps(function)
def inner(*args, **kwargs):
warnings.warn(msg, FutureWarning)
warnings.warn(msg, FutureWarning, stacklevel=2)
return function(*args, **kwargs)
return inner

View File

@@ -299,7 +299,7 @@ class HCI_Android_Vendor_Event(HCI_Extended_Event):
HCI_Android_Vendor_Event.register_subevents(globals())
HCI_Event.vendor_factory = HCI_Android_Vendor_Event.subclass_from_parameters
HCI_Event.add_vendor_factory(HCI_Android_Vendor_Event.subclass_from_parameters)
# -----------------------------------------------------------------------------

View File

@@ -0,0 +1,9 @@
{
"name": "Bumble CS Initiator",
"address": "F0:F1:F2:F3:F4:F5",
"advertising_interval": 100,
"keystore": "JsonKeyStore",
"irk": "865F81FF5A8B486EAAE29A27AD9F77DC",
"identity_address_type": 1,
"channel_sounding_enabled": true
}

View File

@@ -0,0 +1,9 @@
{
"name": "Bumble CS Reflector",
"address": "F0:F1:F2:F3:F4:F6",
"advertising_interval": 100,
"keystore": "JsonKeyStore",
"irk": "0c7d74db03a1c98e7be691f76141d53d",
"identity_address_type": 1,
"channel_sounding_enabled": true
}

View File

@@ -28,7 +28,7 @@ class OneDeviceBenchTest(base_test.BaseTestClass):
def test_l2cap_client_ping(self):
runner = self.dut.bench.runL2capClient(
"ping", "4B:2A:67:76:2B:E3", 128, True, 100, 970, 100
"ping", "4B:2A:67:76:2B:E3", 128, True, 100, 970, 100, "HIGH"
)
print("### Initial status:", runner)
final_status = self.dut.bench.waitForRunnerCompletion(runner["id"])
@@ -36,12 +36,34 @@ class OneDeviceBenchTest(base_test.BaseTestClass):
def test_l2cap_client_send(self):
runner = self.dut.bench.runL2capClient(
"send", "7E:90:D0:F2:7A:11", 131, True, 100, 970, 0
"send",
"F1:F1:F1:F1:F1:F1",
128,
True,
100,
970,
0,
"HIGH",
10000,
)
print("### Initial status:", runner)
final_status = self.dut.bench.waitForRunnerCompletion(runner["id"])
print("### Final status:", final_status)
def test_gatt_client_send(self):
runner = self.dut.bench.runGattClient(
"send", "F1:F1:F1:F1:F1:F1", 128, True, 100, 970, 100, "HIGH"
)
print("### Initial status:", runner)
final_status = self.dut.bench.waitForRunnerCompletion(runner["id"])
print("### Final status:", final_status)
def test_gatt_server_receive(self):
runner = self.dut.bench.runGattServer("receive")
print("### Initial status:", runner)
final_status = self.dut.bench.waitForRunnerCompletion(runner["id"])
print("### Final status:", final_status)
if __name__ == "__main__":
test_runner.main()

View File

@@ -2,8 +2,8 @@ TestBeds:
- Name: BenchTestBed
Controllers:
AndroidDevice:
- serial: 37211FDJG000DJ
- serial: emulator-5554
local_bt_address: 94:45:60:5E:03:B0
- serial: 23071FDEE001F7
local_bt_address: DC:E5:5B:E5:51:2C
#- serial: 23071FDEE001F7
# local_bt_address: DC:E5:5B:E5:51:2C

View File

@@ -0,0 +1,154 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import asyncio
import logging
import sys
import os
import functools
from bumble import core
from bumble import hci
from bumble.device import Connection, Device, ChannelSoundingCapabilities
from bumble.transport import open_transport_or_link
# From https://cs.android.com/android/platform/superproject/main/+/main:packages/modules/Bluetooth/system/gd/hci/distance_measurement_manager.cc.
CS_TONE_ANTENNA_CONFIG_MAPPING_TABLE = [
[0, 4, 5, 6],
[1, 7, 7, 7],
[2, 7, 7, 7],
[3, 7, 7, 7],
]
CS_PREFERRED_PEER_ANTENNA_MAPPING_TABLE = [1, 1, 1, 1, 3, 7, 15, 3]
CS_ANTENNA_PERMUTATION_ARRAY = [
[1, 2, 3, 4],
[2, 1, 3, 4],
[1, 3, 2, 4],
[3, 1, 2, 4],
[3, 2, 1, 4],
[2, 3, 1, 4],
[1, 2, 4, 3],
[2, 1, 4, 3],
[1, 4, 2, 3],
[4, 1, 2, 3],
[4, 2, 1, 3],
[2, 4, 1, 3],
[1, 4, 3, 2],
[4, 1, 3, 2],
[1, 3, 4, 2],
[3, 1, 4, 2],
[3, 4, 1, 2],
[4, 3, 1, 2],
[4, 2, 3, 1],
[2, 4, 3, 1],
[4, 3, 2, 1],
[3, 4, 2, 1],
[3, 2, 4, 1],
[2, 3, 4, 1],
]
# -----------------------------------------------------------------------------
async def main() -> None:
if len(sys.argv) < 3:
print(
'Usage: run_channel_sounding.py <config-file> <transport-spec-for-device>'
'[target_address](If missing, run as reflector)'
)
print('example: run_channel_sounding.py cs_reflector.json usb:0')
print(
'example: run_channel_sounding.py cs_initiator.json usb:0 F0:F1:F2:F3:F4:F5'
)
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected')
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
await device.power_on()
assert (local_cs_capabilities := device.cs_capabilities)
if len(sys.argv) == 3:
print('<<< Start Advertising')
await device.start_advertising(
own_address_type=hci.OwnAddressType.RANDOM, auto_restart=True
)
def on_cs_capabilities(
connection: Connection, capabilities: ChannelSoundingCapabilities
):
del capabilities
print('<<< Set CS Settings')
asyncio.create_task(device.set_default_cs_settings(connection))
device.on(
'connection',
lambda connection: connection.on(
'channel_sounding_capabilities',
functools.partial(on_cs_capabilities, connection),
),
)
else:
target_address = hci.Address(sys.argv[3])
print(f'<<< Connecting to {target_address}')
connection = await device.connect(
target_address, transport=core.BT_LE_TRANSPORT
)
print('<<< ACL Connected')
if not (await device.get_long_term_key(connection.handle, b'', 0)):
print('<<< No bond, start pairing')
await connection.pair()
print('<<< Pairing complete')
print('<<< Encrypting Connection')
await connection.encrypt()
print('<<< Getting remote CS Capabilities...')
remote_capabilities = await device.get_remote_cs_capabilities(connection)
print('<<< Set CS Settings...')
await device.set_default_cs_settings(connection)
print('<<< Set CS Config...')
config = await device.create_cs_config(connection)
print('<<< Enable CS Security...')
await device.enable_cs_security(connection)
tone_antenna_config_selection = CS_TONE_ANTENNA_CONFIG_MAPPING_TABLE[
local_cs_capabilities.num_antennas_supported - 1
][remote_capabilities.num_antennas_supported - 1]
print('<<< Set CS Procedure Parameters...')
await device.set_cs_procedure_parameters(
connection=connection,
config=config,
tone_antenna_config_selection=tone_antenna_config_selection,
preferred_peer_antenna=CS_PREFERRED_PEER_ANTENNA_MAPPING_TABLE[
tone_antenna_config_selection
],
)
print('<<< Enable CS Procedure...')
await device.enable_cs_procedure(connection=connection, config=config)
await hci_transport.source.terminated
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
asyncio.run(main())

View File

@@ -10,7 +10,7 @@ android {
defaultConfig {
applicationId = "com.github.google.bumble.btbench"
minSdk = 30
minSdk = 33
targetSdk = 34
versionCode = 1
versionName = "1.0"

View File

@@ -1,7 +1,7 @@
<?xml version="1.0" encoding="utf-8"?>
<manifest xmlns:android="http://schemas.android.com/apk/res/android"
package="com.github.google.bumble.btbench">
<uses-sdk android:minSdkVersion="30" android:targetSdkVersion="34" />
<uses-sdk android:minSdkVersion="33" android:targetSdkVersion="34" />
<!-- Request legacy Bluetooth permissions on older devices. -->
<uses-permission android:name="android.permission.BLUETOOTH" android:maxSdkVersion="30" />
<uses-permission android:name="android.permission.BLUETOOTH_ADMIN" android:maxSdkVersion="30" />

View File

@@ -0,0 +1,39 @@
package com.github.google.bumble.btbench
import android.annotation.SuppressLint
import android.bluetooth.BluetoothAdapter
import android.bluetooth.le.AdvertiseCallback
import android.bluetooth.le.AdvertiseData
import android.bluetooth.le.AdvertiseSettings
import android.bluetooth.le.AdvertiseSettings.ADVERTISE_MODE_LOW_LATENCY
import android.os.Build
import java.util.logging.Logger
private val Log = Logger.getLogger("btbench.advertiser")
class Advertiser(private val bluetoothAdapter: BluetoothAdapter) : AdvertiseCallback() {
@SuppressLint("MissingPermission")
fun start() {
val advertiseSettingsBuilder = AdvertiseSettings.Builder()
.setAdvertiseMode(ADVERTISE_MODE_LOW_LATENCY)
.setConnectable(true)
advertiseSettingsBuilder.setDiscoverable(true)
val advertiseSettings = advertiseSettingsBuilder.build()
val advertiseData = AdvertiseData.Builder().build()
val scanData = AdvertiseData.Builder().setIncludeDeviceName(true).build()
bluetoothAdapter.bluetoothLeAdvertiser.startAdvertising(advertiseSettings, advertiseData, scanData, this)
}
@SuppressLint("MissingPermission")
fun stop() {
bluetoothAdapter.bluetoothLeAdvertiser.stopAdvertising(this)
}
override fun onStartFailure(errorCode: Int) {
Log.warning("failed to start advertising: $errorCode")
}
override fun onStartSuccess(settingsInEffect: AdvertiseSettings) {
Log.info("advertising started: $settingsInEffect")
}
}

View File

@@ -22,11 +22,13 @@ import androidx.test.core.app.ApplicationProvider;
import com.google.android.mobly.snippet.Snippet;
import com.google.android.mobly.snippet.rpc.Rpc;
import com.google.android.mobly.snippet.rpc.RpcOptional;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
import java.io.IOException;
import java.security.InvalidParameterException;
import java.util.ArrayList;
import java.util.UUID;
@@ -71,12 +73,15 @@ public class AutomationSnippet implements Snippet {
private final Context mContext;
private final ArrayList<Runner> mRunners = new ArrayList<>();
public AutomationSnippet() {
public AutomationSnippet() throws IOException {
mContext = ApplicationProvider.getApplicationContext();
BluetoothManager bluetoothManager = mContext.getSystemService(BluetoothManager.class);
mBluetoothAdapter = bluetoothManager.getAdapter();
if (mBluetoothAdapter == null) {
throw new RuntimeException("bluetooth not supported");
throw new IOException("bluetooth not supported");
}
if (!mBluetoothAdapter.isEnabled()) {
throw new IOException("bluetooth not enabled");
}
}
@@ -85,32 +90,46 @@ public class AutomationSnippet implements Snippet {
switch (mode) {
case "rfcomm-client":
runnable = new RfcommClient(model, mBluetoothAdapter,
(PacketIO packetIO) -> createIoClient(model, scenario,
packetIO));
(PacketIO packetIO) -> createIoClient(model, scenario,
packetIO));
break;
case "rfcomm-server":
runnable = new RfcommServer(model, mBluetoothAdapter,
(PacketIO packetIO) -> createIoClient(model, scenario,
packetIO));
(PacketIO packetIO) -> createIoClient(model, scenario,
packetIO));
break;
case "l2cap-client":
runnable = new L2capClient(model, mBluetoothAdapter, mContext,
(PacketIO packetIO) -> createIoClient(model, scenario,
packetIO));
(PacketIO packetIO) -> createIoClient(model, scenario,
packetIO));
break;
case "l2cap-server":
runnable = new L2capServer(model, mBluetoothAdapter,
(PacketIO packetIO) -> createIoClient(model, scenario,
packetIO));
(PacketIO packetIO) -> createIoClient(model, scenario,
packetIO));
break;
case "gatt-client":
runnable = new GattClient(model, mBluetoothAdapter, mContext,
(PacketIO packetIO) -> createIoClient(model, scenario,
packetIO));
break;
case "gatt-server":
runnable = new GattServer(model, mBluetoothAdapter, mContext,
(PacketIO packetIO) -> createIoClient(model, scenario,
packetIO));
break;
default:
return null;
}
model.setMode(mode);
model.setScenario(scenario);
runnable.run();
Runner runner = new Runner(runnable, mode, scenario, model);
mRunners.add(runner);
@@ -140,7 +159,21 @@ public class AutomationSnippet implements Snippet {
JSONObject result = new JSONObject();
result.put("status", model.getStatus());
result.put("running", model.getRunning());
result.put("peer_bluetooth_address", model.getPeerBluetoothAddress());
result.put("mode", model.getMode());
result.put("scenario", model.getScenario());
result.put("sender_packet_size", model.getSenderPacketSize());
result.put("sender_packet_count", model.getSenderPacketCount());
result.put("sender_packet_interval", model.getSenderPacketInterval());
result.put("packets_sent", model.getPacketsSent());
result.put("packets_received", model.getPacketsReceived());
result.put("l2cap_psm", model.getL2capPsm());
result.put("use_2m_phy", model.getUse2mPhy());
result.put("connection_priority", model.getConnectionPriority());
result.put("mtu", model.getMtu());
result.put("rx_phy", model.getRxPhy());
result.put("tx_phy", model.getTxPhy());
result.put("startup_delay", model.getStartupDelay());
if (model.getStatus().equals("OK")) {
JSONObject stats = new JSONObject();
result.put("stats", stats);
@@ -167,12 +200,12 @@ public class AutomationSnippet implements Snippet {
@Rpc(description = "Run a scenario in RFComm Client mode")
public JSONObject runRfcommClient(String scenario, String peerBluetoothAddress, int packetCount,
int packetSize, int packetInterval) throws JSONException {
assert (mBluetoothAdapter != null);
int packetSize, int packetInterval,
@RpcOptional Integer startupDelay) throws JSONException {
// We only support "send" and "ping" for this mode for now
if (!(scenario.equals("send") || scenario.equals("ping"))) {
throw new InvalidParameterException("only 'send' and 'ping' are supported for this mode");
throw new InvalidParameterException(
"only 'send' and 'ping' are supported for this mode");
}
AppViewModel model = new AppViewModel();
@@ -180,6 +213,9 @@ public class AutomationSnippet implements Snippet {
model.setSenderPacketCount(packetCount);
model.setSenderPacketSize(packetSize);
model.setSenderPacketInterval(packetInterval);
if (startupDelay != null) {
model.setStartupDelay(startupDelay);
}
Runner runner = runScenario(model, "rfcomm-client", scenario);
assert runner != null;
@@ -187,15 +223,18 @@ public class AutomationSnippet implements Snippet {
}
@Rpc(description = "Run a scenario in RFComm Server mode")
public JSONObject runRfcommServer(String scenario) throws JSONException {
assert (mBluetoothAdapter != null);
public JSONObject runRfcommServer(String scenario,
@RpcOptional Integer startupDelay) throws JSONException {
// We only support "receive" and "pong" for this mode for now
if (!(scenario.equals("receive") || scenario.equals("pong"))) {
throw new InvalidParameterException("only 'receive' and 'pong' are supported for this mode");
throw new InvalidParameterException(
"only 'receive' and 'pong' are supported for this mode");
}
AppViewModel model = new AppViewModel();
if (startupDelay != null) {
model.setStartupDelay(startupDelay);
}
Runner runner = runScenario(model, "rfcomm-server", scenario);
assert runner != null;
@@ -205,12 +244,12 @@ public class AutomationSnippet implements Snippet {
@Rpc(description = "Run a scenario in L2CAP Client mode")
public JSONObject runL2capClient(String scenario, String peerBluetoothAddress, int psm,
boolean use_2m_phy, int packetCount, int packetSize,
int packetInterval) throws JSONException {
assert (mBluetoothAdapter != null);
int packetInterval, @RpcOptional String connectionPriority,
@RpcOptional Integer startupDelay) throws JSONException {
// We only support "send" and "ping" for this mode for now
if (!(scenario.equals("send") || scenario.equals("ping"))) {
throw new InvalidParameterException("only 'send' and 'ping' are supported for this mode");
throw new InvalidParameterException(
"only 'send' and 'ping' are supported for this mode");
}
AppViewModel model = new AppViewModel();
@@ -220,28 +259,83 @@ public class AutomationSnippet implements Snippet {
model.setSenderPacketCount(packetCount);
model.setSenderPacketSize(packetSize);
model.setSenderPacketInterval(packetInterval);
if (connectionPriority != null) {
model.setConnectionPriority(connectionPriority);
}
if (startupDelay != null) {
model.setStartupDelay(startupDelay);
}
Runner runner = runScenario(model, "l2cap-client", scenario);
assert runner != null;
return runner.toJson();
}
@Rpc(description = "Run a scenario in L2CAP Server mode")
public JSONObject runL2capServer(String scenario) throws JSONException {
assert (mBluetoothAdapter != null);
public JSONObject runL2capServer(String scenario,
@RpcOptional Integer startupDelay) throws JSONException {
// We only support "receive" and "pong" for this mode for now
if (!(scenario.equals("receive") || scenario.equals("pong"))) {
throw new InvalidParameterException("only 'receive' and 'pong' are supported for this mode");
throw new InvalidParameterException(
"only 'receive' and 'pong' are supported for this mode");
}
AppViewModel model = new AppViewModel();
if (startupDelay != null) {
model.setStartupDelay(startupDelay);
}
Runner runner = runScenario(model, "l2cap-server", scenario);
assert runner != null;
return runner.toJson();
}
@Rpc(description = "Run a scenario in GATT Client mode")
public JSONObject runGattClient(String scenario, String peerBluetoothAddress,
boolean use_2m_phy, int packetCount, int packetSize,
int packetInterval, @RpcOptional String connectionPriority,
@RpcOptional Integer startupDelay) throws JSONException {
// We only support "send" and "ping" for this mode for now
if (!(scenario.equals("send") || scenario.equals("ping"))) {
throw new InvalidParameterException(
"only 'send' and 'ping' are supported for this mode");
}
AppViewModel model = new AppViewModel();
model.setPeerBluetoothAddress(peerBluetoothAddress);
model.setUse2mPhy(use_2m_phy);
model.setSenderPacketCount(packetCount);
model.setSenderPacketSize(packetSize);
model.setSenderPacketInterval(packetInterval);
if (connectionPriority != null) {
model.setConnectionPriority(connectionPriority);
}
if (startupDelay != null) {
model.setStartupDelay(startupDelay);
}
Runner runner = runScenario(model, "gatt-client", scenario);
assert runner != null;
return runner.toJson();
}
@Rpc(description = "Run a scenario in GATT Server mode")
public JSONObject runGattServer(String scenario,
@RpcOptional Integer startupDelay) throws JSONException {
// We only support "receive" and "pong" for this mode for now
if (!(scenario.equals("receive") || scenario.equals("pong"))) {
throw new InvalidParameterException(
"only 'receive' and 'pong' are supported for this mode");
}
AppViewModel model = new AppViewModel();
if (startupDelay != null) {
model.setStartupDelay(startupDelay);
}
Runner runner = runScenario(model, "gatt-server", scenario);
assert runner != null;
return runner.toJson();
}
@Rpc(description = "Stop a Runner")
public JSONObject stopRunner(String runnerId) throws JSONException {
Runner runner = findRunner(runnerId);
@@ -276,7 +370,7 @@ public class AutomationSnippet implements Snippet {
JSONObject result = new JSONObject();
JSONArray runners = new JSONArray();
result.put("runners", runners);
for (Runner runner: mRunners) {
for (Runner runner : mRunners) {
runners.put(runner.toJson());
}

View File

@@ -0,0 +1,109 @@
package com.github.google.bumble.btbench
import android.annotation.SuppressLint
import android.bluetooth.BluetoothAdapter
import android.bluetooth.BluetoothDevice
import android.bluetooth.BluetoothGatt
import android.bluetooth.BluetoothGattCallback
import android.bluetooth.BluetoothManager
import android.bluetooth.BluetoothProfile
import android.content.Context
import android.os.Build
import androidx.core.content.ContextCompat
import java.util.logging.Logger
private val Log = Logger.getLogger("btbench.connection")
open class Connection(
private val viewModel: AppViewModel,
private val bluetoothAdapter: BluetoothAdapter,
private val context: Context
) : BluetoothGattCallback() {
var remoteDevice: BluetoothDevice? = null
var gatt: BluetoothGatt? = null
@SuppressLint("MissingPermission")
open fun connect() {
val addressIsPublic = viewModel.peerBluetoothAddress.endsWith("/P")
val address = viewModel.peerBluetoothAddress.take(17)
remoteDevice = if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.TIRAMISU) {
bluetoothAdapter.getRemoteLeDevice(
address,
if (addressIsPublic) {
BluetoothDevice.ADDRESS_TYPE_PUBLIC
} else {
BluetoothDevice.ADDRESS_TYPE_RANDOM
}
)
} else {
bluetoothAdapter.getRemoteDevice(address)
}
gatt = remoteDevice?.connectGatt(
context,
false,
this,
BluetoothDevice.TRANSPORT_LE,
if (viewModel.use2mPhy) BluetoothDevice.PHY_LE_2M_MASK else BluetoothDevice.PHY_LE_1M_MASK
)
}
@SuppressLint("MissingPermission")
open fun disconnect() {
gatt?.disconnect()
}
override fun onMtuChanged(gatt: BluetoothGatt, mtu: Int, status: Int) {
Log.info("MTU update: mtu=$mtu status=$status")
viewModel.mtu = mtu
}
override fun onPhyUpdate(gatt: BluetoothGatt, txPhy: Int, rxPhy: Int, status: Int) {
Log.info("PHY update: tx=$txPhy, rx=$rxPhy, status=$status")
viewModel.txPhy = txPhy
viewModel.rxPhy = rxPhy
}
override fun onPhyRead(gatt: BluetoothGatt, txPhy: Int, rxPhy: Int, status: Int) {
Log.info("PHY: tx=$txPhy, rx=$rxPhy, status=$status")
viewModel.txPhy = txPhy
viewModel.rxPhy = rxPhy
}
@SuppressLint("MissingPermission")
override fun onConnectionStateChange(
gatt: BluetoothGatt?, status: Int, newState: Int
) {
if (status != BluetoothGatt.GATT_SUCCESS) {
Log.warning("onConnectionStateChange status=$status")
}
if (gatt != null && newState == BluetoothProfile.STATE_CONNECTED) {
if (viewModel.use2mPhy) {
Log.info("requesting 2M PHY")
gatt.setPreferredPhy(
BluetoothDevice.PHY_LE_2M_MASK,
BluetoothDevice.PHY_LE_2M_MASK,
BluetoothDevice.PHY_OPTION_NO_PREFERRED
)
}
gatt.readPhy()
// Request an MTU update, even though we don't use GATT, because Android
// won't request a larger link layer maximum data length otherwise.
gatt.requestMtu(517)
// Request a specific connection priority
val connectionPriority = when (viewModel.connectionPriority) {
"BALANCED" -> BluetoothGatt.CONNECTION_PRIORITY_BALANCED
"LOW_POWER" -> BluetoothGatt.CONNECTION_PRIORITY_LOW_POWER
"HIGH" -> BluetoothGatt.CONNECTION_PRIORITY_HIGH
"DCK" -> BluetoothGatt.CONNECTION_PRIORITY_DCK
else -> 0
}
if (!gatt.requestConnectionPriority(connectionPriority)) {
Log.warning("requestConnectionPriority failed")
}
}
}
}

View File

@@ -0,0 +1,23 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.github.google.bumble.btbench
import java.util.UUID
var CCCD_UUID = UUID.fromString("00002902-0000-1000-8000-00805F9B34FB")
val BENCH_SERVICE_UUID = UUID.fromString("50DB505C-8AC4-4738-8448-3B1D9CC09CC5")
val BENCH_TX_UUID = UUID.fromString("E789C754-41A1-45F4-A948-A0A1A90DBA53")
val BENCH_RX_UUID = UUID.fromString("016A2CC7-E14B-4819-935F-1F56EAE4098D")

View File

@@ -0,0 +1,224 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.github.google.bumble.btbench
import android.annotation.SuppressLint
import android.bluetooth.BluetoothAdapter
import android.bluetooth.BluetoothGatt
import android.bluetooth.BluetoothGattCharacteristic
import android.bluetooth.BluetoothGattDescriptor
import android.bluetooth.BluetoothProfile
import android.content.Context
import java.io.IOException
import java.util.UUID
import java.util.concurrent.CountDownLatch
import java.util.concurrent.Semaphore
import java.util.logging.Logger
import kotlin.concurrent.thread
private val Log = Logger.getLogger("btbench.gatt-client")
class GattClientConnection(
viewModel: AppViewModel,
bluetoothAdapter: BluetoothAdapter,
context: Context
) : Connection(viewModel, bluetoothAdapter, context), PacketIO {
override var packetSink: PacketSink? = null
private val discoveryDone: CountDownLatch = CountDownLatch(1)
private val writeSemaphore: Semaphore = Semaphore(1)
var rxCharacteristic: BluetoothGattCharacteristic? = null
var txCharacteristic: BluetoothGattCharacteristic? = null
override fun connect() {
super.connect()
// Check if we're already connected and have discovered the services
if (gatt?.getService(BENCH_SERVICE_UUID) != null) {
Log.fine("already connected")
onServicesDiscovered(gatt, BluetoothGatt.GATT_SUCCESS)
}
}
@SuppressLint("MissingPermission")
override fun onConnectionStateChange(
gatt: BluetoothGatt?, status: Int, newState: Int
) {
super.onConnectionStateChange(gatt, status, newState)
if (status != BluetoothGatt.GATT_SUCCESS) {
Log.warning("onConnectionStateChange status=$status")
discoveryDone.countDown()
return
}
if (gatt != null && newState == BluetoothProfile.STATE_CONNECTED) {
if (!gatt.discoverServices()) {
Log.warning("discoverServices could not start")
discoveryDone.countDown()
}
}
}
@SuppressLint("MissingPermission")
override fun onServicesDiscovered(gatt: BluetoothGatt?, status: Int) {
Log.fine("onServicesDiscovered")
if (status != BluetoothGatt.GATT_SUCCESS) {
Log.warning("failed to discover services: ${status}")
discoveryDone.countDown()
return
}
// Find the service
val service = gatt!!.getService(BENCH_SERVICE_UUID)
if (service == null) {
Log.warning("GATT Service not found")
discoveryDone.countDown()
return
}
// Find the RX and TX characteristics
rxCharacteristic = service.getCharacteristic(BENCH_RX_UUID)
if (rxCharacteristic == null) {
Log.warning("GATT RX Characteristics not found")
discoveryDone.countDown()
return
}
txCharacteristic = service.getCharacteristic(BENCH_TX_UUID)
if (txCharacteristic == null) {
Log.warning("GATT TX Characteristics not found")
discoveryDone.countDown()
return
}
// Subscribe to the RX characteristic
Log.fine("subscribing to RX")
gatt.setCharacteristicNotification(rxCharacteristic, true)
val cccdDescriptor = rxCharacteristic!!.getDescriptor(CCCD_UUID)
gatt.writeDescriptor(cccdDescriptor, BluetoothGattDescriptor.ENABLE_NOTIFICATION_VALUE);
Log.info("GATT discovery complete")
discoveryDone.countDown()
}
override fun onCharacteristicWrite(
gatt: BluetoothGatt?,
characteristic: BluetoothGattCharacteristic?,
status: Int
) {
// Now we can write again
writeSemaphore.release()
if (status != BluetoothGatt.GATT_SUCCESS) {
Log.warning("onCharacteristicWrite failed: $status")
return
}
}
override fun onCharacteristicChanged(
gatt: BluetoothGatt,
characteristic: BluetoothGattCharacteristic,
value: ByteArray
) {
if (characteristic.uuid == BENCH_RX_UUID && packetSink != null) {
val packet = Packet.from(value)
packetSink!!.onPacket(packet)
}
}
@SuppressLint("MissingPermission")
override fun sendPacket(packet: Packet) {
if (txCharacteristic == null) {
Log.warning("No TX characteristic, dropping")
return
}
// Wait until we can write
writeSemaphore.acquire()
// Write the data
val data = packet.toBytes()
val clampedData = if (data.size > 512) {
// Clamp the data to the maximum allowed characteristic data size
data.copyOf(512)
} else {
data
}
gatt?.writeCharacteristic(
txCharacteristic!!,
clampedData,
BluetoothGattCharacteristic.WRITE_TYPE_NO_RESPONSE
)
}
override
fun disconnect() {
super.disconnect()
discoveryDone.countDown()
}
fun waitForDiscoveryCompletion() {
discoveryDone.await()
}
}
class GattClient(
private val viewModel: AppViewModel,
bluetoothAdapter: BluetoothAdapter,
context: Context,
private val createIoClient: (packetIo: PacketIO) -> IoClient
) : Mode {
private var connection: GattClientConnection =
GattClientConnection(viewModel, bluetoothAdapter, context)
private var clientThread: Thread? = null
@SuppressLint("MissingPermission")
override fun run() {
viewModel.running = true
clientThread = thread(name = "GattClient") {
connection.connect()
viewModel.aborter = {
connection.disconnect()
}
// Discover the rx and tx characteristics
connection.waitForDiscoveryCompletion()
if (connection.rxCharacteristic == null || connection.txCharacteristic == null) {
connection.disconnect()
viewModel.running = false
return@thread
}
val ioClient = createIoClient(connection)
try {
ioClient.run()
viewModel.status = "OK"
} catch (error: IOException) {
Log.info("run ended abruptly")
viewModel.status = "ABORTED"
viewModel.lastError = "IO_ERROR"
} finally {
connection.disconnect()
viewModel.running = false
}
}
}
override fun waitForCompletion() {
clientThread?.join()
}
}

View File

@@ -0,0 +1,243 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.github.google.bumble.btbench
import android.annotation.SuppressLint
import android.bluetooth.BluetoothAdapter
import android.bluetooth.BluetoothDevice
import android.bluetooth.BluetoothGatt
import android.bluetooth.BluetoothGattCharacteristic
import android.bluetooth.BluetoothGattDescriptor
import android.bluetooth.BluetoothGattServer
import android.bluetooth.BluetoothGattServerCallback
import android.bluetooth.BluetoothGattService
import android.bluetooth.BluetoothManager
import android.bluetooth.BluetoothStatusCodes
import android.content.Context
import androidx.core.content.ContextCompat
import java.io.IOException
import java.util.concurrent.CountDownLatch
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.Semaphore
import java.util.logging.Logger
import kotlin.concurrent.thread
import kotlin.experimental.and
private val Log = Logger.getLogger("btbench.gatt-server")
@SuppressLint("MissingPermission")
class GattServer(
private val viewModel: AppViewModel,
private val bluetoothAdapter: BluetoothAdapter,
context: Context,
private val createIoClient: (packetIo: PacketIO) -> IoClient
) : Mode, PacketIO, BluetoothGattServerCallback() {
override var packetSink: PacketSink? = null
private val gattServer: BluetoothGattServer
private val rxCharacteristic: BluetoothGattCharacteristic?
private val txCharacteristic: BluetoothGattCharacteristic?
private val notifySemaphore: Semaphore = Semaphore(1)
private val ready: CountDownLatch = CountDownLatch(1)
private var peerDevice: BluetoothDevice? = null
private var clientThread: Thread? = null
private var sinkQueue: LinkedBlockingQueue<Packet>? = null
init {
val bluetoothManager = ContextCompat.getSystemService(context, BluetoothManager::class.java)
gattServer = bluetoothManager!!.openGattServer(context, this)
val benchService = gattServer.getService(BENCH_SERVICE_UUID)
if (benchService == null) {
rxCharacteristic = BluetoothGattCharacteristic(
BENCH_RX_UUID,
BluetoothGattCharacteristic.PROPERTY_NOTIFY,
0
)
txCharacteristic = BluetoothGattCharacteristic(
BENCH_TX_UUID,
BluetoothGattCharacteristic.PROPERTY_WRITE_NO_RESPONSE,
BluetoothGattCharacteristic.PERMISSION_WRITE
)
val rxCCCD = BluetoothGattDescriptor(
CCCD_UUID,
BluetoothGattDescriptor.PERMISSION_READ or BluetoothGattDescriptor.PERMISSION_WRITE
)
rxCharacteristic.addDescriptor(rxCCCD)
val service =
BluetoothGattService(BENCH_SERVICE_UUID, BluetoothGattService.SERVICE_TYPE_PRIMARY)
service.addCharacteristic(rxCharacteristic)
service.addCharacteristic(txCharacteristic)
gattServer.addService(service)
} else {
rxCharacteristic = benchService.getCharacteristic(BENCH_RX_UUID)
txCharacteristic = benchService.getCharacteristic(BENCH_TX_UUID)
}
}
override fun onCharacteristicWriteRequest(
device: BluetoothDevice?,
requestId: Int,
characteristic: BluetoothGattCharacteristic?,
preparedWrite: Boolean,
responseNeeded: Boolean,
offset: Int,
value: ByteArray?
) {
Log.info("onCharacteristicWriteRequest")
if (characteristic != null && characteristic.uuid == BENCH_TX_UUID) {
if (packetSink == null) {
Log.warning("no sink, dropping")
} else if (offset != 0) {
Log.warning("offset != 0")
} else if (value == null) {
Log.warning("no value")
} else {
// Deliver the packet in a separate thread so that we don't block this
// callback.
sinkQueue?.put(Packet.from(value))
}
}
if (responseNeeded) {
gattServer.sendResponse(device, requestId, BluetoothGatt.GATT_SUCCESS, offset, value)
}
}
override fun onNotificationSent(device: BluetoothDevice?, status: Int) {
if (status == BluetoothGatt.GATT_SUCCESS) {
notifySemaphore.release()
}
}
override fun onDescriptorWriteRequest(
device: BluetoothDevice?,
requestId: Int,
descriptor: BluetoothGattDescriptor?,
preparedWrite: Boolean,
responseNeeded: Boolean,
offset: Int,
value: ByteArray?
) {
if (descriptor?.uuid == CCCD_UUID && descriptor?.characteristic?.uuid == BENCH_RX_UUID) {
if (offset == 0 && value?.size == 2) {
if (value[0].and(1).toInt() != 0) {
// Subscription
Log.fine("peer subscribed to RX")
peerDevice = device
ready.countDown()
}
}
}
if (responseNeeded) {
gattServer.sendResponse(device, requestId, BluetoothGatt.GATT_SUCCESS, offset, value)
}
}
@SuppressLint("MissingPermission")
override fun sendPacket(packet: Packet) {
if (peerDevice == null) {
Log.warning("no peer device, cannot send")
return
}
if (rxCharacteristic == null) {
Log.warning("no RX characteristic, cannot send")
return
}
// Wait until we can notify
notifySemaphore.acquire()
// Send the packet via a notification
val result = gattServer.notifyCharacteristicChanged(
peerDevice!!,
rxCharacteristic,
false,
packet.toBytes()
)
if (result != BluetoothStatusCodes.SUCCESS) {
Log.warning("notifyCharacteristicChanged failed: $result")
notifySemaphore.release()
}
}
override fun run() {
viewModel.running = true
// Start advertising
Log.fine("starting advertiser")
val advertiser = Advertiser(bluetoothAdapter)
advertiser.start()
clientThread = thread(name = "GattServer") {
// Wait for a subscriber
Log.info("waiting for RX subscriber")
viewModel.aborter = {
ready.countDown()
}
ready.await()
if (peerDevice == null) {
Log.warning("server interrupted")
viewModel.running = false
gattServer.close()
return@thread
}
Log.info("RX subscriber accepted")
// Stop advertising
Log.info("stopping advertiser")
advertiser.stop()
sinkQueue = LinkedBlockingQueue()
val sinkWriterThread = thread(name = "SinkWriter") {
while (true) {
try {
val packet = sinkQueue!!.take()
if (packetSink == null) {
Log.warning("no sink, dropping packet")
continue
}
packetSink!!.onPacket(packet)
} catch (error: InterruptedException) {
Log.warning("sink writer interrupted")
break
}
}
}
val ioClient = createIoClient(this)
try {
ioClient.run()
viewModel.status = "OK"
} catch (error: IOException) {
Log.info("run ended abruptly")
viewModel.status = "ABORTED"
viewModel.lastError = "IO_ERROR"
} finally {
sinkWriterThread.interrupt()
sinkWriterThread.join()
gattServer.close()
viewModel.running = false
}
}
}
override fun waitForCompletion() {
clientThread?.join()
Log.info("server thread completed")
}
}

View File

@@ -16,89 +16,25 @@ package com.github.google.bumble.btbench
import android.annotation.SuppressLint
import android.bluetooth.BluetoothAdapter
import android.bluetooth.BluetoothDevice
import android.bluetooth.BluetoothGatt
import android.bluetooth.BluetoothGattCallback
import android.bluetooth.BluetoothProfile
import android.content.Context
import android.os.Build
import java.util.logging.Logger
private val Log = Logger.getLogger("btbench.l2cap-client")
class L2capClient(
private val viewModel: AppViewModel,
private val bluetoothAdapter: BluetoothAdapter,
private val context: Context,
bluetoothAdapter: BluetoothAdapter,
context: Context,
private val createIoClient: (packetIo: PacketIO) -> IoClient
) : Mode {
private var connection: Connection = Connection(viewModel, bluetoothAdapter, context)
private var socketClient: SocketClient? = null
@SuppressLint("MissingPermission")
override fun run() {
viewModel.running = true
val addressIsPublic = viewModel.peerBluetoothAddress.endsWith("/P")
val address = viewModel.peerBluetoothAddress.take(17)
val remoteDevice = if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.TIRAMISU) {
bluetoothAdapter.getRemoteLeDevice(
address,
if (addressIsPublic) {
BluetoothDevice.ADDRESS_TYPE_PUBLIC
} else {
BluetoothDevice.ADDRESS_TYPE_RANDOM
}
)
} else {
bluetoothAdapter.getRemoteDevice(address)
}
val gatt = remoteDevice.connectGatt(
context,
false,
object : BluetoothGattCallback() {
override fun onMtuChanged(gatt: BluetoothGatt, mtu: Int, status: Int) {
Log.info("MTU update: mtu=$mtu status=$status")
viewModel.mtu = mtu
}
override fun onPhyUpdate(gatt: BluetoothGatt, txPhy: Int, rxPhy: Int, status: Int) {
Log.info("PHY update: tx=$txPhy, rx=$rxPhy, status=$status")
viewModel.txPhy = txPhy
viewModel.rxPhy = rxPhy
}
override fun onPhyRead(gatt: BluetoothGatt, txPhy: Int, rxPhy: Int, status: Int) {
Log.info("PHY: tx=$txPhy, rx=$rxPhy, status=$status")
viewModel.txPhy = txPhy
viewModel.rxPhy = rxPhy
}
override fun onConnectionStateChange(
gatt: BluetoothGatt?, status: Int, newState: Int
) {
if (gatt != null && newState == BluetoothProfile.STATE_CONNECTED) {
if (viewModel.use2mPhy) {
Log.info("requesting 2M PHY")
gatt.setPreferredPhy(
BluetoothDevice.PHY_LE_2M_MASK,
BluetoothDevice.PHY_LE_2M_MASK,
BluetoothDevice.PHY_OPTION_NO_PREFERRED
)
}
gatt.readPhy()
// Request an MTU update, even though we don't use GATT, because Android
// won't request a larger link layer maximum data length otherwise.
gatt.requestMtu(517)
}
}
},
BluetoothDevice.TRANSPORT_LE,
if (viewModel.use2mPhy) BluetoothDevice.PHY_LE_2M_MASK else BluetoothDevice.PHY_LE_1M_MASK
)
val socket = remoteDevice.createInsecureL2capChannel(viewModel.l2capPsm)
connection.connect()
val socket = connection.remoteDevice!!.createInsecureL2capChannel(viewModel.l2capPsm)
socketClient = SocketClient(viewModel, socket, createIoClient)
socketClient!!.run()
}

View File

@@ -37,34 +37,15 @@ class L2capServer(
@SuppressLint("MissingPermission")
override fun run() {
// Advertise so that the peer can find us and connect.
val callback = object : AdvertiseCallback() {
override fun onStartFailure(errorCode: Int) {
Log.warning("failed to start advertising: $errorCode")
}
override fun onStartSuccess(settingsInEffect: AdvertiseSettings) {
Log.info("advertising started: $settingsInEffect")
}
}
val advertiseSettingsBuilder = AdvertiseSettings.Builder()
.setAdvertiseMode(ADVERTISE_MODE_LOW_LATENCY)
.setConnectable(true)
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.UPSIDE_DOWN_CAKE) {
advertiseSettingsBuilder.setDiscoverable(true)
}
val advertiseSettings = advertiseSettingsBuilder.build()
val advertiseData = AdvertiseData.Builder().build()
val scanData = AdvertiseData.Builder().setIncludeDeviceName(true).build()
val advertiser = bluetoothAdapter.bluetoothLeAdvertiser
val advertiser = Advertiser(bluetoothAdapter)
val serverSocket = bluetoothAdapter.listenUsingInsecureL2capChannel()
viewModel.l2capPsm = serverSocket.psm
Log.info("psm = $serverSocket.psm")
socketServer = SocketServer(viewModel, serverSocket, createIoClient)
socketServer!!.run(
{ advertiser.stopAdvertising(callback) },
{ advertiser.startAdvertising(advertiseSettings, advertiseData, scanData, callback) }
{ advertiser.stop() },
{ advertiser.start() }
)
}

View File

@@ -17,9 +17,12 @@ package com.github.google.bumble.btbench
import android.Manifest
import android.annotation.SuppressLint
import android.bluetooth.BluetoothAdapter
import android.bluetooth.BluetoothDevice
import android.bluetooth.BluetoothManager
import android.content.BroadcastReceiver
import android.content.Context
import android.content.Intent
import android.content.IntentFilter
import android.content.pm.PackageManager
import android.os.Build
import android.os.Bundle
@@ -66,6 +69,7 @@ import androidx.compose.ui.unit.dp
import androidx.compose.ui.unit.sp
import androidx.core.content.ContextCompat
import com.github.google.bumble.btbench.ui.theme.BTBenchTheme
import java.io.IOException
import java.util.logging.Logger
private val Log = Logger.getLogger("bumble.main-activity")
@@ -76,6 +80,7 @@ const val SENDER_PACKET_SIZE_PREF_KEY = "sender_packet_size"
const val SENDER_PACKET_INTERVAL_PREF_KEY = "sender_packet_interval"
const val SCENARIO_PREF_KEY = "scenario"
const val MODE_PREF_KEY = "mode"
const val CONNECTION_PRIORITY_PREF_KEY = "connection_priority"
class MainActivity : ComponentActivity() {
private val appViewModel = AppViewModel()
@@ -84,6 +89,47 @@ class MainActivity : ComponentActivity() {
super.onCreate(savedInstanceState)
appViewModel.loadPreferences(getPreferences(Context.MODE_PRIVATE))
checkPermissions()
registerReceivers()
}
private fun registerReceivers() {
val pairingRequestIntentFilter = IntentFilter(BluetoothDevice.ACTION_PAIRING_REQUEST)
registerReceiver(object: BroadcastReceiver() {
@SuppressLint("MissingPermission")
override fun onReceive(context: Context, intent: Intent) {
Log.info("ACTION_PAIRING_REQUEST")
val extras = intent.extras
if (extras != null) {
for (key in extras.keySet()) {
Log.info("$key: ${extras.get(key)}")
}
}
val device: BluetoothDevice? = intent.getParcelableExtra(BluetoothDevice.EXTRA_DEVICE)
if (device != null) {
if (checkSelfPermission(Manifest.permission.BLUETOOTH_PRIVILEGED) == PackageManager.PERMISSION_GRANTED) {
Log.info("confirming pairing")
device.setPairingConfirmation(true)
} else {
Log.info("we don't have BLUETOOTH_PRIVILEGED, not confirming")
}
}
}
}, pairingRequestIntentFilter)
val bondStateChangedIntentFilter = IntentFilter(BluetoothDevice.ACTION_BOND_STATE_CHANGED)
registerReceiver(object: BroadcastReceiver() {
@SuppressLint("MissingPermission")
override fun onReceive(context: Context, intent: Intent) {
Log.info("ACTION_BOND_STATE_CHANGED")
val extras = intent.extras
if (extras != null) {
for (key in extras.keySet()) {
Log.info("$key: ${extras.get(key)}")
}
}
}
}, bondStateChangedIntentFilter)
}
private fun checkPermissions() {
@@ -144,9 +190,7 @@ class MainActivity : ComponentActivity() {
initBluetooth()
setContent {
MainView(
appViewModel,
::becomeDiscoverable,
::runScenario
appViewModel, ::becomeDiscoverable, ::runScenario
)
}
@@ -182,6 +226,8 @@ class MainActivity : ComponentActivity() {
"rfcomm-server" -> appViewModel.mode = RFCOMM_SERVER_MODE
"l2cap-client" -> appViewModel.mode = L2CAP_CLIENT_MODE
"l2cap-server" -> appViewModel.mode = L2CAP_SERVER_MODE
"gatt-client" -> appViewModel.mode = GATT_CLIENT_MODE
"gatt-server" -> appViewModel.mode = GATT_SERVER_MODE
}
}
intent.getStringExtra("autostart")?.let {
@@ -195,19 +241,24 @@ class MainActivity : ComponentActivity() {
private fun runScenario() {
if (bluetoothAdapter == null) {
return
throw IOException("bluetooth not enabled")
}
val runner = when (appViewModel.mode) {
RFCOMM_CLIENT_MODE -> RfcommClient(appViewModel, bluetoothAdapter!!, ::createIoClient)
RFCOMM_SERVER_MODE -> RfcommServer(appViewModel, bluetoothAdapter!!, ::createIoClient)
L2CAP_CLIENT_MODE -> L2capClient(
appViewModel,
bluetoothAdapter!!,
baseContext,
::createIoClient
appViewModel, bluetoothAdapter!!, baseContext, ::createIoClient
)
L2CAP_SERVER_MODE -> L2capServer(appViewModel, bluetoothAdapter!!, ::createIoClient)
GATT_CLIENT_MODE -> GattClient(
appViewModel, bluetoothAdapter!!, baseContext, ::createIoClient
)
GATT_SERVER_MODE -> GattServer(
appViewModel, bluetoothAdapter!!, baseContext, ::createIoClient
)
else -> throw IllegalStateException()
}
runner.run()
@@ -281,7 +332,7 @@ fun MainView(
keyboardController?.hide()
focusManager.clearFocus()
}),
enabled = (appViewModel.mode == RFCOMM_CLIENT_MODE) or (appViewModel.mode == L2CAP_CLIENT_MODE)
enabled = (appViewModel.mode == RFCOMM_CLIENT_MODE || appViewModel.mode == L2CAP_CLIENT_MODE || appViewModel.mode == GATT_CLIENT_MODE)
)
Divider()
TextField(
@@ -349,24 +400,45 @@ fun MainView(
keyboardController?.hide()
focusManager.clearFocus()
}),
enabled = (appViewModel.scenario == PING_SCENARIO)
enabled = (appViewModel.scenario == PING_SCENARIO || appViewModel.scenario == SEND_SCENARIO)
)
Divider()
ActionButton(
text = "Become Discoverable", onClick = becomeDiscoverable, true
)
Row(
horizontalArrangement = Arrangement.SpaceBetween,
verticalAlignment = Alignment.CenterVertically
) {
Text(text = "2M PHY")
Spacer(modifier = Modifier.padding(start = 8.dp))
Switch(
enabled = (appViewModel.mode == L2CAP_CLIENT_MODE || appViewModel.mode == L2CAP_SERVER_MODE),
Switch(enabled = (appViewModel.mode == L2CAP_CLIENT_MODE || appViewModel.mode == L2CAP_SERVER_MODE || appViewModel.mode == GATT_CLIENT_MODE || appViewModel.mode == GATT_SERVER_MODE),
checked = appViewModel.use2mPhy,
onCheckedChange = { appViewModel.use2mPhy = it }
)
onCheckedChange = { appViewModel.use2mPhy = it })
Column(Modifier.selectableGroup()) {
listOf(
"BALANCED", "LOW", "HIGH", "DCK"
).forEach { text ->
Row(
Modifier
.selectable(
selected = (text == appViewModel.connectionPriority),
onClick = { appViewModel.updateConnectionPriority(text) },
role = Role.RadioButton,
)
.padding(horizontal = 16.dp),
verticalAlignment = Alignment.CenterVertically
) {
RadioButton(
selected = (text == appViewModel.connectionPriority),
onClick = null,
enabled = (appViewModel.mode == L2CAP_CLIENT_MODE || appViewModel.mode == L2CAP_SERVER_MODE || appViewModel.mode == GATT_CLIENT_MODE || appViewModel.mode == GATT_SERVER_MODE)
)
Text(
text = text,
style = MaterialTheme.typography.bodyLarge,
modifier = Modifier.padding(start = 16.dp)
)
}
}
}
}
Row {
Column(Modifier.selectableGroup()) {
@@ -374,7 +446,9 @@ fun MainView(
RFCOMM_CLIENT_MODE,
RFCOMM_SERVER_MODE,
L2CAP_CLIENT_MODE,
L2CAP_SERVER_MODE
L2CAP_SERVER_MODE,
GATT_CLIENT_MODE,
GATT_SERVER_MODE
).forEach { text ->
Row(
Modifier
@@ -387,8 +461,7 @@ fun MainView(
verticalAlignment = Alignment.CenterVertically
) {
RadioButton(
selected = (text == appViewModel.mode),
onClick = null
selected = (text == appViewModel.mode), onClick = null
)
Text(
text = text,
@@ -400,10 +473,7 @@ fun MainView(
}
Column(Modifier.selectableGroup()) {
listOf(
SEND_SCENARIO,
RECEIVE_SCENARIO,
PING_SCENARIO,
PONG_SCENARIO
SEND_SCENARIO, RECEIVE_SCENARIO, PING_SCENARIO, PONG_SCENARIO
).forEach { text ->
Row(
Modifier
@@ -416,8 +486,7 @@ fun MainView(
verticalAlignment = Alignment.CenterVertically
) {
RadioButton(
selected = (text == appViewModel.scenario),
onClick = null
selected = (text == appViewModel.scenario), onClick = null
)
Text(
text = text,
@@ -435,20 +504,29 @@ fun MainView(
ActionButton(
text = "Stop", onClick = appViewModel::abort, enabled = appViewModel.running
)
ActionButton(
text = "Become Discoverable", onClick = becomeDiscoverable, true
)
}
Divider()
Text(
text = if (appViewModel.mtu != 0) "MTU: ${appViewModel.mtu}" else ""
)
Text(
text = if (appViewModel.rxPhy != 0 || appViewModel.txPhy != 0) "PHY: tx=${appViewModel.txPhy}, rx=${appViewModel.rxPhy}" else ""
)
if (appViewModel.mtu != 0) {
Text(
text = "MTU: ${appViewModel.mtu}"
)
}
if (appViewModel.rxPhy != 0) {
Text(
text = "PHY: tx=${appViewModel.txPhy}, rx=${appViewModel.rxPhy}"
)
}
Text(
text = "Status: ${appViewModel.status}"
)
Text(
text = "Last Error: ${appViewModel.lastError}"
)
if (appViewModel.lastError.isNotEmpty()) {
Text(
text = "Last Error: ${appViewModel.lastError}"
)
}
Text(
text = "Packets Sent: ${appViewModel.packetsSent}"
)

View File

@@ -25,6 +25,7 @@ import java.util.UUID
val DEFAULT_RFCOMM_UUID: UUID = UUID.fromString("E6D55659-C8B4-4B85-96BB-B1143AF6D3AE")
const val DEFAULT_PEER_BLUETOOTH_ADDRESS = "AA:BB:CC:DD:EE:FF"
const val DEFAULT_STARTUP_DELAY = 3000
const val DEFAULT_SENDER_PACKET_COUNT = 100
const val DEFAULT_SENDER_PACKET_SIZE = 1024
const val DEFAULT_SENDER_PACKET_INTERVAL = 100
@@ -34,6 +35,8 @@ const val L2CAP_CLIENT_MODE = "L2CAP Client"
const val L2CAP_SERVER_MODE = "L2CAP Server"
const val RFCOMM_CLIENT_MODE = "RFCOMM Client"
const val RFCOMM_SERVER_MODE = "RFCOMM Server"
const val GATT_CLIENT_MODE = "GATT Client"
const val GATT_SERVER_MODE = "GATT Server"
const val SEND_SCENARIO = "Send"
const val RECEIVE_SCENARIO = "Receive"
@@ -47,8 +50,10 @@ class AppViewModel : ViewModel() {
var mode by mutableStateOf(RFCOMM_SERVER_MODE)
var scenario by mutableStateOf(RECEIVE_SCENARIO)
var peerBluetoothAddress by mutableStateOf(DEFAULT_PEER_BLUETOOTH_ADDRESS)
var startupDelay by mutableIntStateOf(DEFAULT_STARTUP_DELAY)
var l2capPsm by mutableIntStateOf(DEFAULT_PSM)
var use2mPhy by mutableStateOf(true)
var connectionPriority by mutableStateOf("BALANCED")
var mtu by mutableIntStateOf(0)
var rxPhy by mutableIntStateOf(0)
var txPhy by mutableIntStateOf(0)
@@ -98,6 +103,11 @@ class AppViewModel : ViewModel() {
if (savedScenario != null) {
scenario = savedScenario
}
val savedConnectionPriority = preferences.getString(CONNECTION_PRIORITY_PREF_KEY, null)
if (savedConnectionPriority != null) {
connectionPriority = savedConnectionPriority
}
}
fun updatePeerBluetoothAddress(peerBluetoothAddress: String) {
@@ -220,6 +230,14 @@ class AppViewModel : ViewModel() {
}
}
fun updateConnectionPriority(connectionPriority: String) {
this.connectionPriority = connectionPriority
with(preferences!!.edit()) {
putString(CONNECTION_PRIORITY_PREF_KEY, connectionPriority)
apply()
}
}
fun clear() {
status = ""
lastError = ""

View File

@@ -17,6 +17,7 @@ package com.github.google.bumble.btbench
import android.bluetooth.BluetoothSocket
import java.io.IOException
import java.nio.ByteBuffer
import java.nio.ByteOrder
import java.util.logging.Logger
import kotlin.math.min
@@ -37,11 +38,16 @@ abstract class Packet(val type: Int, val payload: ByteArray = ByteArray(0)) {
RESET -> ResetPacket()
SEQUENCE -> SequencePacket(
data[1].toInt(),
ByteBuffer.wrap(data, 2, 4).getInt(),
data.sliceArray(6..<data.size)
ByteBuffer.wrap(data, 2, 4).order(ByteOrder.LITTLE_ENDIAN).getInt(),
ByteBuffer.wrap(data, 6, 4).order(ByteOrder.LITTLE_ENDIAN).getInt(),
data.sliceArray(10..<data.size)
)
ACK -> AckPacket(
data[1].toInt(),
ByteBuffer.wrap(data, 2, 4).order(ByteOrder.LITTLE_ENDIAN).getInt()
)
ACK -> AckPacket(data[1].toInt(), ByteBuffer.wrap(data, 2, 4).getInt())
else -> GenericPacket(data[0].toInt(), data.sliceArray(1..<data.size))
}
}
@@ -57,16 +63,24 @@ class ResetPacket : Packet(RESET)
class AckPacket(val flags: Int, val sequenceNumber: Int) : Packet(ACK) {
override fun toBytes(): ByteArray {
return ByteBuffer.allocate(1 + 1 + 4).put(type.toByte()).put(flags.toByte())
return ByteBuffer.allocate(6).order(
ByteOrder.LITTLE_ENDIAN
).put(type.toByte()).put(flags.toByte())
.putInt(sequenceNumber).array()
}
}
class SequencePacket(val flags: Int, val sequenceNumber: Int, payload: ByteArray) :
class SequencePacket(
val flags: Int,
val sequenceNumber: Int,
val timestamp: Int,
payload: ByteArray
) :
Packet(SEQUENCE, payload) {
override fun toBytes(): ByteArray {
return ByteBuffer.allocate(1 + 1 + 4 + payload.size).put(type.toByte()).put(flags.toByte())
.putInt(sequenceNumber).put(payload).array()
return ByteBuffer.allocate(10 + payload.size).order(ByteOrder.LITTLE_ENDIAN)
.put(type.toByte()).put(flags.toByte())
.putInt(sequenceNumber).putInt(timestamp).put(payload).array()
}
}

View File

@@ -19,8 +19,6 @@ import java.util.logging.Logger
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.TimeSource
private const val DEFAULT_STARTUP_DELAY = 3000
private val Log = Logger.getLogger("btbench.pinger")
class Pinger(private val viewModel: AppViewModel, private val packetIO: PacketIO) : IoClient,
@@ -36,8 +34,8 @@ class Pinger(private val viewModel: AppViewModel, private val packetIO: PacketIO
override fun run() {
viewModel.clear()
Log.info("startup delay: $DEFAULT_STARTUP_DELAY")
Thread.sleep(DEFAULT_STARTUP_DELAY.toLong());
Log.info("startup delay: ${viewModel.startupDelay}")
Thread.sleep(viewModel.startupDelay.toLong());
Log.info("running")
Log.info("sending reset")
@@ -48,19 +46,23 @@ class Pinger(private val viewModel: AppViewModel, private val packetIO: PacketIO
val startTime = TimeSource.Monotonic.markNow()
for (i in 0..<packetCount) {
val now = TimeSource.Monotonic.markNow()
val targetTime = startTime + (i * viewModel.senderPacketInterval).milliseconds
val delay = targetTime - now
if (delay.isPositive()) {
Log.info("sleeping ${delay.inWholeMilliseconds} ms")
Thread.sleep(delay.inWholeMilliseconds)
var now = TimeSource.Monotonic.markNow()
if (viewModel.senderPacketInterval > 0) {
val targetTime = startTime + (i * viewModel.senderPacketInterval).milliseconds
val delay = targetTime - now
if (delay.isPositive()) {
Log.info("sleeping ${delay.inWholeMilliseconds} ms")
Thread.sleep(delay.inWholeMilliseconds)
now = TimeSource.Monotonic.markNow()
}
}
pingTimes.add(TimeSource.Monotonic.markNow())
packetIO.sendPacket(
SequencePacket(
if (i < packetCount - 1) 0 else Packet.LAST_FLAG,
i,
ByteArray(packetSize - 6)
(now - startTime).inWholeMicroseconds.toInt(),
ByteArray(packetSize - 10)
)
)
viewModel.packetsSent = i + 1

View File

@@ -14,6 +14,7 @@
package com.github.google.bumble.btbench
import java.util.concurrent.CountDownLatch
import java.util.logging.Logger
import kotlin.time.TimeSource
@@ -23,6 +24,7 @@ class Ponger(private val viewModel: AppViewModel, private val packetIO: PacketIO
private var startTime: TimeSource.Monotonic.ValueTimeMark = TimeSource.Monotonic.markNow()
private var lastPacketTime: TimeSource.Monotonic.ValueTimeMark = TimeSource.Monotonic.markNow()
private var expectedSequenceNumber: Int = 0
private val done = CountDownLatch(1)
init {
packetIO.packetSink = this
@@ -30,6 +32,7 @@ class Ponger(private val viewModel: AppViewModel, private val packetIO: PacketIO
override fun run() {
viewModel.clear()
done.await()
}
override fun abort() {}
@@ -58,5 +61,10 @@ class Ponger(private val viewModel: AppViewModel, private val packetIO: PacketIO
packetIO.sendPacket(AckPacket(packet.flags, packet.sequenceNumber))
viewModel.packetsSent += 1
if (packet.flags and Packet.LAST_FLAG != 0) {
Log.info("received last packet")
done.countDown()
}
}
}

View File

@@ -14,6 +14,7 @@
package com.github.google.bumble.btbench
import java.util.concurrent.CountDownLatch
import java.util.logging.Logger
import kotlin.time.DurationUnit
import kotlin.time.TimeSource
@@ -24,6 +25,7 @@ class Receiver(private val viewModel: AppViewModel, private val packetIO: Packet
private var startTime: TimeSource.Monotonic.ValueTimeMark = TimeSource.Monotonic.markNow()
private var lastPacketTime: TimeSource.Monotonic.ValueTimeMark = TimeSource.Monotonic.markNow()
private var bytesReceived = 0
private val done = CountDownLatch(1)
init {
packetIO.packetSink = this
@@ -31,6 +33,7 @@ class Receiver(private val viewModel: AppViewModel, private val packetIO: Packet
override fun run() {
viewModel.clear()
done.await()
}
override fun abort() {}
@@ -62,6 +65,7 @@ class Receiver(private val viewModel: AppViewModel, private val packetIO: Packet
Log.info("throughput: $throughput")
viewModel.throughput = throughput
packetIO.sendPacket(AckPacket(packet.flags, packet.sequenceNumber))
done.countDown()
}
}
}

View File

@@ -16,11 +16,10 @@ package com.github.google.bumble.btbench
import java.util.concurrent.Semaphore
import java.util.logging.Logger
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.DurationUnit
import kotlin.time.TimeSource
private const val DEFAULT_STARTUP_DELAY = 3000
private val Log = Logger.getLogger("btbench.sender")
class Sender(private val viewModel: AppViewModel, private val packetIO: PacketIO) : IoClient,
@@ -36,8 +35,8 @@ class Sender(private val viewModel: AppViewModel, private val packetIO: PacketIO
override fun run() {
viewModel.clear()
Log.info("startup delay: $DEFAULT_STARTUP_DELAY")
Thread.sleep(DEFAULT_STARTUP_DELAY.toLong());
Log.info("startup delay: ${viewModel.startupDelay}")
Thread.sleep(viewModel.startupDelay.toLong());
Log.info("running")
Log.info("sending reset")
@@ -47,20 +46,32 @@ class Sender(private val viewModel: AppViewModel, private val packetIO: PacketIO
val packetCount = viewModel.senderPacketCount
val packetSize = viewModel.senderPacketSize
for (i in 0..<packetCount - 1) {
packetIO.sendPacket(SequencePacket(0, i, ByteArray(packetSize - 6)))
for (i in 0..<packetCount) {
var now = TimeSource.Monotonic.markNow()
if (viewModel.senderPacketInterval > 0) {
val targetTime = startTime + (i * viewModel.senderPacketInterval).milliseconds
val delay = targetTime - now
if (delay.isPositive()) {
Log.info("sleeping ${delay.inWholeMilliseconds} ms")
Thread.sleep(delay.inWholeMilliseconds)
}
now = TimeSource.Monotonic.markNow()
}
val flags = when (i) {
packetCount - 1 -> Packet.LAST_FLAG
else -> 0
}
packetIO.sendPacket(
SequencePacket(
flags,
i,
(now - startTime).inWholeMicroseconds.toInt(),
ByteArray(packetSize - 10)
)
)
bytesSent += packetSize
viewModel.packetsSent = i + 1
}
packetIO.sendPacket(
SequencePacket(
Packet.LAST_FLAG,
packetCount - 1,
ByteArray(packetSize - 6)
)
)
bytesSent += packetSize
viewModel.packetsSent = packetCount
// Wait for the ACK
Log.info("waiting for ACK")

View File

@@ -153,7 +153,7 @@ disable = [
]
[tool.pylint.main]
ignore = "pandora" # FIXME: pylint does not support stubs yet:
ignore=["pandora", "mobly"] # FIXME: pylint does not support stubs yet
[tool.pylint.typecheck]
signature-mutators = "AsyncRunner.run_in_task"

View File

@@ -34,7 +34,7 @@ from bumble.device import (
Device,
PeriodicAdvertisingParameters,
)
from bumble.host import AclPacketQueue, Host
from bumble.host import DataPacketQueue, Host
from bumble.hci import (
HCI_ACCEPT_CONNECTION_REQUEST_COMMAND,
HCI_COMMAND_STATUS_PENDING,
@@ -50,12 +50,7 @@ from bumble.hci import (
HCI_Error,
HCI_Packet,
)
from bumble.gatt import (
GATT_GENERIC_ACCESS_SERVICE,
GATT_CHARACTERISTIC_ATTRIBUTE_TYPE,
GATT_DEVICE_NAME_CHARACTERISTIC,
GATT_APPEARANCE_CHARACTERISTIC,
)
from bumble import gatt
from .test_utils import TwoDevices, async_barrier
@@ -90,9 +85,9 @@ async def test_device_connect_parallel():
def _send(packet):
pass
d0.host.acl_packet_queue = AclPacketQueue(0, 0, _send)
d1.host.acl_packet_queue = AclPacketQueue(0, 0, _send)
d2.host.acl_packet_queue = AclPacketQueue(0, 0, _send)
d0.host.acl_packet_queue = DataPacketQueue(0, 0, _send)
d1.host.acl_packet_queue = DataPacketQueue(0, 0, _send)
d2.host.acl_packet_queue = DataPacketQueue(0, 0, _send)
# enable classic
d0.classic_enabled = True
@@ -592,32 +587,54 @@ async def test_power_on_default_static_address_should_not_be_any():
# -----------------------------------------------------------------------------
def test_gatt_services_with_gas():
def test_gatt_services_with_gas_and_gatt():
device = Device(host=Host(None, None))
# there should be one service and two chars, therefore 5 attributes
assert len(device.gatt_server.attributes) == 5
assert device.gatt_server.attributes[0].uuid == GATT_GENERIC_ACCESS_SERVICE
assert device.gatt_server.attributes[1].type == GATT_CHARACTERISTIC_ATTRIBUTE_TYPE
assert device.gatt_server.attributes[2].uuid == GATT_DEVICE_NAME_CHARACTERISTIC
assert device.gatt_server.attributes[3].type == GATT_CHARACTERISTIC_ATTRIBUTE_TYPE
assert device.gatt_server.attributes[4].uuid == GATT_APPEARANCE_CHARACTERISTIC
# there should be 2 service, 5 chars, and 1 descriptors, therefore 13 attributes
assert len(device.gatt_server.attributes) == 13
assert device.gatt_server.attributes[0].uuid == gatt.GATT_GENERIC_ACCESS_SERVICE
assert (
device.gatt_server.attributes[1].type == gatt.GATT_CHARACTERISTIC_ATTRIBUTE_TYPE
)
assert device.gatt_server.attributes[2].uuid == gatt.GATT_DEVICE_NAME_CHARACTERISTIC
assert (
device.gatt_server.attributes[3].type == gatt.GATT_CHARACTERISTIC_ATTRIBUTE_TYPE
)
assert device.gatt_server.attributes[4].uuid == gatt.GATT_APPEARANCE_CHARACTERISTIC
# -----------------------------------------------------------------------------
def test_gatt_services_without_gas():
device = Device(host=Host(None, None), generic_access_service=False)
# there should be no services
assert len(device.gatt_server.attributes) == 0
assert device.gatt_server.attributes[5].uuid == gatt.GATT_GENERIC_ATTRIBUTE_SERVICE
assert (
device.gatt_server.attributes[6].type == gatt.GATT_CHARACTERISTIC_ATTRIBUTE_TYPE
)
assert (
device.gatt_server.attributes[7].uuid
== gatt.GATT_SERVICE_CHANGED_CHARACTERISTIC
)
assert (
device.gatt_server.attributes[8].type
== gatt.GATT_CLIENT_CHARACTERISTIC_CONFIGURATION_DESCRIPTOR
)
assert (
device.gatt_server.attributes[9].type == gatt.GATT_CHARACTERISTIC_ATTRIBUTE_TYPE
)
assert (
device.gatt_server.attributes[10].uuid
== gatt.GATT_CLIENT_SUPPORTED_FEATURES_CHARACTERISTIC
)
assert (
device.gatt_server.attributes[11].type
== gatt.GATT_CHARACTERISTIC_ATTRIBUTE_TYPE
)
assert (
device.gatt_server.attributes[12].uuid == gatt.GATT_DATABASE_HASH_CHARACTERISTIC
)
# -----------------------------------------------------------------------------
async def run_test_device():
await test_device_connect_parallel()
await test_flush()
await test_gatt_services_with_gas()
await test_gatt_services_without_gas()
await test_gatt_services_with_gas_and_gatt()
# -----------------------------------------------------------------------------

140
tests/gatt_service_test.py Normal file
View File

@@ -0,0 +1,140 @@
# Copyright 2021-2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
from . import test_utils
from bumble import device
from bumble import gatt
from bumble.profiles import gatt_service
# -----------------------------------------------------------------------------
async def test_database_hash():
devices = await test_utils.TwoDevices.create_with_connection()
devices[0].gatt_server.services.clear()
devices[0].gatt_server.attributes.clear()
devices[0].gatt_server.attributes_by_handle.clear()
devices[0].add_service(
gatt.Service(
gatt.GATT_GENERIC_ACCESS_SERVICE,
characteristics=[
gatt.Characteristic(
gatt.GATT_DEVICE_NAME_CHARACTERISTIC,
(
gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.WRITE
),
gatt.Characteristic.Permissions.READ_REQUIRES_AUTHENTICATION,
),
gatt.Characteristic(
gatt.GATT_APPEARANCE_CHARACTERISTIC,
gatt.Characteristic.Properties.READ,
gatt.Characteristic.Permissions.READ_REQUIRES_AUTHENTICATION,
),
],
)
)
devices[0].add_service(
gatt_service.GenericAttributeProfileService(
server_supported_features=None,
database_hash_enabled=True,
service_change_enabled=True,
)
)
devices[0].gatt_server.add_attribute(
gatt.Service(gatt.GATT_GLUCOSE_SERVICE, characteristics=[])
)
# There is a special attribute order in the spec, so we need to add attribute one by
# one here.
battery_service = gatt.Service(
gatt.GATT_BATTERY_SERVICE,
characteristics=[
gatt.Characteristic(
gatt.GATT_BATTERY_LEVEL_CHARACTERISTIC,
properties=gatt.Characteristic.Properties.READ,
permissions=gatt.Characteristic.Permissions.READ_REQUIRES_AUTHENTICATION,
)
],
primary=False,
)
battery_service.handle = 0x0014
battery_service.end_group_handle = 0x0016
devices[0].gatt_server.add_attribute(
gatt.IncludedServiceDeclaration(battery_service)
)
c = gatt.Characteristic(
'2A18',
properties=(
gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.INDICATE
| gatt.Characteristic.Properties.EXTENDED_PROPERTIES
),
permissions=gatt.Characteristic.Permissions.READ_REQUIRES_AUTHENTICATION,
)
devices[0].gatt_server.add_attribute(
gatt.CharacteristicDeclaration(c, devices[0].gatt_server.next_handle() + 1)
)
devices[0].gatt_server.add_attribute(c)
devices[0].gatt_server.add_attribute(
gatt.Descriptor(
gatt.GATT_CLIENT_CHARACTERISTIC_CONFIGURATION_DESCRIPTOR,
gatt.Descriptor.Permissions.READ_REQUIRES_AUTHENTICATION,
b'\x02\x00',
),
)
devices[0].gatt_server.add_attribute(
gatt.Descriptor(
gatt.GATT_CHARACTERISTIC_EXTENDED_PROPERTIES_DESCRIPTOR,
gatt.Descriptor.Permissions.READ_REQUIRES_AUTHENTICATION,
b'\x00\x00',
),
)
devices[0].add_service(battery_service)
peer = device.Peer(devices.connections[1])
client = await peer.discover_service_and_create_proxy(
gatt_service.GenericAttributeProfileServiceProxy
)
assert client.database_hash_characteristic
assert await client.database_hash_characteristic.read_value() == bytes.fromhex(
'F1CA2D48ECF58BAC8A8830BBB9FBA990'
)
# -----------------------------------------------------------------------------
async def test_service_changed():
devices = await test_utils.TwoDevices.create_with_connection()
assert (service := devices[0].gatt_service)
peer = device.Peer(devices.connections[1])
assert (
client := await peer.discover_service_and_create_proxy(
gatt_service.GenericAttributeProfileServiceProxy
)
)
assert client.service_changed_characteristic
indications = []
await client.service_changed_characteristic.subscribe(
indications.append, prefer_notify=False
)
await devices[0].indicate_subscribers(
service.service_changed_characteristic, b'1234'
)
await test_utils.async_barrier()
assert indications[0] == b'1234'

View File

@@ -957,11 +957,12 @@ async def test_discover_all():
peer = Peer(connection)
await peer.discover_all()
assert len(peer.gatt_client.services) == 3
# service 1800 gets added automatically
assert len(peer.gatt_client.services) == 4
# service 1800 and 1801 get added automatically
assert peer.gatt_client.services[0].uuid == UUID('1800')
assert peer.gatt_client.services[1].uuid == service1.uuid
assert peer.gatt_client.services[2].uuid == service2.uuid
assert peer.gatt_client.services[1].uuid == UUID('1801')
assert peer.gatt_client.services[2].uuid == service1.uuid
assert peer.gatt_client.services[3].uuid == service2.uuid
s = peer.get_services_by_uuid(service1.uuid)
assert len(s) == 1
assert len(s[0].characteristics) == 2
@@ -1084,10 +1085,18 @@ CharacteristicDeclaration(handle=0x0002, value_handle=0x0003, uuid=UUID-16:2A00
Characteristic(handle=0x0003, end=0x0003, uuid=UUID-16:2A00 (Device Name), READ)
CharacteristicDeclaration(handle=0x0004, value_handle=0x0005, uuid=UUID-16:2A01 (Appearance), READ)
Characteristic(handle=0x0005, end=0x0005, uuid=UUID-16:2A01 (Appearance), READ)
Service(handle=0x0006, end=0x0009, uuid=3A657F47-D34F-46B3-B1EC-698E29B6B829)
CharacteristicDeclaration(handle=0x0007, value_handle=0x0008, uuid=FDB159DB-036C-49E3-B3DB-6325AC750806, READ|WRITE|NOTIFY)
Characteristic(handle=0x0008, end=0x0009, uuid=FDB159DB-036C-49E3-B3DB-6325AC750806, READ|WRITE|NOTIFY)
Descriptor(handle=0x0009, type=UUID-16:2902 (Client Characteristic Configuration), value=0000)"""
Service(handle=0x0006, end=0x000D, uuid=UUID-16:1801 (Generic Attribute))
CharacteristicDeclaration(handle=0x0007, value_handle=0x0008, uuid=UUID-16:2A05 (Service Changed), INDICATE)
Characteristic(handle=0x0008, end=0x0009, uuid=UUID-16:2A05 (Service Changed), INDICATE)
Descriptor(handle=0x0009, type=UUID-16:2902 (Client Characteristic Configuration), value=0000)
CharacteristicDeclaration(handle=0x000A, value_handle=0x000B, uuid=UUID-16:2B29 (Client Supported Features), READ|WRITE)
Characteristic(handle=0x000B, end=0x000B, uuid=UUID-16:2B29 (Client Supported Features), READ|WRITE)
CharacteristicDeclaration(handle=0x000C, value_handle=0x000D, uuid=UUID-16:2B2A (Database Hash), READ)
Characteristic(handle=0x000D, end=0x000D, uuid=UUID-16:2B2A (Database Hash), READ)
Service(handle=0x000E, end=0x0011, uuid=3A657F47-D34F-46B3-B1EC-698E29B6B829)
CharacteristicDeclaration(handle=0x000F, value_handle=0x0010, uuid=FDB159DB-036C-49E3-B3DB-6325AC750806, READ|WRITE|NOTIFY)
Characteristic(handle=0x0010, end=0x0011, uuid=FDB159DB-036C-49E3-B3DB-6325AC750806, READ|WRITE|NOTIFY)
Descriptor(handle=0x0011, type=UUID-16:2902 (Client Characteristic Configuration), value=0000)"""
)

View File

@@ -15,6 +15,7 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import struct
from bumble.hci import (
HCI_DISCONNECT_COMMAND,
@@ -22,6 +23,7 @@ from bumble.hci import (
HCI_LE_CODED_PHY_BIT,
HCI_LE_READ_BUFFER_SIZE_COMMAND,
HCI_RESET_COMMAND,
HCI_VENDOR_EVENT,
HCI_SUCCESS,
HCI_LE_CONNECTION_COMPLETE_EVENT,
HCI_LE_ENHANCED_CONNECTION_COMPLETE_V2_EVENT,
@@ -67,6 +69,7 @@ from bumble.hci import (
HCI_Read_Local_Version_Information_Command,
HCI_Reset_Command,
HCI_Set_Event_Mask_Command,
HCI_Vendor_Event,
)
@@ -167,8 +170,8 @@ def test_HCI_Command_Complete_Event():
command_opcode=HCI_LE_READ_BUFFER_SIZE_COMMAND,
return_parameters=HCI_LE_Read_Buffer_Size_Command.create_return_parameters(
status=0,
hc_le_acl_data_packet_length=1234,
hc_total_num_le_acl_data_packets=56,
le_acl_data_packet_length=1234,
total_num_le_acl_data_packets=56,
),
)
basic_check(event)
@@ -213,6 +216,41 @@ def test_HCI_Number_Of_Completed_Packets_Event():
basic_check(event)
# -----------------------------------------------------------------------------
def test_HCI_Vendor_Event():
data = bytes.fromhex('01020304')
event = HCI_Vendor_Event(data=data)
event_bytes = bytes(event)
parsed = HCI_Packet.from_bytes(event_bytes)
assert isinstance(parsed, HCI_Vendor_Event)
assert parsed.data == data
class HCI_Custom_Event(HCI_Event):
def __init__(self, blabla):
super().__init__(HCI_VENDOR_EVENT, parameters=struct.pack("<I", blabla))
self.name = 'HCI_CUSTOM_EVENT'
self.blabla = blabla
def create_event(payload):
if payload[0] == 1:
return HCI_Custom_Event(blabla=struct.unpack('<I', payload)[0])
return None
HCI_Event.add_vendor_factory(create_event)
parsed = HCI_Packet.from_bytes(event_bytes)
assert isinstance(parsed, HCI_Custom_Event)
assert parsed.blabla == 0x04030201
event_bytes2 = event_bytes[:3] + bytes([7]) + event_bytes[4:]
parsed = HCI_Packet.from_bytes(event_bytes2)
assert not isinstance(parsed, HCI_Custom_Event)
assert isinstance(parsed, HCI_Vendor_Event)
HCI_Event.remove_vendor_factory(create_event)
parsed = HCI_Packet.from_bytes(event_bytes)
assert not isinstance(parsed, HCI_Custom_Event)
assert isinstance(parsed, HCI_Vendor_Event)
# -----------------------------------------------------------------------------
def test_HCI_Command():
command = HCI_Command(0x5566)
@@ -576,6 +614,7 @@ def run_test_events():
test_HCI_Command_Complete_Event()
test_HCI_Command_Status_Event()
test_HCI_Number_Of_Completed_Packets_Event()
test_HCI_Vendor_Event()
# -----------------------------------------------------------------------------

View File

@@ -16,11 +16,14 @@
# Imports
# -----------------------------------------------------------------------------
import logging
import unittest.mock
import pytest
import unittest
from bumble.controller import Controller
from bumble.host import Host
from bumble.host import Host, DataPacketQueue
from bumble.transport import AsyncPipeSink
from bumble.hci import HCI_AclDataPacket
# -----------------------------------------------------------------------------
# Logging
@@ -60,3 +63,90 @@ async def test_reset(supported_commands: str, lmp_features: str):
assert host.local_lmp_features == int.from_bytes(
bytes.fromhex(lmp_features), 'little'
)
# -----------------------------------------------------------------------------
def test_data_packet_queue():
controller = unittest.mock.Mock()
queue = DataPacketQueue(10, 2, controller.send)
assert queue.queued == 0
assert queue.completed == 0
packet = HCI_AclDataPacket(
connection_handle=123, pb_flag=0, bc_flag=0, data_total_length=0, data=b''
)
queue.enqueue(packet, packet.connection_handle)
assert queue.queued == 1
assert queue.completed == 0
assert controller.send.call_count == 1
queue.enqueue(packet, packet.connection_handle)
assert queue.queued == 2
assert queue.completed == 0
assert controller.send.call_count == 2
queue.enqueue(packet, packet.connection_handle)
assert queue.queued == 3
assert queue.completed == 0
assert controller.send.call_count == 2
queue.on_packets_completed(1, 8000)
assert queue.queued == 3
assert queue.completed == 0
assert controller.send.call_count == 2
queue.on_packets_completed(1, 123)
assert queue.queued == 3
assert queue.completed == 1
assert controller.send.call_count == 3
queue.enqueue(packet, packet.connection_handle)
assert queue.queued == 4
assert queue.completed == 1
assert controller.send.call_count == 3
queue.on_packets_completed(2, 123)
assert queue.queued == 4
assert queue.completed == 3
assert controller.send.call_count == 4
queue.on_packets_completed(1, 123)
assert queue.queued == 4
assert queue.completed == 4
assert controller.send.call_count == 4
queue.enqueue(packet, 123)
queue.enqueue(packet, 123)
queue.enqueue(packet, 123)
queue.enqueue(packet, 124)
queue.enqueue(packet, 124)
queue.enqueue(packet, 124)
queue.on_packets_completed(1, 123)
assert queue.queued == 10
assert queue.completed == 5
queue.flush(123)
queue.flush(124)
assert queue.queued == 10
assert queue.completed == 10
queue.enqueue(packet, 123)
queue.on_packets_completed(1, 124)
assert queue.queued == 11
assert queue.completed == 10
queue.on_packets_completed(1000, 123)
assert queue.queued == 11
assert queue.completed == 11
drain_listener = unittest.mock.Mock()
queue.on('flow', drain_listener.on_flow)
queue.enqueue(packet, 123)
assert drain_listener.on_flow.call_count == 0
queue.on_packets_completed(1, 123)
assert drain_listener.on_flow.call_count == 1
queue.enqueue(packet, 123)
queue.enqueue(packet, 123)
queue.enqueue(packet, 123)
queue.flush(123)
assert drain_listener.on_flow.call_count == 1
assert queue.queued == 15
assert queue.completed == 15

View File

@@ -20,12 +20,11 @@ import logging
import os
import pytest
from bumble.core import UUID, BT_L2CAP_PROTOCOL_ID, BT_RFCOMM_PROTOCOL_ID
from bumble.core import UUID, BT_L2CAP_PROTOCOL_ID
from bumble.sdp import (
DataElement,
ServiceAttribute,
Client,
Server,
SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID,
SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID,
SDP_PUBLIC_BROWSE_ROOT,
@@ -174,9 +173,10 @@ def test_data_elements() -> None:
# -----------------------------------------------------------------------------
def sdp_records():
def sdp_records(record_count=1):
return {
0x00010001: [
0x00010001
+ i: [
ServiceAttribute(
SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID,
DataElement.unsigned_integer_32(0x00010001),
@@ -200,6 +200,7 @@ def sdp_records():
),
),
]
for i in range(record_count)
}
@@ -216,19 +217,55 @@ async def test_service_search():
devices.devices[0].sdp_server.service_records.update(sdp_records())
# Search for service
client = Client(devices.connections[1])
await client.connect()
services = await client.search_services(
[UUID('E6D55659-C8B4-4B85-96BB-B1143AF6D3AE')]
)
async with Client(devices.connections[1]) as client:
services = await client.search_services(
[UUID('E6D55659-C8B4-4B85-96BB-B1143AF6D3AF')]
)
assert len(services) == 0
# Then
assert services[0] == 0x00010001
services = await client.search_services(
[UUID('E6D55659-C8B4-4B85-96BB-B1143AF6D3AE')]
)
assert len(services) == 1
assert services[0] == 0x00010001
services = await client.search_services(
[BT_L2CAP_PROTOCOL_ID, SDP_PUBLIC_BROWSE_ROOT]
)
assert len(services) == 1
assert services[0] == 0x00010001
services = await client.search_services(
[BT_L2CAP_PROTOCOL_ID, SDP_PUBLIC_BROWSE_ROOT]
)
assert len(services) == 1
assert services[0] == 0x00010001
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_service_attribute():
async def test_service_search_with_continuation():
# Setup connections
devices = TwoDevices()
await devices.setup_connection()
# Register SDP service
records = sdp_records(100)
devices.devices[0].sdp_server.service_records.update(records)
# Search for service
async with Client(devices.connections[1], mtu=48) as client:
services = await client.search_services(
[UUID('E6D55659-C8B4-4B85-96BB-B1143AF6D3AE')]
)
assert len(services) == len(records)
for i in range(len(records)):
assert services[i] == 0x00010001 + i
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_service_attributes():
# Setup connections
devices = TwoDevices()
await devices.setup_connection()
@@ -236,15 +273,43 @@ async def test_service_attribute():
# Register SDP service
devices.devices[0].sdp_server.service_records.update(sdp_records())
# Search for service
client = Client(devices.connections[1])
await client.connect()
attributes = await client.get_attributes(
0x00010001, [SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID]
)
# Get attributes
async with Client(devices.connections[1]) as client:
attributes = await client.get_attributes(0x00010001, [1234])
assert len(attributes) == 0
# Then
assert attributes[0].value.value == sdp_records()[0x00010001][0].value.value
attributes = await client.get_attributes(
0x00010001, [SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID]
)
assert len(attributes) == 1
assert attributes[0].value.value == sdp_records()[0x00010001][0].value.value
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_service_attributes_with_continuation():
# Setup connections
devices = TwoDevices()
await devices.setup_connection()
# Register SDP service
records = {
0x00010001: [
ServiceAttribute(
x,
DataElement.unsigned_integer_32(0x00010001),
)
for x in range(100)
]
}
devices.devices[0].sdp_server.service_records.update(records)
# Get attributes
async with Client(devices.connections[1], mtu=48) as client:
attributes = await client.get_attributes(0x00010001, list(range(100)))
assert len(attributes) == 100
for i, attribute in enumerate(attributes):
assert attribute.id == i
# -----------------------------------------------------------------------------
@@ -255,19 +320,81 @@ async def test_service_search_attribute():
await devices.setup_connection()
# Register SDP service
devices.devices[0].sdp_server.service_records.update(sdp_records())
records = {
0x00010001: [
ServiceAttribute(
4,
DataElement.sequence(
[DataElement.uuid(UUID('E6D55659-C8B4-4B85-96BB-B1143AF6D3AE'))]
),
),
ServiceAttribute(
3,
DataElement.sequence(
[DataElement.uuid(UUID('E6D55659-C8B4-4B85-96BB-B1143AF6D3AE'))]
),
),
ServiceAttribute(
1,
DataElement.sequence(
[DataElement.uuid(UUID('E6D55659-C8B4-4B85-96BB-B1143AF6D3AE'))]
),
),
]
}
devices.devices[0].sdp_server.service_records.update(records)
# Search for service
client = Client(devices.connections[1])
await client.connect()
attributes = await client.search_attributes(
[UUID('E6D55659-C8B4-4B85-96BB-B1143AF6D3AE')], [(0x0000FFFF, 8)]
)
async with Client(devices.connections[1]) as client:
attributes = await client.search_attributes(
[UUID('E6D55659-C8B4-4B85-96BB-B1143AF6D3AE')], [(0, 0xFFFF)]
)
assert len(attributes) == 1
assert len(attributes[0]) == 3
assert attributes[0][0].id == 1
assert attributes[0][1].id == 3
assert attributes[0][2].id == 4
# Then
for expect, actual in zip(attributes, sdp_records().values()):
assert expect.id == actual.id
assert expect.value == actual.value
attributes = await client.search_attributes(
[UUID('E6D55659-C8B4-4B85-96BB-B1143AF6D3AE')], [1, 2, 3]
)
assert len(attributes) == 1
assert len(attributes[0]) == 2
assert attributes[0][0].id == 1
assert attributes[0][1].id == 3
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_service_search_attribute_with_continuation():
# Setup connections
devices = TwoDevices()
await devices.setup_connection()
# Register SDP service
records = {
0x00010001: [
ServiceAttribute(
x,
DataElement.sequence(
[DataElement.uuid(UUID('E6D55659-C8B4-4B85-96BB-B1143AF6D3AE'))]
),
)
for x in range(100)
]
}
devices.devices[0].sdp_server.service_records.update(records)
# Search for service
async with Client(devices.connections[1], mtu=48) as client:
attributes = await client.search_attributes(
[UUID('E6D55659-C8B4-4B85-96BB-B1143AF6D3AE')], [(0, 0xFFFF)]
)
assert len(attributes) == 1
assert len(attributes[0]) == 100
for i in range(100):
assert attributes[0][i].id == i
# -----------------------------------------------------------------------------
@@ -287,9 +414,12 @@ async def test_client_async_context():
# -----------------------------------------------------------------------------
async def run():
test_data_elements()
await test_service_attribute()
await test_service_attributes()
await test_service_attributes_with_continuation()
await test_service_search()
await test_service_search_with_continuation()
await test_service_search_attribute()
await test_service_search_attribute_with_continuation()
# -----------------------------------------------------------------------------