Compare commits

...

63 Commits

Author SHA1 Message Date
Gilles Boccon-Gibod 3495eb52ba reset parser before raising exception 2025-03-19 11:32:51 -04:00
Gilles Boccon-Gibod 5e55c0e358 add broadcast code encoding 2025-03-17 19:56:02 -04:00
zxzxwu b2d9541f8f Merge pull request #332 from zxzxwu/role
Enumify: PhysicalTransport, Role, AddressType
2025-03-10 00:04:18 +08:00
Josh Wu 637224d5bc Enum: PhysicalTransport, Role, AddressType 2025-03-09 23:34:01 +08:00
Gilles Boccon-Gibod 92ab171013 Merge pull request #659 from pcondoleon/le_scan_interval_fix
Fixed le_scan_interval incorrectly being set with scan_window
2025-02-28 15:04:03 -05:00
Peter Condoleon 592475e2ed Fixed le_scan_interval incorrectly being set with scan_window 2025-02-27 13:54:20 +10:00
Gilles Boccon-Gibod 12bcdb7770 Merge pull request #658 from google/gbg/auracast-doc
add auracast doc
2025-02-25 07:18:38 -08:00
Gilles Boccon-Gibod 7a58f36020 add auracast doc 2025-02-24 09:10:03 -08:00
Gilles Boccon-Gibod ed0eb912c5 Merge pull request #650 from google/gbg/gatt-adapter-typing
new GATT adapter classes with proper typing support
2025-02-23 18:06:16 -08:00
Gilles Boccon-Gibod 752ce6c830 Merge pull request #657 from google/gbg/auracast-iso-data-path-refactor
use bis link API
2025-02-23 07:42:13 -08:00
Gilles Boccon-Gibod 8e509c18c9 remove unused import 2025-02-22 13:34:57 -08:00
Gilles Boccon-Gibod cc21ed27c7 use bis link API 2025-02-22 13:32:58 -08:00
Gilles Boccon-Gibod 82d825071c address PR comments 2025-02-22 12:43:38 -08:00
Gilles Boccon-Gibod b932bafe6d Merge pull request #655 from markusjellitsch/fix/acl_packet_queue
FIX - acl_packet_queue.flush
2025-02-22 11:33:37 -08:00
markus 4e35aba033 fix acl_packet_queue flush when controller does not support HCI_READ_BUFFER_SIZE_COMMAND 2025-02-22 08:37:12 +01:00
zxzxwu 0060ee8ee2 Merge pull request #646 from zxzxwu/ad
Improve AdvertisingData type annotations
2025-02-22 02:52:09 +08:00
zxzxwu 3263d71f54 Merge pull request #654 from zxzxwu/init
Fix mutable default values
2025-02-22 02:51:11 +08:00
Josh Wu f321143837 Improve AdvertisingData type annotations
* Add overloads to provide better return type hints
* Make advertising data type enum so they can be considered constants
2025-02-21 17:12:14 +08:00
Josh Wu bac6f5baaf Fix mutable default values 2025-02-21 16:00:18 +08:00
Gilles Boccon-Gibod e027bcb57a Merge pull request #651 from google/gbg/update-lc3-and-rootcanal
update rootcanal and lc3 dependencies
2025-02-18 14:08:42 -08:00
Gilles Boccon-Gibod eeb9de31ed update dependencies 2025-02-18 12:52:57 -08:00
Gilles Boccon-Gibod 4befc5bbae fix doc strings 2025-02-18 09:50:15 -08:00
Gilles Boccon-Gibod 2c3af5b2bb Merge pull request #649 from google/gbg/async-read-phy
make connection phy async
2025-02-18 07:25:06 -08:00
Gilles Boccon-Gibod dfb92e8ed1 fix pandora connection waiting 2025-02-17 19:50:57 -08:00
Gilles Boccon-Gibod 73d2b54e30 make connection phy async 2025-02-17 19:24:18 -08:00
Gilles Boccon-Gibod 8315a60f24 Merge pull request #648 from pstrueb/fix/auracast_transmit
Add setup_data_path for iso queues
2025-02-17 17:22:47 -08:00
pstruebi 185d5fd577 reformatting 2025-02-17 20:11:17 +01:00
pstrueb ae5f9cf690 Remove little accident 2025-02-17 18:21:16 +01:00
pstrueb 4b66a38fe6 Refractoring again 2025-02-17 09:49:09 +01:00
pstrueb f526f549ee refractoring 2025-02-17 08:51:28 +01:00
Gilles Boccon-Gibod da029a1749 new adapter classes 2025-02-16 16:26:13 -08:00
pstrueb 8761129677 Add setup_data_path for iso queues 2025-02-16 14:38:05 +01:00
Gilles Boccon-Gibod 3f6f036270 Merge pull request #643 from google/gbg/auracast-audio-io
auracast audio io
2025-02-08 18:19:24 -05:00
zxzxwu c02c1f33d2 Merge pull request #642 from zxzxwu/btbench
Add missing permissions in btbench
2025-02-07 04:48:46 +08:00
Josh Wu c08449d9db Add missing permissions in btbench 2025-02-07 03:13:53 +08:00
Gilles Boccon-Gibod 3c8718bb5b Merge pull request #608 from google/gbg/bench-android-enhancements
add startupDelay and connectionPriority params to BtBench snippets
2025-02-06 10:37:55 -05:00
zxzxwu 26d38a855c Merge pull request #641 from zxzxwu/pasync
Receive Periodic Advertising Sync Transfer
2025-02-06 05:18:47 +08:00
Josh Wu 7360a887d9 Receive Periodic Advertising Sync Transfer 2025-02-06 05:12:22 +08:00
Gilles Boccon-Gibod d6100755b1 add bond listener 2025-02-04 17:47:55 -05:00
Gilles Boccon-Gibod a66eef6630 Merge pull request #640 from whitevegagabriel/cleanup
Rust library cleanup
2025-02-04 12:35:37 -05:00
Gabriel White-Vega ae23ef7b9b Rust library cleanup
* Fix error code extraction from Python to Rust
* Add documentation for dealing with HCI packets
2025-02-04 12:23:06 -05:00
Gilles Boccon-Gibod f368b5e518 wip 2025-02-03 18:02:14 -05:00
Gilles Boccon-Gibod 5293d32dc6 fix linter config 2025-02-03 18:02:14 -05:00
Gilles Boccon-Gibod 6d9a0bf4e1 fix linter config 2025-02-03 18:02:14 -05:00
Gilles Boccon-Gibod 3c7b5df7c5 add startupDelay and connectionPriority params to BtBench snippets 2025-02-03 18:00:46 -05:00
zxzxwu dedc0aca54 Merge pull request #639 from zxzxwu/sdp
Correct SDP_ALL_ATTRIBUTES_RANGE value
2025-02-04 00:53:27 +08:00
Gilles Boccon-Gibod 7c019b574f Merge pull request #633 from markusjellitsch/fix/legacy-adv-params
fix advertising parameter usage for legacy advertising
2025-02-03 10:29:52 -05:00
markus 9b485fd943 revert python-avatar.yml 2025-02-03 15:17:22 +01:00
Josh Wu fdee8269ec Correct SDP_ALL_ATTRIBUTES_RANGE value 2025-02-03 21:40:39 +08:00
zxzxwu 0767f2d4ae Merge pull request #638 from zxzxwu/avatar
Update actions/upload-artifact to v4
2025-02-03 21:31:42 +08:00
Josh Wu c4a0846727 Update actions/upload-artifact to v4 2025-02-03 16:41:09 +08:00
zxzxwu 83ac70e426 Merge pull request #619 from zxzxwu/cs
Channel Sounding
2025-02-01 03:46:59 +08:00
markus 01cce3525f update avatar to github actions v4 2025-01-30 23:55:15 +01:00
markus b9d35aea47 revert advertising_interval to type int 2025-01-30 19:47:20 +01:00
zxzxwu 079cf6b896 Merge pull request #624 from zxzxwu/gatt
Support GATT Service
2025-01-28 20:02:43 +08:00
Markus Jellitsch 180655088c run linter 2025-01-27 22:17:31 +01:00
Gilles Boccon-Gibod a1bade6f20 Merge pull request #632 from markusjellitsch/fix/adapt-param-types
Adapt scanning and connection parameters type
2025-01-27 10:46:08 -05:00
Gilles Boccon-Gibod 5d80e7fd80 Merge pull request #634 from jmdietrich-gcx/fix_missing_await_for_update_rpa
Add missing await for update_rpa()
2025-01-27 10:45:42 -05:00
Jan-Marcel Dietrich 2198692961 Add missing await for update_rpa() 2025-01-27 15:14:52 +01:00
Markus Jellitsch 9023407ee4 fix advertising parameters for legacy advertising 2025-01-23 15:14:54 +01:00
Markus Jellitsch 54d961bbe5 adapt scanning and connection parameters type 2025-01-23 14:53:20 +01:00
Josh Wu 745e107849 Channel Sounding device handlers 2025-01-22 23:38:44 +08:00
Josh Wu 1eb9d8d055 Support GATT Service 2025-01-15 02:13:25 +08:00
80 changed files with 4588 additions and 1490 deletions
+2 -2
View File
@@ -44,7 +44,7 @@ jobs:
run: cat rootcanal.log
- name: Upload Mobly logs
if: always()
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: mobly-logs
name: mobly-logs-${{ strategy.job-index }}
path: /tmp/logs/mobly/bumble.bumbles/
+63 -59
View File
@@ -27,7 +27,6 @@ import logging
import os
import struct
from typing import (
cast,
Any,
AsyncGenerator,
Coroutine,
@@ -100,6 +99,25 @@ def codec_config_string(
return '\n'.join(indent + line for line in lines)
def broadcast_code_bytes(broadcast_code: str) -> bytes:
"""
Convert a broadcast code string to a 16-byte value.
If `broadcast_code` is `0x` followed by 32 hex characters, it is interpreted as a
raw 16-byte raw broadcast code in big-endian byte order.
Otherwise, `broadcast_code` is converted to a 16-byte value as specified in
BLUETOOTH CORE SPECIFICATION Version 6.0 | Vol 3, Part C , section 3.2.6.3
"""
if broadcast_code.startswith("0x") and len(broadcast_code) == 34:
return bytes.fromhex(broadcast_code[2:])[::-1]
broadcast_code_utf8 = broadcast_code.encode("utf-8")
if len(broadcast_code_utf8) > 16:
raise ValueError("broadcast code must be <= 16 bytes in utf-8 encoding")
padding = bytes(16 - len(broadcast_code_utf8))
return broadcast_code_utf8 + padding
# -----------------------------------------------------------------------------
# Scan For Broadcasts
# -----------------------------------------------------------------------------
@@ -127,11 +145,9 @@ class BroadcastScanner(pyee.EventEmitter):
def update(self, advertisement: bumble.device.Advertisement) -> None:
self.rssi = advertisement.rssi
for service_data in advertisement.data.get_all(
core.AdvertisingData.SERVICE_DATA
core.AdvertisingData.Type.SERVICE_DATA_16_BIT_UUID
):
assert isinstance(service_data, tuple)
service_uuid, data = service_data
assert isinstance(data, bytes)
if service_uuid == gatt.GATT_PUBLIC_BROADCAST_ANNOUNCEMENT_SERVICE:
self.public_broadcast_announcement = (
@@ -145,16 +161,14 @@ class BroadcastScanner(pyee.EventEmitter):
)
continue
self.appearance = advertisement.data.get( # type: ignore[assignment]
core.AdvertisingData.APPEARANCE
self.appearance = advertisement.data.get(
core.AdvertisingData.Type.APPEARANCE
)
if manufacturer_data := advertisement.data.get(
core.AdvertisingData.MANUFACTURER_SPECIFIC_DATA
core.AdvertisingData.Type.MANUFACTURER_SPECIFIC_DATA
):
assert isinstance(manufacturer_data, tuple)
company_id = cast(int, manufacturer_data[0])
data = cast(bytes, manufacturer_data[1])
company_id, data = manufacturer_data
self.manufacturer_data = (
company_ids.COMPANY_IDENTIFIERS.get(
company_id, f'0x{company_id:04X}'
@@ -239,22 +253,14 @@ class BroadcastScanner(pyee.EventEmitter):
if self.biginfo:
print(color(' BIG:', 'cyan'))
print(
color(' Number of BIS:', 'magenta'),
self.biginfo.num_bis,
)
print(
color(' PHY: ', 'magenta'),
self.biginfo.phy.name,
)
print(
color(' Framed: ', 'magenta'),
self.biginfo.framed,
)
print(
color(' Encrypted: ', 'magenta'),
self.biginfo.encrypted,
)
print(color(' Number of BIS:', 'magenta'), self.biginfo.num_bis)
print(color(' ISO Interval: ', 'magenta'), self.biginfo.iso_interval)
print(color(' Max PDU: ', 'magenta'), self.biginfo.max_pdu)
print(color(' SDU Interval: ', 'magenta'), self.biginfo.sdu_interval)
print(color(' Max SDU: ', 'magenta'), self.biginfo.max_sdu)
print(color(' PHY: ', 'magenta'), self.biginfo.phy.name)
print(color(' Framed: ', 'magenta'), self.biginfo.framed)
print(color(' Encrypted: ', 'magenta'), self.biginfo.encrypted)
def on_sync_establishment(self) -> None:
self.emit('sync_establishment')
@@ -271,11 +277,9 @@ class BroadcastScanner(pyee.EventEmitter):
return
for service_data in advertisement.data.get_all(
core.AdvertisingData.SERVICE_DATA
core.AdvertisingData.Type.SERVICE_DATA_16_BIT_UUID
):
assert isinstance(service_data, tuple)
service_uuid, data = service_data
assert isinstance(data, bytes)
if service_uuid == gatt.GATT_BASIC_AUDIO_ANNOUNCEMENT_SERVICE:
self.basic_audio_announcement = (
@@ -316,24 +320,23 @@ class BroadcastScanner(pyee.EventEmitter):
def on_advertisement(self, advertisement: bumble.device.Advertisement) -> None:
if not (
ads := advertisement.data.get_all(
core.AdvertisingData.SERVICE_DATA_16_BIT_UUID
core.AdvertisingData.Type.SERVICE_DATA_16_BIT_UUID
)
) or not (
broadcast_audio_announcement := next(
(
ad
for ad in ads
if isinstance(ad, tuple)
and ad[0] == gatt.GATT_BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE
if ad[0] == gatt.GATT_BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE
),
None,
)
):
return
broadcast_name = advertisement.data.get(core.AdvertisingData.BROADCAST_NAME)
assert isinstance(broadcast_name, str) or broadcast_name is None
assert isinstance(broadcast_audio_announcement[1], bytes)
broadcast_name = advertisement.data.get_all(
core.AdvertisingData.Type.BROADCAST_NAME
)
if broadcast := self.broadcasts.get(advertisement.address):
broadcast.update(advertisement)
@@ -341,7 +344,7 @@ class BroadcastScanner(pyee.EventEmitter):
bumble.utils.AsyncRunner.spawn(
self.on_new_broadcast(
broadcast_name,
broadcast_name[0] if broadcast_name else None,
advertisement,
bap.BroadcastAudioAnnouncement.from_bytes(
broadcast_audio_announcement[1]
@@ -522,14 +525,19 @@ async def run_assist(
return
# Subscribe to and read the broadcast receive state characteristics
def on_broadcast_receive_state_update(
value: bass.BroadcastReceiveState, index: int
) -> None:
print(
f"{color(f'Broadcast Receive State Update [{index}]:', 'green')} {value}"
)
for i, broadcast_receive_state in enumerate(
bass_client.broadcast_receive_states
):
try:
await broadcast_receive_state.subscribe(
lambda value, i=i: print(
f"{color(f'Broadcast Receive State Update [{i}]:', 'green')} {value}"
)
functools.partial(on_broadcast_receive_state_update, index=i)
)
except core.ProtocolError as error:
print(
@@ -705,14 +713,13 @@ async def run_receive(
def on_change() -> None:
if (
broadcast.basic_audio_announcement
and not basic_audio_announcement_scanned.is_set()
):
broadcast.basic_audio_announcement and broadcast.biginfo
) and not basic_audio_announcement_scanned.is_set():
basic_audio_announcement_scanned.set()
broadcast.on('change', on_change)
if not broadcast.basic_audio_announcement:
print('Wait for Basic Audio Announcement...')
if not broadcast.basic_audio_announcement or not broadcast.biginfo:
print('Wait for Basic Audio Announcement and BIG Info...')
await basic_audio_announcement_scanned.wait()
print('Basic Audio Announcement found')
broadcast.print()
@@ -733,7 +740,7 @@ async def run_receive(
big_sync_timeout=0x4000,
bis=[bis.index for bis in subgroup.bis],
broadcast_code=(
bytes.fromhex(broadcast_code) if broadcast_code else None
broadcast_code_bytes(broadcast_code) if broadcast_code else None
),
),
)
@@ -790,16 +797,8 @@ async def run_receive(
for i, bis_link in enumerate(big_sync.bis_links):
print(f'Setup ISO for BIS {bis_link.handle}')
bis_link.sink = functools.partial(sink, lc3_queues[i])
await device.send_command(
hci.HCI_LE_Setup_ISO_Data_Path_Command(
connection_handle=bis_link.handle,
data_path_direction=hci.HCI_LE_Setup_ISO_Data_Path_Command.Direction.CONTROLLER_TO_HOST,
data_path_id=0,
codec_id=hci.CodingFormat(codec_id=hci.CodecID.TRANSPARENT),
controller_delay=0,
codec_configuration=b'',
),
check_result=True,
await bis_link.setup_data_path(
direction=bis_link.Direction.CONTROLLER_TO_HOST
)
terminated = asyncio.Event()
@@ -955,14 +954,19 @@ async def run_transmit(
max_transport_latency=65,
rtn=4,
broadcast_code=(
bytes.fromhex(broadcast_code) if broadcast_code else None
broadcast_code_bytes(broadcast_code) if broadcast_code else None
),
),
)
for bis_link in big.bis_links:
print(f'Setup ISO for BIS {bis_link.handle}')
await bis_link.setup_data_path(
direction=bis_link.Direction.HOST_TO_CONTROLLER
)
iso_queues = [
bumble.device.IsoPacketStream(big.bis_links[0], 64),
bumble.device.IsoPacketStream(big.bis_links[1], 64),
bumble.device.IsoPacketStream(bis_link, 64)
for bis_link in big.bis_links
]
def on_flow():
@@ -1094,7 +1098,7 @@ def pair(ctx, transport, address):
'--broadcast-code',
metavar='BROADCAST_CODE',
type=str,
help='Broadcast encryption code in hex format',
help='Broadcast encryption code (string or raw hex format prefixed with 0x)',
)
@click.option(
'--sync-timeout',
+237 -156
View File
@@ -16,6 +16,7 @@
# Imports
# -----------------------------------------------------------------------------
import asyncio
import dataclasses
import enum
import logging
import os
@@ -97,49 +98,22 @@ DEFAULT_RFCOMM_MTU = 2048
# -----------------------------------------------------------------------------
# Utils
# -----------------------------------------------------------------------------
def parse_packet(packet):
if len(packet) < 1:
logging.info(
color(f'!!! Packet too short (got {len(packet)} bytes, need >= 1)', 'red')
)
raise ValueError('packet too short')
try:
packet_type = PacketType(packet[0])
except ValueError:
logging.info(color(f'!!! Invalid packet type 0x{packet[0]:02X}', 'red'))
raise
return (packet_type, packet[1:])
def parse_packet_sequence(packet_data):
if len(packet_data) < 5:
logging.info(
color(
f'!!!Packet too short (got {len(packet_data)} bytes, need >= 5)',
'red',
)
)
raise ValueError('packet too short')
return struct.unpack_from('>bI', packet_data, 0)
def le_phy_name(phy_id):
return {HCI_LE_1M_PHY: '1M', HCI_LE_2M_PHY: '2M', HCI_LE_CODED_PHY: 'CODED'}.get(
phy_id, HCI_Constant.le_phy_name(phy_id)
)
def print_connection_phy(phy):
logging.info(
color('@@@ PHY: ', 'yellow') + f'TX:{le_phy_name(phy.tx_phy)}/'
f'RX:{le_phy_name(phy.rx_phy)}'
)
def print_connection(connection):
params = []
if connection.transport == BT_LE_TRANSPORT:
params.append(
'PHY='
f'TX:{le_phy_name(connection.phy.tx_phy)}/'
f'RX:{le_phy_name(connection.phy.rx_phy)}'
)
params.append(
'DL=('
f'TX:{connection.data_length[0]}/{connection.data_length[1]},'
@@ -225,13 +199,135 @@ async def switch_roles(connection, role):
logging.info(f'{color("### Role switch failed:", "red")} {error}')
class PacketType(enum.IntEnum):
RESET = 0
SEQUENCE = 1
ACK = 2
# -----------------------------------------------------------------------------
# Packet
# -----------------------------------------------------------------------------
@dataclasses.dataclass
class Packet:
class PacketType(enum.IntEnum):
RESET = 0
SEQUENCE = 1
ACK = 2
class PacketFlags(enum.IntFlag):
LAST = 1
packet_type: PacketType
flags: PacketFlags = PacketFlags(0)
sequence: int = 0
timestamp: int = 0
payload: bytes = b""
@classmethod
def from_bytes(cls, data: bytes):
if len(data) < 1:
logging.warning(
color(f'!!! Packet too short (got {len(data)} bytes, need >= 1)', 'red')
)
raise ValueError('packet too short')
try:
packet_type = cls.PacketType(data[0])
except ValueError:
logging.warning(color(f'!!! Invalid packet type 0x{data[0]:02X}', 'red'))
raise
if packet_type == cls.PacketType.RESET:
return cls(packet_type)
flags = cls.PacketFlags(data[1])
(sequence,) = struct.unpack_from("<I", data, 2)
if packet_type == cls.PacketType.ACK:
if len(data) < 6:
logging.warning(
color(
f'!!! Packet too short (got {len(data)} bytes, need >= 6)',
'red',
)
)
return cls(packet_type, flags, sequence)
if len(data) < 10:
logging.warning(
color(
f'!!! Packet too short (got {len(data)} bytes, need >= 10)', 'red'
)
)
raise ValueError('packet too short')
(timestamp,) = struct.unpack_from("<I", data, 6)
return cls(packet_type, flags, sequence, timestamp, data[10:])
def __bytes__(self):
if self.packet_type == self.PacketType.RESET:
return bytes([self.packet_type])
if self.packet_type == self.PacketType.ACK:
return struct.pack("<BBI", self.packet_type, self.flags, self.sequence)
return (
struct.pack(
"<BBII", self.packet_type, self.flags, self.sequence, self.timestamp
)
+ self.payload
)
PACKET_FLAG_LAST = 1
# -----------------------------------------------------------------------------
# Jitter Stats
# -----------------------------------------------------------------------------
class JitterStats:
def __init__(self):
self.reset()
def reset(self):
self.packets = []
self.receive_times = []
self.jitter = []
def on_packet_received(self, packet):
now = time.time()
self.packets.append(packet)
self.receive_times.append(now)
if packet.timestamp and len(self.packets) > 1:
expected_time = (
self.receive_times[0]
+ (packet.timestamp - self.packets[0].timestamp) / 1000000
)
jitter = now - expected_time
else:
jitter = 0.0
self.jitter.append(jitter)
return jitter
def show_stats(self):
if len(self.jitter) < 3:
return
average = sum(self.jitter) / len(self.jitter)
adjusted = [jitter - average for jitter in self.jitter]
log_stats('Jitter (signed)', adjusted, 3)
log_stats('Jitter (absolute)', [abs(jitter) for jitter in adjusted], 3)
# Show a histogram
bin_count = 20
bins = [0] * bin_count
interval_min = min(adjusted)
interval_max = max(adjusted)
interval_range = interval_max - interval_min
bin_thresholds = [
interval_min + i * (interval_range / bin_count) for i in range(bin_count)
]
for jitter in adjusted:
for i in reversed(range(bin_count)):
if jitter >= bin_thresholds[i]:
bins[i] += 1
break
for i in range(bin_count):
logging.info(f'@@@ >= {bin_thresholds[i]:.4f}: {bins[i]}')
# -----------------------------------------------------------------------------
@@ -281,19 +377,37 @@ class Sender:
await asyncio.sleep(self.tx_start_delay)
logging.info(color('=== Sending RESET', 'magenta'))
await self.packet_io.send_packet(bytes([PacketType.RESET]))
await self.packet_io.send_packet(
bytes(Packet(packet_type=Packet.PacketType.RESET))
)
self.start_time = time.time()
self.bytes_sent = 0
for tx_i in range(self.tx_packet_count):
packet_flags = (
PACKET_FLAG_LAST if tx_i == self.tx_packet_count - 1 else 0
if self.pace > 0:
# Wait until it is time to send the next packet
target_time = self.start_time + (tx_i * self.pace / 1000)
now = time.time()
if now < target_time:
await asyncio.sleep(target_time - now)
else:
await self.packet_io.drain()
packet = bytes(
Packet(
packet_type=Packet.PacketType.SEQUENCE,
flags=(
Packet.PacketFlags.LAST
if tx_i == self.tx_packet_count - 1
else 0
),
sequence=tx_i,
timestamp=int((time.time() - self.start_time) * 1000000),
payload=bytes(
self.tx_packet_size - 10 - self.packet_io.overhead_size
),
)
)
packet = struct.pack(
'>bbI',
PacketType.SEQUENCE,
packet_flags,
tx_i,
) + bytes(self.tx_packet_size - 6 - self.packet_io.overhead_size)
logging.info(
color(
f'Sending packet {tx_i}: {self.tx_packet_size} bytes', 'yellow'
@@ -302,14 +416,6 @@ class Sender:
self.bytes_sent += len(packet)
await self.packet_io.send_packet(packet)
if self.pace is None:
continue
if self.pace > 0:
await asyncio.sleep(self.pace / 1000)
else:
await self.packet_io.drain()
await self.done.wait()
run_counter = f'[{run + 1} of {self.repeat + 1}]' if self.repeat else ''
@@ -321,13 +427,13 @@ class Sender:
if self.repeat:
logging.info(color('--- End of runs', 'blue'))
def on_packet_received(self, packet):
def on_packet_received(self, data):
try:
packet_type, _ = parse_packet(packet)
packet = Packet.from_bytes(data)
except ValueError:
return
if packet_type == PacketType.ACK:
if packet.packet_type == Packet.PacketType.ACK:
elapsed = time.time() - self.start_time
average_tx_speed = self.bytes_sent / elapsed
self.stats.append(average_tx_speed)
@@ -350,52 +456,53 @@ class Receiver:
last_timestamp: float
def __init__(self, packet_io, linger):
self.reset()
self.jitter_stats = JitterStats()
self.packet_io = packet_io
self.packet_io.packet_listener = self
self.linger = linger
self.done = asyncio.Event()
self.reset()
def reset(self):
self.expected_packet_index = 0
self.measurements = [(time.time(), 0)]
self.total_bytes_received = 0
self.jitter_stats.reset()
def on_packet_received(self, packet):
def on_packet_received(self, data):
try:
packet_type, packet_data = parse_packet(packet)
packet = Packet.from_bytes(data)
except ValueError:
logging.exception("invalid packet")
return
if packet_type == PacketType.RESET:
if packet.packet_type == Packet.PacketType.RESET:
logging.info(color('=== Received RESET', 'magenta'))
self.reset()
return
try:
packet_flags, packet_index = parse_packet_sequence(packet_data)
except ValueError:
return
jitter = self.jitter_stats.on_packet_received(packet)
logging.info(
f'<<< Received packet {packet_index}: '
f'flags=0x{packet_flags:02X}, '
f'{len(packet) + self.packet_io.overhead_size} bytes'
f'<<< Received packet {packet.sequence}: '
f'flags={packet.flags}, '
f'jitter={jitter:.4f}, '
f'{len(data) + self.packet_io.overhead_size} bytes',
)
if packet_index != self.expected_packet_index:
if packet.sequence != self.expected_packet_index:
logging.info(
color(
f'!!! Unexpected packet, expected {self.expected_packet_index} '
f'but received {packet_index}'
f'but received {packet.sequence}'
)
)
now = time.time()
elapsed_since_start = now - self.measurements[0][0]
elapsed_since_last = now - self.measurements[-1][0]
self.measurements.append((now, len(packet)))
self.total_bytes_received += len(packet)
instant_rx_speed = len(packet) / elapsed_since_last
self.measurements.append((now, len(data)))
self.total_bytes_received += len(data)
instant_rx_speed = len(data) / elapsed_since_last
average_rx_speed = self.total_bytes_received / elapsed_since_start
window = self.measurements[-64:]
windowed_rx_speed = sum(measurement[1] for measurement in window[1:]) / (
@@ -411,15 +518,17 @@ class Receiver:
)
)
self.expected_packet_index = packet_index + 1
self.expected_packet_index = packet.sequence + 1
if packet_flags & PACKET_FLAG_LAST:
if packet.flags & Packet.PacketFlags.LAST:
AsyncRunner.spawn(
self.packet_io.send_packet(
struct.pack('>bbI', PacketType.ACK, packet_flags, packet_index)
bytes(Packet(Packet.PacketType.ACK, packet.flags, packet.sequence))
)
)
logging.info(color('@@@ Received last packet', 'green'))
self.jitter_stats.show_stats()
if not self.linger:
self.done.set()
@@ -479,25 +588,32 @@ class Ping:
await asyncio.sleep(self.tx_start_delay)
logging.info(color('=== Sending RESET', 'magenta'))
await self.packet_io.send_packet(bytes([PacketType.RESET]))
await self.packet_io.send_packet(bytes(Packet(Packet.PacketType.RESET)))
packet_interval = self.pace / 1000
start_time = time.time()
self.next_expected_packet_index = 0
for i in range(self.tx_packet_count):
target_time = start_time + (i * packet_interval)
target_time = start_time + (i * self.pace / 1000)
now = time.time()
if now < target_time:
await asyncio.sleep(target_time - now)
now = time.time()
packet = struct.pack(
'>bbI',
PacketType.SEQUENCE,
(PACKET_FLAG_LAST if i == self.tx_packet_count - 1 else 0),
i,
) + bytes(self.tx_packet_size - 6)
packet = bytes(
Packet(
packet_type=Packet.PacketType.SEQUENCE,
flags=(
Packet.PacketFlags.LAST
if i == self.tx_packet_count - 1
else 0
),
sequence=i,
timestamp=int((now - start_time) * 1000000),
payload=bytes(self.tx_packet_size - 10),
)
)
logging.info(color(f'Sending packet {i}', 'yellow'))
self.ping_times.append(time.time())
self.ping_times.append(now)
await self.packet_io.send_packet(packet)
await self.done.wait()
@@ -531,40 +647,35 @@ class Ping:
if self.repeat:
logging.info(color('--- End of runs', 'blue'))
def on_packet_received(self, packet):
def on_packet_received(self, data):
try:
packet_type, packet_data = parse_packet(packet)
packet = Packet.from_bytes(data)
except ValueError:
return
try:
packet_flags, packet_index = parse_packet_sequence(packet_data)
except ValueError:
return
if packet_type == PacketType.ACK:
elapsed = time.time() - self.ping_times[packet_index]
if packet.packet_type == Packet.PacketType.ACK:
elapsed = time.time() - self.ping_times[packet.sequence]
rtt = elapsed * 1000
self.rtts.append(rtt)
logging.info(
color(
f'<<< Received ACK [{packet_index}], RTT={rtt:.2f}ms',
f'<<< Received ACK [{packet.sequence}], RTT={rtt:.2f}ms',
'green',
)
)
if packet_index == self.next_expected_packet_index:
if packet.sequence == self.next_expected_packet_index:
self.next_expected_packet_index += 1
else:
logging.info(
color(
f'!!! Unexpected packet, '
f'expected {self.next_expected_packet_index} '
f'but received {packet_index}'
f'but received {packet.sequence}'
)
)
if packet_flags & PACKET_FLAG_LAST:
if packet.flags & Packet.PacketFlags.LAST:
self.done.set()
return
@@ -576,89 +687,56 @@ class Pong:
expected_packet_index: int
def __init__(self, packet_io, linger):
self.reset()
self.jitter_stats = JitterStats()
self.packet_io = packet_io
self.packet_io.packet_listener = self
self.linger = linger
self.done = asyncio.Event()
self.reset()
def reset(self):
self.expected_packet_index = 0
self.receive_times = []
def on_packet_received(self, packet):
self.receive_times.append(time.time())
self.jitter_stats.reset()
def on_packet_received(self, data):
try:
packet_type, packet_data = parse_packet(packet)
packet = Packet.from_bytes(data)
except ValueError:
return
if packet_type == PacketType.RESET:
if packet.packet_type == Packet.PacketType.RESET:
logging.info(color('=== Received RESET', 'magenta'))
self.reset()
return
try:
packet_flags, packet_index = parse_packet_sequence(packet_data)
except ValueError:
return
interval = (
self.receive_times[-1] - self.receive_times[-2]
if len(self.receive_times) >= 2
else 0
)
jitter = self.jitter_stats.on_packet_received(packet)
logging.info(
color(
f'<<< Received packet {packet_index}: '
f'flags=0x{packet_flags:02X}, {len(packet)} bytes, '
f'interval={interval:.4f}',
f'<<< Received packet {packet.sequence}: '
f'flags={packet.flags}, {len(data)} bytes, '
f'jitter={jitter:.4f}',
'green',
)
)
if packet_index != self.expected_packet_index:
if packet.sequence != self.expected_packet_index:
logging.info(
color(
f'!!! Unexpected packet, expected {self.expected_packet_index} '
f'but received {packet_index}'
f'but received {packet.sequence}'
)
)
self.expected_packet_index = packet_index + 1
self.expected_packet_index = packet.sequence + 1
AsyncRunner.spawn(
self.packet_io.send_packet(
struct.pack('>bbI', PacketType.ACK, packet_flags, packet_index)
bytes(Packet(Packet.PacketType.ACK, packet.flags, packet.sequence))
)
)
if packet_flags & PACKET_FLAG_LAST:
if len(self.receive_times) >= 3:
# Show basic stats
intervals = [
self.receive_times[i + 1] - self.receive_times[i]
for i in range(len(self.receive_times) - 1)
]
log_stats('Packet intervals', intervals, 3)
# Show a histogram
bin_count = 20
bins = [0] * bin_count
interval_min = min(intervals)
interval_max = max(intervals)
interval_range = interval_max - interval_min
bin_thresholds = [
interval_min + i * (interval_range / bin_count)
for i in range(bin_count)
]
for interval in intervals:
for i in reversed(range(bin_count)):
if interval >= bin_thresholds[i]:
bins[i] += 1
break
for i in range(bin_count):
logging.info(f'@@@ >= {bin_thresholds[i]:.4f}: {bins[i]}')
if packet.flags & Packet.PacketFlags.LAST:
self.jitter_stats.show_stats()
if not self.linger:
self.done.set()
@@ -1211,6 +1289,8 @@ class Central(Connection.Listener):
logging.info(color('### Connected', 'cyan'))
self.connection.listener = self
print_connection(self.connection)
phy = await self.connection.get_phy()
print_connection_phy(phy)
# Switch roles if needed.
if self.role_switch:
@@ -1268,8 +1348,8 @@ class Central(Connection.Listener):
def on_connection_parameters_update(self):
print_connection(self.connection)
def on_connection_phy_update(self):
print_connection(self.connection)
def on_connection_phy_update(self, phy):
print_connection_phy(phy)
def on_connection_att_mtu_update(self):
print_connection(self.connection)
@@ -1395,8 +1475,8 @@ class Peripheral(Device.Listener, Connection.Listener):
def on_connection_parameters_update(self):
print_connection(self.connection)
def on_connection_phy_update(self):
print_connection(self.connection)
def on_connection_phy_update(self, phy):
print_connection_phy(phy)
def on_connection_att_mtu_update(self):
print_connection(self.connection)
@@ -1471,7 +1551,7 @@ def create_mode_factory(ctx, default_mode):
def create_scenario_factory(ctx, default_scenario):
scenario = ctx.obj['scenario']
if scenario is None:
scenarion = default_scenario
scenario = default_scenario
def create_scenario(packet_io):
if scenario == 'send':
@@ -1530,6 +1610,7 @@ def create_scenario_factory(ctx, default_scenario):
'--att-mtu',
metavar='MTU',
type=click.IntRange(23, 517),
default=517,
help='GATT MTU (gatt-client mode)',
)
@click.option(
@@ -1605,7 +1686,7 @@ def create_scenario_factory(ctx, default_scenario):
'--packet-size',
'-s',
metavar='SIZE',
type=click.IntRange(8, 8192),
type=click.IntRange(10, 8192),
default=500,
help='Packet size (send or ping scenario)',
)
+19 -12
View File
@@ -22,7 +22,6 @@
import asyncio
import logging
import os
import random
import re
import humanize
from typing import Optional, Union
@@ -57,7 +56,13 @@ from bumble import __version__
import bumble.core
from bumble import colors
from bumble.core import UUID, AdvertisingData, BT_LE_TRANSPORT
from bumble.device import ConnectionParametersPreferences, Device, Connection, Peer
from bumble.device import (
ConnectionParametersPreferences,
ConnectionPHY,
Device,
Connection,
Peer,
)
from bumble.utils import AsyncRunner
from bumble.transport import open_transport_or_link
from bumble.gatt import Characteristic, Service, CharacteristicDeclaration, Descriptor
@@ -125,6 +130,7 @@ def parse_phys(phys):
# -----------------------------------------------------------------------------
class ConsoleApp:
connected_peer: Optional[Peer]
connection_phy: Optional[ConnectionPHY]
def __init__(self):
self.known_addresses = set()
@@ -132,6 +138,7 @@ class ConsoleApp:
self.known_local_attributes = []
self.device = None
self.connected_peer = None
self.connection_phy = None
self.top_tab = 'device'
self.monitor_rssi = False
self.connection_rssi = None
@@ -332,10 +339,10 @@ class ConsoleApp:
f'{connection.parameters.peripheral_latency}/'
f'{connection.parameters.supervision_timeout}'
)
if connection.transport == BT_LE_TRANSPORT:
if self.connection_phy is not None:
phy_state = (
f' RX={le_phy_name(connection.phy.rx_phy)}/'
f'TX={le_phy_name(connection.phy.tx_phy)}'
f' RX={le_phy_name(self.connection_phy.rx_phy)}/'
f'TX={le_phy_name(self.connection_phy.tx_phy)}'
)
else:
phy_state = ''
@@ -654,11 +661,12 @@ class ConsoleApp:
self.append_to_output('connecting...')
try:
await self.device.connect(
connection = await self.device.connect(
params[0],
connection_parameters_preferences=connection_parameters_preferences,
timeout=DEFAULT_CONNECTION_TIMEOUT,
)
self.connection_phy = await connection.get_phy()
self.top_tab = 'services'
except bumble.core.TimeoutError:
self.show_error('connection timed out')
@@ -838,8 +846,8 @@ class ConsoleApp:
phy = await self.connected_peer.connection.get_phy()
self.append_to_output(
f'PHY: RX={HCI_Constant.le_phy_name(phy[0])}, '
f'TX={HCI_Constant.le_phy_name(phy[1])}'
f'PHY: RX={HCI_Constant.le_phy_name(phy.rx_phy)}, '
f'TX={HCI_Constant.le_phy_name(phy.tx_phy)}'
)
async def do_request_mtu(self, params):
@@ -1076,10 +1084,9 @@ class DeviceListener(Device.Listener, Connection.Listener):
f'{self.app.connected_peer.connection.parameters}'
)
def on_connection_phy_update(self):
self.app.append_to_output(
f'connection phy update: {self.app.connected_peer.connection.phy}'
)
def on_connection_phy_update(self, phy):
self.app.connection_phy = phy
self.app.append_to_output(f'connection phy update: {phy}')
def on_connection_att_mtu_update(self):
self.app.append_to_output(
+1 -1
View File
@@ -234,7 +234,7 @@ class GattlinkNodeBridge(GattlinkL2capEndpoint, Device.Listener):
Characteristic.WRITEABLE,
CharacteristicValue(write=self.on_rx_write),
)
self.tx_characteristic = Characteristic(
self.tx_characteristic: Characteristic[bytes] = Characteristic(
GG_GATTLINK_TX_CHARACTERISTIC_UUID,
Characteristic.Properties.NOTIFY,
Characteristic.READABLE,
+51 -37
View File
@@ -29,13 +29,14 @@ import functools
import inspect
import struct
from typing import (
Any,
Awaitable,
Callable,
Generic,
Dict,
List,
Optional,
Type,
TypeVar,
Union,
TYPE_CHECKING,
)
@@ -43,13 +44,18 @@ from typing import (
from pyee import EventEmitter
from bumble import utils
from bumble.core import UUID, name_or_number, ProtocolError
from bumble.core import UUID, name_or_number, InvalidOperationError, ProtocolError
from bumble.hci import HCI_Object, key_with_value
from bumble.colors import color
# -----------------------------------------------------------------------------
# Typing
# -----------------------------------------------------------------------------
if TYPE_CHECKING:
from bumble.device import Connection
_T = TypeVar('_T')
# -----------------------------------------------------------------------------
# Constants
# -----------------------------------------------------------------------------
@@ -748,7 +754,7 @@ class ATT_Handle_Value_Confirmation(ATT_PDU):
# -----------------------------------------------------------------------------
class AttributeValue:
class AttributeValue(Generic[_T]):
'''
Attribute value where reading and/or writing is delegated to functions
passed as arguments to the constructor.
@@ -757,33 +763,34 @@ class AttributeValue:
def __init__(
self,
read: Union[
Callable[[Optional[Connection]], Any],
Callable[[Optional[Connection]], Awaitable[Any]],
Callable[[Optional[Connection]], _T],
Callable[[Optional[Connection]], Awaitable[_T]],
None,
] = None,
write: Union[
Callable[[Optional[Connection], Any], None],
Callable[[Optional[Connection], Any], Awaitable[None]],
Callable[[Optional[Connection], _T], None],
Callable[[Optional[Connection], _T], Awaitable[None]],
None,
] = None,
):
self._read = read
self._write = write
def read(self, connection: Optional[Connection]) -> Union[bytes, Awaitable[bytes]]:
return self._read(connection) if self._read else b''
def read(self, connection: Optional[Connection]) -> Union[_T, Awaitable[_T]]:
if self._read is None:
raise InvalidOperationError('AttributeValue has no read function')
return self._read(connection)
def write(
self, connection: Optional[Connection], value: bytes
self, connection: Optional[Connection], value: _T
) -> Union[Awaitable[None], None]:
if self._write:
return self._write(connection, value)
return None
if self._write is None:
raise InvalidOperationError('AttributeValue has no write function')
return self._write(connection, value)
# -----------------------------------------------------------------------------
class Attribute(EventEmitter):
class Attribute(EventEmitter, Generic[_T]):
class Permissions(enum.IntFlag):
READABLE = 0x01
WRITEABLE = 0x02
@@ -822,13 +829,13 @@ class Attribute(EventEmitter):
READ_REQUIRES_AUTHORIZATION = Permissions.READ_REQUIRES_AUTHORIZATION
WRITE_REQUIRES_AUTHORIZATION = Permissions.WRITE_REQUIRES_AUTHORIZATION
value: Any
value: Union[AttributeValue[_T], _T, None]
def __init__(
self,
attribute_type: Union[str, bytes, UUID],
permissions: Union[str, Attribute.Permissions],
value: Any = b'',
value: Union[AttributeValue[_T], _T, None] = None,
) -> None:
EventEmitter.__init__(self)
self.handle = 0
@@ -848,11 +855,11 @@ class Attribute(EventEmitter):
self.value = value
def encode_value(self, value: Any) -> bytes:
return value
def encode_value(self, value: _T) -> bytes:
return value # type: ignore
def decode_value(self, value_bytes: bytes) -> Any:
return value_bytes
def decode_value(self, value: bytes) -> _T:
return value # type: ignore
async def read_value(self, connection: Optional[Connection]) -> bytes:
if (
@@ -877,11 +884,14 @@ class Attribute(EventEmitter):
error_code=ATT_INSUFFICIENT_AUTHORIZATION_ERROR, att_handle=self.handle
)
if hasattr(self.value, 'read'):
value: Union[_T, None]
if isinstance(self.value, AttributeValue):
try:
value = self.value.read(connection)
if inspect.isawaitable(value):
value = await value
read_value = self.value.read(connection)
if inspect.isawaitable(read_value):
value = await read_value
else:
value = read_value
except ATT_Error as error:
raise ATT_Error(
error_code=error.error_code, att_handle=self.handle
@@ -889,20 +899,24 @@ class Attribute(EventEmitter):
else:
value = self.value
self.emit('read', connection, value)
self.emit('read', connection, b'' if value is None else value)
return self.encode_value(value)
return b'' if value is None else self.encode_value(value)
async def write_value(self, connection: Connection, value_bytes: bytes) -> None:
async def write_value(self, connection: Optional[Connection], value: bytes) -> None:
if (
self.permissions & self.WRITE_REQUIRES_ENCRYPTION
) and not connection.encryption:
(self.permissions & self.WRITE_REQUIRES_ENCRYPTION)
and connection is not None
and not connection.encryption
):
raise ATT_Error(
error_code=ATT_INSUFFICIENT_ENCRYPTION_ERROR, att_handle=self.handle
)
if (
self.permissions & self.WRITE_REQUIRES_AUTHENTICATION
) and not connection.authenticated:
(self.permissions & self.WRITE_REQUIRES_AUTHENTICATION)
and connection is not None
and not connection.authenticated
):
raise ATT_Error(
error_code=ATT_INSUFFICIENT_AUTHENTICATION_ERROR, att_handle=self.handle
)
@@ -912,11 +926,11 @@ class Attribute(EventEmitter):
error_code=ATT_INSUFFICIENT_AUTHORIZATION_ERROR, att_handle=self.handle
)
value = self.decode_value(value_bytes)
decoded_value = self.decode_value(value)
if hasattr(self.value, 'write'):
if isinstance(self.value, AttributeValue):
try:
result = self.value.write(connection, value)
result = self.value.write(connection, decoded_value)
if inspect.isawaitable(result):
await result
except ATT_Error as error:
@@ -924,9 +938,9 @@ class Attribute(EventEmitter):
error_code=error.error_code, att_handle=self.handle
) from error
else:
self.value = value
self.value = decoded_value
self.emit('write', connection, value)
self.emit('write', connection, decoded_value)
def __repr__(self):
if isinstance(self.value, bytes):
+7 -8
View File
@@ -25,8 +25,6 @@ import random
import struct
from bumble.colors import color
from bumble.core import (
BT_CENTRAL_ROLE,
BT_PERIPHERAL_ROLE,
BT_LE_TRANSPORT,
BT_BR_EDR_TRANSPORT,
)
@@ -47,6 +45,7 @@ from bumble.hci import (
HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR,
HCI_VERSION_BLUETOOTH_CORE_5_0,
Address,
Role,
HCI_AclDataPacket,
HCI_AclDataPacketAssembler,
HCI_Command_Complete_Event,
@@ -98,7 +97,7 @@ class CisLink:
class Connection:
controller: Controller
handle: int
role: int
role: Role
peer_address: Address
link: Any
transport: int
@@ -390,7 +389,7 @@ class Controller:
connection = Connection(
controller=self,
handle=connection_handle,
role=BT_PERIPHERAL_ROLE,
role=Role.PERIPHERAL,
peer_address=peer_address,
link=self.link,
transport=BT_LE_TRANSPORT,
@@ -450,7 +449,7 @@ class Controller:
connection = Connection(
controller=self,
handle=connection_handle,
role=BT_CENTRAL_ROLE,
role=Role.CENTRAL,
peer_address=peer_address,
link=self.link,
transport=BT_LE_TRANSPORT,
@@ -469,7 +468,7 @@ class Controller:
HCI_LE_Connection_Complete_Event(
status=status,
connection_handle=connection.handle if connection else 0,
role=BT_CENTRAL_ROLE,
role=Role.CENTRAL,
peer_address_type=le_create_connection_command.peer_address_type,
peer_address=le_create_connection_command.peer_address,
connection_interval=le_create_connection_command.connection_interval_min,
@@ -693,7 +692,7 @@ class Controller:
controller=self,
handle=connection_handle,
# Role doesn't matter in Classic because they are managed by HCI_Role_Change and HCI_Role_Discovery
role=BT_CENTRAL_ROLE,
role=Role.CENTRAL,
peer_address=peer_address,
link=self.link,
transport=BT_BR_EDR_TRANSPORT,
@@ -761,7 +760,7 @@ class Controller:
controller=self,
handle=connection_handle,
# Role doesn't matter in SCO.
role=BT_CENTRAL_ROLE,
role=Role.CENTRAL,
peer_address=peer_address,
link=self.link,
transport=BT_BR_EDR_TRANSPORT,
+305 -158
View File
@@ -1,4 +1,4 @@
# Copyright 2021-2022 Google LLC
# Copyright 2021-2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -16,10 +16,10 @@
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import dataclasses
import enum
import struct
from typing import List, Optional, Tuple, Union, cast, Dict
from typing import cast, overload, Literal, Union, Optional
from typing_extensions import Self
from bumble.company_ids import COMPANY_IDENTIFIERS
@@ -31,11 +31,12 @@ from bumble.utils import OpenIntEnum
# -----------------------------------------------------------------------------
# fmt: off
BT_CENTRAL_ROLE = 0
BT_PERIPHERAL_ROLE = 1
class PhysicalTransport(enum.IntEnum):
BR_EDR = 0
LE = 1
BT_BR_EDR_TRANSPORT = 0
BT_LE_TRANSPORT = 1
BT_BR_EDR_TRANSPORT = PhysicalTransport.BR_EDR
BT_LE_TRANSPORT = PhysicalTransport.LE
# fmt: on
@@ -57,7 +58,7 @@ def bit_flags_to_strings(bits, bit_flag_names):
return names
def name_or_number(dictionary: Dict[int, str], number: int, width: int = 2) -> str:
def name_or_number(dictionary: dict[int, str], number: int, width: int = 2) -> str:
name = dictionary.get(number)
if name is not None:
return name
@@ -200,7 +201,7 @@ class UUID:
'''
BASE_UUID = bytes.fromhex('00001000800000805F9B34FB')[::-1] # little-endian
UUIDS: List[UUID] = [] # Registry of all instances created
UUIDS: list[UUID] = [] # Registry of all instances created
uuid_bytes: bytes
name: Optional[str]
@@ -259,11 +260,11 @@ class UUID:
return cls.from_bytes(struct.pack('<I', uuid_32), name)
@classmethod
def parse_uuid(cls, uuid_as_bytes: bytes, offset: int) -> Tuple[int, UUID]:
def parse_uuid(cls, uuid_as_bytes: bytes, offset: int) -> tuple[int, UUID]:
return len(uuid_as_bytes), cls.from_bytes(uuid_as_bytes[offset:])
@classmethod
def parse_uuid_2(cls, uuid_as_bytes: bytes, offset: int) -> Tuple[int, UUID]:
def parse_uuid_2(cls, uuid_as_bytes: bytes, offset: int) -> tuple[int, UUID]:
return offset + 2, cls.from_bytes(uuid_as_bytes[offset : offset + 2])
def to_bytes(self, force_128: bool = False) -> bytes:
@@ -1280,13 +1281,13 @@ class Appearance:
# Advertising Data
# -----------------------------------------------------------------------------
AdvertisingDataObject = Union[
List[UUID],
Tuple[UUID, bytes],
list[UUID],
tuple[UUID, bytes],
bytes,
str,
int,
Tuple[int, int],
Tuple[int, bytes],
tuple[int, int],
tuple[int, bytes],
Appearance,
]
@@ -1295,116 +1296,116 @@ class AdvertisingData:
# fmt: off
# pylint: disable=line-too-long
FLAGS = 0x01
INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS = 0x02
COMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS = 0x03
INCOMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS = 0x04
COMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS = 0x05
INCOMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS = 0x06
COMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS = 0x07
SHORTENED_LOCAL_NAME = 0x08
COMPLETE_LOCAL_NAME = 0x09
TX_POWER_LEVEL = 0x0A
CLASS_OF_DEVICE = 0x0D
SIMPLE_PAIRING_HASH_C = 0x0E
SIMPLE_PAIRING_HASH_C_192 = 0x0E
SIMPLE_PAIRING_RANDOMIZER_R = 0x0F
SIMPLE_PAIRING_RANDOMIZER_R_192 = 0x0F
DEVICE_ID = 0x10
SECURITY_MANAGER_TK_VALUE = 0x10
SECURITY_MANAGER_OUT_OF_BAND_FLAGS = 0x11
PERIPHERAL_CONNECTION_INTERVAL_RANGE = 0x12
LIST_OF_16_BIT_SERVICE_SOLICITATION_UUIDS = 0x14
LIST_OF_128_BIT_SERVICE_SOLICITATION_UUIDS = 0x15
SERVICE_DATA = 0x16
SERVICE_DATA_16_BIT_UUID = 0x16
PUBLIC_TARGET_ADDRESS = 0x17
RANDOM_TARGET_ADDRESS = 0x18
APPEARANCE = 0x19
ADVERTISING_INTERVAL = 0x1A
LE_BLUETOOTH_DEVICE_ADDRESS = 0x1B
LE_ROLE = 0x1C
SIMPLE_PAIRING_HASH_C_256 = 0x1D
SIMPLE_PAIRING_RANDOMIZER_R_256 = 0x1E
LIST_OF_32_BIT_SERVICE_SOLICITATION_UUIDS = 0x1F
SERVICE_DATA_32_BIT_UUID = 0x20
SERVICE_DATA_128_BIT_UUID = 0x21
LE_SECURE_CONNECTIONS_CONFIRMATION_VALUE = 0x22
LE_SECURE_CONNECTIONS_RANDOM_VALUE = 0x23
URI = 0x24
INDOOR_POSITIONING = 0x25
TRANSPORT_DISCOVERY_DATA = 0x26
LE_SUPPORTED_FEATURES = 0x27
CHANNEL_MAP_UPDATE_INDICATION = 0x28
PB_ADV = 0x29
MESH_MESSAGE = 0x2A
MESH_BEACON = 0x2B
BIGINFO = 0x2C
BROADCAST_CODE = 0x2D
RESOLVABLE_SET_IDENTIFIER = 0x2E
ADVERTISING_INTERVAL_LONG = 0x2F
BROADCAST_NAME = 0x30
ENCRYPTED_ADVERTISING_DATA = 0X31
PERIODIC_ADVERTISING_RESPONSE_TIMING_INFORMATION = 0X32
ELECTRONIC_SHELF_LABEL = 0X34
THREE_D_INFORMATION_DATA = 0x3D
MANUFACTURER_SPECIFIC_DATA = 0xFF
class Type(OpenIntEnum):
FLAGS = 0x01
INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS = 0x02
COMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS = 0x03
INCOMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS = 0x04
COMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS = 0x05
INCOMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS = 0x06
COMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS = 0x07
SHORTENED_LOCAL_NAME = 0x08
COMPLETE_LOCAL_NAME = 0x09
TX_POWER_LEVEL = 0x0A
CLASS_OF_DEVICE = 0x0D
SIMPLE_PAIRING_HASH_C = 0x0E
SIMPLE_PAIRING_HASH_C_192 = 0x0E
SIMPLE_PAIRING_RANDOMIZER_R = 0x0F
SIMPLE_PAIRING_RANDOMIZER_R_192 = 0x0F
DEVICE_ID = 0x10
SECURITY_MANAGER_TK_VALUE = 0x10
SECURITY_MANAGER_OUT_OF_BAND_FLAGS = 0x11
PERIPHERAL_CONNECTION_INTERVAL_RANGE = 0x12
LIST_OF_16_BIT_SERVICE_SOLICITATION_UUIDS = 0x14
LIST_OF_128_BIT_SERVICE_SOLICITATION_UUIDS = 0x15
SERVICE_DATA_16_BIT_UUID = 0x16
PUBLIC_TARGET_ADDRESS = 0x17
RANDOM_TARGET_ADDRESS = 0x18
APPEARANCE = 0x19
ADVERTISING_INTERVAL = 0x1A
LE_BLUETOOTH_DEVICE_ADDRESS = 0x1B
LE_ROLE = 0x1C
SIMPLE_PAIRING_HASH_C_256 = 0x1D
SIMPLE_PAIRING_RANDOMIZER_R_256 = 0x1E
LIST_OF_32_BIT_SERVICE_SOLICITATION_UUIDS = 0x1F
SERVICE_DATA_32_BIT_UUID = 0x20
SERVICE_DATA_128_BIT_UUID = 0x21
LE_SECURE_CONNECTIONS_CONFIRMATION_VALUE = 0x22
LE_SECURE_CONNECTIONS_RANDOM_VALUE = 0x23
URI = 0x24
INDOOR_POSITIONING = 0x25
TRANSPORT_DISCOVERY_DATA = 0x26
LE_SUPPORTED_FEATURES = 0x27
CHANNEL_MAP_UPDATE_INDICATION = 0x28
PB_ADV = 0x29
MESH_MESSAGE = 0x2A
MESH_BEACON = 0x2B
BIGINFO = 0x2C
BROADCAST_CODE = 0x2D
RESOLVABLE_SET_IDENTIFIER = 0x2E
ADVERTISING_INTERVAL_LONG = 0x2F
BROADCAST_NAME = 0x30
ENCRYPTED_ADVERTISING_DATA = 0x31
PERIODIC_ADVERTISING_RESPONSE_TIMING_INFORMATION = 0x32
ELECTRONIC_SHELF_LABEL = 0x34
THREE_D_INFORMATION_DATA = 0x3D
MANUFACTURER_SPECIFIC_DATA = 0xFF
AD_TYPE_NAMES = {
FLAGS: 'FLAGS',
INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS: 'INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS',
COMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS: 'COMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS',
INCOMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS: 'INCOMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS',
COMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS: 'COMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS',
INCOMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS: 'INCOMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS',
COMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS: 'COMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS',
SHORTENED_LOCAL_NAME: 'SHORTENED_LOCAL_NAME',
COMPLETE_LOCAL_NAME: 'COMPLETE_LOCAL_NAME',
TX_POWER_LEVEL: 'TX_POWER_LEVEL',
CLASS_OF_DEVICE: 'CLASS_OF_DEVICE',
SIMPLE_PAIRING_HASH_C: 'SIMPLE_PAIRING_HASH_C',
SIMPLE_PAIRING_HASH_C_192: 'SIMPLE_PAIRING_HASH_C_192',
SIMPLE_PAIRING_RANDOMIZER_R: 'SIMPLE_PAIRING_RANDOMIZER_R',
SIMPLE_PAIRING_RANDOMIZER_R_192: 'SIMPLE_PAIRING_RANDOMIZER_R_192',
DEVICE_ID: 'DEVICE_ID',
SECURITY_MANAGER_TK_VALUE: 'SECURITY_MANAGER_TK_VALUE',
SECURITY_MANAGER_OUT_OF_BAND_FLAGS: 'SECURITY_MANAGER_OUT_OF_BAND_FLAGS',
PERIPHERAL_CONNECTION_INTERVAL_RANGE: 'PERIPHERAL_CONNECTION_INTERVAL_RANGE',
LIST_OF_16_BIT_SERVICE_SOLICITATION_UUIDS: 'LIST_OF_16_BIT_SERVICE_SOLICITATION_UUIDS',
LIST_OF_128_BIT_SERVICE_SOLICITATION_UUIDS: 'LIST_OF_128_BIT_SERVICE_SOLICITATION_UUIDS',
SERVICE_DATA_16_BIT_UUID: 'SERVICE_DATA_16_BIT_UUID',
PUBLIC_TARGET_ADDRESS: 'PUBLIC_TARGET_ADDRESS',
RANDOM_TARGET_ADDRESS: 'RANDOM_TARGET_ADDRESS',
APPEARANCE: 'APPEARANCE',
ADVERTISING_INTERVAL: 'ADVERTISING_INTERVAL',
LE_BLUETOOTH_DEVICE_ADDRESS: 'LE_BLUETOOTH_DEVICE_ADDRESS',
LE_ROLE: 'LE_ROLE',
SIMPLE_PAIRING_HASH_C_256: 'SIMPLE_PAIRING_HASH_C_256',
SIMPLE_PAIRING_RANDOMIZER_R_256: 'SIMPLE_PAIRING_RANDOMIZER_R_256',
LIST_OF_32_BIT_SERVICE_SOLICITATION_UUIDS: 'LIST_OF_32_BIT_SERVICE_SOLICITATION_UUIDS',
SERVICE_DATA_32_BIT_UUID: 'SERVICE_DATA_32_BIT_UUID',
SERVICE_DATA_128_BIT_UUID: 'SERVICE_DATA_128_BIT_UUID',
LE_SECURE_CONNECTIONS_CONFIRMATION_VALUE: 'LE_SECURE_CONNECTIONS_CONFIRMATION_VALUE',
LE_SECURE_CONNECTIONS_RANDOM_VALUE: 'LE_SECURE_CONNECTIONS_RANDOM_VALUE',
URI: 'URI',
INDOOR_POSITIONING: 'INDOOR_POSITIONING',
TRANSPORT_DISCOVERY_DATA: 'TRANSPORT_DISCOVERY_DATA',
LE_SUPPORTED_FEATURES: 'LE_SUPPORTED_FEATURES',
CHANNEL_MAP_UPDATE_INDICATION: 'CHANNEL_MAP_UPDATE_INDICATION',
PB_ADV: 'PB_ADV',
MESH_MESSAGE: 'MESH_MESSAGE',
MESH_BEACON: 'MESH_BEACON',
BIGINFO: 'BIGINFO',
BROADCAST_CODE: 'BROADCAST_CODE',
RESOLVABLE_SET_IDENTIFIER: 'RESOLVABLE_SET_IDENTIFIER',
ADVERTISING_INTERVAL_LONG: 'ADVERTISING_INTERVAL_LONG',
BROADCAST_NAME: 'BROADCAST_NAME',
ENCRYPTED_ADVERTISING_DATA: 'ENCRYPTED_ADVERTISING_DATA',
PERIODIC_ADVERTISING_RESPONSE_TIMING_INFORMATION: 'PERIODIC_ADVERTISING_RESPONSE_TIMING_INFORMATION',
ELECTRONIC_SHELF_LABEL: 'ELECTRONIC_SHELF_LABEL',
THREE_D_INFORMATION_DATA: 'THREE_D_INFORMATION_DATA',
MANUFACTURER_SPECIFIC_DATA: 'MANUFACTURER_SPECIFIC_DATA'
}
# For backward-compatibility
FLAGS = Type.FLAGS
INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS = Type.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS
COMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS = Type.COMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS
INCOMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS = Type.INCOMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS
COMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS = Type.COMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS
INCOMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS = Type.INCOMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS
COMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS = Type.COMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS
SHORTENED_LOCAL_NAME = Type.SHORTENED_LOCAL_NAME
COMPLETE_LOCAL_NAME = Type.COMPLETE_LOCAL_NAME
TX_POWER_LEVEL = Type.TX_POWER_LEVEL
CLASS_OF_DEVICE = Type.CLASS_OF_DEVICE
SIMPLE_PAIRING_HASH_C = Type.SIMPLE_PAIRING_HASH_C
SIMPLE_PAIRING_HASH_C_192 = Type.SIMPLE_PAIRING_HASH_C_192
SIMPLE_PAIRING_RANDOMIZER_R = Type.SIMPLE_PAIRING_RANDOMIZER_R
SIMPLE_PAIRING_RANDOMIZER_R_192 = Type.SIMPLE_PAIRING_RANDOMIZER_R_192
DEVICE_ID = Type.DEVICE_ID
SECURITY_MANAGER_TK_VALUE = Type.SECURITY_MANAGER_TK_VALUE
SECURITY_MANAGER_OUT_OF_BAND_FLAGS = Type.SECURITY_MANAGER_OUT_OF_BAND_FLAGS
PERIPHERAL_CONNECTION_INTERVAL_RANGE = Type.PERIPHERAL_CONNECTION_INTERVAL_RANGE
LIST_OF_16_BIT_SERVICE_SOLICITATION_UUIDS = Type.LIST_OF_16_BIT_SERVICE_SOLICITATION_UUIDS
LIST_OF_128_BIT_SERVICE_SOLICITATION_UUIDS = Type.LIST_OF_128_BIT_SERVICE_SOLICITATION_UUIDS
SERVICE_DATA = Type.SERVICE_DATA_16_BIT_UUID
SERVICE_DATA_16_BIT_UUID = Type.SERVICE_DATA_16_BIT_UUID
PUBLIC_TARGET_ADDRESS = Type.PUBLIC_TARGET_ADDRESS
RANDOM_TARGET_ADDRESS = Type.RANDOM_TARGET_ADDRESS
APPEARANCE = Type.APPEARANCE
ADVERTISING_INTERVAL = Type.ADVERTISING_INTERVAL
LE_BLUETOOTH_DEVICE_ADDRESS = Type.LE_BLUETOOTH_DEVICE_ADDRESS
LE_ROLE = Type.LE_ROLE
SIMPLE_PAIRING_HASH_C_256 = Type.SIMPLE_PAIRING_HASH_C_256
SIMPLE_PAIRING_RANDOMIZER_R_256 = Type.SIMPLE_PAIRING_RANDOMIZER_R_256
LIST_OF_32_BIT_SERVICE_SOLICITATION_UUIDS = Type.LIST_OF_32_BIT_SERVICE_SOLICITATION_UUIDS
SERVICE_DATA_32_BIT_UUID = Type.SERVICE_DATA_32_BIT_UUID
SERVICE_DATA_128_BIT_UUID = Type.SERVICE_DATA_128_BIT_UUID
LE_SECURE_CONNECTIONS_CONFIRMATION_VALUE = Type.LE_SECURE_CONNECTIONS_CONFIRMATION_VALUE
LE_SECURE_CONNECTIONS_RANDOM_VALUE = Type.LE_SECURE_CONNECTIONS_RANDOM_VALUE
URI = Type.URI
INDOOR_POSITIONING = Type.INDOOR_POSITIONING
TRANSPORT_DISCOVERY_DATA = Type.TRANSPORT_DISCOVERY_DATA
LE_SUPPORTED_FEATURES = Type.LE_SUPPORTED_FEATURES
CHANNEL_MAP_UPDATE_INDICATION = Type.CHANNEL_MAP_UPDATE_INDICATION
PB_ADV = Type.PB_ADV
MESH_MESSAGE = Type.MESH_MESSAGE
MESH_BEACON = Type.MESH_BEACON
BIGINFO = Type.BIGINFO
BROADCAST_CODE = Type.BROADCAST_CODE
RESOLVABLE_SET_IDENTIFIER = Type.RESOLVABLE_SET_IDENTIFIER
ADVERTISING_INTERVAL_LONG = Type.ADVERTISING_INTERVAL_LONG
BROADCAST_NAME = Type.BROADCAST_NAME
ENCRYPTED_ADVERTISING_DATA = Type.ENCRYPTED_ADVERTISING_DATA
PERIODIC_ADVERTISING_RESPONSE_TIMING_INFORMATION = Type.PERIODIC_ADVERTISING_RESPONSE_TIMING_INFORMATION
ELECTRONIC_SHELF_LABEL = Type.ELECTRONIC_SHELF_LABEL
THREE_D_INFORMATION_DATA = Type.THREE_D_INFORMATION_DATA
MANUFACTURER_SPECIFIC_DATA = Type.MANUFACTURER_SPECIFIC_DATA
LE_LIMITED_DISCOVERABLE_MODE_FLAG = 0x01
LE_GENERAL_DISCOVERABLE_MODE_FLAG = 0x02
@@ -1412,12 +1413,12 @@ class AdvertisingData:
BR_EDR_CONTROLLER_FLAG = 0x08
BR_EDR_HOST_FLAG = 0x10
ad_structures: List[Tuple[int, bytes]]
ad_structures: list[tuple[int, bytes]]
# fmt: on
# pylint: enable=line-too-long
def __init__(self, ad_structures: Optional[List[Tuple[int, bytes]]] = None) -> None:
def __init__(self, ad_structures: Optional[list[tuple[int, bytes]]] = None) -> None:
if ad_structures is None:
ad_structures = []
self.ad_structures = ad_structures[:]
@@ -1444,7 +1445,7 @@ class AdvertisingData:
return ','.join(bit_flags_to_strings(flags, flag_names))
@staticmethod
def uuid_list_to_objects(ad_data: bytes, uuid_size: int) -> List[UUID]:
def uuid_list_to_objects(ad_data: bytes, uuid_size: int) -> list[UUID]:
uuids = []
offset = 0
while (offset + uuid_size) <= len(ad_data):
@@ -1461,8 +1462,8 @@ class AdvertisingData:
]
)
@staticmethod
def ad_data_to_string(ad_type, ad_data):
@classmethod
def ad_data_to_string(cls, ad_type: int, ad_data: bytes) -> str:
if ad_type == AdvertisingData.FLAGS:
ad_type_str = 'Flags'
ad_data_str = AdvertisingData.flags_to_string(ad_data[0], short=True)
@@ -1521,72 +1522,72 @@ class AdvertisingData:
ad_type_str = 'Broadcast Name'
ad_data_str = ad_data.decode('utf-8')
else:
ad_type_str = AdvertisingData.AD_TYPE_NAMES.get(ad_type, f'0x{ad_type:02X}')
ad_type_str = AdvertisingData.Type(ad_type).name
ad_data_str = ad_data.hex()
return f'[{ad_type_str}]: {ad_data_str}'
# pylint: disable=too-many-return-statements
@staticmethod
def ad_data_to_object(ad_type: int, ad_data: bytes) -> AdvertisingDataObject:
@classmethod
def ad_data_to_object(cls, ad_type: int, ad_data: bytes) -> AdvertisingDataObject:
if ad_type in (
AdvertisingData.COMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.LIST_OF_16_BIT_SERVICE_SOLICITATION_UUIDS,
AdvertisingData.Type.COMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.Type.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.Type.LIST_OF_16_BIT_SERVICE_SOLICITATION_UUIDS,
):
return AdvertisingData.uuid_list_to_objects(ad_data, 2)
if ad_type in (
AdvertisingData.COMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.INCOMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.LIST_OF_32_BIT_SERVICE_SOLICITATION_UUIDS,
AdvertisingData.Type.COMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.Type.INCOMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.Type.LIST_OF_32_BIT_SERVICE_SOLICITATION_UUIDS,
):
return AdvertisingData.uuid_list_to_objects(ad_data, 4)
if ad_type in (
AdvertisingData.COMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.INCOMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.LIST_OF_128_BIT_SERVICE_SOLICITATION_UUIDS,
AdvertisingData.Type.COMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.Type.INCOMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.Type.LIST_OF_128_BIT_SERVICE_SOLICITATION_UUIDS,
):
return AdvertisingData.uuid_list_to_objects(ad_data, 16)
if ad_type == AdvertisingData.SERVICE_DATA_16_BIT_UUID:
if ad_type == AdvertisingData.Type.SERVICE_DATA_16_BIT_UUID:
return (UUID.from_bytes(ad_data[:2]), ad_data[2:])
if ad_type == AdvertisingData.SERVICE_DATA_32_BIT_UUID:
if ad_type == AdvertisingData.Type.SERVICE_DATA_32_BIT_UUID:
return (UUID.from_bytes(ad_data[:4]), ad_data[4:])
if ad_type == AdvertisingData.SERVICE_DATA_128_BIT_UUID:
if ad_type == AdvertisingData.Type.SERVICE_DATA_128_BIT_UUID:
return (UUID.from_bytes(ad_data[:16]), ad_data[16:])
if ad_type in (
AdvertisingData.SHORTENED_LOCAL_NAME,
AdvertisingData.COMPLETE_LOCAL_NAME,
AdvertisingData.URI,
AdvertisingData.BROADCAST_NAME,
AdvertisingData.Type.SHORTENED_LOCAL_NAME,
AdvertisingData.Type.COMPLETE_LOCAL_NAME,
AdvertisingData.Type.URI,
AdvertisingData.Type.BROADCAST_NAME,
):
return ad_data.decode("utf-8")
if ad_type in (AdvertisingData.TX_POWER_LEVEL, AdvertisingData.FLAGS):
if ad_type in (AdvertisingData.Type.TX_POWER_LEVEL, AdvertisingData.Type.FLAGS):
return cast(int, struct.unpack('B', ad_data)[0])
if ad_type in (AdvertisingData.ADVERTISING_INTERVAL,):
if ad_type in (AdvertisingData.Type.ADVERTISING_INTERVAL,):
return cast(int, struct.unpack('<H', ad_data)[0])
if ad_type == AdvertisingData.CLASS_OF_DEVICE:
if ad_type == AdvertisingData.Type.CLASS_OF_DEVICE:
return cast(int, struct.unpack('<I', bytes([*ad_data, 0]))[0])
if ad_type == AdvertisingData.PERIPHERAL_CONNECTION_INTERVAL_RANGE:
return cast(Tuple[int, int], struct.unpack('<HH', ad_data))
if ad_type == AdvertisingData.Type.PERIPHERAL_CONNECTION_INTERVAL_RANGE:
return cast(tuple[int, int], struct.unpack('<HH', ad_data))
if ad_type == AdvertisingData.MANUFACTURER_SPECIFIC_DATA:
return (cast(int, struct.unpack_from('<H', ad_data, 0)[0]), ad_data[2:])
if ad_type == AdvertisingData.APPEARANCE:
if ad_type == AdvertisingData.Type.APPEARANCE:
return Appearance.from_int(
cast(int, struct.unpack_from('<H', ad_data, 0)[0])
)
if ad_type == AdvertisingData.Type.MANUFACTURER_SPECIFIC_DATA:
return (cast(int, struct.unpack_from('<H', ad_data, 0)[0]), ad_data[2:])
return ad_data
def append(self, data: bytes) -> None:
@@ -1600,7 +1601,80 @@ class AdvertisingData:
self.ad_structures.append((ad_type, ad_data))
offset += length
def get_all(self, type_id: int, raw: bool = False) -> List[AdvertisingDataObject]:
@overload
def get_all(
self,
type_id: Literal[
AdvertisingData.Type.COMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.Type.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.Type.LIST_OF_16_BIT_SERVICE_SOLICITATION_UUIDS,
AdvertisingData.Type.COMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.Type.INCOMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.Type.LIST_OF_32_BIT_SERVICE_SOLICITATION_UUIDS,
AdvertisingData.Type.COMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.Type.INCOMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.Type.LIST_OF_128_BIT_SERVICE_SOLICITATION_UUIDS,
],
raw: Literal[False] = False,
) -> list[list[UUID]]: ...
@overload
def get_all(
self,
type_id: Literal[
AdvertisingData.Type.SERVICE_DATA_16_BIT_UUID,
AdvertisingData.Type.SERVICE_DATA_32_BIT_UUID,
AdvertisingData.Type.SERVICE_DATA_128_BIT_UUID,
],
raw: Literal[False] = False,
) -> list[tuple[UUID, bytes]]: ...
@overload
def get_all(
self,
type_id: Literal[
AdvertisingData.Type.SHORTENED_LOCAL_NAME,
AdvertisingData.Type.COMPLETE_LOCAL_NAME,
AdvertisingData.Type.URI,
AdvertisingData.Type.BROADCAST_NAME,
],
raw: Literal[False] = False,
) -> list[str]: ...
@overload
def get_all(
self,
type_id: Literal[
AdvertisingData.Type.TX_POWER_LEVEL,
AdvertisingData.Type.FLAGS,
AdvertisingData.Type.ADVERTISING_INTERVAL,
AdvertisingData.Type.CLASS_OF_DEVICE,
],
raw: Literal[False] = False,
) -> list[int]: ...
@overload
def get_all(
self,
type_id: Literal[AdvertisingData.Type.PERIPHERAL_CONNECTION_INTERVAL_RANGE,],
raw: Literal[False] = False,
) -> list[tuple[int, int]]: ...
@overload
def get_all(
self,
type_id: Literal[AdvertisingData.Type.MANUFACTURER_SPECIFIC_DATA,],
raw: Literal[False] = False,
) -> list[tuple[int, bytes]]: ...
@overload
def get_all(
self,
type_id: Literal[AdvertisingData.Type.APPEARANCE,],
raw: Literal[False] = False,
) -> list[Appearance]: ...
@overload
def get_all(self, type_id: int, raw: Literal[True]) -> list[bytes]: ...
@overload
def get_all(
self, type_id: int, raw: bool = False
) -> list[AdvertisingDataObject]: ...
def get_all(self, type_id: int, raw: bool = False) -> list[AdvertisingDataObject]: # type: ignore[misc]
'''
Get Advertising Data Structure(s) with a given type
@@ -1612,6 +1686,79 @@ class AdvertisingData:
return [process_ad_data(ad[1]) for ad in self.ad_structures if ad[0] == type_id]
@overload
def get(
self,
type_id: Literal[
AdvertisingData.Type.COMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.Type.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.Type.LIST_OF_16_BIT_SERVICE_SOLICITATION_UUIDS,
AdvertisingData.Type.COMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.Type.INCOMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.Type.LIST_OF_32_BIT_SERVICE_SOLICITATION_UUIDS,
AdvertisingData.Type.COMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.Type.INCOMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.Type.LIST_OF_128_BIT_SERVICE_SOLICITATION_UUIDS,
],
raw: Literal[False] = False,
) -> Optional[list[UUID]]: ...
@overload
def get(
self,
type_id: Literal[
AdvertisingData.Type.SERVICE_DATA_16_BIT_UUID,
AdvertisingData.Type.SERVICE_DATA_32_BIT_UUID,
AdvertisingData.Type.SERVICE_DATA_128_BIT_UUID,
],
raw: Literal[False] = False,
) -> Optional[tuple[UUID, bytes]]: ...
@overload
def get(
self,
type_id: Literal[
AdvertisingData.Type.SHORTENED_LOCAL_NAME,
AdvertisingData.Type.COMPLETE_LOCAL_NAME,
AdvertisingData.Type.URI,
AdvertisingData.Type.BROADCAST_NAME,
],
raw: Literal[False] = False,
) -> Optional[Optional[str]]: ...
@overload
def get(
self,
type_id: Literal[
AdvertisingData.Type.TX_POWER_LEVEL,
AdvertisingData.Type.FLAGS,
AdvertisingData.Type.ADVERTISING_INTERVAL,
AdvertisingData.Type.CLASS_OF_DEVICE,
],
raw: Literal[False] = False,
) -> Optional[int]: ...
@overload
def get(
self,
type_id: Literal[AdvertisingData.Type.PERIPHERAL_CONNECTION_INTERVAL_RANGE,],
raw: Literal[False] = False,
) -> Optional[tuple[int, int]]: ...
@overload
def get(
self,
type_id: Literal[AdvertisingData.Type.MANUFACTURER_SPECIFIC_DATA,],
raw: Literal[False] = False,
) -> Optional[tuple[int, bytes]]: ...
@overload
def get(
self,
type_id: Literal[AdvertisingData.Type.APPEARANCE,],
raw: Literal[False] = False,
) -> Optional[Appearance]: ...
@overload
def get(self, type_id: int, raw: Literal[True]) -> Optional[bytes]: ...
@overload
def get(
self, type_id: int, raw: bool = False
) -> Optional[AdvertisingDataObject]: ...
def get(self, type_id: int, raw: bool = False) -> Optional[AdvertisingDataObject]:
'''
Get Advertising Data Structure(s) with a given type
+609 -100
View File
File diff suppressed because it is too large Load Diff
+29 -229
View File
@@ -27,29 +27,16 @@ import enum
import functools
import logging
import struct
from typing import (
Any,
Callable,
Dict,
Iterable,
List,
Optional,
Sequence,
SupportsBytes,
Type,
Union,
TYPE_CHECKING,
)
from typing import Iterable, List, Optional, Sequence, TypeVar, Union
from bumble.colors import color
from bumble.core import BaseBumbleError, InvalidOperationError, UUID
from bumble.core import BaseBumbleError, UUID
from bumble.att import Attribute, AttributeValue
from bumble.utils import ByteSerializable
if TYPE_CHECKING:
from bumble.gatt_client import AttributeProxy
from bumble.device import Connection
# -----------------------------------------------------------------------------
# Typing
# -----------------------------------------------------------------------------
_T = TypeVar('_T')
# -----------------------------------------------------------------------------
# Logging
@@ -437,7 +424,7 @@ class IncludedServiceDeclaration(Attribute):
# -----------------------------------------------------------------------------
class Characteristic(Attribute):
class Characteristic(Attribute[_T]):
'''
See Vol 3, Part G - 3.3 CHARACTERISTIC DEFINITION
'''
@@ -500,7 +487,7 @@ class Characteristic(Attribute):
uuid: Union[str, bytes, UUID],
properties: Characteristic.Properties,
permissions: Union[str, Attribute.Permissions],
value: Any = b'',
value: Union[AttributeValue[_T], _T, None] = None,
descriptors: Sequence[Descriptor] = (),
):
super().__init__(uuid, permissions, value)
@@ -560,217 +547,10 @@ class CharacteristicDeclaration(Attribute):
# -----------------------------------------------------------------------------
class CharacteristicValue(AttributeValue):
class CharacteristicValue(AttributeValue[_T]):
"""Same as AttributeValue, for backward compatibility"""
# -----------------------------------------------------------------------------
class CharacteristicAdapter:
'''
An adapter that can adapt Characteristic and AttributeProxy objects
by wrapping their `read_value()` and `write_value()` methods with ones that
return/accept encoded/decoded values.
For proxies (i.e used by a GATT client), the adaptation is one where the return
value of `read_value()` is decoded and the value passed to `write_value()` is
encoded. The `subscribe()` method, is wrapped with one where the values are decoded
before being passed to the subscriber.
For local values (i.e hosted by a GATT server) the adaptation is one where the
return value of `read_value()` is encoded and the value passed to `write_value()`
is decoded.
'''
read_value: Callable
write_value: Callable
def __init__(self, characteristic: Union[Characteristic, AttributeProxy]):
self.wrapped_characteristic = characteristic
self.subscribers: Dict[Callable, Callable] = (
{}
) # Map from subscriber to proxy subscriber
if isinstance(characteristic, Characteristic):
self.read_value = self.read_encoded_value
self.write_value = self.write_encoded_value
else:
self.read_value = self.read_decoded_value
self.write_value = self.write_decoded_value
self.subscribe = self.wrapped_subscribe
self.unsubscribe = self.wrapped_unsubscribe
def __getattr__(self, name):
return getattr(self.wrapped_characteristic, name)
def __setattr__(self, name, value):
if name in (
'wrapped_characteristic',
'subscribers',
'read_value',
'write_value',
'subscribe',
'unsubscribe',
):
super().__setattr__(name, value)
else:
setattr(self.wrapped_characteristic, name, value)
async def read_encoded_value(self, connection):
return self.encode_value(
await self.wrapped_characteristic.read_value(connection)
)
async def write_encoded_value(self, connection, value):
return await self.wrapped_characteristic.write_value(
connection, self.decode_value(value)
)
async def read_decoded_value(self):
return self.decode_value(await self.wrapped_characteristic.read_value())
async def write_decoded_value(self, value, with_response=False):
return await self.wrapped_characteristic.write_value(
self.encode_value(value), with_response
)
def encode_value(self, value):
return value
def decode_value(self, value):
return value
def wrapped_subscribe(self, subscriber=None):
if subscriber is not None:
if subscriber in self.subscribers:
# We already have a proxy subscriber
subscriber = self.subscribers[subscriber]
else:
# Create and register a proxy that will decode the value
original_subscriber = subscriber
def on_change(value):
original_subscriber(self.decode_value(value))
self.subscribers[subscriber] = on_change
subscriber = on_change
return self.wrapped_characteristic.subscribe(subscriber)
def wrapped_unsubscribe(self, subscriber=None):
if subscriber in self.subscribers:
subscriber = self.subscribers.pop(subscriber)
return self.wrapped_characteristic.unsubscribe(subscriber)
def __str__(self) -> str:
wrapped = str(self.wrapped_characteristic)
return f'{self.__class__.__name__}({wrapped})'
# -----------------------------------------------------------------------------
class DelegatedCharacteristicAdapter(CharacteristicAdapter):
'''
Adapter that converts bytes values using an encode and a decode function.
'''
def __init__(self, characteristic, encode=None, decode=None):
super().__init__(characteristic)
self.encode = encode
self.decode = decode
def encode_value(self, value):
if self.encode is None:
raise InvalidOperationError('delegated adapter does not have an encoder')
return self.encode(value)
def decode_value(self, value):
if self.decode is None:
raise InvalidOperationError('delegate adapter does not have a decoder')
return self.decode(value)
# -----------------------------------------------------------------------------
class PackedCharacteristicAdapter(CharacteristicAdapter):
'''
Adapter that packs/unpacks characteristic values according to a standard
Python `struct` format.
For formats with a single value, the adapted `read_value` and `write_value`
methods return/accept single values. For formats with multiple values,
they return/accept a tuple with the same number of elements as is required for
the format.
'''
def __init__(self, characteristic, pack_format):
super().__init__(characteristic)
self.struct = struct.Struct(pack_format)
def pack(self, *values):
return self.struct.pack(*values)
def unpack(self, buffer):
return self.struct.unpack(buffer)
def encode_value(self, value):
return self.pack(*value if isinstance(value, tuple) else (value,))
def decode_value(self, value):
unpacked = self.unpack(value)
return unpacked[0] if len(unpacked) == 1 else unpacked
# -----------------------------------------------------------------------------
class MappedCharacteristicAdapter(PackedCharacteristicAdapter):
'''
Adapter that packs/unpacks characteristic values according to a standard
Python `struct` format.
The adapted `read_value` and `write_value` methods return/accept a dictionary which
is packed/unpacked according to format, with the arguments extracted from the
dictionary by key, in the same order as they occur in the `keys` parameter.
'''
def __init__(self, characteristic, pack_format, keys):
super().__init__(characteristic, pack_format)
self.keys = keys
# pylint: disable=arguments-differ
def pack(self, values):
return super().pack(*(values[key] for key in self.keys))
def unpack(self, buffer):
return dict(zip(self.keys, super().unpack(buffer)))
# -----------------------------------------------------------------------------
class UTF8CharacteristicAdapter(CharacteristicAdapter):
'''
Adapter that converts strings to/from bytes using UTF-8 encoding
'''
def encode_value(self, value: str) -> bytes:
return value.encode('utf-8')
def decode_value(self, value: bytes) -> str:
return value.decode('utf-8')
# -----------------------------------------------------------------------------
class SerializableCharacteristicAdapter(CharacteristicAdapter):
'''
Adapter that converts any class to/from bytes using the class'
`to_bytes` and `__bytes__` methods, respectively.
'''
def __init__(self, characteristic, cls: Type[ByteSerializable]):
super().__init__(characteristic)
self.cls = cls
def encode_value(self, value: SupportsBytes) -> bytes:
return bytes(value)
def decode_value(self, value: bytes) -> Any:
return self.cls.from_bytes(value)
# -----------------------------------------------------------------------------
class Descriptor(Attribute):
'''
@@ -805,3 +585,23 @@ class ClientCharacteristicConfigurationBits(enum.IntFlag):
DEFAULT = 0x0000
NOTIFICATION = 0x0001
INDICATION = 0x0002
# -----------------------------------------------------------------------------
class ClientSupportedFeatures(enum.IntFlag):
'''
See Vol 3, Part G - 7.2 - Table 7.6: Client Supported Features bit assignments.
'''
ROBUST_CACHING = 0x01
ENHANCED_ATT_BEARER = 0x02
MULTIPLE_HANDLE_VALUE_NOTIFICATIONS = 0x04
# -----------------------------------------------------------------------------
class ServerSupportedFeatures(enum.IntFlag):
'''
See Vol 3, Part G - 7.4 - Table 7.11: Server Supported Features bit assignments.
'''
EATT_SUPPORTED = 0x01
+374
View File
@@ -0,0 +1,374 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# GATT - Type Adapters
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import struct
from typing import (
Any,
Callable,
Generic,
Iterable,
Literal,
Optional,
Type,
TypeVar,
)
from bumble.core import InvalidOperationError
from bumble.gatt import Characteristic
from bumble.gatt_client import CharacteristicProxy
from bumble.utils import ByteSerializable, IntConvertible
# -----------------------------------------------------------------------------
# Typing
# -----------------------------------------------------------------------------
_T = TypeVar('_T')
_T2 = TypeVar('_T2', bound=ByteSerializable)
_T3 = TypeVar('_T3', bound=IntConvertible)
# -----------------------------------------------------------------------------
class CharacteristicAdapter(Characteristic, Generic[_T]):
'''Base class for GATT Characteristic adapters.'''
def __init__(self, characteristic: Characteristic) -> None:
super().__init__(
characteristic.uuid,
characteristic.properties,
characteristic.permissions,
characteristic.value,
characteristic.descriptors,
)
# -----------------------------------------------------------------------------
class CharacteristicProxyAdapter(CharacteristicProxy[_T]):
'''Base class for GATT CharacteristicProxy adapters.'''
def __init__(self, characteristic_proxy: CharacteristicProxy):
super().__init__(
characteristic_proxy.client,
characteristic_proxy.handle,
characteristic_proxy.end_group_handle,
characteristic_proxy.uuid,
characteristic_proxy.properties,
)
# -----------------------------------------------------------------------------
class DelegatedCharacteristicAdapter(CharacteristicAdapter[_T]):
'''
Adapter that converts bytes values using an encode and/or a decode function.
'''
def __init__(
self,
characteristic: Characteristic,
encode: Optional[Callable[[_T], bytes]] = None,
decode: Optional[Callable[[bytes], _T]] = None,
):
super().__init__(characteristic)
self.encode = encode
self.decode = decode
def encode_value(self, value: _T) -> bytes:
if self.encode is None:
raise InvalidOperationError('delegated adapter does not have an encoder')
return self.encode(value)
def decode_value(self, value: bytes) -> _T:
if self.decode is None:
raise InvalidOperationError('delegate adapter does not have a decoder')
return self.decode(value)
# -----------------------------------------------------------------------------
class DelegatedCharacteristicProxyAdapter(CharacteristicProxyAdapter[_T]):
'''
Adapter that converts bytes values using an encode and a decode function.
'''
def __init__(
self,
characteristic_proxy: CharacteristicProxy,
encode: Optional[Callable[[_T], bytes]] = None,
decode: Optional[Callable[[bytes], _T]] = None,
):
super().__init__(characteristic_proxy)
self.encode = encode
self.decode = decode
def encode_value(self, value: _T) -> bytes:
if self.encode is None:
raise InvalidOperationError('delegated adapter does not have an encoder')
return self.encode(value)
def decode_value(self, value: bytes) -> _T:
if self.decode is None:
raise InvalidOperationError('delegate adapter does not have a decoder')
return self.decode(value)
# -----------------------------------------------------------------------------
class PackedCharacteristicAdapter(CharacteristicAdapter):
'''
Adapter that packs/unpacks characteristic values according to a standard
Python `struct` format.
For formats with a single value, the adapted `read_value` and `write_value`
methods return/accept single values. For formats with multiple values,
they return/accept a tuple with the same number of elements as is required for
the format.
'''
def __init__(self, characteristic: Characteristic, pack_format: str) -> None:
super().__init__(characteristic)
self.struct = struct.Struct(pack_format)
def pack(self, *values) -> bytes:
return self.struct.pack(*values)
def unpack(self, buffer: bytes) -> tuple:
return self.struct.unpack(buffer)
def encode_value(self, value: Any) -> bytes:
return self.pack(*value if isinstance(value, tuple) else (value,))
def decode_value(self, value: bytes) -> Any:
unpacked = self.unpack(value)
return unpacked[0] if len(unpacked) == 1 else unpacked
# -----------------------------------------------------------------------------
class PackedCharacteristicProxyAdapter(CharacteristicProxyAdapter):
'''
Adapter that packs/unpacks characteristic values according to a standard
Python `struct` format.
For formats with a single value, the adapted `read_value` and `write_value`
methods return/accept single values. For formats with multiple values,
they return/accept a tuple with the same number of elements as is required for
the format.
'''
def __init__(self, characteristic_proxy, pack_format):
super().__init__(characteristic_proxy)
self.struct = struct.Struct(pack_format)
def pack(self, *values) -> bytes:
return self.struct.pack(*values)
def unpack(self, buffer: bytes) -> tuple:
return self.struct.unpack(buffer)
def encode_value(self, value: Any) -> bytes:
return self.pack(*value if isinstance(value, tuple) else (value,))
def decode_value(self, value: bytes) -> Any:
unpacked = self.unpack(value)
return unpacked[0] if len(unpacked) == 1 else unpacked
# -----------------------------------------------------------------------------
class MappedCharacteristicAdapter(PackedCharacteristicAdapter):
'''
Adapter that packs/unpacks characteristic values according to a standard
Python `struct` format.
The adapted `read_value` and `write_value` methods return/accept a dictionary which
is packed/unpacked according to format, with the arguments extracted from the
dictionary by key, in the same order as they occur in the `keys` parameter.
'''
def __init__(
self, characteristic: Characteristic, pack_format: str, keys: Iterable[str]
) -> None:
super().__init__(characteristic, pack_format)
self.keys = keys
# pylint: disable=arguments-differ
def pack(self, values) -> bytes:
return super().pack(*(values[key] for key in self.keys))
def unpack(self, buffer: bytes) -> Any:
return dict(zip(self.keys, super().unpack(buffer)))
# -----------------------------------------------------------------------------
class MappedCharacteristicProxyAdapter(PackedCharacteristicProxyAdapter):
'''
Adapter that packs/unpacks characteristic values according to a standard
Python `struct` format.
The adapted `read_value` and `write_value` methods return/accept a dictionary which
is packed/unpacked according to format, with the arguments extracted from the
dictionary by key, in the same order as they occur in the `keys` parameter.
'''
def __init__(
self,
characteristic_proxy: CharacteristicProxy,
pack_format: str,
keys: Iterable[str],
) -> None:
super().__init__(characteristic_proxy, pack_format)
self.keys = keys
# pylint: disable=arguments-differ
def pack(self, values) -> bytes:
return super().pack(*(values[key] for key in self.keys))
def unpack(self, buffer: bytes) -> Any:
return dict(zip(self.keys, super().unpack(buffer)))
# -----------------------------------------------------------------------------
class UTF8CharacteristicAdapter(CharacteristicAdapter[str]):
'''
Adapter that converts strings to/from bytes using UTF-8 encoding
'''
def encode_value(self, value: str) -> bytes:
return value.encode('utf-8')
def decode_value(self, value: bytes) -> str:
return value.decode('utf-8')
# -----------------------------------------------------------------------------
class UTF8CharacteristicProxyAdapter(CharacteristicProxyAdapter[str]):
'''
Adapter that converts strings to/from bytes using UTF-8 encoding
'''
def encode_value(self, value: str) -> bytes:
return value.encode('utf-8')
def decode_value(self, value: bytes) -> str:
return value.decode('utf-8')
# -----------------------------------------------------------------------------
class SerializableCharacteristicAdapter(CharacteristicAdapter[_T2]):
'''
Adapter that converts any class to/from bytes using the class'
`to_bytes` and `__bytes__` methods, respectively.
'''
def __init__(self, characteristic: Characteristic, cls: Type[_T2]) -> None:
super().__init__(characteristic)
self.cls = cls
def encode_value(self, value: _T2) -> bytes:
return bytes(value)
def decode_value(self, value: bytes) -> _T2:
return self.cls.from_bytes(value)
# -----------------------------------------------------------------------------
class SerializableCharacteristicProxyAdapter(CharacteristicProxyAdapter[_T2]):
'''
Adapter that converts any class to/from bytes using the class'
`to_bytes` and `__bytes__` methods, respectively.
'''
def __init__(
self, characteristic_proxy: CharacteristicProxy, cls: Type[_T2]
) -> None:
super().__init__(characteristic_proxy)
self.cls = cls
def encode_value(self, value: _T2) -> bytes:
return bytes(value)
def decode_value(self, value: bytes) -> _T2:
return self.cls.from_bytes(value)
# -----------------------------------------------------------------------------
class EnumCharacteristicAdapter(CharacteristicAdapter[_T3]):
'''
Adapter that converts int-enum-like classes to/from bytes using the class'
`int().to_bytes()` and `from_bytes()` methods, respectively.
'''
def __init__(
self,
characteristic: Characteristic,
cls: Type[_T3],
length: int,
byteorder: Literal['little', 'big'] = 'little',
):
"""
Initialize an instance.
Params:
characteristic: the Characteristic to adapt to/from
cls: the class to/from which to convert integer values
length: number of bytes used to represent integer values
byteorder: byte order of the byte representation of integers.
"""
super().__init__(characteristic)
self.cls = cls
self.length = length
self.byteorder = byteorder
def encode_value(self, value: _T3) -> bytes:
return int(value).to_bytes(self.length, self.byteorder)
def decode_value(self, value: bytes) -> _T3:
int_value = int.from_bytes(value, self.byteorder)
return self.cls(int_value)
# -----------------------------------------------------------------------------
class EnumCharacteristicProxyAdapter(CharacteristicProxyAdapter[_T3]):
'''
Adapter that converts int-enum-like classes to/from bytes using the class'
`int().to_bytes()` and `from_bytes()` methods, respectively.
'''
def __init__(
self,
characteristic_proxy: CharacteristicProxy,
cls: Type[_T3],
length: int,
byteorder: Literal['little', 'big'] = 'little',
):
"""
Initialize an instance.
Params:
characteristic_proxy: the CharacteristicProxy to adapt to/from
cls: the class to/from which to convert integer values
length: number of bytes used to represent integer values
byteorder: byte order of the byte representation of integers.
"""
super().__init__(characteristic_proxy)
self.cls = cls
self.length = length
self.byteorder = byteorder
def encode_value(self, value: _T3) -> bytes:
return int(value).to_bytes(self.length, self.byteorder)
def decode_value(self, value: bytes) -> _T3:
int_value = int.from_bytes(value, self.byteorder)
a = self.cls(int_value)
return self.cls(int_value)
+40 -33
View File
@@ -29,16 +29,18 @@ import logging
import struct
from datetime import datetime
from typing import (
Any,
Callable,
Dict,
Generic,
Iterable,
List,
Optional,
Dict,
Tuple,
Callable,
Union,
Any,
Iterable,
Type,
Set,
Tuple,
Union,
Type,
TypeVar,
TYPE_CHECKING,
)
@@ -82,9 +84,14 @@ from .gatt import (
TemplateService,
)
# -----------------------------------------------------------------------------
# Typing
# -----------------------------------------------------------------------------
if TYPE_CHECKING:
from bumble.device import Connection
_T = TypeVar('_T')
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
@@ -110,7 +117,7 @@ def show_services(services: Iterable[ServiceProxy]) -> None:
# -----------------------------------------------------------------------------
# Proxies
# -----------------------------------------------------------------------------
class AttributeProxy(EventEmitter):
class AttributeProxy(EventEmitter, Generic[_T]):
def __init__(
self, client: Client, handle: int, end_group_handle: int, attribute_type: UUID
) -> None:
@@ -120,21 +127,21 @@ class AttributeProxy(EventEmitter):
self.end_group_handle = end_group_handle
self.type = attribute_type
async def read_value(self, no_long_read: bool = False) -> bytes:
async def read_value(self, no_long_read: bool = False) -> _T:
return self.decode_value(
await self.client.read_value(self.handle, no_long_read)
)
async def write_value(self, value, with_response=False):
async def write_value(self, value: _T, with_response=False):
return await self.client.write_value(
self.handle, self.encode_value(value), with_response
)
def encode_value(self, value: Any) -> bytes:
return value
def encode_value(self, value: _T) -> bytes:
return value # type: ignore
def decode_value(self, value_bytes: bytes) -> Any:
return value_bytes
def decode_value(self, value: bytes) -> _T:
return value # type: ignore
def __str__(self) -> str:
return f'Attribute(handle=0x{self.handle:04X}, type={self.type})'
@@ -184,19 +191,19 @@ class ServiceProxy(AttributeProxy):
return f'Service(handle=0x{self.handle:04X}, uuid={self.uuid})'
class CharacteristicProxy(AttributeProxy):
class CharacteristicProxy(AttributeProxy[_T]):
properties: Characteristic.Properties
descriptors: List[DescriptorProxy]
subscribers: Dict[Any, Callable[[bytes], Any]]
subscribers: Dict[Any, Callable[[_T], Any]]
def __init__(
self,
client,
handle,
end_group_handle,
uuid,
client: Client,
handle: int,
end_group_handle: int,
uuid: UUID,
properties: int,
):
) -> None:
super().__init__(client, handle, end_group_handle, uuid)
self.uuid = uuid
self.properties = Characteristic.Properties(properties)
@@ -204,21 +211,21 @@ class CharacteristicProxy(AttributeProxy):
self.descriptors_discovered = False
self.subscribers = {} # Map from subscriber to proxy subscriber
def get_descriptor(self, descriptor_type):
def get_descriptor(self, descriptor_type: UUID) -> Optional[DescriptorProxy]:
for descriptor in self.descriptors:
if descriptor.type == descriptor_type:
return descriptor
return None
async def discover_descriptors(self):
async def discover_descriptors(self) -> list[DescriptorProxy]:
return await self.client.discover_descriptors(self)
async def subscribe(
self,
subscriber: Optional[Callable[[bytes], Any]] = None,
subscriber: Optional[Callable[[_T], Any]] = None,
prefer_notify: bool = True,
):
) -> None:
if subscriber is not None:
if subscriber in self.subscribers:
# We already have a proxy subscriber
@@ -233,13 +240,13 @@ class CharacteristicProxy(AttributeProxy):
self.subscribers[subscriber] = on_change
subscriber = on_change
return await self.client.subscribe(self, subscriber, prefer_notify)
await self.client.subscribe(self, subscriber, prefer_notify)
async def unsubscribe(self, subscriber=None, force=False):
async def unsubscribe(self, subscriber=None, force=False) -> None:
if subscriber in self.subscribers:
subscriber = self.subscribers.pop(subscriber)
return await self.client.unsubscribe(self, subscriber, force)
await self.client.unsubscribe(self, subscriber, force)
def __str__(self) -> str:
return (
@@ -250,7 +257,7 @@ class CharacteristicProxy(AttributeProxy):
class DescriptorProxy(AttributeProxy):
def __init__(self, client, handle, descriptor_type):
def __init__(self, client: Client, handle: int, descriptor_type: UUID) -> None:
super().__init__(client, handle, 0, descriptor_type)
def __str__(self) -> str:
@@ -679,7 +686,7 @@ class Client:
properties, handle = struct.unpack_from('<BH', attribute_value)
characteristic_uuid = UUID.from_bytes(attribute_value[3:])
characteristic = CharacteristicProxy(
characteristic: CharacteristicProxy = CharacteristicProxy(
self, handle, 0, characteristic_uuid, properties
)
@@ -805,7 +812,7 @@ class Client:
logger.warning(f'bogus handle value: {attribute_handle}')
return []
attribute = AttributeProxy(
attribute: AttributeProxy = AttributeProxy(
self, attribute_handle, 0, UUID.from_bytes(attribute_uuid)
)
attributes.append(attribute)
@@ -818,7 +825,7 @@ class Client:
async def subscribe(
self,
characteristic: CharacteristicProxy,
subscriber: Optional[Callable[[bytes], Any]] = None,
subscriber: Optional[Callable[[Any], Any]] = None,
prefer_notify: bool = True,
) -> None:
# If we haven't already discovered the descriptors for this characteristic,
@@ -868,7 +875,7 @@ class Client:
async def unsubscribe(
self,
characteristic: CharacteristicProxy,
subscriber: Optional[Callable[[bytes], Any]] = None,
subscriber: Optional[Callable[[Any], Any]] = None,
force: bool = False,
) -> None:
'''
+5 -5
View File
@@ -36,7 +36,6 @@ from typing import (
Tuple,
TypeVar,
Type,
Union,
TYPE_CHECKING,
)
from pyee import EventEmitter
@@ -78,7 +77,6 @@ from bumble.gatt import (
GATT_REQUEST_TIMEOUT,
GATT_SECONDARY_SERVICE_ATTRIBUTE_TYPE,
Characteristic,
CharacteristicAdapter,
CharacteristicDeclaration,
CharacteristicValue,
IncludedServiceDeclaration,
@@ -469,7 +467,7 @@ class Server(EventEmitter):
finally:
self.pending_confirmations[connection.handle] = None
async def notify_or_indicate_subscribers(
async def _notify_or_indicate_subscribers(
self,
indicate: bool,
attribute: Attribute,
@@ -503,7 +501,9 @@ class Server(EventEmitter):
value: Optional[bytes] = None,
force: bool = False,
):
return await self.notify_or_indicate_subscribers(False, attribute, value, force)
return await self._notify_or_indicate_subscribers(
False, attribute, value, force
)
async def indicate_subscribers(
self,
@@ -511,7 +511,7 @@ class Server(EventEmitter):
value: Optional[bytes] = None,
force: bool = False,
):
return await self.notify_or_indicate_subscribers(True, attribute, value, force)
return await self._notify_or_indicate_subscribers(True, attribute, value, force)
def on_disconnection(self, connection: Connection) -> None:
if connection.handle in self.subscribers:
+182 -75
View File
@@ -24,6 +24,7 @@ import logging
import secrets
import struct
from typing import Any, Callable, Dict, Iterable, List, Optional, Type, Union, ClassVar
from typing_extensions import Self
from bumble import crypto
from bumble.colors import color
@@ -34,6 +35,7 @@ from bumble.core import (
InvalidArgumentError,
InvalidPacketError,
ProtocolError,
PhysicalTransport,
bit_flags_to_strings,
name_or_number,
padded_bytes,
@@ -94,7 +96,7 @@ def map_class_of_device(class_of_device):
)
def phy_list_to_bits(phys: Optional[Iterable[int]]) -> int:
def phy_list_to_bits(phys: Optional[Iterable[Phy]]) -> int:
if phys is None:
return 0
@@ -700,30 +702,22 @@ HCI_ERROR_NAMES[HCI_SUCCESS] = 'HCI_SUCCESS'
HCI_COMMAND_STATUS_PENDING = 0
class Phy(enum.IntEnum):
LE_1M = 1
LE_2M = 2
LE_CODED = 3
# ACL
HCI_ACL_PB_FIRST_NON_FLUSHABLE = 0
HCI_ACL_PB_CONTINUATION = 1
HCI_ACL_PB_FIRST_FLUSHABLE = 2
HCI_ACK_PB_COMPLETE_L2CAP = 3
# Roles
HCI_CENTRAL_ROLE = 0
HCI_PERIPHERAL_ROLE = 1
HCI_ROLE_NAMES = {
HCI_CENTRAL_ROLE: 'CENTRAL',
HCI_PERIPHERAL_ROLE: 'PERIPHERAL'
}
# LE PHY Types
HCI_LE_1M_PHY = 1
HCI_LE_2M_PHY = 2
HCI_LE_CODED_PHY = 3
HCI_LE_PHY_NAMES = {
HCI_LE_1M_PHY: 'LE 1M',
HCI_LE_2M_PHY: 'LE 2M',
HCI_LE_CODED_PHY: 'LE Coded'
HCI_LE_PHY_NAMES: dict[int,str] = {
Phy.LE_1M: 'LE 1M',
Phy.LE_2M: 'LE 2M',
Phy.LE_CODED: 'LE Coded'
}
HCI_LE_1M_PHY_BIT = 0
@@ -732,19 +726,13 @@ HCI_LE_CODED_PHY_BIT = 2
HCI_LE_PHY_BIT_NAMES = ['LE_1M_PHY', 'LE_2M_PHY', 'LE_CODED_PHY']
HCI_LE_PHY_TYPE_TO_BIT = {
HCI_LE_1M_PHY: HCI_LE_1M_PHY_BIT,
HCI_LE_2M_PHY: HCI_LE_2M_PHY_BIT,
HCI_LE_CODED_PHY: HCI_LE_CODED_PHY_BIT
HCI_LE_PHY_TYPE_TO_BIT: dict[Phy, int] = {
Phy.LE_1M: HCI_LE_1M_PHY_BIT,
Phy.LE_2M: HCI_LE_2M_PHY_BIT,
Phy.LE_CODED: HCI_LE_CODED_PHY_BIT,
}
class Phy(enum.IntEnum):
LE_1M = HCI_LE_1M_PHY
LE_2M = HCI_LE_2M_PHY
LE_CODED = HCI_LE_CODED_PHY
class PhyBit(enum.IntFlag):
LE_1M = 1 << HCI_LE_1M_PHY_BIT
LE_2M = 1 << HCI_LE_2M_PHY_BIT
@@ -791,6 +779,40 @@ class CsSnr(OpenIntEnum):
NOT_APPLIED = 0xFF
class CsDoneStatus(OpenIntEnum):
ALL_RESULTS_COMPLETED = 0x00
PARTIAL = 0x01
ABORTED = 0x0F
class CsProcedureAbortReason(OpenIntEnum):
NO_ABORT = 0x00
LOCAL_HOST_OR_REMOTE_REQUEST = 0x01
CHANNEL_MAP_UPDATE_INSTANT_PASSED = 0x02
UNSPECIFIED = 0x0F
class CsSubeventAbortReason(OpenIntEnum):
NO_ABORT = 0x00
LOCAL_HOST_OR_REMOTE_REQUEST = 0x01
NO_CS_SYNC_RECEIVED = 0x02
SCHEDULING_CONFLICT_OR_LIMITED_RESOURCES = 0x03
UNSPECIFIED = 0x0F
class Role(enum.IntEnum):
CENTRAL = 0
PERIPHERAL = 1
# For Backward Compatibility.
HCI_CENTRAL_ROLE = Role.CENTRAL
HCI_PERIPHERAL_ROLE = Role.PERIPHERAL
HCI_LE_1M_PHY = Phy.LE_1M
HCI_LE_2M_PHY = Phy.LE_2M
HCI_LE_CODED_PHY = Phy.LE_CODED
# Connection Parameters
HCI_CONNECTION_INTERVAL_MS_PER_UNIT = 1.25
HCI_CONNECTION_LATENCY_MS_PER_UNIT = 1.25
@@ -868,10 +890,15 @@ HCI_LINK_TYPE_NAMES = {
}
# Address types
HCI_PUBLIC_DEVICE_ADDRESS_TYPE = 0x00
HCI_RANDOM_DEVICE_ADDRESS_TYPE = 0x01
HCI_PUBLIC_IDENTITY_ADDRESS_TYPE = 0x02
HCI_RANDOM_IDENTITY_ADDRESS_TYPE = 0x03
class AddressType(OpenIntEnum):
PUBLIC_DEVICE = 0x00
RANDOM_DEVICE = 0x01
PUBLIC_IDENTITY = 0x02
RANDOM_IDENTITY = 0x03
# (Directed Only) Address is RPA, but controller cannot resolve.
UNABLE_TO_RESOLVE = 0xFE
# (Extended Only) No address.
ANONYMOUS = 0xFF
# Supported Commands Masks
# See Bluetooth spec @ 6.27 SUPPORTED COMMANDS
@@ -1561,8 +1588,8 @@ class HCI_Constant:
return HCI_ERROR_NAMES.get(status, f'0x{status:02X}')
@staticmethod
def role_name(role):
return HCI_ROLE_NAMES.get(role, str(role))
def role_name(role: int) -> str:
return Role(role).name
@staticmethod
def le_phy_name(phy):
@@ -1928,17 +1955,10 @@ class Address:
address[0] is the LSB of the address, address[5] is the MSB.
'''
PUBLIC_DEVICE_ADDRESS = 0x00
RANDOM_DEVICE_ADDRESS = 0x01
PUBLIC_IDENTITY_ADDRESS = 0x02
RANDOM_IDENTITY_ADDRESS = 0x03
ADDRESS_TYPE_NAMES = {
PUBLIC_DEVICE_ADDRESS: 'PUBLIC_DEVICE_ADDRESS',
RANDOM_DEVICE_ADDRESS: 'RANDOM_DEVICE_ADDRESS',
PUBLIC_IDENTITY_ADDRESS: 'PUBLIC_IDENTITY_ADDRESS',
RANDOM_IDENTITY_ADDRESS: 'RANDOM_IDENTITY_ADDRESS',
}
PUBLIC_DEVICE_ADDRESS = AddressType.PUBLIC_DEVICE
RANDOM_DEVICE_ADDRESS = AddressType.RANDOM_DEVICE
PUBLIC_IDENTITY_ADDRESS = AddressType.PUBLIC_IDENTITY
RANDOM_IDENTITY_ADDRESS = AddressType.RANDOM_IDENTITY
# Type declarations
NIL: Address
@@ -1948,40 +1968,44 @@ class Address:
# pylint: disable-next=unnecessary-lambda
ADDRESS_TYPE_SPEC = {'size': 1, 'mapper': lambda x: Address.address_type_name(x)}
@staticmethod
def address_type_name(address_type):
return name_or_number(Address.ADDRESS_TYPE_NAMES, address_type)
@classmethod
def address_type_name(cls: type[Self], address_type: int) -> str:
return AddressType(address_type).name
@staticmethod
def from_string_for_transport(string, transport):
@classmethod
def from_string_for_transport(
cls: type[Self], string: str, transport: PhysicalTransport
) -> Self:
if transport == BT_BR_EDR_TRANSPORT:
address_type = Address.PUBLIC_DEVICE_ADDRESS
else:
address_type = Address.RANDOM_DEVICE_ADDRESS
return Address(string, address_type)
return cls(string, address_type)
@staticmethod
def parse_address(data, offset):
@classmethod
def parse_address(cls: type[Self], data: bytes, offset: int) -> tuple[int, Self]:
# Fix the type to a default value. This is used for parsing type-less Classic
# addresses
return Address.parse_address_with_type(
data, offset, Address.PUBLIC_DEVICE_ADDRESS
)
return cls.parse_address_with_type(data, offset, Address.PUBLIC_DEVICE_ADDRESS)
@staticmethod
def parse_random_address(data, offset):
return Address.parse_address_with_type(
data, offset, Address.RANDOM_DEVICE_ADDRESS
)
@classmethod
def parse_random_address(
cls: type[Self], data: bytes, offset: int
) -> tuple[int, Self]:
return cls.parse_address_with_type(data, offset, Address.RANDOM_DEVICE_ADDRESS)
@staticmethod
def parse_address_with_type(data, offset, address_type):
return offset + 6, Address(data[offset : offset + 6], address_type)
@classmethod
def parse_address_with_type(
cls: type[Self], data: bytes, offset: int, address_type: AddressType
) -> tuple[int, Self]:
return offset + 6, cls(data[offset : offset + 6], address_type)
@staticmethod
def parse_address_preceded_by_type(data, offset):
address_type = data[offset - 1]
return Address.parse_address_with_type(data, offset, address_type)
@classmethod
def parse_address_preceded_by_type(
cls: type[Self], data: bytes, offset: int
) -> tuple[int, Self]:
address_type = AddressType(data[offset - 1])
return cls.parse_address_with_type(data, offset, address_type)
@classmethod
def generate_static_address(cls) -> Address:
@@ -2021,8 +2045,10 @@ class Address:
)
def __init__(
self, address: Union[bytes, str], address_type: int = RANDOM_DEVICE_ADDRESS
):
self,
address: Union[bytes, str],
address_type: AddressType = RANDOM_DEVICE_ADDRESS,
) -> None:
'''
Initialize an instance. `address` may be a byte array in little-endian
format, or a hex string in big-endian format (with optional ':'
@@ -4857,6 +4883,62 @@ class HCI_LE_Periodic_Advertising_Sync_Transfer_Command(HCI_Command):
'''
# -----------------------------------------------------------------------------
@HCI_Command.command(
fields=[
('connection_handle', 2),
('mode', 1),
('skip', 2),
('sync_timeout', 2),
(
'cte_type',
{
'size': 1,
'mapper': lambda x: HCI_LE_Periodic_Advertising_Report_Event.CteType(
x
).name,
},
),
],
return_parameters_fields=[
('status', STATUS_SPEC),
('connection_handle', 2),
],
)
class HCI_LE_Set_Periodic_Advertising_Sync_Transfer_Parameters_Command(HCI_Command):
'''
See Bluetooth spec @ 7.8.91 LE Set Periodic Advertising Sync Transfer Parameters command
'''
# -----------------------------------------------------------------------------
@HCI_Command.command(
fields=[
('mode', 1),
('skip', 2),
('sync_timeout', 2),
(
'cte_type',
{
'size': 1,
'mapper': lambda x: HCI_LE_Periodic_Advertising_Report_Event.CteType(
x
).name,
},
),
],
return_parameters_fields=[
('status', STATUS_SPEC),
],
)
class HCI_LE_Set_Default_Periodic_Advertising_Sync_Transfer_Parameters_Command(
HCI_Command
):
'''
See Bluetooth spec @ 7.8.92 LE Set Default Periodic Advertising Sync Transfer Parameters command
'''
# -----------------------------------------------------------------------------
@HCI_Command.command(
fields=[
@@ -6210,6 +6292,31 @@ class HCI_LE_Periodic_Advertising_Sync_Transfer_Received_Event(HCI_LE_Meta_Event
'''
# -----------------------------------------------------------------------------
@HCI_LE_Meta_Event.event(
[
('status', STATUS_SPEC),
('connection_handle', 2),
('service_data', 2),
('sync_handle', 2),
('advertising_sid', 1),
('advertiser_address_type', Address.ADDRESS_TYPE_SPEC),
('advertiser_address', Address.parse_address_preceded_by_type),
('advertiser_phy', 1),
('periodic_advertising_interval', 2),
('advertiser_clock_accuracy', 1),
('num_subevents', 1),
('subevent_interval', 1),
('response_slot_delay', 1),
('response_slot_spacing', 1),
]
)
class HCI_LE_Periodic_Advertising_Sync_Transfer_Received_V2_Event(HCI_LE_Meta_Event):
'''
See Bluetooth spec @ 7.7.65.24 LE Periodic Advertising Sync Transfer Received Event
'''
# -----------------------------------------------------------------------------
@HCI_LE_Meta_Event.event(
[
@@ -6506,7 +6613,7 @@ class HCI_LE_CS_Config_Complete_Event(HCI_LE_Meta_Event):
('config_id', 1),
('state', 1),
('tone_antenna_config_selection', 1),
('selected_tx_power', 1),
('selected_tx_power', -1),
('subevent_len', 3),
('subevents_per_event', 1),
('subevent_interval', 2),
@@ -6548,7 +6655,7 @@ class HCI_LE_CS_Procedure_Enable_Complete_Event(HCI_LE_Meta_Event):
('start_acl_conn_event_counter', 2),
('procedure_counter', 2),
('frequency_compensation', 2),
('reference_power_level', 1),
('reference_power_level', -1),
('procedure_done_status', 1),
('subevent_done_status', 1),
('abort_reason', 1),
@@ -6565,7 +6672,7 @@ class HCI_LE_CS_Subevent_Result_Event(HCI_LE_Meta_Event):
See Bluetooth spec @ 7.7.65.44 LE CS Subevent Result event
'''
status: int
connection_handle: int
config_id: int
start_acl_conn_event_counter: int
procedure_counter: int
@@ -6601,7 +6708,7 @@ class HCI_LE_CS_Subevent_Result_Continue_Event(HCI_LE_Meta_Event):
See Bluetooth spec @ 7.7.65.45 LE CS Subevent Result Continue event
'''
status: int
connection_handle: int
config_id: int
procedure_done_status: int
subevent_done_status: int
+69 -8
View File
@@ -44,6 +44,7 @@ from bumble import hci
from bumble.core import (
BT_BR_EDR_TRANSPORT,
BT_LE_TRANSPORT,
PhysicalTransport,
ConnectionPHY,
ConnectionParameters,
)
@@ -186,7 +187,11 @@ class DataPacketQueue(pyee.EventEmitter):
# -----------------------------------------------------------------------------
class Connection:
def __init__(
self, host: Host, handle: int, peer_address: hci.Address, transport: int
self,
host: Host,
handle: int,
peer_address: hci.Address,
transport: PhysicalTransport,
):
self.host = host
self.handle = handle
@@ -235,7 +240,7 @@ class Host(AbortableEventEmitter):
cis_links: Dict[int, IsoLink]
bis_links: Dict[int, IsoLink]
sco_links: Dict[int, ScoLink]
bigs: dict[int, set[int]] = {} # BIG Handle to BIS Handles
bigs: dict[int, set[int]]
acl_packet_queue: Optional[DataPacketQueue] = None
le_acl_packet_queue: Optional[DataPacketQueue] = None
iso_packet_queue: Optional[DataPacketQueue] = None
@@ -259,6 +264,7 @@ class Host(AbortableEventEmitter):
self.cis_links = {} # CIS links, by connection handle
self.bis_links = {} # BIS links, by connection handle
self.sco_links = {} # SCO links, by connection handle
self.bigs = {} # BIG Handle to BIS Handles
self.pending_command = None
self.pending_response: Optional[asyncio.Future[Any]] = None
self.number_of_supported_advertising_sets = 0
@@ -476,6 +482,12 @@ class Host(AbortableEventEmitter):
hci.HCI_LE_TRANSMIT_POWER_REPORTING_EVENT,
hci.HCI_LE_BIGINFO_ADVERTISING_REPORT_EVENT,
hci.HCI_LE_SUBRATE_CHANGE_EVENT,
hci.HCI_LE_CS_READ_REMOTE_SUPPORTED_CAPABILITIES_COMPLETE_EVENT,
hci.HCI_LE_CS_PROCEDURE_ENABLE_COMPLETE_EVENT,
hci.HCI_LE_CS_SECURITY_ENABLE_COMPLETE_EVENT,
hci.HCI_LE_CS_CONFIG_COMPLETE_EVENT,
hci.HCI_LE_CS_SUBEVENT_RESULT_EVENT,
hci.HCI_LE_CS_SUBEVENT_RESULT_CONTINUE_EVENT,
]
)
@@ -972,7 +984,7 @@ class Host(AbortableEventEmitter):
event.peer_address,
getattr(event, 'local_resolvable_private_address', None),
getattr(event, 'peer_resolvable_private_address', None),
event.role,
hci.Role(event.role),
connection_parameters,
)
else:
@@ -1055,8 +1067,10 @@ class Host(AbortableEventEmitter):
)
# Flush the data queues
self.acl_packet_queue.flush(handle)
self.le_acl_packet_queue.flush(handle)
if self.acl_packet_queue:
self.acl_packet_queue.flush(handle)
if self.le_acl_packet_queue:
self.le_acl_packet_queue.flush(handle)
if self.iso_packet_queue:
self.iso_packet_queue.flush(handle)
else:
@@ -1092,8 +1106,11 @@ class Host(AbortableEventEmitter):
# Notify the client
if event.status == hci.HCI_SUCCESS:
connection_phy = ConnectionPHY(event.tx_phy, event.rx_phy)
self.emit('connection_phy_update', connection.handle, connection_phy)
self.emit(
'connection_phy_update',
connection.handle,
ConnectionPHY(event.tx_phy, event.rx_phy),
)
else:
self.emit('connection_phy_update_failure', connection.handle, event.status)
@@ -1198,6 +1215,32 @@ class Host(AbortableEventEmitter):
self.remove_big(event.big_handle)
self.emit('big_termination', event.reason, event.big_handle)
def on_hci_le_periodic_advertising_sync_transfer_received_event(self, event):
self.emit(
'periodic_advertising_sync_transfer',
event.status,
event.connection_handle,
event.sync_handle,
event.advertising_sid,
event.advertiser_address,
event.advertiser_phy,
event.periodic_advertising_interval,
event.advertiser_clock_accuracy,
)
def on_hci_le_periodic_advertising_sync_transfer_received_v2_event(self, event):
self.emit(
'periodic_advertising_sync_transfer',
event.status,
event.connection_handle,
event.sync_handle,
event.advertising_sid,
event.advertiser_address,
event.advertiser_phy,
event.periodic_advertising_interval,
event.advertiser_clock_accuracy,
)
def on_hci_le_cis_established_event(self, event):
# The remaining parameters are unused for now.
if event.status == hci.HCI_SUCCESS:
@@ -1299,7 +1342,7 @@ class Host(AbortableEventEmitter):
f'role change for {event.bd_addr}: '
f'{hci.HCI_Constant.role_name(event.new_role)}'
)
self.emit('role_change', event.bd_addr, event.new_role)
self.emit('role_change', event.bd_addr, hci.Role(event.new_role))
else:
logger.debug(
f'role change for {event.bd_addr} failed: '
@@ -1495,5 +1538,23 @@ class Host(AbortableEventEmitter):
int.from_bytes(event.le_features, 'little'),
)
def on_hci_le_cs_read_remote_supported_capabilities_complete_event(self, event):
self.emit('cs_remote_supported_capabilities', event)
def on_hci_le_cs_security_enable_complete_event(self, event):
self.emit('cs_security', event)
def on_hci_le_cs_config_complete_event(self, event):
self.emit('cs_config', event)
def on_hci_le_cs_procedure_enable_complete_event(self, event):
self.emit('cs_procedure', event)
def on_hci_le_cs_subevent_result_event(self, event):
self.emit('cs_subevent_result', event)
def on_hci_le_cs_subevent_result_continue_event(self, event):
self.emit('cs_subevent_result_continue', event)
def on_hci_vendor_event(self, event):
self.emit('vendor_event', event)
+2 -2
View File
@@ -42,7 +42,6 @@ from typing import (
from .utils import deprecated
from .colors import color
from .core import (
BT_CENTRAL_ROLE,
InvalidStateError,
InvalidArgumentError,
InvalidPacketError,
@@ -52,6 +51,7 @@ from .core import (
from .hci import (
HCI_LE_Connection_Update_Command,
HCI_Object,
Role,
key_with_value,
name_or_number,
)
@@ -1908,7 +1908,7 @@ class ChannelManager:
def on_l2cap_connection_parameter_update_request(
self, connection: Connection, cid: int, request
):
if connection.role == BT_CENTRAL_ROLE:
if connection.role == Role.CENTRAL:
self.send_control_frame(
connection,
cid,
+2 -2
View File
@@ -20,7 +20,6 @@ import asyncio
from functools import partial
from bumble.core import (
BT_PERIPHERAL_ROLE,
BT_BR_EDR_TRANSPORT,
BT_LE_TRANSPORT,
InvalidStateError,
@@ -28,6 +27,7 @@ from bumble.core import (
from bumble.colors import color
from bumble.hci import (
Address,
Role,
HCI_SUCCESS,
HCI_CONNECTION_ACCEPT_TIMEOUT_ERROR,
HCI_CONNECTION_TIMEOUT_ERROR,
@@ -292,7 +292,7 @@ class LocalLink:
return
async def task():
if responder_role != BT_PERIPHERAL_ROLE:
if responder_role != Role.PERIPHERAL:
initiator_controller.on_classic_role_change(
responder_controller.public_address, int(not (responder_role))
)
+5 -5
View File
@@ -1,4 +1,4 @@
# Copyright 2021-2023 Google LLC
# Copyright 2021-2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -76,18 +76,18 @@ class OobData:
return instance
def to_ad(self) -> AdvertisingData:
ad_structures = []
ad_structures: list[tuple[int, bytes]] = []
if self.address is not None:
ad_structures.append(
(AdvertisingData.LE_BLUETOOTH_DEVICE_ADDRESS, bytes(self.address))
(AdvertisingData.Type.LE_BLUETOOTH_DEVICE_ADDRESS, bytes(self.address))
)
if self.role is not None:
ad_structures.append((AdvertisingData.LE_ROLE, bytes([self.role])))
ad_structures.append((AdvertisingData.Type.LE_ROLE, bytes([self.role])))
if self.shared_data is not None:
ad_structures.extend(self.shared_data.to_ad().ad_structures)
if self.legacy_context is not None:
ad_structures.append(
(AdvertisingData.SECURITY_MANAGER_TK_VALUE, self.legacy_context.tk)
(AdvertisingData.Type.SECURITY_MANAGER_TK_VALUE, self.legacy_context.tk)
)
return AdvertisingData(ad_structures)
+19 -23
View File
@@ -25,7 +25,6 @@ from .config import Config
from bumble.core import (
BT_BR_EDR_TRANSPORT,
BT_LE_TRANSPORT,
BT_PERIPHERAL_ROLE,
UUID,
AdvertisingData,
Appearance,
@@ -47,6 +46,8 @@ from bumble.hci import (
HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR,
Address,
Phy,
Role,
OwnAddressType,
)
from google.protobuf import any_pb2 # pytype: disable=pyi-error
from google.protobuf import empty_pb2 # pytype: disable=pyi-error
@@ -114,11 +115,11 @@ SECONDARY_PHY_TO_BUMBLE_PHY_MAP: Dict[SecondaryPhy, Phy] = {
SECONDARY_CODED: Phy.LE_CODED,
}
OWN_ADDRESS_MAP: Dict[host_pb2.OwnAddressType, bumble.hci.OwnAddressType] = {
host_pb2.PUBLIC: bumble.hci.OwnAddressType.PUBLIC,
host_pb2.RANDOM: bumble.hci.OwnAddressType.RANDOM,
host_pb2.RESOLVABLE_OR_PUBLIC: bumble.hci.OwnAddressType.RESOLVABLE_OR_PUBLIC,
host_pb2.RESOLVABLE_OR_RANDOM: bumble.hci.OwnAddressType.RESOLVABLE_OR_RANDOM,
OWN_ADDRESS_MAP: Dict[host_pb2.OwnAddressType, OwnAddressType] = {
host_pb2.PUBLIC: OwnAddressType.PUBLIC,
host_pb2.RANDOM: OwnAddressType.RANDOM,
host_pb2.RESOLVABLE_OR_PUBLIC: OwnAddressType.RESOLVABLE_OR_PUBLIC,
host_pb2.RESOLVABLE_OR_RANDOM: OwnAddressType.RESOLVABLE_OR_RANDOM,
}
@@ -250,7 +251,7 @@ class HostService(HostServicer):
connection = await self.device.connect(
address,
transport=BT_LE_TRANSPORT,
own_address_type=request.own_address_type,
own_address_type=OwnAddressType(request.own_address_type),
)
except ConnectionError as e:
if e.error_code == HCI_PAGE_TIMEOUT_ERROR:
@@ -371,18 +372,16 @@ class HostService(HostServicer):
scan_response_data=scan_response_data,
)
pending_connection: asyncio.Future[bumble.device.Connection] = (
asyncio.get_running_loop().create_future()
)
connections: asyncio.Queue[bumble.device.Connection] = asyncio.Queue()
if request.connectable:
def on_connection(connection: bumble.device.Connection) -> None:
if (
connection.transport == BT_LE_TRANSPORT
and connection.role == BT_PERIPHERAL_ROLE
and connection.role == Role.PERIPHERAL
):
pending_connection.set_result(connection)
connections.put_nowait(connection)
self.device.on('connection', on_connection)
@@ -397,8 +396,7 @@ class HostService(HostServicer):
await asyncio.sleep(1)
continue
connection = await pending_connection
pending_connection = asyncio.get_running_loop().create_future()
connection = await connections.get()
cookie = any_pb2.Any(value=connection.handle.to_bytes(4, 'big'))
yield AdvertiseResponse(connection=Connection(cookie=cookie))
@@ -492,14 +490,16 @@ class HostService(HostServicer):
target = Address(target_bytes, Address.RANDOM_DEVICE_ADDRESS)
advertising_type = AdvertisingType.DIRECTED_CONNECTABLE_LOW_DUTY
connections: asyncio.Queue[bumble.device.Connection] = asyncio.Queue()
if request.connectable:
def on_connection(connection: bumble.device.Connection) -> None:
if (
connection.transport == BT_LE_TRANSPORT
and connection.role == BT_PERIPHERAL_ROLE
and connection.role == Role.PERIPHERAL
):
pending_connection.set_result(connection)
connections.put_nowait(connection)
self.device.on('connection', on_connection)
@@ -510,19 +510,15 @@ class HostService(HostServicer):
await self.device.start_advertising(
target=target,
advertising_type=advertising_type,
own_address_type=request.own_address_type,
own_address_type=OwnAddressType(request.own_address_type),
)
if not request.connectable:
await asyncio.sleep(1)
continue
pending_connection: asyncio.Future[bumble.device.Connection] = (
asyncio.get_running_loop().create_future()
)
self.log.debug('Wait for LE connection...')
connection = await pending_connection
connection = await connections.get()
self.log.debug(
f"Advertise: Connected to {connection.peer_address} (handle={connection.handle})"
@@ -563,7 +559,7 @@ class HostService(HostServicer):
await self.device.start_scanning(
legacy=request.legacy,
active=not request.passive,
own_address_type=request.own_address_type,
own_address_type=OwnAddressType(request.own_address_type),
scan_interval=(
int(request.interval)
if request.interval
+2 -3
View File
@@ -24,11 +24,10 @@ from bumble import hci
from bumble.core import (
BT_BR_EDR_TRANSPORT,
BT_LE_TRANSPORT,
BT_PERIPHERAL_ROLE,
ProtocolError,
)
from bumble.device import Connection as BumbleConnection, Device
from bumble.hci import HCI_Error
from bumble.hci import HCI_Error, Role
from bumble.utils import EventWatcher
from bumble.pairing import PairingConfig, PairingDelegate as BasePairingDelegate
from google.protobuf import any_pb2 # pytype: disable=pyi-error
@@ -318,7 +317,7 @@ class SecurityService(SecurityServicer):
if (
connection.transport == BT_LE_TRANSPORT
and connection.role == BT_PERIPHERAL_ROLE
and connection.role == Role.PERIPHERAL
):
connection.request_pairing()
else:
+2 -2
View File
@@ -20,11 +20,11 @@ import inspect
import logging
from bumble.device import Device
from bumble.hci import Address
from bumble.hci import Address, AddressType
from google.protobuf.message import Message # pytype: disable=pyi-error
from typing import Any, Dict, Generator, MutableMapping, Optional, Tuple
ADDRESS_TYPES: Dict[str, int] = {
ADDRESS_TYPES: Dict[str, AddressType] = {
"public": Address.PUBLIC_DEVICE_ADDRESS,
"random": Address.RANDOM_DEVICE_ADDRESS,
"public_identity": Address.PUBLIC_IDENTITY_ADDRESS,
+33 -23
View File
@@ -24,16 +24,13 @@ import struct
from dataclasses import dataclass
from typing import Optional
from bumble import gatt
from bumble.device import Connection
from bumble.att import ATT_Error
from bumble.gatt import (
Attribute,
Characteristic,
SerializableCharacteristicAdapter,
PackedCharacteristicAdapter,
TemplateService,
CharacteristicValue,
UTF8CharacteristicAdapter,
GATT_AUDIO_INPUT_CONTROL_SERVICE,
GATT_AUDIO_INPUT_STATE_CHARACTERISTIC,
GATT_GAIN_SETTINGS_ATTRIBUTE_CHARACTERISTIC,
@@ -42,6 +39,14 @@ from bumble.gatt import (
GATT_AUDIO_INPUT_CONTROL_POINT_CHARACTERISTIC,
GATT_AUDIO_INPUT_DESCRIPTION_CHARACTERISTIC,
)
from bumble.gatt_adapters import (
CharacteristicProxy,
PackedCharacteristicProxyAdapter,
SerializableCharacteristicAdapter,
SerializableCharacteristicProxyAdapter,
UTF8CharacteristicAdapter,
UTF8CharacteristicProxyAdapter,
)
from bumble.gatt_client import ProfileServiceProxy, ServiceProxy
from bumble.utils import OpenIntEnum
@@ -124,7 +129,7 @@ class AudioInputState:
mute: Mute = Mute.NOT_MUTED
gain_mode: GainMode = GainMode.MANUAL
change_counter: int = 0
attribute_value: Optional[CharacteristicValue] = None
attribute: Optional[Attribute] = None
def __bytes__(self) -> bytes:
return bytes(
@@ -151,10 +156,8 @@ class AudioInputState:
self.change_counter = (self.change_counter + 1) % (CHANGE_COUNTER_MAX_VALUE + 1)
async def notify_subscribers_via_connection(self, connection: Connection) -> None:
assert self.attribute_value is not None
await connection.device.notify_subscribers(
attribute=self.attribute_value, value=bytes(self)
)
assert self.attribute is not None
await connection.device.notify_subscribers(attribute=self.attribute)
@dataclass
@@ -315,24 +318,28 @@ class AudioInputDescription:
'''
audio_input_description: str = "Bluetooth"
attribute_value: Optional[CharacteristicValue] = None
attribute: Optional[Attribute] = None
def on_read(self, _connection: Optional[Connection]) -> str:
return self.audio_input_description
async def on_write(self, connection: Optional[Connection], value: str) -> None:
assert connection
assert self.attribute_value
assert self.attribute
self.audio_input_description = value
await connection.device.notify_subscribers(
attribute=self.attribute_value, value=value
)
await connection.device.notify_subscribers(attribute=self.attribute)
class AICSService(TemplateService):
UUID = GATT_AUDIO_INPUT_CONTROL_SERVICE
audio_input_state_characteristic: Characteristic[AudioInputState]
audio_input_type_characteristic: Characteristic[bytes]
audio_input_status_characteristic: Characteristic[bytes]
audio_input_control_point_characteristic: Characteristic[bytes]
gain_settings_properties_characteristic: Characteristic[GainSettingsProperties]
def __init__(
self,
audio_input_state: Optional[AudioInputState] = None,
@@ -374,9 +381,7 @@ class AICSService(TemplateService):
),
AudioInputState,
)
self.audio_input_state.attribute_value = (
self.audio_input_state_characteristic.value
)
self.audio_input_state.attribute = self.audio_input_state_characteristic
self.gain_settings_properties_characteristic = (
SerializableCharacteristicAdapter(
@@ -425,8 +430,8 @@ class AICSService(TemplateService):
),
)
)
self.audio_input_description.attribute_value = (
self.audio_input_control_point_characteristic.value
self.audio_input_description.attribute = (
self.audio_input_control_point_characteristic
)
super().__init__(
@@ -448,24 +453,29 @@ class AICSService(TemplateService):
class AICSServiceProxy(ProfileServiceProxy):
SERVICE_CLASS = AICSService
audio_input_state: CharacteristicProxy[AudioInputState]
gain_settings_properties: CharacteristicProxy[GainSettingsProperties]
audio_input_status: CharacteristicProxy[int]
audio_input_control_point: CharacteristicProxy[bytes]
def __init__(self, service_proxy: ServiceProxy) -> None:
self.service_proxy = service_proxy
self.audio_input_state = SerializableCharacteristicAdapter(
self.audio_input_state = SerializableCharacteristicProxyAdapter(
service_proxy.get_required_characteristic_by_uuid(
GATT_AUDIO_INPUT_STATE_CHARACTERISTIC
),
AudioInputState,
)
self.gain_settings_properties = SerializableCharacteristicAdapter(
self.gain_settings_properties = SerializableCharacteristicProxyAdapter(
service_proxy.get_required_characteristic_by_uuid(
GATT_GAIN_SETTINGS_ATTRIBUTE_CHARACTERISTIC
),
GainSettingsProperties,
)
self.audio_input_status = PackedCharacteristicAdapter(
self.audio_input_status = PackedCharacteristicProxyAdapter(
service_proxy.get_required_characteristic_by_uuid(
GATT_AUDIO_INPUT_STATUS_CHARACTERISTIC
),
@@ -478,7 +488,7 @@ class AICSServiceProxy(ProfileServiceProxy):
)
)
self.audio_input_description = UTF8CharacteristicAdapter(
self.audio_input_description = UTF8CharacteristicProxyAdapter(
service_proxy.get_required_characteristic_by_uuid(
GATT_AUDIO_INPUT_DESCRIPTION_CHARACTERISTIC
)
+2 -1
View File
@@ -301,7 +301,7 @@ class AseStateMachine(gatt.Characteristic):
presentation_delay = 0
# Additional parameters in ENABLING, STREAMING, DISABLING State
metadata = le_audio.Metadata()
metadata: le_audio.Metadata
def __init__(
self,
@@ -313,6 +313,7 @@ class AseStateMachine(gatt.Characteristic):
self.ase_id = ase_id
self._state = AseStateMachine.State.IDLE
self.role = role
self.metadata = le_audio.Metadata()
uuid = (
gatt.GATT_SINK_ASE_CHARACTERISTIC
+11 -9
View File
@@ -134,12 +134,14 @@ class AshaService(gatt.TemplateService):
),
)
self.audio_control_point_characteristic = gatt.Characteristic(
gatt.GATT_ASHA_AUDIO_CONTROL_POINT_CHARACTERISTIC,
gatt.Characteristic.Properties.WRITE
| gatt.Characteristic.Properties.WRITE_WITHOUT_RESPONSE,
gatt.Characteristic.WRITEABLE,
gatt.CharacteristicValue(write=self._on_audio_control_point_write),
self.audio_control_point_characteristic: gatt.Characteristic[bytes] = (
gatt.Characteristic(
gatt.GATT_ASHA_AUDIO_CONTROL_POINT_CHARACTERISTIC,
gatt.Characteristic.Properties.WRITE
| gatt.Characteristic.Properties.WRITE_WITHOUT_RESPONSE,
gatt.Characteristic.WRITEABLE,
gatt.CharacteristicValue(write=self._on_audio_control_point_write),
)
)
self.audio_status_characteristic = gatt.Characteristic(
gatt.GATT_ASHA_AUDIO_STATUS_CHARACTERISTIC,
@@ -147,7 +149,7 @@ class AshaService(gatt.TemplateService):
gatt.Characteristic.READABLE,
bytes([AudioStatus.OK]),
)
self.volume_characteristic = gatt.Characteristic(
self.volume_characteristic: gatt.Characteristic[bytes] = gatt.Characteristic(
gatt.GATT_ASHA_VOLUME_CHARACTERISTIC,
gatt.Characteristic.Properties.WRITE_WITHOUT_RESPONSE,
gatt.Characteristic.WRITEABLE,
@@ -166,13 +168,13 @@ class AshaService(gatt.TemplateService):
struct.pack('<H', self.psm),
)
characteristics = [
characteristics = (
self.read_only_properties_characteristic,
self.audio_control_point_characteristic,
self.audio_status_characteristic,
self.volume_characteristic,
self.le_psm_out_characteristic,
]
)
super().__init__(characteristics)
+8 -5
View File
@@ -20,11 +20,12 @@ from __future__ import annotations
import dataclasses
import logging
import struct
from typing import ClassVar, List, Optional, Sequence
from typing import ClassVar, Optional, Sequence
from bumble import core
from bumble import device
from bumble import gatt
from bumble import gatt_adapters
from bumble import gatt_client
from bumble import hci
from bumble import utils
@@ -52,7 +53,7 @@ def encode_subgroups(subgroups: Sequence[SubgroupInfo]) -> bytes:
)
def decode_subgroups(data: bytes) -> List[SubgroupInfo]:
def decode_subgroups(data: bytes) -> list[SubgroupInfo]:
num_subgroups = data[0]
offset = 1
subgroups = []
@@ -273,7 +274,7 @@ class BroadcastReceiveState:
pa_sync_state: PeriodicAdvertisingSyncState
big_encryption: BigEncryption
bad_code: bytes
subgroups: List[SubgroupInfo]
subgroups: list[SubgroupInfo]
@classmethod
def from_bytes(cls, data: bytes) -> BroadcastReceiveState:
@@ -354,7 +355,9 @@ class BroadcastAudioScanServiceProxy(gatt_client.ProfileServiceProxy):
SERVICE_CLASS = BroadcastAudioScanService
broadcast_audio_scan_control_point: gatt_client.CharacteristicProxy
broadcast_receive_states: List[gatt.DelegatedCharacteristicAdapter]
broadcast_receive_states: list[
gatt_client.CharacteristicProxy[Optional[BroadcastReceiveState]]
]
def __init__(self, service_proxy: gatt_client.ServiceProxy):
self.service_proxy = service_proxy
@@ -366,7 +369,7 @@ class BroadcastAudioScanServiceProxy(gatt_client.ProfileServiceProxy):
)
self.broadcast_receive_states = [
gatt.DelegatedCharacteristicAdapter(
gatt_adapters.DelegatedCharacteristicProxyAdapter(
characteristic,
decode=lambda x: BroadcastReceiveState.from_bytes(x) if x else None,
)
+13 -3
View File
@@ -16,14 +16,20 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from ..gatt_client import ProfileServiceProxy
from ..gatt import (
from typing import Optional
from bumble.gatt_client import ProfileServiceProxy
from bumble.gatt import (
GATT_BATTERY_SERVICE,
GATT_BATTERY_LEVEL_CHARACTERISTIC,
TemplateService,
Characteristic,
CharacteristicValue,
)
from bumble.gatt_client import CharacteristicProxy
from bumble.gatt_adapters import (
PackedCharacteristicAdapter,
PackedCharacteristicProxyAdapter,
)
@@ -32,6 +38,8 @@ class BatteryService(TemplateService):
UUID = GATT_BATTERY_SERVICE
BATTERY_LEVEL_FORMAT = 'B'
battery_level_characteristic: Characteristic[int]
def __init__(self, read_battery_level):
self.battery_level_characteristic = PackedCharacteristicAdapter(
Characteristic(
@@ -49,13 +57,15 @@ class BatteryService(TemplateService):
class BatteryServiceProxy(ProfileServiceProxy):
SERVICE_CLASS = BatteryService
battery_level: Optional[CharacteristicProxy[int]]
def __init__(self, service_proxy):
self.service_proxy = service_proxy
if characteristics := service_proxy.get_characteristics_by_uuid(
GATT_BATTERY_LEVEL_CHARACTERISTIC
):
self.battery_level = PackedCharacteristicAdapter(
self.battery_level = PackedCharacteristicProxyAdapter(
characteristics[0], pack_format=BatteryService.BATTERY_LEVEL_FORMAT
)
else:
+16 -14
View File
@@ -19,7 +19,6 @@
import struct
from typing import Optional, Tuple
from bumble.gatt_client import ServiceProxy, ProfileServiceProxy, CharacteristicProxy
from bumble.gatt import (
GATT_DEVICE_INFORMATION_SERVICE,
GATT_FIRMWARE_REVISION_STRING_CHARACTERISTIC,
@@ -32,9 +31,12 @@ from bumble.gatt import (
GATT_REGULATORY_CERTIFICATION_DATA_LIST_CHARACTERISTIC,
TemplateService,
Characteristic,
DelegatedCharacteristicAdapter,
UTF8CharacteristicAdapter,
)
from bumble.gatt_adapters import (
DelegatedCharacteristicProxyAdapter,
UTF8CharacteristicProxyAdapter,
)
from bumble.gatt_client import CharacteristicProxy, ProfileServiceProxy, ServiceProxy
# -----------------------------------------------------------------------------
@@ -62,7 +64,7 @@ class DeviceInformationService(TemplateService):
ieee_regulatory_certification_data_list: Optional[bytes] = None,
# TODO: pnp_id
):
characteristics = [
characteristics: list[Characteristic[bytes]] = [
Characteristic(
uuid,
Characteristic.Properties.READ,
@@ -107,14 +109,14 @@ class DeviceInformationService(TemplateService):
class DeviceInformationServiceProxy(ProfileServiceProxy):
SERVICE_CLASS = DeviceInformationService
manufacturer_name: Optional[UTF8CharacteristicAdapter]
model_number: Optional[UTF8CharacteristicAdapter]
serial_number: Optional[UTF8CharacteristicAdapter]
hardware_revision: Optional[UTF8CharacteristicAdapter]
firmware_revision: Optional[UTF8CharacteristicAdapter]
software_revision: Optional[UTF8CharacteristicAdapter]
system_id: Optional[DelegatedCharacteristicAdapter]
ieee_regulatory_certification_data_list: Optional[CharacteristicProxy]
manufacturer_name: Optional[CharacteristicProxy[str]]
model_number: Optional[CharacteristicProxy[str]]
serial_number: Optional[CharacteristicProxy[str]]
hardware_revision: Optional[CharacteristicProxy[str]]
firmware_revision: Optional[CharacteristicProxy[str]]
software_revision: Optional[CharacteristicProxy[str]]
system_id: Optional[CharacteristicProxy[tuple[int, int]]]
ieee_regulatory_certification_data_list: Optional[CharacteristicProxy[bytes]]
def __init__(self, service_proxy: ServiceProxy):
self.service_proxy = service_proxy
@@ -128,7 +130,7 @@ class DeviceInformationServiceProxy(ProfileServiceProxy):
('software_revision', GATT_SOFTWARE_REVISION_STRING_CHARACTERISTIC),
):
if characteristics := service_proxy.get_characteristics_by_uuid(uuid):
characteristic = UTF8CharacteristicAdapter(characteristics[0])
characteristic = UTF8CharacteristicProxyAdapter(characteristics[0])
else:
characteristic = None
self.__setattr__(field, characteristic)
@@ -136,7 +138,7 @@ class DeviceInformationServiceProxy(ProfileServiceProxy):
if characteristics := service_proxy.get_characteristics_by_uuid(
GATT_SYSTEM_ID_CHARACTERISTIC
):
self.system_id = DelegatedCharacteristicAdapter(
self.system_id = DelegatedCharacteristicProxyAdapter(
characteristics[0],
encode=lambda v: DeviceInformationService.pack_system_id(*v),
decode=DeviceInformationService.unpack_system_id,
+12 -8
View File
@@ -25,14 +25,15 @@ from bumble.core import Appearance
from bumble.gatt import (
TemplateService,
Characteristic,
CharacteristicAdapter,
DelegatedCharacteristicAdapter,
UTF8CharacteristicAdapter,
GATT_GENERIC_ACCESS_SERVICE,
GATT_DEVICE_NAME_CHARACTERISTIC,
GATT_APPEARANCE_CHARACTERISTIC,
)
from bumble.gatt_client import ProfileServiceProxy, ServiceProxy
from bumble.gatt_adapters import (
DelegatedCharacteristicProxyAdapter,
UTF8CharacteristicProxyAdapter,
)
from bumble.gatt_client import CharacteristicProxy, ProfileServiceProxy, ServiceProxy
# -----------------------------------------------------------------------------
# Logging
@@ -49,6 +50,9 @@ logger = logging.getLogger(__name__)
class GenericAccessService(TemplateService):
UUID = GATT_GENERIC_ACCESS_SERVICE
device_name_characteristic: Characteristic[bytes]
appearance_characteristic: Characteristic[bytes]
def __init__(
self, device_name: str, appearance: Union[Appearance, Tuple[int, int], int] = 0
):
@@ -84,8 +88,8 @@ class GenericAccessService(TemplateService):
class GenericAccessServiceProxy(ProfileServiceProxy):
SERVICE_CLASS = GenericAccessService
device_name: Optional[CharacteristicAdapter]
appearance: Optional[DelegatedCharacteristicAdapter]
device_name: Optional[CharacteristicProxy[str]]
appearance: Optional[CharacteristicProxy[Appearance]]
def __init__(self, service_proxy: ServiceProxy):
self.service_proxy = service_proxy
@@ -93,14 +97,14 @@ class GenericAccessServiceProxy(ProfileServiceProxy):
if characteristics := service_proxy.get_characteristics_by_uuid(
GATT_DEVICE_NAME_CHARACTERISTIC
):
self.device_name = UTF8CharacteristicAdapter(characteristics[0])
self.device_name = UTF8CharacteristicProxyAdapter(characteristics[0])
else:
self.device_name = None
if characteristics := service_proxy.get_characteristics_by_uuid(
GATT_APPEARANCE_CHARACTERISTIC
):
self.appearance = DelegatedCharacteristicAdapter(
self.appearance = DelegatedCharacteristicProxyAdapter(
characteristics[0],
decode=lambda value: Appearance.from_int(
struct.unpack_from('<H', value, 0)[0],
+167
View File
@@ -0,0 +1,167 @@
# Copyright 2021-2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import struct
from typing import TYPE_CHECKING
from bumble import att
from bumble import gatt
from bumble import gatt_client
from bumble import crypto
if TYPE_CHECKING:
from bumble import device
# -----------------------------------------------------------------------------
class GenericAttributeProfileService(gatt.TemplateService):
'''See Vol 3, Part G - 7 - DEFINED GENERIC ATTRIBUTE PROFILE SERVICE.'''
UUID = gatt.GATT_GENERIC_ATTRIBUTE_SERVICE
client_supported_features_characteristic: gatt.Characteristic | None = None
server_supported_features_characteristic: gatt.Characteristic | None = None
database_hash_characteristic: gatt.Characteristic | None = None
service_changed_characteristic: gatt.Characteristic | None = None
def __init__(
self,
server_supported_features: gatt.ServerSupportedFeatures | None = None,
database_hash_enabled: bool = True,
service_change_enabled: bool = True,
) -> None:
if server_supported_features is not None:
self.server_supported_features_characteristic = gatt.Characteristic(
uuid=gatt.GATT_SERVER_SUPPORTED_FEATURES_CHARACTERISTIC,
properties=gatt.Characteristic.Properties.READ,
permissions=gatt.Characteristic.Permissions.READABLE,
value=bytes([server_supported_features]),
)
if database_hash_enabled:
self.database_hash_characteristic = gatt.Characteristic(
uuid=gatt.GATT_DATABASE_HASH_CHARACTERISTIC,
properties=gatt.Characteristic.Properties.READ,
permissions=gatt.Characteristic.Permissions.READABLE,
value=gatt.CharacteristicValue(read=self.get_database_hash),
)
if service_change_enabled:
self.service_changed_characteristic = gatt.Characteristic(
uuid=gatt.GATT_SERVICE_CHANGED_CHARACTERISTIC,
properties=gatt.Characteristic.Properties.INDICATE,
permissions=gatt.Characteristic.Permissions(0),
value=b'',
)
if (database_hash_enabled and service_change_enabled) or (
server_supported_features
and (
server_supported_features & gatt.ServerSupportedFeatures.EATT_SUPPORTED
)
): # TODO: Support Multiple Handle Value Notifications
self.client_supported_features_characteristic = gatt.Characteristic(
uuid=gatt.GATT_CLIENT_SUPPORTED_FEATURES_CHARACTERISTIC,
properties=(
gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.WRITE
),
permissions=(
gatt.Characteristic.Permissions.READABLE
| gatt.Characteristic.Permissions.WRITEABLE
),
value=bytes(1),
)
super().__init__(
characteristics=[
c
for c in (
self.service_changed_characteristic,
self.client_supported_features_characteristic,
self.database_hash_characteristic,
self.server_supported_features_characteristic,
)
if c is not None
],
primary=True,
)
@classmethod
def get_attribute_data(cls, attribute: att.Attribute) -> bytes:
if attribute.type in (
gatt.GATT_PRIMARY_SERVICE_ATTRIBUTE_TYPE,
gatt.GATT_SECONDARY_SERVICE_ATTRIBUTE_TYPE,
gatt.GATT_INCLUDE_ATTRIBUTE_TYPE,
gatt.GATT_CHARACTERISTIC_ATTRIBUTE_TYPE,
gatt.GATT_CHARACTERISTIC_EXTENDED_PROPERTIES_DESCRIPTOR,
):
assert isinstance(attribute.value, bytes)
return (
struct.pack("<H", attribute.handle)
+ attribute.type.to_bytes()
+ attribute.value
)
elif attribute.type in (
gatt.GATT_CHARACTERISTIC_USER_DESCRIPTION_DESCRIPTOR,
gatt.GATT_CLIENT_CHARACTERISTIC_CONFIGURATION_DESCRIPTOR,
gatt.GATT_SERVER_CHARACTERISTIC_CONFIGURATION_DESCRIPTOR,
gatt.GATT_CHARACTERISTIC_PRESENTATION_FORMAT_DESCRIPTOR,
gatt.GATT_CHARACTERISTIC_AGGREGATE_FORMAT_DESCRIPTOR,
):
return struct.pack("<H", attribute.handle) + attribute.type.to_bytes()
return b''
def get_database_hash(self, connection: device.Connection | None) -> bytes:
assert connection
m = b''.join(
[
self.get_attribute_data(attribute)
for attribute in connection.device.gatt_server.attributes
]
)
return crypto.aes_cmac(m=m, k=bytes(16))
class GenericAttributeProfileServiceProxy(gatt_client.ProfileServiceProxy):
SERVICE_CLASS = GenericAttributeProfileService
client_supported_features_characteristic: gatt_client.CharacteristicProxy | None = (
None
)
server_supported_features_characteristic: gatt_client.CharacteristicProxy | None = (
None
)
database_hash_characteristic: gatt_client.CharacteristicProxy | None = None
service_changed_characteristic: gatt_client.CharacteristicProxy | None = None
_CHARACTERISTICS = {
gatt.GATT_CLIENT_SUPPORTED_FEATURES_CHARACTERISTIC: 'client_supported_features_characteristic',
gatt.GATT_SERVER_SUPPORTED_FEATURES_CHARACTERISTIC: 'server_supported_features_characteristic',
gatt.GATT_DATABASE_HASH_CHARACTERISTIC: 'database_hash_characteristic',
gatt.GATT_SERVICE_CHANGED_CHARACTERISTIC: 'service_changed_characteristic',
}
def __init__(self, service_proxy: gatt_client.ServiceProxy) -> None:
self.service_proxy = service_proxy
for uuid, attribute_name in self._CHARACTERISTICS.items():
if characteristics := self.service_proxy.get_characteristics_by_uuid(uuid):
setattr(self, attribute_name, characteristics[0])
+16 -11
View File
@@ -22,7 +22,6 @@ from typing import Optional
from bumble.gatt import (
TemplateService,
DelegatedCharacteristicAdapter,
Characteristic,
GATT_GAMING_AUDIO_SERVICE,
GATT_GMAP_ROLE_CHARACTERISTIC,
@@ -31,7 +30,8 @@ from bumble.gatt import (
GATT_BGS_FEATURES_CHARACTERISTIC,
GATT_BGR_FEATURES_CHARACTERISTIC,
)
from bumble.gatt_client import ProfileServiceProxy, ServiceProxy
from bumble.gatt_adapters import DelegatedCharacteristicProxyAdapter
from bumble.gatt_client import CharacteristicProxy, ProfileServiceProxy, ServiceProxy
from enum import IntFlag
@@ -150,10 +150,15 @@ class GamingAudioService(TemplateService):
class GamingAudioServiceProxy(ProfileServiceProxy):
SERVICE_CLASS = GamingAudioService
ugg_features: Optional[CharacteristicProxy[UggFeatures]] = None
ugt_features: Optional[CharacteristicProxy[UgtFeatures]] = None
bgs_features: Optional[CharacteristicProxy[BgsFeatures]] = None
bgr_features: Optional[CharacteristicProxy[BgrFeatures]] = None
def __init__(self, service_proxy: ServiceProxy) -> None:
self.service_proxy = service_proxy
self.gmap_role = DelegatedCharacteristicAdapter(
self.gmap_role = DelegatedCharacteristicProxyAdapter(
service_proxy.get_required_characteristic_by_uuid(
GATT_GMAP_ROLE_CHARACTERISTIC
),
@@ -163,31 +168,31 @@ class GamingAudioServiceProxy(ProfileServiceProxy):
if characteristics := service_proxy.get_characteristics_by_uuid(
GATT_UGG_FEATURES_CHARACTERISTIC
):
self.ugg_features = DelegatedCharacteristicAdapter(
characteristic=characteristics[0],
self.ugg_features = DelegatedCharacteristicProxyAdapter(
characteristics[0],
decode=lambda value: UggFeatures(value[0]),
)
if characteristics := service_proxy.get_characteristics_by_uuid(
GATT_UGT_FEATURES_CHARACTERISTIC
):
self.ugt_features = DelegatedCharacteristicAdapter(
characteristic=characteristics[0],
self.ugt_features = DelegatedCharacteristicProxyAdapter(
characteristics[0],
decode=lambda value: UgtFeatures(value[0]),
)
if characteristics := service_proxy.get_characteristics_by_uuid(
GATT_BGS_FEATURES_CHARACTERISTIC
):
self.bgs_features = DelegatedCharacteristicAdapter(
characteristic=characteristics[0],
self.bgs_features = DelegatedCharacteristicProxyAdapter(
characteristics[0],
decode=lambda value: BgsFeatures(value[0]),
)
if characteristics := service_proxy.get_characteristics_by_uuid(
GATT_BGR_FEATURES_CHARACTERISTIC
):
self.bgr_features = DelegatedCharacteristicAdapter(
characteristic=characteristics[0],
self.bgr_features = DelegatedCharacteristicProxyAdapter(
characteristics[0],
decode=lambda value: BgrFeatures(value[0]),
)
+8 -6
View File
@@ -18,14 +18,15 @@
from __future__ import annotations
import asyncio
import functools
from bumble import att, gatt, gatt_client
from dataclasses import dataclass, field
import logging
from typing import Any, Dict, List, Optional, Set, Union
from bumble import att, gatt, gatt_adapters, gatt_client
from bumble.core import InvalidArgumentError, InvalidStateError
from bumble.device import Device, Connection
from bumble.utils import AsyncRunner, OpenIntEnum
from bumble.hci import Address
from dataclasses import dataclass, field
import logging
from typing import Any, Dict, List, Optional, Set, Union
# -----------------------------------------------------------------------------
@@ -631,11 +632,12 @@ class HearingAccessServiceProxy(gatt_client.ProfileServiceProxy):
hearing_aid_preset_control_point: gatt_client.CharacteristicProxy
preset_control_point_indications: asyncio.Queue
active_preset_index_notification: asyncio.Queue
def __init__(self, service_proxy: gatt_client.ServiceProxy) -> None:
self.service_proxy = service_proxy
self.server_features = gatt.PackedCharacteristicAdapter(
self.server_features = gatt_adapters.PackedCharacteristicProxyAdapter(
service_proxy.get_characteristics_by_uuid(
gatt.GATT_HEARING_AID_FEATURES_CHARACTERISTIC
)[0],
@@ -648,7 +650,7 @@ class HearingAccessServiceProxy(gatt_client.ProfileServiceProxy):
)[0]
)
self.active_preset_index = gatt.PackedCharacteristicAdapter(
self.active_preset_index = gatt_adapters.PackedCharacteristicProxyAdapter(
service_proxy.get_characteristics_by_uuid(
gatt.GATT_ACTIVE_PRESET_INDEX_CHARACTERISTIC
)[0],
+20 -4
View File
@@ -16,13 +16,14 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
from enum import IntEnum
import struct
from typing import Optional
from bumble import core
from ..gatt_client import ProfileServiceProxy
from ..att import ATT_Error
from ..gatt import (
from bumble.att import ATT_Error
from bumble.gatt import (
GATT_HEART_RATE_SERVICE,
GATT_HEART_RATE_MEASUREMENT_CHARACTERISTIC,
GATT_BODY_SENSOR_LOCATION_CHARACTERISTIC,
@@ -30,10 +31,13 @@ from ..gatt import (
TemplateService,
Characteristic,
CharacteristicValue,
SerializableCharacteristicAdapter,
)
from bumble.gatt_adapters import (
DelegatedCharacteristicAdapter,
PackedCharacteristicAdapter,
SerializableCharacteristicAdapter,
)
from bumble.gatt_client import CharacteristicProxy, ProfileServiceProxy
# -----------------------------------------------------------------------------
@@ -43,6 +47,10 @@ class HeartRateService(TemplateService):
CONTROL_POINT_NOT_SUPPORTED = 0x80
RESET_ENERGY_EXPENDED = 0x01
heart_rate_measurement_characteristic: Characteristic[HeartRateMeasurement]
body_sensor_location_characteristic: Characteristic[BodySensorLocation]
heart_rate_control_point_characteristic: Characteristic[int]
class BodySensorLocation(IntEnum):
OTHER = 0
CHEST = 1
@@ -198,6 +206,14 @@ class HeartRateService(TemplateService):
class HeartRateServiceProxy(ProfileServiceProxy):
SERVICE_CLASS = HeartRateService
heart_rate_measurement: Optional[
CharacteristicProxy[HeartRateService.HeartRateMeasurement]
]
body_sensor_location: Optional[
CharacteristicProxy[HeartRateService.BodySensorLocation]
]
heart_rate_control_point: Optional[CharacteristicProxy[int]]
def __init__(self, service_proxy):
self.service_proxy = service_proxy
+11 -9
View File
@@ -208,7 +208,7 @@ class MediaControlService(gatt.TemplateService):
properties=gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.NOTIFY,
permissions=gatt.Characteristic.Permissions.READ_REQUIRES_ENCRYPTION,
value=media_player_name or 'Bumble Player',
value=(media_player_name or 'Bumble Player').encode(),
)
self.track_changed_characteristic = gatt.Characteristic(
uuid=gatt.GATT_TRACK_CHANGED_CHARACTERISTIC,
@@ -247,14 +247,16 @@ class MediaControlService(gatt.TemplateService):
permissions=gatt.Characteristic.Permissions.READ_REQUIRES_ENCRYPTION,
value=b'',
)
self.media_control_point_characteristic = gatt.Characteristic(
uuid=gatt.GATT_MEDIA_CONTROL_POINT_CHARACTERISTIC,
properties=gatt.Characteristic.Properties.WRITE
| gatt.Characteristic.Properties.WRITE_WITHOUT_RESPONSE
| gatt.Characteristic.Properties.NOTIFY,
permissions=gatt.Characteristic.Permissions.READ_REQUIRES_ENCRYPTION
| gatt.Characteristic.Permissions.WRITE_REQUIRES_ENCRYPTION,
value=gatt.CharacteristicValue(write=self.on_media_control_point),
self.media_control_point_characteristic: gatt.Characteristic[bytes] = (
gatt.Characteristic(
uuid=gatt.GATT_MEDIA_CONTROL_POINT_CHARACTERISTIC,
properties=gatt.Characteristic.Properties.WRITE
| gatt.Characteristic.Properties.WRITE_WITHOUT_RESPONSE
| gatt.Characteristic.Properties.NOTIFY,
permissions=gatt.Characteristic.Permissions.READ_REQUIRES_ENCRYPTION
| gatt.Characteristic.Permissions.WRITE_REQUIRES_ENCRYPTION,
value=gatt.CharacteristicValue(write=self.on_media_control_point),
)
)
self.media_control_point_opcodes_supported_characteristic = gatt.Characteristic(
uuid=gatt.GATT_MEDIA_CONTROL_POINT_OPCODES_SUPPORTED_CHARACTERISTIC,
+37 -24
View File
@@ -25,6 +25,7 @@ from typing import Optional, Sequence, Union
from bumble.profiles.bap import AudioLocation, CodecSpecificCapabilities, ContextType
from bumble.profiles import le_audio
from bumble import gatt
from bumble import gatt_adapters
from bumble import gatt_client
from bumble import hci
@@ -185,34 +186,42 @@ class PublishedAudioCapabilitiesService(gatt.TemplateService):
class PublishedAudioCapabilitiesServiceProxy(gatt_client.ProfileServiceProxy):
SERVICE_CLASS = PublishedAudioCapabilitiesService
sink_pac: Optional[gatt.DelegatedCharacteristicAdapter] = None
sink_audio_locations: Optional[gatt.DelegatedCharacteristicAdapter] = None
source_pac: Optional[gatt.DelegatedCharacteristicAdapter] = None
source_audio_locations: Optional[gatt.DelegatedCharacteristicAdapter] = None
available_audio_contexts: gatt.DelegatedCharacteristicAdapter
supported_audio_contexts: gatt.DelegatedCharacteristicAdapter
sink_pac: Optional[gatt_client.CharacteristicProxy[list[PacRecord]]] = None
sink_audio_locations: Optional[gatt_client.CharacteristicProxy[AudioLocation]] = (
None
)
source_pac: Optional[gatt_client.CharacteristicProxy[list[PacRecord]]] = None
source_audio_locations: Optional[gatt_client.CharacteristicProxy[AudioLocation]] = (
None
)
available_audio_contexts: gatt_client.CharacteristicProxy[tuple[ContextType, ...]]
supported_audio_contexts: gatt_client.CharacteristicProxy[tuple[ContextType, ...]]
def __init__(self, service_proxy: gatt_client.ServiceProxy):
self.service_proxy = service_proxy
self.available_audio_contexts = gatt.DelegatedCharacteristicAdapter(
service_proxy.get_required_characteristic_by_uuid(
gatt.GATT_AVAILABLE_AUDIO_CONTEXTS_CHARACTERISTIC
),
decode=lambda x: tuple(map(ContextType, struct.unpack('<HH', x))),
self.available_audio_contexts = (
gatt_adapters.DelegatedCharacteristicProxyAdapter(
service_proxy.get_required_characteristic_by_uuid(
gatt.GATT_AVAILABLE_AUDIO_CONTEXTS_CHARACTERISTIC
),
decode=lambda x: tuple(map(ContextType, struct.unpack('<HH', x))),
)
)
self.supported_audio_contexts = gatt.DelegatedCharacteristicAdapter(
service_proxy.get_required_characteristic_by_uuid(
gatt.GATT_SUPPORTED_AUDIO_CONTEXTS_CHARACTERISTIC
),
decode=lambda x: tuple(map(ContextType, struct.unpack('<HH', x))),
self.supported_audio_contexts = (
gatt_adapters.DelegatedCharacteristicProxyAdapter(
service_proxy.get_required_characteristic_by_uuid(
gatt.GATT_SUPPORTED_AUDIO_CONTEXTS_CHARACTERISTIC
),
decode=lambda x: tuple(map(ContextType, struct.unpack('<HH', x))),
)
)
if characteristics := service_proxy.get_characteristics_by_uuid(
gatt.GATT_SINK_PAC_CHARACTERISTIC
):
self.sink_pac = gatt.DelegatedCharacteristicAdapter(
self.sink_pac = gatt_adapters.DelegatedCharacteristicProxyAdapter(
characteristics[0],
decode=PacRecord.list_from_bytes,
)
@@ -220,7 +229,7 @@ class PublishedAudioCapabilitiesServiceProxy(gatt_client.ProfileServiceProxy):
if characteristics := service_proxy.get_characteristics_by_uuid(
gatt.GATT_SOURCE_PAC_CHARACTERISTIC
):
self.source_pac = gatt.DelegatedCharacteristicAdapter(
self.source_pac = gatt_adapters.DelegatedCharacteristicProxyAdapter(
characteristics[0],
decode=PacRecord.list_from_bytes,
)
@@ -228,15 +237,19 @@ class PublishedAudioCapabilitiesServiceProxy(gatt_client.ProfileServiceProxy):
if characteristics := service_proxy.get_characteristics_by_uuid(
gatt.GATT_SINK_AUDIO_LOCATION_CHARACTERISTIC
):
self.sink_audio_locations = gatt.DelegatedCharacteristicAdapter(
characteristics[0],
decode=lambda x: AudioLocation(struct.unpack('<I', x)[0]),
self.sink_audio_locations = (
gatt_adapters.DelegatedCharacteristicProxyAdapter(
characteristics[0],
decode=lambda x: AudioLocation(struct.unpack('<I', x)[0]),
)
)
if characteristics := service_proxy.get_characteristics_by_uuid(
gatt.GATT_SOURCE_AUDIO_LOCATION_CHARACTERISTIC
):
self.source_audio_locations = gatt.DelegatedCharacteristicAdapter(
characteristics[0],
decode=lambda x: AudioLocation(struct.unpack('<I', x)[0]),
self.source_audio_locations = (
gatt_adapters.DelegatedCharacteristicProxyAdapter(
characteristics[0],
decode=lambda x: AudioLocation(struct.unpack('<I', x)[0]),
)
)
+1 -1
View File
@@ -40,7 +40,7 @@ class PublicBroadcastAnnouncement:
def from_bytes(cls, data: bytes) -> Self:
features = cls.Features(data[0])
metadata_length = data[1]
metadata_ltv = data[1 : 1 + metadata_length]
metadata_ltv = data[2 : 2 + metadata_length]
return cls(
features=features, metadata=le_audio.Metadata.from_bytes(metadata_ltv)
)
+6 -4
View File
@@ -24,11 +24,11 @@ import struct
from bumble.gatt import (
TemplateService,
Characteristic,
DelegatedCharacteristicAdapter,
GATT_TELEPHONY_AND_MEDIA_AUDIO_SERVICE,
GATT_TMAP_ROLE_CHARACTERISTIC,
)
from bumble.gatt_client import ProfileServiceProxy, ServiceProxy
from bumble.gatt_adapters import DelegatedCharacteristicProxyAdapter
from bumble.gatt_client import CharacteristicProxy, ProfileServiceProxy, ServiceProxy
# -----------------------------------------------------------------------------
@@ -53,6 +53,8 @@ class Role(enum.IntFlag):
class TelephonyAndMediaAudioService(TemplateService):
UUID = GATT_TELEPHONY_AND_MEDIA_AUDIO_SERVICE
role_characteristic: Characteristic[bytes]
def __init__(self, role: Role):
self.role_characteristic = Characteristic(
GATT_TMAP_ROLE_CHARACTERISTIC,
@@ -68,12 +70,12 @@ class TelephonyAndMediaAudioService(TemplateService):
class TelephonyAndMediaAudioServiceProxy(ProfileServiceProxy):
SERVICE_CLASS = TelephonyAndMediaAudioService
role: DelegatedCharacteristicAdapter
role: CharacteristicProxy[Role]
def __init__(self, service_proxy: ServiceProxy):
self.service_proxy = service_proxy
self.role = DelegatedCharacteristicAdapter(
self.role = DelegatedCharacteristicProxyAdapter(
service_proxy.get_required_characteristic_by_uuid(
GATT_TMAP_ROLE_CHARACTERISTIC
),
+6 -5
View File
@@ -25,6 +25,7 @@ from typing import Optional, Sequence
from bumble import att
from bumble import device
from bumble import gatt
from bumble import gatt_adapters
from bumble import gatt_client
@@ -209,14 +210,14 @@ class VolumeControlService(gatt.TemplateService):
class VolumeControlServiceProxy(gatt_client.ProfileServiceProxy):
SERVICE_CLASS = VolumeControlService
volume_control_point: gatt_client.CharacteristicProxy
volume_state: gatt.SerializableCharacteristicAdapter
volume_flags: gatt.DelegatedCharacteristicAdapter
volume_control_point: gatt_client.CharacteristicProxy[bytes]
volume_state: gatt_client.CharacteristicProxy[VolumeState]
volume_flags: gatt_client.CharacteristicProxy[VolumeFlags]
def __init__(self, service_proxy: gatt_client.ServiceProxy) -> None:
self.service_proxy = service_proxy
self.volume_state = gatt.SerializableCharacteristicAdapter(
self.volume_state = gatt_adapters.SerializableCharacteristicProxyAdapter(
service_proxy.get_required_characteristic_by_uuid(
gatt.GATT_VOLUME_STATE_CHARACTERISTIC
),
@@ -227,7 +228,7 @@ class VolumeControlServiceProxy(gatt_client.ProfileServiceProxy):
gatt.GATT_VOLUME_CONTROL_POINT_CHARACTERISTIC
)
self.volume_flags = gatt.DelegatedCharacteristicAdapter(
self.volume_flags = gatt_adapters.DelegatedCharacteristicProxyAdapter(
service_proxy.get_required_characteristic_by_uuid(
gatt.GATT_VOLUME_FLAGS_CHARACTERISTIC
),
+48 -40
View File
@@ -24,17 +24,19 @@ from bumble.device import Connection
from bumble.att import ATT_Error
from bumble.gatt import (
Characteristic,
DelegatedCharacteristicAdapter,
TemplateService,
CharacteristicValue,
SerializableCharacteristicAdapter,
UTF8CharacteristicAdapter,
GATT_VOLUME_OFFSET_CONTROL_SERVICE,
GATT_VOLUME_OFFSET_STATE_CHARACTERISTIC,
GATT_AUDIO_LOCATION_CHARACTERISTIC,
GATT_VOLUME_OFFSET_CONTROL_POINT_CHARACTERISTIC,
GATT_AUDIO_OUTPUT_DESCRIPTION_CHARACTERISTIC,
)
from bumble.gatt_adapters import (
DelegatedCharacteristicProxyAdapter,
SerializableCharacteristicProxyAdapter,
UTF8CharacteristicProxyAdapter,
)
from bumble.gatt_client import ProfileServiceProxy, ServiceProxy
from bumble.utils import OpenIntEnum
from bumble.profiles.bap import AudioLocation
@@ -67,7 +69,7 @@ class ErrorCode(OpenIntEnum):
class VolumeOffsetState:
volume_offset: int = 0
change_counter: int = 0
attribute_value: Optional[CharacteristicValue] = None
attribute: Optional[Characteristic] = None
def __bytes__(self) -> bytes:
return struct.pack('<hB', self.volume_offset, self.change_counter)
@@ -81,8 +83,8 @@ class VolumeOffsetState:
self.change_counter = (self.change_counter + 1) % (CHANGE_COUNTER_MAX_VALUE + 1)
async def notify_subscribers_via_connection(self, connection: Connection) -> None:
assert self.attribute_value is not None
await connection.device.notify_subscribers(attribute=self.attribute_value)
assert self.attribute is not None
await connection.device.notify_subscribers(attribute=self.attribute)
def on_read(self, _connection: Optional[Connection]) -> bytes:
return bytes(self)
@@ -91,7 +93,7 @@ class VolumeOffsetState:
@dataclass
class VocsAudioLocation:
audio_location: AudioLocation = AudioLocation.NOT_ALLOWED
attribute_value: Optional[CharacteristicValue] = None
attribute: Optional[Characteristic] = None
def __bytes__(self) -> bytes:
return struct.pack('<I', self.audio_location)
@@ -106,10 +108,10 @@ class VocsAudioLocation:
async def on_write(self, connection: Optional[Connection], value: bytes) -> None:
assert connection
assert self.attribute_value
assert self.attribute
self.audio_location = AudioLocation(int.from_bytes(value, 'little'))
await connection.device.notify_subscribers(attribute=self.attribute_value)
await connection.device.notify_subscribers(attribute=self.attribute)
@dataclass
@@ -148,7 +150,7 @@ class VolumeOffsetControlPoint:
@dataclass
class AudioOutputDescription:
audio_output_description: str = ''
attribute_value: Optional[CharacteristicValue] = None
attribute: Optional[Characteristic] = None
@classmethod
def from_bytes(cls, data: bytes):
@@ -162,10 +164,10 @@ class AudioOutputDescription:
async def on_write(self, connection: Optional[Connection], value: bytes) -> None:
assert connection
assert self.attribute_value
assert self.attribute
self.audio_output_description = value.decode('utf-8')
await connection.device.notify_subscribers(attribute=self.attribute_value)
await connection.device.notify_subscribers(attribute=self.attribute)
# -----------------------------------------------------------------------------
@@ -197,7 +199,7 @@ class VolumeOffsetControlService(TemplateService):
VolumeOffsetControlPoint(self.volume_offset_state)
)
self.volume_offset_state_characteristic = Characteristic(
self.volume_offset_state_characteristic: Characteristic[bytes] = Characteristic(
uuid=GATT_VOLUME_OFFSET_STATE_CHARACTERISTIC,
properties=(
Characteristic.Properties.READ | Characteristic.Properties.NOTIFY
@@ -206,7 +208,7 @@ class VolumeOffsetControlService(TemplateService):
value=CharacteristicValue(read=self.volume_offset_state.on_read),
)
self.audio_location_characteristic = Characteristic(
self.audio_location_characteristic: Characteristic[bytes] = Characteristic(
uuid=GATT_AUDIO_LOCATION_CHARACTERISTIC,
properties=(
Characteristic.Properties.READ
@@ -222,33 +224,39 @@ class VolumeOffsetControlService(TemplateService):
write=self.audio_location.on_write,
),
)
self.audio_location.attribute_value = self.audio_location_characteristic.value
self.audio_location.attribute = self.audio_location_characteristic
self.volume_offset_control_point_characteristic = Characteristic(
uuid=GATT_VOLUME_OFFSET_CONTROL_POINT_CHARACTERISTIC,
properties=Characteristic.Properties.WRITE,
permissions=Characteristic.Permissions.WRITE_REQUIRES_ENCRYPTION,
value=CharacteristicValue(write=self.volume_offset_control_point.on_write),
self.volume_offset_control_point_characteristic: Characteristic[bytes] = (
Characteristic(
uuid=GATT_VOLUME_OFFSET_CONTROL_POINT_CHARACTERISTIC,
properties=Characteristic.Properties.WRITE,
permissions=Characteristic.Permissions.WRITE_REQUIRES_ENCRYPTION,
value=CharacteristicValue(
write=self.volume_offset_control_point.on_write
),
)
)
self.audio_output_description_characteristic = Characteristic(
uuid=GATT_AUDIO_OUTPUT_DESCRIPTION_CHARACTERISTIC,
properties=(
Characteristic.Properties.READ
| Characteristic.Properties.NOTIFY
| Characteristic.Properties.WRITE_WITHOUT_RESPONSE
),
permissions=(
Characteristic.Permissions.READ_REQUIRES_ENCRYPTION
| Characteristic.Permissions.WRITE_REQUIRES_ENCRYPTION
),
value=CharacteristicValue(
read=self.audio_output_description.on_read,
write=self.audio_output_description.on_write,
),
self.audio_output_description_characteristic: Characteristic[bytes] = (
Characteristic(
uuid=GATT_AUDIO_OUTPUT_DESCRIPTION_CHARACTERISTIC,
properties=(
Characteristic.Properties.READ
| Characteristic.Properties.NOTIFY
| Characteristic.Properties.WRITE_WITHOUT_RESPONSE
),
permissions=(
Characteristic.Permissions.READ_REQUIRES_ENCRYPTION
| Characteristic.Permissions.WRITE_REQUIRES_ENCRYPTION
),
value=CharacteristicValue(
read=self.audio_output_description.on_read,
write=self.audio_output_description.on_write,
),
)
)
self.audio_output_description.attribute_value = (
self.audio_output_description_characteristic.value
self.audio_output_description.attribute = (
self.audio_output_description_characteristic
)
super().__init__(
@@ -271,14 +279,14 @@ class VolumeOffsetControlServiceProxy(ProfileServiceProxy):
def __init__(self, service_proxy: ServiceProxy) -> None:
self.service_proxy = service_proxy
self.volume_offset_state = SerializableCharacteristicAdapter(
self.volume_offset_state = SerializableCharacteristicProxyAdapter(
service_proxy.get_required_characteristic_by_uuid(
GATT_VOLUME_OFFSET_STATE_CHARACTERISTIC
),
VolumeOffsetState,
)
self.audio_location = DelegatedCharacteristicAdapter(
self.audio_location = DelegatedCharacteristicProxyAdapter(
service_proxy.get_required_characteristic_by_uuid(
GATT_AUDIO_LOCATION_CHARACTERISTIC
),
@@ -292,7 +300,7 @@ class VolumeOffsetControlServiceProxy(ProfileServiceProxy):
)
)
self.audio_output_description = UTF8CharacteristicAdapter(
self.audio_output_description = UTF8CharacteristicProxyAdapter(
service_proxy.get_required_characteristic_by_uuid(
GATT_AUDIO_OUTPUT_DESCRIPTION_CHARACTERISTIC
)
+1 -1
View File
@@ -130,7 +130,7 @@ SDP_ATTRIBUTE_ID_NAMES = {
SDP_PUBLIC_BROWSE_ROOT = core.UUID.from_16_bits(0x1002, 'PublicBrowseRoot')
# To be used in searches where an attribute ID list allows a range to be specified
SDP_ALL_ATTRIBUTES_RANGE = (0x0000FFFF, 4) # Express this as tuple so we can convey the desired encoding size
SDP_ALL_ATTRIBUTES_RANGE = (0x0000, 0xFFFF)
# fmt: on
# pylint: enable=line-too-long
+4 -4
View File
@@ -46,13 +46,13 @@ from pyee import EventEmitter
from .colors import color
from .hci import (
Address,
Role,
HCI_LE_Enable_Encryption_Command,
HCI_Object,
key_with_value,
)
from .core import (
BT_BR_EDR_TRANSPORT,
BT_CENTRAL_ROLE,
BT_LE_TRANSPORT,
AdvertisingData,
InvalidArgumentError,
@@ -1326,7 +1326,7 @@ class Session:
self.connection.abort_on('disconnection', self.on_pairing())
def on_connection_encryption_change(self) -> None:
if self.connection.is_encrypted:
if self.connection.is_encrypted and not self.completed:
if self.is_responder:
# The responder distributes its keys first, the initiator later
self.distribute_keys()
@@ -1975,7 +1975,7 @@ class Manager(EventEmitter):
# Look for a session with this connection, and create one if none exists
if not (session := self.sessions.get(connection.handle)):
if connection.role == BT_CENTRAL_ROLE:
if connection.role == Role.CENTRAL:
logger.warning('Remote starts pairing as Peripheral!')
pairing_config = self.pairing_config_factory(connection)
session = self.session_proxy(
@@ -1995,7 +1995,7 @@ class Manager(EventEmitter):
async def pair(self, connection: Connection) -> None:
# TODO: check if there's already a session for this connection
if connection.role != BT_CENTRAL_ROLE:
if connection.role != Role.CENTRAL:
logger.warning('Start pairing as Peripheral!')
pairing_config = self.pairing_config_factory(connection)
session = self.session_proxy(
+5 -1
View File
@@ -139,6 +139,7 @@ class PacketParser:
packet_type
) or self.extended_packet_info.get(packet_type)
if self.packet_info is None:
self.reset()
raise core.InvalidPacketError(
f'invalid packet type {packet_type}'
)
@@ -302,7 +303,10 @@ class ParserSource(BaseSource):
# -----------------------------------------------------------------------------
class StreamPacketSource(asyncio.Protocol, ParserSource):
def data_received(self, data: bytes) -> None:
self.parser.feed_data(data)
try:
self.parser.feed_data(data)
except core.InvalidPacketError:
logger.warning("invalid packet, ignoring data")
# -----------------------------------------------------------------------------
+1 -3
View File
@@ -115,9 +115,7 @@ async def open_usb_transport(spec: str) -> Transport:
self.acl_out = acl_out
self.acl_out_transfer = device.getTransfer()
self.acl_out_transfer_ready = asyncio.Semaphore(1)
self.packets: asyncio.Queue[bytes] = (
asyncio.Queue()
) # Queue of packets waiting to be sent
self.packets = asyncio.Queue[bytes]() # Queue of packets waiting to be sent
self.loop = asyncio.get_running_loop()
self.queue_task = None
self.cancel_done = self.loop.create_future()
+12 -2
View File
@@ -447,7 +447,7 @@ def deprecated(msg: str):
def wrapper(function):
@functools.wraps(function)
def inner(*args, **kwargs):
warnings.warn(msg, DeprecationWarning)
warnings.warn(msg, DeprecationWarning, stacklevel=2)
return function(*args, **kwargs)
return inner
@@ -464,7 +464,7 @@ def experimental(msg: str):
def wrapper(function):
@functools.wraps(function)
def inner(*args, **kwargs):
warnings.warn(msg, FutureWarning)
warnings.warn(msg, FutureWarning, stacklevel=2)
return function(*args, **kwargs)
return inner
@@ -502,3 +502,13 @@ class ByteSerializable(Protocol):
def from_bytes(cls, data: bytes) -> Self: ...
def __bytes__(self) -> bytes: ...
# -----------------------------------------------------------------------------
class IntConvertible(Protocol):
"""
Type protocol for classes that can be instantiated from int and converted to int.
"""
def __init__(self, value: int) -> None: ...
def __int__(self) -> int: ...
+202
View File
@@ -0,0 +1,202 @@
AURACAST TOOL
=============
The "auracast" tool implements commands that implement broadcasting, receiving
and controlling LE Audio broadcasts.
=== "Running as an installed package"
```
$ bumble-auracast
```
=== "Running from source"
```
$ python3 apps/auracast.py <args>
```
# Python Dependencies
Try installing the optional `[auracast]` dependencies:
=== "From source"
```bash
$ python3 -m pip install ".[auracast]"
```
=== "From PyPI"
```bash
$ python3 -m pip install "bumble[auracast]"
```
## LC3
The `auracast` app depends on the `lc3` python module, which is available
either as PyPI module (currently only available for Linux x86_64).
When installing Bumble with the optional `auracast` dependency, the `lc3`
module will be installed from the `lc3py` PyPI package if available.
If not, you will need to install it separately. This can be done with:
```bash
$ python3 -m pip install "git+https://github.com/google/liblc3.git"
```
## SoundDevice
The `sounddevice` module is required for audio output to the host's sound
output device(s) and/or input from the host's input device(s).
If not installed, the `auracast` app is still functional, but will be limited
to non-device inputs and output (files, external processes, ...)
On macOS and Windows, the `sounddevice` module gets installed with the
native PortAudio libraries included.
For Linux, however, PortAudio must be installed separately.
This is typically done with a command like:
```bash
$ sudo apt install libportaudio2
```
Visit the [sounddevice documentation](https://python-sounddevice.readthedocs.io/)
for details.
# General Usage
```
Usage: bumble-auracast [OPTIONS] COMMAND [ARGS]...
Options:
--help Show this message and exit.
Commands:
assist Scan for broadcasts on behalf of an audio server
pair Pair with an audio server
receive Receive a broadcast source
scan Scan for public broadcasts
transmit Transmit a broadcast source
```
Use `bumble-auracast <command> --help` to get more detailed usage information
for a specific `<command>`.
## `assist`
Act as a broadcast assistant.
Use `bumble-auracast assist --help` for details on the commands and options.
The assistant commands are:
### `monitor-state`
Subscribe to the state characteristic and monitor changes.
### `add-source`
Add a broadcast source. This will instruct the device to start
receiving a broadcast.
### `modify-source`
Modify a broadcast source.
### `remove-source`
Remote a broadcast source.
## `pair`
Pair with a device.
## `receive`
Receive a broadcast source.
The `--output` option specifies where to send the decoded audio samples.
The following outputs are supported:
### Sound Device
The `--output` argument is either `device`, to send the audio to the hosts's default sound device, or `device:<DEVICE_ID>` where `<DEVICE_ID>`
is the integer ID of one of the available sound devices.
When invoked with `--output "device:?"`, a list of available devices and
their IDs is printed out.
### Standard Output
With `--output stdout`, the decoded audio samples are written to the
standard output (currently always as float32 PCM samples)
### FFPlay
With `--output ffplay`, the decoded audio samples are piped to `ffplay`
in a child process. This option is only available if `ffplay` is a command that is available on the host.
### File
With `--output <filename>` or `--output file:<filename>`, the decoded audio
samples are written to a file (currently always as float32 PCM)
## `transmit`
Broadcast an audio source as a transmitter.
The `--input` and `--input-format` options specify what audio input
source to transmit.
The following inputs are supported:
### Sound Device
The `--input` argument is either `device`, to use the host's default sound
device (typically a builtin microphone), or `device:<DEVICE_ID>` where
`<DEVICE_ID>` is the integer ID of one of the available sound devices.
When invoked with `--input "device:?"`, a list of available devices and their
IDs is printed out.
### Standard Input
With `--input stdout`, the audio samples are read from the standard input.
(currently always as int16 PCM).
### File
With `--input <filename>` or `--input file:<filename>`, the audio samples
are read from a .wav or raw PCM file.
Use the `--input-format <FORMAT>` option to specify the format of the audio
samples in raw PCM files. `<FORMAT>` is expressed as:
`<sample-type>,<sample-rate>,<channels>`
(the only supported <sample-type> currently is 'int16le' for 16 bit signed integers with little-endian byte order)
## `scan`
Scan for public broadcasts.
A live display of the available broadcasts is displayed continuously.
# Compatibility With Some Products
The `auracast` app has been tested for compatibility with a few products.
The list is still very limited. Please let us know if there are products
that are not working well, or if there are specific instructions that should
be shared to allow better compatibiity with certain products.
## Transmitters
The `receive` command has been tested to successfully receive broadcasts from
the following transmitters:
* JBL GO 4
* Flairmesh FlooGoo FMA120
* Eppfun AK3040Pro Max
* HIGHGAZE BA-25T
* Nexum Audio VOCE and USB dongle
## Receivers
### Pixel Buds Pro 2
The Pixel Buds Pro 2 can be used as a broadcast receiver, controlled by the
`auracast assist` command, instructing the buds to receive a broadcast.
Use the `assist --command add-source` command to tell the buds to receive a
broadcast.
Use the `assist --command monitor-state` command to monitor the current sync/receive
state of the buds.
### JBL
The JBL GO 4 and other JBL products that support the Auracast feature can be used
as transmitters or receivers.
When running in receiver mode (pressing the Auracast button while not already playing),
the JBL speaker will scan for broadcast advertisements with a specific manufacturer data.
Use the `--manufacturer-data` option of the `transmit` command in order to include data
that will let the speaker recognize the broadcast as a compatible source.
The manufacturer ID for JBL is 87.
Using an option like `--manufacturer-data 87:00000000000000000000000000000000dffd` should work (tested on the
JBL GO 4. The `dffd` value at the end of the payload may be different on other models?).
### Others
* Nexum Audio VOCE and USB dongle
+9
View File
@@ -0,0 +1,9 @@
{
"name": "Bumble CS Initiator",
"address": "F0:F1:F2:F3:F4:F5",
"advertising_interval": 100,
"keystore": "JsonKeyStore",
"irk": "865F81FF5A8B486EAAE29A27AD9F77DC",
"identity_address_type": 1,
"channel_sounding_enabled": true
}
+9
View File
@@ -0,0 +1,9 @@
{
"name": "Bumble CS Reflector",
"address": "F0:F1:F2:F3:F4:F6",
"advertising_interval": 100,
"keystore": "JsonKeyStore",
"irk": "0c7d74db03a1c98e7be691f76141d53d",
"identity_address_type": 1,
"channel_sounding_enabled": true
}
+4 -1
View File
@@ -102,7 +102,6 @@ async def main() -> None:
)
# Notify subscribers of the current value as soon as they subscribe
@heart_rate_service.heart_rate_measurement_characteristic.on('subscription')
def on_subscription(connection, notify_enabled, indicate_enabled):
if notify_enabled or indicate_enabled:
AsyncRunner.spawn(
@@ -112,6 +111,10 @@ async def main() -> None:
)
)
heart_rate_service.heart_rate_measurement_characteristic.on(
'subscription', on_subscription
)
# Go!
await device.power_on()
await device.start_advertising(auto_restart=True)
+24 -2
View File
@@ -28,7 +28,7 @@ class OneDeviceBenchTest(base_test.BaseTestClass):
def test_l2cap_client_ping(self):
runner = self.dut.bench.runL2capClient(
"ping", "4B:2A:67:76:2B:E3", 128, True, 100, 970, 100
"ping", "4B:2A:67:76:2B:E3", 128, True, 100, 970, 100, "HIGH"
)
print("### Initial status:", runner)
final_status = self.dut.bench.waitForRunnerCompletion(runner["id"])
@@ -36,12 +36,34 @@ class OneDeviceBenchTest(base_test.BaseTestClass):
def test_l2cap_client_send(self):
runner = self.dut.bench.runL2capClient(
"send", "7E:90:D0:F2:7A:11", 131, True, 100, 970, 0
"send",
"F1:F1:F1:F1:F1:F1",
128,
True,
100,
970,
0,
"HIGH",
10000,
)
print("### Initial status:", runner)
final_status = self.dut.bench.waitForRunnerCompletion(runner["id"])
print("### Final status:", final_status)
def test_gatt_client_send(self):
runner = self.dut.bench.runGattClient(
"send", "F1:F1:F1:F1:F1:F1", 128, True, 100, 970, 100, "HIGH"
)
print("### Initial status:", runner)
final_status = self.dut.bench.waitForRunnerCompletion(runner["id"])
print("### Final status:", final_status)
def test_gatt_server_receive(self):
runner = self.dut.bench.runGattServer("receive")
print("### Initial status:", runner)
final_status = self.dut.bench.waitForRunnerCompletion(runner["id"])
print("### Final status:", final_status)
if __name__ == "__main__":
test_runner.main()
+3 -3
View File
@@ -2,8 +2,8 @@ TestBeds:
- Name: BenchTestBed
Controllers:
AndroidDevice:
- serial: 37211FDJG000DJ
- serial: emulator-5554
local_bt_address: 94:45:60:5E:03:B0
- serial: 23071FDEE001F7
local_bt_address: DC:E5:5B:E5:51:2C
#- serial: 23071FDEE001F7
# local_bt_address: DC:E5:5B:E5:51:2C
+154
View File
@@ -0,0 +1,154 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import asyncio
import logging
import sys
import os
import functools
from bumble import core
from bumble import hci
from bumble.device import Connection, Device, ChannelSoundingCapabilities
from bumble.transport import open_transport_or_link
# From https://cs.android.com/android/platform/superproject/main/+/main:packages/modules/Bluetooth/system/gd/hci/distance_measurement_manager.cc.
CS_TONE_ANTENNA_CONFIG_MAPPING_TABLE = [
[0, 4, 5, 6],
[1, 7, 7, 7],
[2, 7, 7, 7],
[3, 7, 7, 7],
]
CS_PREFERRED_PEER_ANTENNA_MAPPING_TABLE = [1, 1, 1, 1, 3, 7, 15, 3]
CS_ANTENNA_PERMUTATION_ARRAY = [
[1, 2, 3, 4],
[2, 1, 3, 4],
[1, 3, 2, 4],
[3, 1, 2, 4],
[3, 2, 1, 4],
[2, 3, 1, 4],
[1, 2, 4, 3],
[2, 1, 4, 3],
[1, 4, 2, 3],
[4, 1, 2, 3],
[4, 2, 1, 3],
[2, 4, 1, 3],
[1, 4, 3, 2],
[4, 1, 3, 2],
[1, 3, 4, 2],
[3, 1, 4, 2],
[3, 4, 1, 2],
[4, 3, 1, 2],
[4, 2, 3, 1],
[2, 4, 3, 1],
[4, 3, 2, 1],
[3, 4, 2, 1],
[3, 2, 4, 1],
[2, 3, 4, 1],
]
# -----------------------------------------------------------------------------
async def main() -> None:
if len(sys.argv) < 3:
print(
'Usage: run_channel_sounding.py <config-file> <transport-spec-for-device>'
'[target_address](If missing, run as reflector)'
)
print('example: run_channel_sounding.py cs_reflector.json usb:0')
print(
'example: run_channel_sounding.py cs_initiator.json usb:0 F0:F1:F2:F3:F4:F5'
)
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected')
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
await device.power_on()
assert (local_cs_capabilities := device.cs_capabilities)
if len(sys.argv) == 3:
print('<<< Start Advertising')
await device.start_advertising(
own_address_type=hci.OwnAddressType.RANDOM, auto_restart=True
)
def on_cs_capabilities(
connection: Connection, capabilities: ChannelSoundingCapabilities
):
del capabilities
print('<<< Set CS Settings')
asyncio.create_task(device.set_default_cs_settings(connection))
device.on(
'connection',
lambda connection: connection.on(
'channel_sounding_capabilities',
functools.partial(on_cs_capabilities, connection),
),
)
else:
target_address = hci.Address(sys.argv[3])
print(f'<<< Connecting to {target_address}')
connection = await device.connect(
target_address, transport=core.BT_LE_TRANSPORT
)
print('<<< ACL Connected')
if not (await device.get_long_term_key(connection.handle, b'', 0)):
print('<<< No bond, start pairing')
await connection.pair()
print('<<< Pairing complete')
print('<<< Encrypting Connection')
await connection.encrypt()
print('<<< Getting remote CS Capabilities...')
remote_capabilities = await device.get_remote_cs_capabilities(connection)
print('<<< Set CS Settings...')
await device.set_default_cs_settings(connection)
print('<<< Set CS Config...')
config = await device.create_cs_config(connection)
print('<<< Enable CS Security...')
await device.enable_cs_security(connection)
tone_antenna_config_selection = CS_TONE_ANTENNA_CONFIG_MAPPING_TABLE[
local_cs_capabilities.num_antennas_supported - 1
][remote_capabilities.num_antennas_supported - 1]
print('<<< Set CS Procedure Parameters...')
await device.set_cs_procedure_parameters(
connection=connection,
config=config,
tone_antenna_config_selection=tone_antenna_config_selection,
preferred_peer_antenna=CS_PREFERRED_PEER_ANTENNA_MAPPING_TABLE[
tone_antenna_config_selection
],
)
print('<<< Enable CS Procedure...')
await device.enable_cs_procedure(connection=connection, config=config)
await hci_transport.source.terminated
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
asyncio.run(main())
+3 -3
View File
@@ -70,13 +70,13 @@ async def main() -> None:
descriptor = Descriptor(
GATT_CHARACTERISTIC_USER_DESCRIPTION_DESCRIPTOR,
Descriptor.READABLE,
'My Description',
'My Description'.encode(),
)
manufacturer_name_characteristic = Characteristic(
manufacturer_name_characteristic = Characteristic[bytes](
GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC,
Characteristic.Properties.READ,
Characteristic.READABLE,
"Fitbit",
"Fitbit".encode(),
[descriptor],
)
device_info_service = Service(
+2 -2
View File
@@ -94,13 +94,13 @@ async def main() -> None:
descriptor = Descriptor(
GATT_CHARACTERISTIC_USER_DESCRIPTION_DESCRIPTOR,
Descriptor.READABLE,
'My Description',
'My Description'.encode(),
)
manufacturer_name_characteristic = Characteristic(
GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC,
Characteristic.Properties.READ,
Characteristic.READABLE,
'Fitbit',
'Fitbit'.encode(),
[descriptor],
)
device_info_service = Service(
+243 -126
View File
@@ -1,4 +1,4 @@
# Copyright 2024 Google LLC
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -18,6 +18,8 @@
from __future__ import annotations
import asyncio
import dataclasses
import functools
import enum
import logging
import os
import random
@@ -28,6 +30,8 @@ from typing import Any, List, Union
from bumble.device import Device, Peer
from bumble import transport
from bumble import gatt
from bumble import gatt_adapters
from bumble import gatt_client
from bumble import hci
from bumble import core
@@ -36,6 +40,9 @@ from bumble import core
SERVICE_UUID = core.UUID("50DB505C-8AC4-4738-8448-3B1D9CC09CC5")
CHARACTERISTIC_UUID_BASE = "D901B45B-4916-412E-ACCA-0000000000"
DEFAULT_CLIENT_ADDRESS = "F0:F1:F2:F3:F4:F5"
DEFAULT_SERVER_ADDRESS = "F1:F2:F3:F4:F5:F6"
# -----------------------------------------------------------------------------
@dataclasses.dataclass
@@ -65,6 +72,12 @@ class CustomClass:
return struct.pack(">II", self.a, self.b)
# -----------------------------------------------------------------------------
class CustomEnum(enum.IntEnum):
FOO = 1234
BAR = 5678
# -----------------------------------------------------------------------------
async def client(device: Device, address: hci.Address) -> None:
print(f'=== Connecting to {address}...')
@@ -78,8 +91,8 @@ async def client(device: Device, address: hci.Address) -> None:
print("*** Discovery complete")
service = peer.get_services_by_uuid(SERVICE_UUID)[0]
characteristics = []
for index in range(1, 9):
characteristics: list[gatt_client.CharacteristicProxy] = []
for index in range(1, 10):
characteristics.append(
service.get_characteristics_by_uuid(
core.UUID(CHARACTERISTIC_UUID_BASE + f"{index:02X}")
@@ -91,59 +104,92 @@ async def client(device: Device, address: hci.Address) -> None:
value = await characteristic.read_value()
print(f"### {characteristic} = {value!r} ({value.hex()})")
# Subscribe to all characteristics as a raw bytes listener.
def on_raw_characteristic_update(characteristic, value):
print(f"^^^ Update[RAW] {characteristic.uuid} value = {value.hex()}")
for characteristic in characteristics:
await characteristic.subscribe(
functools.partial(on_raw_characteristic_update, characteristic)
)
# Function to subscribe to adapted characteristics
def on_adapted_characteristic_update(characteristic, value):
print(
f"^^^ Update[ADAPTED] {characteristic.uuid} value = {value!r}, "
f"type={type(value)}"
)
# Static characteristic with a bytes value.
c1 = characteristics[0]
c1_value = await c1.read_value()
print(f"@@@ C1 {c1} value = {c1_value!r} (type={type(c1_value)})")
await c1.write_value("happy π day".encode("utf-8"))
await c1.subscribe(functools.partial(on_adapted_characteristic_update, c1))
# Static characteristic with a string value.
c2 = gatt.UTF8CharacteristicAdapter(characteristics[1])
c2 = gatt_adapters.UTF8CharacteristicProxyAdapter(characteristics[1])
c2_value = await c2.read_value()
print(f"@@@ C2 {c2} value = {c2_value} (type={type(c2_value)})")
await c2.write_value("happy π day")
await c2.subscribe(functools.partial(on_adapted_characteristic_update, c2))
# Static characteristic with a tuple value.
c3 = gatt.PackedCharacteristicAdapter(characteristics[2], ">III")
c3 = gatt_adapters.PackedCharacteristicProxyAdapter(characteristics[2], ">III")
c3_value = await c3.read_value()
print(f"@@@ C3 {c3} value = {c3_value} (type={type(c3_value)})")
await c3.write_value((2001, 2002, 2003))
await c3.subscribe(functools.partial(on_adapted_characteristic_update, c3))
# Static characteristic with a named tuple value.
c4 = gatt.MappedCharacteristicAdapter(
c4 = gatt_adapters.MappedCharacteristicProxyAdapter(
characteristics[3], ">III", ["f1", "f2", "f3"]
)
c4_value = await c4.read_value()
print(f"@@@ C4 {c4} value = {c4_value} (type={type(c4_value)})")
await c4.write_value({"f1": 4001, "f2": 4002, "f3": 4003})
await c4.subscribe(functools.partial(on_adapted_characteristic_update, c4))
# Static characteristic with a serializable value.
c5 = gatt.SerializableCharacteristicAdapter(
c5 = gatt_adapters.SerializableCharacteristicProxyAdapter(
characteristics[4], CustomSerializableClass
)
c5_value = await c5.read_value()
print(f"@@@ C5 {c5} value = {c5_value} (type={type(c5_value)})")
await c5.write_value(CustomSerializableClass(56, 57))
await c5.subscribe(functools.partial(on_adapted_characteristic_update, c5))
# Static characteristic with a delegated value.
c6 = gatt.DelegatedCharacteristicAdapter(
c6 = gatt_adapters.DelegatedCharacteristicProxyAdapter(
characteristics[5], encode=CustomClass.encode, decode=CustomClass.decode
)
c6_value = await c6.read_value()
print(f"@@@ C6 {c6} value = {c6_value} (type={type(c6_value)})")
await c6.write_value(CustomClass(6, 7))
await c6.subscribe(functools.partial(on_adapted_characteristic_update, c6))
# Dynamic characteristic with a bytes value.
c7 = characteristics[6]
c7_value = await c7.read_value()
print(f"@@@ C7 {c7} value = {c7_value!r} (type={type(c7_value)})")
await c7.write_value(bytes.fromhex("01020304"))
await c7.subscribe(functools.partial(on_adapted_characteristic_update, c7))
# Dynamic characteristic with a string value.
c8 = gatt.UTF8CharacteristicAdapter(characteristics[7])
c8 = gatt_adapters.UTF8CharacteristicProxyAdapter(characteristics[7])
c8_value = await c8.read_value()
print(f"@@@ C8 {c8} value = {c8_value} (type={type(c8_value)})")
await c8.write_value("howdy")
await c8.subscribe(functools.partial(on_adapted_characteristic_update, c8))
# Static characteristic with an enum value
c9 = gatt_adapters.EnumCharacteristicProxyAdapter(
characteristics[8], CustomEnum, 3, 'big'
)
c9_value = await c9.read_value()
print(f"@@@ C9 {c9} value = {c9_value.name} (type={type(c9_value)})")
await c9.write_value(CustomEnum.BAR)
await c9.subscribe(functools.partial(on_adapted_characteristic_update, c9))
# -----------------------------------------------------------------------------
@@ -175,142 +221,213 @@ def on_characteristic_write(characteristic: gatt.Characteristic, value: Any) ->
print(f"<<< WRITE: {characteristic} <- {value} ({type(value)})")
# -----------------------------------------------------------------------------
async def server(device: Device) -> None:
# Static characteristic with a bytes value.
c1 = gatt.Characteristic(
CHARACTERISTIC_UUID_BASE + "01",
gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.WRITE
| gatt.Characteristic.Properties.NOTIFY,
gatt.Characteristic.READABLE | gatt.Characteristic.WRITEABLE,
b'hello',
)
# Static characteristic with a string value.
c2 = gatt_adapters.UTF8CharacteristicAdapter(
gatt.Characteristic(
CHARACTERISTIC_UUID_BASE + "02",
gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.WRITE
| gatt.Characteristic.Properties.NOTIFY,
gatt.Characteristic.READABLE | gatt.Characteristic.WRITEABLE,
'hello',
)
)
# Static characteristic with a tuple value.
c3 = gatt_adapters.PackedCharacteristicAdapter(
gatt.Characteristic(
CHARACTERISTIC_UUID_BASE + "03",
gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.WRITE
| gatt.Characteristic.Properties.NOTIFY,
gatt.Characteristic.READABLE | gatt.Characteristic.WRITEABLE,
(1007, 1008, 1009),
),
">III",
)
# Static characteristic with a named tuple value.
c4 = gatt_adapters.MappedCharacteristicAdapter(
gatt.Characteristic(
CHARACTERISTIC_UUID_BASE + "04",
gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.WRITE
| gatt.Characteristic.Properties.NOTIFY,
gatt.Characteristic.READABLE | gatt.Characteristic.WRITEABLE,
{"f1": 3007, "f2": 3008, "f3": 3009},
),
">III",
["f1", "f2", "f3"],
)
# Static characteristic with a serializable value.
c5 = gatt_adapters.SerializableCharacteristicAdapter(
gatt.Characteristic(
CHARACTERISTIC_UUID_BASE + "05",
gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.WRITE
| gatt.Characteristic.Properties.NOTIFY,
gatt.Characteristic.READABLE | gatt.Characteristic.WRITEABLE,
CustomSerializableClass(11, 12),
),
CustomSerializableClass,
)
# Static characteristic with a delegated value.
c6 = gatt_adapters.DelegatedCharacteristicAdapter(
gatt.Characteristic(
CHARACTERISTIC_UUID_BASE + "06",
gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.WRITE
| gatt.Characteristic.Properties.NOTIFY,
gatt.Characteristic.READABLE | gatt.Characteristic.WRITEABLE,
CustomClass(1, 2),
),
encode=CustomClass.encode,
decode=CustomClass.decode,
)
# Dynamic characteristic with a bytes value.
c7 = gatt.Characteristic(
CHARACTERISTIC_UUID_BASE + "07",
gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.WRITE
| gatt.Characteristic.Properties.NOTIFY,
gatt.Characteristic.READABLE | gatt.Characteristic.WRITEABLE,
gatt.CharacteristicValue(
read=lambda connection: dynamic_read("bytes"),
write=lambda connection, value: dynamic_write("bytes", value),
),
)
# Dynamic characteristic with a string value.
c8 = gatt_adapters.UTF8CharacteristicAdapter(
gatt.Characteristic(
CHARACTERISTIC_UUID_BASE + "08",
gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.WRITE
| gatt.Characteristic.Properties.NOTIFY,
gatt.Characteristic.READABLE | gatt.Characteristic.WRITEABLE,
gatt.CharacteristicValue(
read=lambda connection: dynamic_read("string"),
write=lambda connection, value: dynamic_write("string", value),
),
)
)
# Static characteristic with an enum value
c9 = gatt_adapters.EnumCharacteristicAdapter(
gatt.Characteristic(
CHARACTERISTIC_UUID_BASE + "09",
gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.WRITE
| gatt.Characteristic.Properties.NOTIFY,
gatt.Characteristic.READABLE | gatt.Characteristic.WRITEABLE,
CustomEnum.FOO,
),
cls=CustomEnum,
length=3,
byteorder='big',
)
characteristics: List[gatt.Characteristic] = [
c1,
c2,
c3,
c4,
c5,
c6,
c7,
c8,
c9,
]
# Listen for read and write events.
for characteristic in characteristics:
characteristic.on(
"read",
lambda _, value, c=characteristic: on_characteristic_read(c, value),
)
characteristic.on(
"write",
lambda _, value, c=characteristic: on_characteristic_write(c, value),
)
device.add_service(gatt.Service(SERVICE_UUID, characteristics))
# Notify every 3 seconds
i = 0
while True:
await asyncio.sleep(3)
# Notifying can be done with the characteristic's current value, or
# by explicitly passing a value to notify with. Both variants are used
# here: for c1..c4 we set the value and then notify, for c4..c9 we notify
# with an explicit value.
c1.value = f'hello c1 {i}'.encode()
await device.notify_subscribers(c1)
c2.value = f'hello c2 {i}'
await device.notify_subscribers(c2)
c3.value = (1000 + i, 2000 + i, 3000 + i)
await device.notify_subscribers(c3)
c4.value = {"f1": 4000 + i, "f2": 5000 + i, "f3": 6000 + i}
await device.notify_subscribers(c4)
await device.notify_subscribers(c5, CustomSerializableClass(1000 + i, 2000 + i))
await device.notify_subscribers(c6, CustomClass(3000 + i, 4000 + i))
await device.notify_subscribers(c7, bytes([1, 2, 3, i % 256]))
await device.notify_subscribers(c8, f'hello c8 {i}')
await device.notify_subscribers(
c9, CustomEnum.FOO if i % 2 == 0 else CustomEnum.BAR
)
i += 1
# -----------------------------------------------------------------------------
async def main() -> None:
if len(sys.argv) < 2:
print("Usage: run_gatt_with_adapters.py <transport-spec> [<bluetooth-address>]")
print("example: run_gatt_with_adapters.py usb:0 E1:CA:72:48:C4:E8")
print("Usage: run_gatt_with_adapters.py <transport-spec> client|server")
print("example: run_gatt_with_adapters.py usb:0 F0:F1:F2:F3:F4:F5")
return
async with await transport.open_transport(sys.argv[1]) as hci_transport:
is_client = sys.argv[2] == "client"
# Create a device to manage the host
device = Device.with_hci(
"Bumble",
hci.Address("F0:F1:F2:F3:F4:F5"),
hci.Address(
DEFAULT_CLIENT_ADDRESS if is_client else DEFAULT_SERVER_ADDRESS
),
hci_transport.source,
hci_transport.sink,
)
# Static characteristic with a bytes value.
c1 = gatt.Characteristic(
CHARACTERISTIC_UUID_BASE + "01",
gatt.Characteristic.Properties.READ | gatt.Characteristic.Properties.WRITE,
gatt.Characteristic.READABLE | gatt.Characteristic.WRITEABLE,
b'hello',
)
# Static characteristic with a string value.
c2 = gatt.UTF8CharacteristicAdapter(
gatt.Characteristic(
CHARACTERISTIC_UUID_BASE + "02",
gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.WRITE,
gatt.Characteristic.READABLE | gatt.Characteristic.WRITEABLE,
'hello',
)
)
# Static characteristic with a tuple value.
c3 = gatt.PackedCharacteristicAdapter(
gatt.Characteristic(
CHARACTERISTIC_UUID_BASE + "03",
gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.WRITE,
gatt.Characteristic.READABLE | gatt.Characteristic.WRITEABLE,
(1007, 1008, 1009),
),
">III",
)
# Static characteristic with a named tuple value.
c4 = gatt.MappedCharacteristicAdapter(
gatt.Characteristic(
CHARACTERISTIC_UUID_BASE + "04",
gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.WRITE,
gatt.Characteristic.READABLE | gatt.Characteristic.WRITEABLE,
{"f1": 3007, "f2": 3008, "f3": 3009},
),
">III",
["f1", "f2", "f3"],
)
# Static characteristic with a serializable value.
c5 = gatt.SerializableCharacteristicAdapter(
gatt.Characteristic(
CHARACTERISTIC_UUID_BASE + "05",
gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.WRITE,
gatt.Characteristic.READABLE | gatt.Characteristic.WRITEABLE,
CustomSerializableClass(11, 12),
),
CustomSerializableClass,
)
# Static characteristic with a delegated value.
c6 = gatt.DelegatedCharacteristicAdapter(
gatt.Characteristic(
CHARACTERISTIC_UUID_BASE + "06",
gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.WRITE,
gatt.Characteristic.READABLE | gatt.Characteristic.WRITEABLE,
CustomClass(1, 2),
),
encode=CustomClass.encode,
decode=CustomClass.decode,
)
# Dynamic characteristic with a bytes value.
c7 = gatt.Characteristic(
CHARACTERISTIC_UUID_BASE + "07",
gatt.Characteristic.Properties.READ | gatt.Characteristic.Properties.WRITE,
gatt.Characteristic.READABLE | gatt.Characteristic.WRITEABLE,
gatt.CharacteristicValue(
read=lambda connection: dynamic_read("bytes"),
write=lambda connection, value: dynamic_write("bytes", value),
),
)
# Dynamic characteristic with a string value.
c8 = gatt.UTF8CharacteristicAdapter(
gatt.Characteristic(
CHARACTERISTIC_UUID_BASE + "08",
gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.WRITE,
gatt.Characteristic.READABLE | gatt.Characteristic.WRITEABLE,
gatt.CharacteristicValue(
read=lambda connection: dynamic_read("string"),
write=lambda connection, value: dynamic_write("string", value),
),
)
)
characteristics: List[
Union[gatt.Characteristic, gatt.CharacteristicAdapter]
] = [c1, c2, c3, c4, c5, c6, c7, c8]
# Listen for read and write events.
for characteristic in characteristics:
characteristic.on(
"read",
lambda _, value, c=characteristic: on_characteristic_read(c, value),
)
characteristic.on(
"write",
lambda _, value, c=characteristic: on_characteristic_write(c, value),
)
device.add_service(gatt.Service(SERVICE_UUID, characteristics)) # type: ignore
# Get things going
await device.power_on()
# Connect to a peer
if len(sys.argv) > 2:
await client(device, hci.Address(sys.argv[2]))
if is_client:
# Connect a client to a peer
await client(device, hci.Address(DEFAULT_SERVER_ADDRESS))
else:
# Advertise so a peer can connect
await device.start_advertising(auto_restart=True)
# Setup a server
await server(device)
await hci_transport.source.wait_for_termination()
+1 -1
View File
@@ -10,7 +10,7 @@ android {
defaultConfig {
applicationId = "com.github.google.bumble.btbench"
minSdk = 30
minSdk = 33
targetSdk = 34
versionCode = 1
versionName = "1.0"
@@ -1,7 +1,7 @@
<?xml version="1.0" encoding="utf-8"?>
<manifest xmlns:android="http://schemas.android.com/apk/res/android"
package="com.github.google.bumble.btbench">
<uses-sdk android:minSdkVersion="30" android:targetSdkVersion="34" />
<uses-sdk android:minSdkVersion="33" android:targetSdkVersion="34" />
<!-- Request legacy Bluetooth permissions on older devices. -->
<uses-permission android:name="android.permission.BLUETOOTH" android:maxSdkVersion="30" />
<uses-permission android:name="android.permission.BLUETOOTH_ADMIN" android:maxSdkVersion="30" />
@@ -9,6 +9,8 @@
<uses-permission android:name="android.permission.BLUETOOTH_SCAN" android:usesPermissionFlags="neverForLocation"/>
<uses-permission android:name="android.permission.BLUETOOTH_ADVERTISE" />
<uses-permission android:name="android.permission.BLUETOOTH_CONNECT" />
<uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />
<uses-permission android:name="android.permission.INTERNET" />
<uses-feature android:name="android.hardware.bluetooth" android:required="true"/>
<uses-feature android:name="android.hardware.bluetooth_le" android:required="true"/>
@@ -0,0 +1,39 @@
package com.github.google.bumble.btbench
import android.annotation.SuppressLint
import android.bluetooth.BluetoothAdapter
import android.bluetooth.le.AdvertiseCallback
import android.bluetooth.le.AdvertiseData
import android.bluetooth.le.AdvertiseSettings
import android.bluetooth.le.AdvertiseSettings.ADVERTISE_MODE_LOW_LATENCY
import android.os.Build
import java.util.logging.Logger
private val Log = Logger.getLogger("btbench.advertiser")
class Advertiser(private val bluetoothAdapter: BluetoothAdapter) : AdvertiseCallback() {
@SuppressLint("MissingPermission")
fun start() {
val advertiseSettingsBuilder = AdvertiseSettings.Builder()
.setAdvertiseMode(ADVERTISE_MODE_LOW_LATENCY)
.setConnectable(true)
advertiseSettingsBuilder.setDiscoverable(true)
val advertiseSettings = advertiseSettingsBuilder.build()
val advertiseData = AdvertiseData.Builder().build()
val scanData = AdvertiseData.Builder().setIncludeDeviceName(true).build()
bluetoothAdapter.bluetoothLeAdvertiser.startAdvertising(advertiseSettings, advertiseData, scanData, this)
}
@SuppressLint("MissingPermission")
fun stop() {
bluetoothAdapter.bluetoothLeAdvertiser.stopAdvertising(this)
}
override fun onStartFailure(errorCode: Int) {
Log.warning("failed to start advertising: $errorCode")
}
override fun onStartSuccess(settingsInEffect: AdvertiseSettings) {
Log.info("advertising started: $settingsInEffect")
}
}
@@ -22,11 +22,13 @@ import androidx.test.core.app.ApplicationProvider;
import com.google.android.mobly.snippet.Snippet;
import com.google.android.mobly.snippet.rpc.Rpc;
import com.google.android.mobly.snippet.rpc.RpcOptional;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
import java.io.IOException;
import java.security.InvalidParameterException;
import java.util.ArrayList;
import java.util.UUID;
@@ -71,12 +73,15 @@ public class AutomationSnippet implements Snippet {
private final Context mContext;
private final ArrayList<Runner> mRunners = new ArrayList<>();
public AutomationSnippet() {
public AutomationSnippet() throws IOException {
mContext = ApplicationProvider.getApplicationContext();
BluetoothManager bluetoothManager = mContext.getSystemService(BluetoothManager.class);
mBluetoothAdapter = bluetoothManager.getAdapter();
if (mBluetoothAdapter == null) {
throw new RuntimeException("bluetooth not supported");
throw new IOException("bluetooth not supported");
}
if (!mBluetoothAdapter.isEnabled()) {
throw new IOException("bluetooth not enabled");
}
}
@@ -85,32 +90,46 @@ public class AutomationSnippet implements Snippet {
switch (mode) {
case "rfcomm-client":
runnable = new RfcommClient(model, mBluetoothAdapter,
(PacketIO packetIO) -> createIoClient(model, scenario,
packetIO));
(PacketIO packetIO) -> createIoClient(model, scenario,
packetIO));
break;
case "rfcomm-server":
runnable = new RfcommServer(model, mBluetoothAdapter,
(PacketIO packetIO) -> createIoClient(model, scenario,
packetIO));
(PacketIO packetIO) -> createIoClient(model, scenario,
packetIO));
break;
case "l2cap-client":
runnable = new L2capClient(model, mBluetoothAdapter, mContext,
(PacketIO packetIO) -> createIoClient(model, scenario,
packetIO));
(PacketIO packetIO) -> createIoClient(model, scenario,
packetIO));
break;
case "l2cap-server":
runnable = new L2capServer(model, mBluetoothAdapter,
(PacketIO packetIO) -> createIoClient(model, scenario,
packetIO));
(PacketIO packetIO) -> createIoClient(model, scenario,
packetIO));
break;
case "gatt-client":
runnable = new GattClient(model, mBluetoothAdapter, mContext,
(PacketIO packetIO) -> createIoClient(model, scenario,
packetIO));
break;
case "gatt-server":
runnable = new GattServer(model, mBluetoothAdapter, mContext,
(PacketIO packetIO) -> createIoClient(model, scenario,
packetIO));
break;
default:
return null;
}
model.setMode(mode);
model.setScenario(scenario);
runnable.run();
Runner runner = new Runner(runnable, mode, scenario, model);
mRunners.add(runner);
@@ -140,7 +159,21 @@ public class AutomationSnippet implements Snippet {
JSONObject result = new JSONObject();
result.put("status", model.getStatus());
result.put("running", model.getRunning());
result.put("peer_bluetooth_address", model.getPeerBluetoothAddress());
result.put("mode", model.getMode());
result.put("scenario", model.getScenario());
result.put("sender_packet_size", model.getSenderPacketSize());
result.put("sender_packet_count", model.getSenderPacketCount());
result.put("sender_packet_interval", model.getSenderPacketInterval());
result.put("packets_sent", model.getPacketsSent());
result.put("packets_received", model.getPacketsReceived());
result.put("l2cap_psm", model.getL2capPsm());
result.put("use_2m_phy", model.getUse2mPhy());
result.put("connection_priority", model.getConnectionPriority());
result.put("mtu", model.getMtu());
result.put("rx_phy", model.getRxPhy());
result.put("tx_phy", model.getTxPhy());
result.put("startup_delay", model.getStartupDelay());
if (model.getStatus().equals("OK")) {
JSONObject stats = new JSONObject();
result.put("stats", stats);
@@ -167,12 +200,12 @@ public class AutomationSnippet implements Snippet {
@Rpc(description = "Run a scenario in RFComm Client mode")
public JSONObject runRfcommClient(String scenario, String peerBluetoothAddress, int packetCount,
int packetSize, int packetInterval) throws JSONException {
assert (mBluetoothAdapter != null);
int packetSize, int packetInterval,
@RpcOptional Integer startupDelay) throws JSONException {
// We only support "send" and "ping" for this mode for now
if (!(scenario.equals("send") || scenario.equals("ping"))) {
throw new InvalidParameterException("only 'send' and 'ping' are supported for this mode");
throw new InvalidParameterException(
"only 'send' and 'ping' are supported for this mode");
}
AppViewModel model = new AppViewModel();
@@ -180,6 +213,9 @@ public class AutomationSnippet implements Snippet {
model.setSenderPacketCount(packetCount);
model.setSenderPacketSize(packetSize);
model.setSenderPacketInterval(packetInterval);
if (startupDelay != null) {
model.setStartupDelay(startupDelay);
}
Runner runner = runScenario(model, "rfcomm-client", scenario);
assert runner != null;
@@ -187,15 +223,18 @@ public class AutomationSnippet implements Snippet {
}
@Rpc(description = "Run a scenario in RFComm Server mode")
public JSONObject runRfcommServer(String scenario) throws JSONException {
assert (mBluetoothAdapter != null);
public JSONObject runRfcommServer(String scenario,
@RpcOptional Integer startupDelay) throws JSONException {
// We only support "receive" and "pong" for this mode for now
if (!(scenario.equals("receive") || scenario.equals("pong"))) {
throw new InvalidParameterException("only 'receive' and 'pong' are supported for this mode");
throw new InvalidParameterException(
"only 'receive' and 'pong' are supported for this mode");
}
AppViewModel model = new AppViewModel();
if (startupDelay != null) {
model.setStartupDelay(startupDelay);
}
Runner runner = runScenario(model, "rfcomm-server", scenario);
assert runner != null;
@@ -205,12 +244,12 @@ public class AutomationSnippet implements Snippet {
@Rpc(description = "Run a scenario in L2CAP Client mode")
public JSONObject runL2capClient(String scenario, String peerBluetoothAddress, int psm,
boolean use_2m_phy, int packetCount, int packetSize,
int packetInterval) throws JSONException {
assert (mBluetoothAdapter != null);
int packetInterval, @RpcOptional String connectionPriority,
@RpcOptional Integer startupDelay) throws JSONException {
// We only support "send" and "ping" for this mode for now
if (!(scenario.equals("send") || scenario.equals("ping"))) {
throw new InvalidParameterException("only 'send' and 'ping' are supported for this mode");
throw new InvalidParameterException(
"only 'send' and 'ping' are supported for this mode");
}
AppViewModel model = new AppViewModel();
@@ -220,28 +259,83 @@ public class AutomationSnippet implements Snippet {
model.setSenderPacketCount(packetCount);
model.setSenderPacketSize(packetSize);
model.setSenderPacketInterval(packetInterval);
if (connectionPriority != null) {
model.setConnectionPriority(connectionPriority);
}
if (startupDelay != null) {
model.setStartupDelay(startupDelay);
}
Runner runner = runScenario(model, "l2cap-client", scenario);
assert runner != null;
return runner.toJson();
}
@Rpc(description = "Run a scenario in L2CAP Server mode")
public JSONObject runL2capServer(String scenario) throws JSONException {
assert (mBluetoothAdapter != null);
public JSONObject runL2capServer(String scenario,
@RpcOptional Integer startupDelay) throws JSONException {
// We only support "receive" and "pong" for this mode for now
if (!(scenario.equals("receive") || scenario.equals("pong"))) {
throw new InvalidParameterException("only 'receive' and 'pong' are supported for this mode");
throw new InvalidParameterException(
"only 'receive' and 'pong' are supported for this mode");
}
AppViewModel model = new AppViewModel();
if (startupDelay != null) {
model.setStartupDelay(startupDelay);
}
Runner runner = runScenario(model, "l2cap-server", scenario);
assert runner != null;
return runner.toJson();
}
@Rpc(description = "Run a scenario in GATT Client mode")
public JSONObject runGattClient(String scenario, String peerBluetoothAddress,
boolean use_2m_phy, int packetCount, int packetSize,
int packetInterval, @RpcOptional String connectionPriority,
@RpcOptional Integer startupDelay) throws JSONException {
// We only support "send" and "ping" for this mode for now
if (!(scenario.equals("send") || scenario.equals("ping"))) {
throw new InvalidParameterException(
"only 'send' and 'ping' are supported for this mode");
}
AppViewModel model = new AppViewModel();
model.setPeerBluetoothAddress(peerBluetoothAddress);
model.setUse2mPhy(use_2m_phy);
model.setSenderPacketCount(packetCount);
model.setSenderPacketSize(packetSize);
model.setSenderPacketInterval(packetInterval);
if (connectionPriority != null) {
model.setConnectionPriority(connectionPriority);
}
if (startupDelay != null) {
model.setStartupDelay(startupDelay);
}
Runner runner = runScenario(model, "gatt-client", scenario);
assert runner != null;
return runner.toJson();
}
@Rpc(description = "Run a scenario in GATT Server mode")
public JSONObject runGattServer(String scenario,
@RpcOptional Integer startupDelay) throws JSONException {
// We only support "receive" and "pong" for this mode for now
if (!(scenario.equals("receive") || scenario.equals("pong"))) {
throw new InvalidParameterException(
"only 'receive' and 'pong' are supported for this mode");
}
AppViewModel model = new AppViewModel();
if (startupDelay != null) {
model.setStartupDelay(startupDelay);
}
Runner runner = runScenario(model, "gatt-server", scenario);
assert runner != null;
return runner.toJson();
}
@Rpc(description = "Stop a Runner")
public JSONObject stopRunner(String runnerId) throws JSONException {
Runner runner = findRunner(runnerId);
@@ -276,7 +370,7 @@ public class AutomationSnippet implements Snippet {
JSONObject result = new JSONObject();
JSONArray runners = new JSONArray();
result.put("runners", runners);
for (Runner runner: mRunners) {
for (Runner runner : mRunners) {
runners.put(runner.toJson());
}
@@ -0,0 +1,109 @@
package com.github.google.bumble.btbench
import android.annotation.SuppressLint
import android.bluetooth.BluetoothAdapter
import android.bluetooth.BluetoothDevice
import android.bluetooth.BluetoothGatt
import android.bluetooth.BluetoothGattCallback
import android.bluetooth.BluetoothManager
import android.bluetooth.BluetoothProfile
import android.content.Context
import android.os.Build
import androidx.core.content.ContextCompat
import java.util.logging.Logger
private val Log = Logger.getLogger("btbench.connection")
open class Connection(
private val viewModel: AppViewModel,
private val bluetoothAdapter: BluetoothAdapter,
private val context: Context
) : BluetoothGattCallback() {
var remoteDevice: BluetoothDevice? = null
var gatt: BluetoothGatt? = null
@SuppressLint("MissingPermission")
open fun connect() {
val addressIsPublic = viewModel.peerBluetoothAddress.endsWith("/P")
val address = viewModel.peerBluetoothAddress.take(17)
remoteDevice = if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.TIRAMISU) {
bluetoothAdapter.getRemoteLeDevice(
address,
if (addressIsPublic) {
BluetoothDevice.ADDRESS_TYPE_PUBLIC
} else {
BluetoothDevice.ADDRESS_TYPE_RANDOM
}
)
} else {
bluetoothAdapter.getRemoteDevice(address)
}
gatt = remoteDevice?.connectGatt(
context,
false,
this,
BluetoothDevice.TRANSPORT_LE,
if (viewModel.use2mPhy) BluetoothDevice.PHY_LE_2M_MASK else BluetoothDevice.PHY_LE_1M_MASK
)
}
@SuppressLint("MissingPermission")
open fun disconnect() {
gatt?.disconnect()
}
override fun onMtuChanged(gatt: BluetoothGatt, mtu: Int, status: Int) {
Log.info("MTU update: mtu=$mtu status=$status")
viewModel.mtu = mtu
}
override fun onPhyUpdate(gatt: BluetoothGatt, txPhy: Int, rxPhy: Int, status: Int) {
Log.info("PHY update: tx=$txPhy, rx=$rxPhy, status=$status")
viewModel.txPhy = txPhy
viewModel.rxPhy = rxPhy
}
override fun onPhyRead(gatt: BluetoothGatt, txPhy: Int, rxPhy: Int, status: Int) {
Log.info("PHY: tx=$txPhy, rx=$rxPhy, status=$status")
viewModel.txPhy = txPhy
viewModel.rxPhy = rxPhy
}
@SuppressLint("MissingPermission")
override fun onConnectionStateChange(
gatt: BluetoothGatt?, status: Int, newState: Int
) {
if (status != BluetoothGatt.GATT_SUCCESS) {
Log.warning("onConnectionStateChange status=$status")
}
if (gatt != null && newState == BluetoothProfile.STATE_CONNECTED) {
if (viewModel.use2mPhy) {
Log.info("requesting 2M PHY")
gatt.setPreferredPhy(
BluetoothDevice.PHY_LE_2M_MASK,
BluetoothDevice.PHY_LE_2M_MASK,
BluetoothDevice.PHY_OPTION_NO_PREFERRED
)
}
gatt.readPhy()
// Request an MTU update, even though we don't use GATT, because Android
// won't request a larger link layer maximum data length otherwise.
gatt.requestMtu(517)
// Request a specific connection priority
val connectionPriority = when (viewModel.connectionPriority) {
"BALANCED" -> BluetoothGatt.CONNECTION_PRIORITY_BALANCED
"LOW_POWER" -> BluetoothGatt.CONNECTION_PRIORITY_LOW_POWER
"HIGH" -> BluetoothGatt.CONNECTION_PRIORITY_HIGH
"DCK" -> BluetoothGatt.CONNECTION_PRIORITY_DCK
else -> 0
}
if (!gatt.requestConnectionPriority(connectionPriority)) {
Log.warning("requestConnectionPriority failed")
}
}
}
}
@@ -0,0 +1,23 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.github.google.bumble.btbench
import java.util.UUID
var CCCD_UUID = UUID.fromString("00002902-0000-1000-8000-00805F9B34FB")
val BENCH_SERVICE_UUID = UUID.fromString("50DB505C-8AC4-4738-8448-3B1D9CC09CC5")
val BENCH_TX_UUID = UUID.fromString("E789C754-41A1-45F4-A948-A0A1A90DBA53")
val BENCH_RX_UUID = UUID.fromString("016A2CC7-E14B-4819-935F-1F56EAE4098D")
@@ -0,0 +1,224 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.github.google.bumble.btbench
import android.annotation.SuppressLint
import android.bluetooth.BluetoothAdapter
import android.bluetooth.BluetoothGatt
import android.bluetooth.BluetoothGattCharacteristic
import android.bluetooth.BluetoothGattDescriptor
import android.bluetooth.BluetoothProfile
import android.content.Context
import java.io.IOException
import java.util.UUID
import java.util.concurrent.CountDownLatch
import java.util.concurrent.Semaphore
import java.util.logging.Logger
import kotlin.concurrent.thread
private val Log = Logger.getLogger("btbench.gatt-client")
class GattClientConnection(
viewModel: AppViewModel,
bluetoothAdapter: BluetoothAdapter,
context: Context
) : Connection(viewModel, bluetoothAdapter, context), PacketIO {
override var packetSink: PacketSink? = null
private val discoveryDone: CountDownLatch = CountDownLatch(1)
private val writeSemaphore: Semaphore = Semaphore(1)
var rxCharacteristic: BluetoothGattCharacteristic? = null
var txCharacteristic: BluetoothGattCharacteristic? = null
override fun connect() {
super.connect()
// Check if we're already connected and have discovered the services
if (gatt?.getService(BENCH_SERVICE_UUID) != null) {
Log.fine("already connected")
onServicesDiscovered(gatt, BluetoothGatt.GATT_SUCCESS)
}
}
@SuppressLint("MissingPermission")
override fun onConnectionStateChange(
gatt: BluetoothGatt?, status: Int, newState: Int
) {
super.onConnectionStateChange(gatt, status, newState)
if (status != BluetoothGatt.GATT_SUCCESS) {
Log.warning("onConnectionStateChange status=$status")
discoveryDone.countDown()
return
}
if (gatt != null && newState == BluetoothProfile.STATE_CONNECTED) {
if (!gatt.discoverServices()) {
Log.warning("discoverServices could not start")
discoveryDone.countDown()
}
}
}
@SuppressLint("MissingPermission")
override fun onServicesDiscovered(gatt: BluetoothGatt?, status: Int) {
Log.fine("onServicesDiscovered")
if (status != BluetoothGatt.GATT_SUCCESS) {
Log.warning("failed to discover services: ${status}")
discoveryDone.countDown()
return
}
// Find the service
val service = gatt!!.getService(BENCH_SERVICE_UUID)
if (service == null) {
Log.warning("GATT Service not found")
discoveryDone.countDown()
return
}
// Find the RX and TX characteristics
rxCharacteristic = service.getCharacteristic(BENCH_RX_UUID)
if (rxCharacteristic == null) {
Log.warning("GATT RX Characteristics not found")
discoveryDone.countDown()
return
}
txCharacteristic = service.getCharacteristic(BENCH_TX_UUID)
if (txCharacteristic == null) {
Log.warning("GATT TX Characteristics not found")
discoveryDone.countDown()
return
}
// Subscribe to the RX characteristic
Log.fine("subscribing to RX")
gatt.setCharacteristicNotification(rxCharacteristic, true)
val cccdDescriptor = rxCharacteristic!!.getDescriptor(CCCD_UUID)
gatt.writeDescriptor(cccdDescriptor, BluetoothGattDescriptor.ENABLE_NOTIFICATION_VALUE);
Log.info("GATT discovery complete")
discoveryDone.countDown()
}
override fun onCharacteristicWrite(
gatt: BluetoothGatt?,
characteristic: BluetoothGattCharacteristic?,
status: Int
) {
// Now we can write again
writeSemaphore.release()
if (status != BluetoothGatt.GATT_SUCCESS) {
Log.warning("onCharacteristicWrite failed: $status")
return
}
}
override fun onCharacteristicChanged(
gatt: BluetoothGatt,
characteristic: BluetoothGattCharacteristic,
value: ByteArray
) {
if (characteristic.uuid == BENCH_RX_UUID && packetSink != null) {
val packet = Packet.from(value)
packetSink!!.onPacket(packet)
}
}
@SuppressLint("MissingPermission")
override fun sendPacket(packet: Packet) {
if (txCharacteristic == null) {
Log.warning("No TX characteristic, dropping")
return
}
// Wait until we can write
writeSemaphore.acquire()
// Write the data
val data = packet.toBytes()
val clampedData = if (data.size > 512) {
// Clamp the data to the maximum allowed characteristic data size
data.copyOf(512)
} else {
data
}
gatt?.writeCharacteristic(
txCharacteristic!!,
clampedData,
BluetoothGattCharacteristic.WRITE_TYPE_NO_RESPONSE
)
}
override
fun disconnect() {
super.disconnect()
discoveryDone.countDown()
}
fun waitForDiscoveryCompletion() {
discoveryDone.await()
}
}
class GattClient(
private val viewModel: AppViewModel,
bluetoothAdapter: BluetoothAdapter,
context: Context,
private val createIoClient: (packetIo: PacketIO) -> IoClient
) : Mode {
private var connection: GattClientConnection =
GattClientConnection(viewModel, bluetoothAdapter, context)
private var clientThread: Thread? = null
@SuppressLint("MissingPermission")
override fun run() {
viewModel.running = true
clientThread = thread(name = "GattClient") {
connection.connect()
viewModel.aborter = {
connection.disconnect()
}
// Discover the rx and tx characteristics
connection.waitForDiscoveryCompletion()
if (connection.rxCharacteristic == null || connection.txCharacteristic == null) {
connection.disconnect()
viewModel.running = false
return@thread
}
val ioClient = createIoClient(connection)
try {
ioClient.run()
viewModel.status = "OK"
} catch (error: IOException) {
Log.info("run ended abruptly")
viewModel.status = "ABORTED"
viewModel.lastError = "IO_ERROR"
} finally {
connection.disconnect()
viewModel.running = false
}
}
}
override fun waitForCompletion() {
clientThread?.join()
}
}
@@ -0,0 +1,243 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.github.google.bumble.btbench
import android.annotation.SuppressLint
import android.bluetooth.BluetoothAdapter
import android.bluetooth.BluetoothDevice
import android.bluetooth.BluetoothGatt
import android.bluetooth.BluetoothGattCharacteristic
import android.bluetooth.BluetoothGattDescriptor
import android.bluetooth.BluetoothGattServer
import android.bluetooth.BluetoothGattServerCallback
import android.bluetooth.BluetoothGattService
import android.bluetooth.BluetoothManager
import android.bluetooth.BluetoothStatusCodes
import android.content.Context
import androidx.core.content.ContextCompat
import java.io.IOException
import java.util.concurrent.CountDownLatch
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.Semaphore
import java.util.logging.Logger
import kotlin.concurrent.thread
import kotlin.experimental.and
private val Log = Logger.getLogger("btbench.gatt-server")
@SuppressLint("MissingPermission")
class GattServer(
private val viewModel: AppViewModel,
private val bluetoothAdapter: BluetoothAdapter,
context: Context,
private val createIoClient: (packetIo: PacketIO) -> IoClient
) : Mode, PacketIO, BluetoothGattServerCallback() {
override var packetSink: PacketSink? = null
private val gattServer: BluetoothGattServer
private val rxCharacteristic: BluetoothGattCharacteristic?
private val txCharacteristic: BluetoothGattCharacteristic?
private val notifySemaphore: Semaphore = Semaphore(1)
private val ready: CountDownLatch = CountDownLatch(1)
private var peerDevice: BluetoothDevice? = null
private var clientThread: Thread? = null
private var sinkQueue: LinkedBlockingQueue<Packet>? = null
init {
val bluetoothManager = ContextCompat.getSystemService(context, BluetoothManager::class.java)
gattServer = bluetoothManager!!.openGattServer(context, this)
val benchService = gattServer.getService(BENCH_SERVICE_UUID)
if (benchService == null) {
rxCharacteristic = BluetoothGattCharacteristic(
BENCH_RX_UUID,
BluetoothGattCharacteristic.PROPERTY_NOTIFY,
0
)
txCharacteristic = BluetoothGattCharacteristic(
BENCH_TX_UUID,
BluetoothGattCharacteristic.PROPERTY_WRITE_NO_RESPONSE,
BluetoothGattCharacteristic.PERMISSION_WRITE
)
val rxCCCD = BluetoothGattDescriptor(
CCCD_UUID,
BluetoothGattDescriptor.PERMISSION_READ or BluetoothGattDescriptor.PERMISSION_WRITE
)
rxCharacteristic.addDescriptor(rxCCCD)
val service =
BluetoothGattService(BENCH_SERVICE_UUID, BluetoothGattService.SERVICE_TYPE_PRIMARY)
service.addCharacteristic(rxCharacteristic)
service.addCharacteristic(txCharacteristic)
gattServer.addService(service)
} else {
rxCharacteristic = benchService.getCharacteristic(BENCH_RX_UUID)
txCharacteristic = benchService.getCharacteristic(BENCH_TX_UUID)
}
}
override fun onCharacteristicWriteRequest(
device: BluetoothDevice?,
requestId: Int,
characteristic: BluetoothGattCharacteristic?,
preparedWrite: Boolean,
responseNeeded: Boolean,
offset: Int,
value: ByteArray?
) {
Log.info("onCharacteristicWriteRequest")
if (characteristic != null && characteristic.uuid == BENCH_TX_UUID) {
if (packetSink == null) {
Log.warning("no sink, dropping")
} else if (offset != 0) {
Log.warning("offset != 0")
} else if (value == null) {
Log.warning("no value")
} else {
// Deliver the packet in a separate thread so that we don't block this
// callback.
sinkQueue?.put(Packet.from(value))
}
}
if (responseNeeded) {
gattServer.sendResponse(device, requestId, BluetoothGatt.GATT_SUCCESS, offset, value)
}
}
override fun onNotificationSent(device: BluetoothDevice?, status: Int) {
if (status == BluetoothGatt.GATT_SUCCESS) {
notifySemaphore.release()
}
}
override fun onDescriptorWriteRequest(
device: BluetoothDevice?,
requestId: Int,
descriptor: BluetoothGattDescriptor?,
preparedWrite: Boolean,
responseNeeded: Boolean,
offset: Int,
value: ByteArray?
) {
if (descriptor?.uuid == CCCD_UUID && descriptor?.characteristic?.uuid == BENCH_RX_UUID) {
if (offset == 0 && value?.size == 2) {
if (value[0].and(1).toInt() != 0) {
// Subscription
Log.fine("peer subscribed to RX")
peerDevice = device
ready.countDown()
}
}
}
if (responseNeeded) {
gattServer.sendResponse(device, requestId, BluetoothGatt.GATT_SUCCESS, offset, value)
}
}
@SuppressLint("MissingPermission")
override fun sendPacket(packet: Packet) {
if (peerDevice == null) {
Log.warning("no peer device, cannot send")
return
}
if (rxCharacteristic == null) {
Log.warning("no RX characteristic, cannot send")
return
}
// Wait until we can notify
notifySemaphore.acquire()
// Send the packet via a notification
val result = gattServer.notifyCharacteristicChanged(
peerDevice!!,
rxCharacteristic,
false,
packet.toBytes()
)
if (result != BluetoothStatusCodes.SUCCESS) {
Log.warning("notifyCharacteristicChanged failed: $result")
notifySemaphore.release()
}
}
override fun run() {
viewModel.running = true
// Start advertising
Log.fine("starting advertiser")
val advertiser = Advertiser(bluetoothAdapter)
advertiser.start()
clientThread = thread(name = "GattServer") {
// Wait for a subscriber
Log.info("waiting for RX subscriber")
viewModel.aborter = {
ready.countDown()
}
ready.await()
if (peerDevice == null) {
Log.warning("server interrupted")
viewModel.running = false
gattServer.close()
return@thread
}
Log.info("RX subscriber accepted")
// Stop advertising
Log.info("stopping advertiser")
advertiser.stop()
sinkQueue = LinkedBlockingQueue()
val sinkWriterThread = thread(name = "SinkWriter") {
while (true) {
try {
val packet = sinkQueue!!.take()
if (packetSink == null) {
Log.warning("no sink, dropping packet")
continue
}
packetSink!!.onPacket(packet)
} catch (error: InterruptedException) {
Log.warning("sink writer interrupted")
break
}
}
}
val ioClient = createIoClient(this)
try {
ioClient.run()
viewModel.status = "OK"
} catch (error: IOException) {
Log.info("run ended abruptly")
viewModel.status = "ABORTED"
viewModel.lastError = "IO_ERROR"
} finally {
sinkWriterThread.interrupt()
sinkWriterThread.join()
gattServer.close()
viewModel.running = false
}
}
}
override fun waitForCompletion() {
clientThread?.join()
Log.info("server thread completed")
}
}
@@ -16,89 +16,25 @@ package com.github.google.bumble.btbench
import android.annotation.SuppressLint
import android.bluetooth.BluetoothAdapter
import android.bluetooth.BluetoothDevice
import android.bluetooth.BluetoothGatt
import android.bluetooth.BluetoothGattCallback
import android.bluetooth.BluetoothProfile
import android.content.Context
import android.os.Build
import java.util.logging.Logger
private val Log = Logger.getLogger("btbench.l2cap-client")
class L2capClient(
private val viewModel: AppViewModel,
private val bluetoothAdapter: BluetoothAdapter,
private val context: Context,
bluetoothAdapter: BluetoothAdapter,
context: Context,
private val createIoClient: (packetIo: PacketIO) -> IoClient
) : Mode {
private var connection: Connection = Connection(viewModel, bluetoothAdapter, context)
private var socketClient: SocketClient? = null
@SuppressLint("MissingPermission")
override fun run() {
viewModel.running = true
val addressIsPublic = viewModel.peerBluetoothAddress.endsWith("/P")
val address = viewModel.peerBluetoothAddress.take(17)
val remoteDevice = if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.TIRAMISU) {
bluetoothAdapter.getRemoteLeDevice(
address,
if (addressIsPublic) {
BluetoothDevice.ADDRESS_TYPE_PUBLIC
} else {
BluetoothDevice.ADDRESS_TYPE_RANDOM
}
)
} else {
bluetoothAdapter.getRemoteDevice(address)
}
val gatt = remoteDevice.connectGatt(
context,
false,
object : BluetoothGattCallback() {
override fun onMtuChanged(gatt: BluetoothGatt, mtu: Int, status: Int) {
Log.info("MTU update: mtu=$mtu status=$status")
viewModel.mtu = mtu
}
override fun onPhyUpdate(gatt: BluetoothGatt, txPhy: Int, rxPhy: Int, status: Int) {
Log.info("PHY update: tx=$txPhy, rx=$rxPhy, status=$status")
viewModel.txPhy = txPhy
viewModel.rxPhy = rxPhy
}
override fun onPhyRead(gatt: BluetoothGatt, txPhy: Int, rxPhy: Int, status: Int) {
Log.info("PHY: tx=$txPhy, rx=$rxPhy, status=$status")
viewModel.txPhy = txPhy
viewModel.rxPhy = rxPhy
}
override fun onConnectionStateChange(
gatt: BluetoothGatt?, status: Int, newState: Int
) {
if (gatt != null && newState == BluetoothProfile.STATE_CONNECTED) {
if (viewModel.use2mPhy) {
Log.info("requesting 2M PHY")
gatt.setPreferredPhy(
BluetoothDevice.PHY_LE_2M_MASK,
BluetoothDevice.PHY_LE_2M_MASK,
BluetoothDevice.PHY_OPTION_NO_PREFERRED
)
}
gatt.readPhy()
// Request an MTU update, even though we don't use GATT, because Android
// won't request a larger link layer maximum data length otherwise.
gatt.requestMtu(517)
}
}
},
BluetoothDevice.TRANSPORT_LE,
if (viewModel.use2mPhy) BluetoothDevice.PHY_LE_2M_MASK else BluetoothDevice.PHY_LE_1M_MASK
)
val socket = remoteDevice.createInsecureL2capChannel(viewModel.l2capPsm)
connection.connect()
val socket = connection.remoteDevice!!.createInsecureL2capChannel(viewModel.l2capPsm)
socketClient = SocketClient(viewModel, socket, createIoClient)
socketClient!!.run()
}
@@ -37,34 +37,15 @@ class L2capServer(
@SuppressLint("MissingPermission")
override fun run() {
// Advertise so that the peer can find us and connect.
val callback = object : AdvertiseCallback() {
override fun onStartFailure(errorCode: Int) {
Log.warning("failed to start advertising: $errorCode")
}
override fun onStartSuccess(settingsInEffect: AdvertiseSettings) {
Log.info("advertising started: $settingsInEffect")
}
}
val advertiseSettingsBuilder = AdvertiseSettings.Builder()
.setAdvertiseMode(ADVERTISE_MODE_LOW_LATENCY)
.setConnectable(true)
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.UPSIDE_DOWN_CAKE) {
advertiseSettingsBuilder.setDiscoverable(true)
}
val advertiseSettings = advertiseSettingsBuilder.build()
val advertiseData = AdvertiseData.Builder().build()
val scanData = AdvertiseData.Builder().setIncludeDeviceName(true).build()
val advertiser = bluetoothAdapter.bluetoothLeAdvertiser
val advertiser = Advertiser(bluetoothAdapter)
val serverSocket = bluetoothAdapter.listenUsingInsecureL2capChannel()
viewModel.l2capPsm = serverSocket.psm
Log.info("psm = $serverSocket.psm")
socketServer = SocketServer(viewModel, serverSocket, createIoClient)
socketServer!!.run(
{ advertiser.stopAdvertising(callback) },
{ advertiser.startAdvertising(advertiseSettings, advertiseData, scanData, callback) }
{ advertiser.stop() },
{ advertiser.start() }
)
}
@@ -17,9 +17,12 @@ package com.github.google.bumble.btbench
import android.Manifest
import android.annotation.SuppressLint
import android.bluetooth.BluetoothAdapter
import android.bluetooth.BluetoothDevice
import android.bluetooth.BluetoothManager
import android.content.BroadcastReceiver
import android.content.Context
import android.content.Intent
import android.content.IntentFilter
import android.content.pm.PackageManager
import android.os.Build
import android.os.Bundle
@@ -66,6 +69,7 @@ import androidx.compose.ui.unit.dp
import androidx.compose.ui.unit.sp
import androidx.core.content.ContextCompat
import com.github.google.bumble.btbench.ui.theme.BTBenchTheme
import java.io.IOException
import java.util.logging.Logger
private val Log = Logger.getLogger("bumble.main-activity")
@@ -76,6 +80,7 @@ const val SENDER_PACKET_SIZE_PREF_KEY = "sender_packet_size"
const val SENDER_PACKET_INTERVAL_PREF_KEY = "sender_packet_interval"
const val SCENARIO_PREF_KEY = "scenario"
const val MODE_PREF_KEY = "mode"
const val CONNECTION_PRIORITY_PREF_KEY = "connection_priority"
class MainActivity : ComponentActivity() {
private val appViewModel = AppViewModel()
@@ -84,6 +89,47 @@ class MainActivity : ComponentActivity() {
super.onCreate(savedInstanceState)
appViewModel.loadPreferences(getPreferences(Context.MODE_PRIVATE))
checkPermissions()
registerReceivers()
}
private fun registerReceivers() {
val pairingRequestIntentFilter = IntentFilter(BluetoothDevice.ACTION_PAIRING_REQUEST)
registerReceiver(object: BroadcastReceiver() {
@SuppressLint("MissingPermission")
override fun onReceive(context: Context, intent: Intent) {
Log.info("ACTION_PAIRING_REQUEST")
val extras = intent.extras
if (extras != null) {
for (key in extras.keySet()) {
Log.info("$key: ${extras.get(key)}")
}
}
val device: BluetoothDevice? = intent.getParcelableExtra(BluetoothDevice.EXTRA_DEVICE)
if (device != null) {
if (checkSelfPermission(Manifest.permission.BLUETOOTH_PRIVILEGED) == PackageManager.PERMISSION_GRANTED) {
Log.info("confirming pairing")
device.setPairingConfirmation(true)
} else {
Log.info("we don't have BLUETOOTH_PRIVILEGED, not confirming")
}
}
}
}, pairingRequestIntentFilter)
val bondStateChangedIntentFilter = IntentFilter(BluetoothDevice.ACTION_BOND_STATE_CHANGED)
registerReceiver(object: BroadcastReceiver() {
@SuppressLint("MissingPermission")
override fun onReceive(context: Context, intent: Intent) {
Log.info("ACTION_BOND_STATE_CHANGED")
val extras = intent.extras
if (extras != null) {
for (key in extras.keySet()) {
Log.info("$key: ${extras.get(key)}")
}
}
}
}, bondStateChangedIntentFilter)
}
private fun checkPermissions() {
@@ -144,9 +190,7 @@ class MainActivity : ComponentActivity() {
initBluetooth()
setContent {
MainView(
appViewModel,
::becomeDiscoverable,
::runScenario
appViewModel, ::becomeDiscoverable, ::runScenario
)
}
@@ -182,6 +226,8 @@ class MainActivity : ComponentActivity() {
"rfcomm-server" -> appViewModel.mode = RFCOMM_SERVER_MODE
"l2cap-client" -> appViewModel.mode = L2CAP_CLIENT_MODE
"l2cap-server" -> appViewModel.mode = L2CAP_SERVER_MODE
"gatt-client" -> appViewModel.mode = GATT_CLIENT_MODE
"gatt-server" -> appViewModel.mode = GATT_SERVER_MODE
}
}
intent.getStringExtra("autostart")?.let {
@@ -195,19 +241,24 @@ class MainActivity : ComponentActivity() {
private fun runScenario() {
if (bluetoothAdapter == null) {
return
throw IOException("bluetooth not enabled")
}
val runner = when (appViewModel.mode) {
RFCOMM_CLIENT_MODE -> RfcommClient(appViewModel, bluetoothAdapter!!, ::createIoClient)
RFCOMM_SERVER_MODE -> RfcommServer(appViewModel, bluetoothAdapter!!, ::createIoClient)
L2CAP_CLIENT_MODE -> L2capClient(
appViewModel,
bluetoothAdapter!!,
baseContext,
::createIoClient
appViewModel, bluetoothAdapter!!, baseContext, ::createIoClient
)
L2CAP_SERVER_MODE -> L2capServer(appViewModel, bluetoothAdapter!!, ::createIoClient)
GATT_CLIENT_MODE -> GattClient(
appViewModel, bluetoothAdapter!!, baseContext, ::createIoClient
)
GATT_SERVER_MODE -> GattServer(
appViewModel, bluetoothAdapter!!, baseContext, ::createIoClient
)
else -> throw IllegalStateException()
}
runner.run()
@@ -281,7 +332,7 @@ fun MainView(
keyboardController?.hide()
focusManager.clearFocus()
}),
enabled = (appViewModel.mode == RFCOMM_CLIENT_MODE) or (appViewModel.mode == L2CAP_CLIENT_MODE)
enabled = (appViewModel.mode == RFCOMM_CLIENT_MODE || appViewModel.mode == L2CAP_CLIENT_MODE || appViewModel.mode == GATT_CLIENT_MODE)
)
Divider()
TextField(
@@ -349,24 +400,45 @@ fun MainView(
keyboardController?.hide()
focusManager.clearFocus()
}),
enabled = (appViewModel.scenario == PING_SCENARIO)
enabled = (appViewModel.scenario == PING_SCENARIO || appViewModel.scenario == SEND_SCENARIO)
)
Divider()
ActionButton(
text = "Become Discoverable", onClick = becomeDiscoverable, true
)
Row(
horizontalArrangement = Arrangement.SpaceBetween,
verticalAlignment = Alignment.CenterVertically
) {
Text(text = "2M PHY")
Spacer(modifier = Modifier.padding(start = 8.dp))
Switch(
enabled = (appViewModel.mode == L2CAP_CLIENT_MODE || appViewModel.mode == L2CAP_SERVER_MODE),
Switch(enabled = (appViewModel.mode == L2CAP_CLIENT_MODE || appViewModel.mode == L2CAP_SERVER_MODE || appViewModel.mode == GATT_CLIENT_MODE || appViewModel.mode == GATT_SERVER_MODE),
checked = appViewModel.use2mPhy,
onCheckedChange = { appViewModel.use2mPhy = it }
)
onCheckedChange = { appViewModel.use2mPhy = it })
Column(Modifier.selectableGroup()) {
listOf(
"BALANCED", "LOW", "HIGH", "DCK"
).forEach { text ->
Row(
Modifier
.selectable(
selected = (text == appViewModel.connectionPriority),
onClick = { appViewModel.updateConnectionPriority(text) },
role = Role.RadioButton,
)
.padding(horizontal = 16.dp),
verticalAlignment = Alignment.CenterVertically
) {
RadioButton(
selected = (text == appViewModel.connectionPriority),
onClick = null,
enabled = (appViewModel.mode == L2CAP_CLIENT_MODE || appViewModel.mode == L2CAP_SERVER_MODE || appViewModel.mode == GATT_CLIENT_MODE || appViewModel.mode == GATT_SERVER_MODE)
)
Text(
text = text,
style = MaterialTheme.typography.bodyLarge,
modifier = Modifier.padding(start = 16.dp)
)
}
}
}
}
Row {
Column(Modifier.selectableGroup()) {
@@ -374,7 +446,9 @@ fun MainView(
RFCOMM_CLIENT_MODE,
RFCOMM_SERVER_MODE,
L2CAP_CLIENT_MODE,
L2CAP_SERVER_MODE
L2CAP_SERVER_MODE,
GATT_CLIENT_MODE,
GATT_SERVER_MODE
).forEach { text ->
Row(
Modifier
@@ -387,8 +461,7 @@ fun MainView(
verticalAlignment = Alignment.CenterVertically
) {
RadioButton(
selected = (text == appViewModel.mode),
onClick = null
selected = (text == appViewModel.mode), onClick = null
)
Text(
text = text,
@@ -400,10 +473,7 @@ fun MainView(
}
Column(Modifier.selectableGroup()) {
listOf(
SEND_SCENARIO,
RECEIVE_SCENARIO,
PING_SCENARIO,
PONG_SCENARIO
SEND_SCENARIO, RECEIVE_SCENARIO, PING_SCENARIO, PONG_SCENARIO
).forEach { text ->
Row(
Modifier
@@ -416,8 +486,7 @@ fun MainView(
verticalAlignment = Alignment.CenterVertically
) {
RadioButton(
selected = (text == appViewModel.scenario),
onClick = null
selected = (text == appViewModel.scenario), onClick = null
)
Text(
text = text,
@@ -435,20 +504,29 @@ fun MainView(
ActionButton(
text = "Stop", onClick = appViewModel::abort, enabled = appViewModel.running
)
ActionButton(
text = "Become Discoverable", onClick = becomeDiscoverable, true
)
}
Divider()
Text(
text = if (appViewModel.mtu != 0) "MTU: ${appViewModel.mtu}" else ""
)
Text(
text = if (appViewModel.rxPhy != 0 || appViewModel.txPhy != 0) "PHY: tx=${appViewModel.txPhy}, rx=${appViewModel.rxPhy}" else ""
)
if (appViewModel.mtu != 0) {
Text(
text = "MTU: ${appViewModel.mtu}"
)
}
if (appViewModel.rxPhy != 0) {
Text(
text = "PHY: tx=${appViewModel.txPhy}, rx=${appViewModel.rxPhy}"
)
}
Text(
text = "Status: ${appViewModel.status}"
)
Text(
text = "Last Error: ${appViewModel.lastError}"
)
if (appViewModel.lastError.isNotEmpty()) {
Text(
text = "Last Error: ${appViewModel.lastError}"
)
}
Text(
text = "Packets Sent: ${appViewModel.packetsSent}"
)
@@ -25,6 +25,7 @@ import java.util.UUID
val DEFAULT_RFCOMM_UUID: UUID = UUID.fromString("E6D55659-C8B4-4B85-96BB-B1143AF6D3AE")
const val DEFAULT_PEER_BLUETOOTH_ADDRESS = "AA:BB:CC:DD:EE:FF"
const val DEFAULT_STARTUP_DELAY = 3000
const val DEFAULT_SENDER_PACKET_COUNT = 100
const val DEFAULT_SENDER_PACKET_SIZE = 1024
const val DEFAULT_SENDER_PACKET_INTERVAL = 100
@@ -34,6 +35,8 @@ const val L2CAP_CLIENT_MODE = "L2CAP Client"
const val L2CAP_SERVER_MODE = "L2CAP Server"
const val RFCOMM_CLIENT_MODE = "RFCOMM Client"
const val RFCOMM_SERVER_MODE = "RFCOMM Server"
const val GATT_CLIENT_MODE = "GATT Client"
const val GATT_SERVER_MODE = "GATT Server"
const val SEND_SCENARIO = "Send"
const val RECEIVE_SCENARIO = "Receive"
@@ -47,8 +50,10 @@ class AppViewModel : ViewModel() {
var mode by mutableStateOf(RFCOMM_SERVER_MODE)
var scenario by mutableStateOf(RECEIVE_SCENARIO)
var peerBluetoothAddress by mutableStateOf(DEFAULT_PEER_BLUETOOTH_ADDRESS)
var startupDelay by mutableIntStateOf(DEFAULT_STARTUP_DELAY)
var l2capPsm by mutableIntStateOf(DEFAULT_PSM)
var use2mPhy by mutableStateOf(true)
var connectionPriority by mutableStateOf("BALANCED")
var mtu by mutableIntStateOf(0)
var rxPhy by mutableIntStateOf(0)
var txPhy by mutableIntStateOf(0)
@@ -98,6 +103,11 @@ class AppViewModel : ViewModel() {
if (savedScenario != null) {
scenario = savedScenario
}
val savedConnectionPriority = preferences.getString(CONNECTION_PRIORITY_PREF_KEY, null)
if (savedConnectionPriority != null) {
connectionPriority = savedConnectionPriority
}
}
fun updatePeerBluetoothAddress(peerBluetoothAddress: String) {
@@ -220,6 +230,14 @@ class AppViewModel : ViewModel() {
}
}
fun updateConnectionPriority(connectionPriority: String) {
this.connectionPriority = connectionPriority
with(preferences!!.edit()) {
putString(CONNECTION_PRIORITY_PREF_KEY, connectionPriority)
apply()
}
}
fun clear() {
status = ""
lastError = ""
@@ -17,6 +17,7 @@ package com.github.google.bumble.btbench
import android.bluetooth.BluetoothSocket
import java.io.IOException
import java.nio.ByteBuffer
import java.nio.ByteOrder
import java.util.logging.Logger
import kotlin.math.min
@@ -37,11 +38,16 @@ abstract class Packet(val type: Int, val payload: ByteArray = ByteArray(0)) {
RESET -> ResetPacket()
SEQUENCE -> SequencePacket(
data[1].toInt(),
ByteBuffer.wrap(data, 2, 4).getInt(),
data.sliceArray(6..<data.size)
ByteBuffer.wrap(data, 2, 4).order(ByteOrder.LITTLE_ENDIAN).getInt(),
ByteBuffer.wrap(data, 6, 4).order(ByteOrder.LITTLE_ENDIAN).getInt(),
data.sliceArray(10..<data.size)
)
ACK -> AckPacket(
data[1].toInt(),
ByteBuffer.wrap(data, 2, 4).order(ByteOrder.LITTLE_ENDIAN).getInt()
)
ACK -> AckPacket(data[1].toInt(), ByteBuffer.wrap(data, 2, 4).getInt())
else -> GenericPacket(data[0].toInt(), data.sliceArray(1..<data.size))
}
}
@@ -57,16 +63,24 @@ class ResetPacket : Packet(RESET)
class AckPacket(val flags: Int, val sequenceNumber: Int) : Packet(ACK) {
override fun toBytes(): ByteArray {
return ByteBuffer.allocate(1 + 1 + 4).put(type.toByte()).put(flags.toByte())
return ByteBuffer.allocate(6).order(
ByteOrder.LITTLE_ENDIAN
).put(type.toByte()).put(flags.toByte())
.putInt(sequenceNumber).array()
}
}
class SequencePacket(val flags: Int, val sequenceNumber: Int, payload: ByteArray) :
class SequencePacket(
val flags: Int,
val sequenceNumber: Int,
val timestamp: Int,
payload: ByteArray
) :
Packet(SEQUENCE, payload) {
override fun toBytes(): ByteArray {
return ByteBuffer.allocate(1 + 1 + 4 + payload.size).put(type.toByte()).put(flags.toByte())
.putInt(sequenceNumber).put(payload).array()
return ByteBuffer.allocate(10 + payload.size).order(ByteOrder.LITTLE_ENDIAN)
.put(type.toByte()).put(flags.toByte())
.putInt(sequenceNumber).putInt(timestamp).put(payload).array()
}
}
@@ -19,8 +19,6 @@ import java.util.logging.Logger
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.TimeSource
private const val DEFAULT_STARTUP_DELAY = 3000
private val Log = Logger.getLogger("btbench.pinger")
class Pinger(private val viewModel: AppViewModel, private val packetIO: PacketIO) : IoClient,
@@ -36,8 +34,8 @@ class Pinger(private val viewModel: AppViewModel, private val packetIO: PacketIO
override fun run() {
viewModel.clear()
Log.info("startup delay: $DEFAULT_STARTUP_DELAY")
Thread.sleep(DEFAULT_STARTUP_DELAY.toLong());
Log.info("startup delay: ${viewModel.startupDelay}")
Thread.sleep(viewModel.startupDelay.toLong());
Log.info("running")
Log.info("sending reset")
@@ -48,19 +46,23 @@ class Pinger(private val viewModel: AppViewModel, private val packetIO: PacketIO
val startTime = TimeSource.Monotonic.markNow()
for (i in 0..<packetCount) {
val now = TimeSource.Monotonic.markNow()
val targetTime = startTime + (i * viewModel.senderPacketInterval).milliseconds
val delay = targetTime - now
if (delay.isPositive()) {
Log.info("sleeping ${delay.inWholeMilliseconds} ms")
Thread.sleep(delay.inWholeMilliseconds)
var now = TimeSource.Monotonic.markNow()
if (viewModel.senderPacketInterval > 0) {
val targetTime = startTime + (i * viewModel.senderPacketInterval).milliseconds
val delay = targetTime - now
if (delay.isPositive()) {
Log.info("sleeping ${delay.inWholeMilliseconds} ms")
Thread.sleep(delay.inWholeMilliseconds)
now = TimeSource.Monotonic.markNow()
}
}
pingTimes.add(TimeSource.Monotonic.markNow())
packetIO.sendPacket(
SequencePacket(
if (i < packetCount - 1) 0 else Packet.LAST_FLAG,
i,
ByteArray(packetSize - 6)
(now - startTime).inWholeMicroseconds.toInt(),
ByteArray(packetSize - 10)
)
)
viewModel.packetsSent = i + 1
@@ -14,6 +14,7 @@
package com.github.google.bumble.btbench
import java.util.concurrent.CountDownLatch
import java.util.logging.Logger
import kotlin.time.TimeSource
@@ -23,6 +24,7 @@ class Ponger(private val viewModel: AppViewModel, private val packetIO: PacketIO
private var startTime: TimeSource.Monotonic.ValueTimeMark = TimeSource.Monotonic.markNow()
private var lastPacketTime: TimeSource.Monotonic.ValueTimeMark = TimeSource.Monotonic.markNow()
private var expectedSequenceNumber: Int = 0
private val done = CountDownLatch(1)
init {
packetIO.packetSink = this
@@ -30,6 +32,7 @@ class Ponger(private val viewModel: AppViewModel, private val packetIO: PacketIO
override fun run() {
viewModel.clear()
done.await()
}
override fun abort() {}
@@ -58,5 +61,10 @@ class Ponger(private val viewModel: AppViewModel, private val packetIO: PacketIO
packetIO.sendPacket(AckPacket(packet.flags, packet.sequenceNumber))
viewModel.packetsSent += 1
if (packet.flags and Packet.LAST_FLAG != 0) {
Log.info("received last packet")
done.countDown()
}
}
}
@@ -14,6 +14,7 @@
package com.github.google.bumble.btbench
import java.util.concurrent.CountDownLatch
import java.util.logging.Logger
import kotlin.time.DurationUnit
import kotlin.time.TimeSource
@@ -24,6 +25,7 @@ class Receiver(private val viewModel: AppViewModel, private val packetIO: Packet
private var startTime: TimeSource.Monotonic.ValueTimeMark = TimeSource.Monotonic.markNow()
private var lastPacketTime: TimeSource.Monotonic.ValueTimeMark = TimeSource.Monotonic.markNow()
private var bytesReceived = 0
private val done = CountDownLatch(1)
init {
packetIO.packetSink = this
@@ -31,6 +33,7 @@ class Receiver(private val viewModel: AppViewModel, private val packetIO: Packet
override fun run() {
viewModel.clear()
done.await()
}
override fun abort() {}
@@ -62,6 +65,7 @@ class Receiver(private val viewModel: AppViewModel, private val packetIO: Packet
Log.info("throughput: $throughput")
viewModel.throughput = throughput
packetIO.sendPacket(AckPacket(packet.flags, packet.sequenceNumber))
done.countDown()
}
}
}
@@ -16,11 +16,10 @@ package com.github.google.bumble.btbench
import java.util.concurrent.Semaphore
import java.util.logging.Logger
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.DurationUnit
import kotlin.time.TimeSource
private const val DEFAULT_STARTUP_DELAY = 3000
private val Log = Logger.getLogger("btbench.sender")
class Sender(private val viewModel: AppViewModel, private val packetIO: PacketIO) : IoClient,
@@ -36,8 +35,8 @@ class Sender(private val viewModel: AppViewModel, private val packetIO: PacketIO
override fun run() {
viewModel.clear()
Log.info("startup delay: $DEFAULT_STARTUP_DELAY")
Thread.sleep(DEFAULT_STARTUP_DELAY.toLong());
Log.info("startup delay: ${viewModel.startupDelay}")
Thread.sleep(viewModel.startupDelay.toLong());
Log.info("running")
Log.info("sending reset")
@@ -47,20 +46,32 @@ class Sender(private val viewModel: AppViewModel, private val packetIO: PacketIO
val packetCount = viewModel.senderPacketCount
val packetSize = viewModel.senderPacketSize
for (i in 0..<packetCount - 1) {
packetIO.sendPacket(SequencePacket(0, i, ByteArray(packetSize - 6)))
for (i in 0..<packetCount) {
var now = TimeSource.Monotonic.markNow()
if (viewModel.senderPacketInterval > 0) {
val targetTime = startTime + (i * viewModel.senderPacketInterval).milliseconds
val delay = targetTime - now
if (delay.isPositive()) {
Log.info("sleeping ${delay.inWholeMilliseconds} ms")
Thread.sleep(delay.inWholeMilliseconds)
}
now = TimeSource.Monotonic.markNow()
}
val flags = when (i) {
packetCount - 1 -> Packet.LAST_FLAG
else -> 0
}
packetIO.sendPacket(
SequencePacket(
flags,
i,
(now - startTime).inWholeMicroseconds.toInt(),
ByteArray(packetSize - 10)
)
)
bytesSent += packetSize
viewModel.packetsSent = i + 1
}
packetIO.sendPacket(
SequencePacket(
Packet.LAST_FLAG,
packetCount - 1,
ByteArray(packetSize - 6)
)
)
bytesSent += packetSize
viewModel.packetsSent = packetCount
// Wait for the ACK
Log.info("waiting for ACK")
+3 -3
View File
@@ -57,7 +57,7 @@ development = [
]
avatar = [
"pandora-avatar == 0.0.10",
"rootcanal == 1.10.0 ; python_version>='3.10'",
"rootcanal == 1.11.1 ; python_version>='3.10'",
]
pandora = ["bt-test-interfaces >= 0.0.6"]
documentation = [
@@ -66,7 +66,7 @@ documentation = [
"mkdocstrings[python] >= 0.27.0",
]
auracast = [
"lc3py ; python_version>='3.10' and platform_system=='Linux' and platform_machine=='x86_64'",
"lc3py >= 1.1.3; python_version>='3.10' and ((platform_system=='Linux' and platform_machine=='x86_64') or (platform_system=='Darwin' and platform_machine=='arm64'))",
"sounddevice >= 0.5.1",
]
@@ -155,7 +155,7 @@ disable = [
]
[tool.pylint.main]
ignore = "pandora" # FIXME: pylint does not support stubs yet:
ignore=["pandora", "mobly"] # FIXME: pylint does not support stubs yet
[tool.pylint.typecheck]
signature-mutators = "AsyncRunner.run_in_task"
+65
View File
@@ -69,3 +69,68 @@ To regenerate the assigned number tables based on the Python codebase:
```
PYTHONPATH=.. cargo run --bin gen-assigned-numbers --features dev-tools
```
## HCI packets
Sending a command packet from a device is composed to of two major steps.
There are more generalized ways of dealing with packets in other scenarios.
### Construct the command
Pick a command from `src/internal/hci/packets.pdl` and construct its associated "builder" struct.
```rust
// The "LE Set Scan Enable" command can be found in the Core Bluetooth Spec.
// It can also be found in `packets.pdl` as `packet LeSetScanEnable : Command`
fn main() {
let device = init_device_as_desired();
let le_set_scan_enable_command_builder = LeSetScanEnableBuilder {
filter_duplicates: Enable::Disabled,
le_scan_enable: Enable::Enabled,
};
}
```
### Send the command and interpret the event response
Send the command from an initialized device, and then receive the response.
```rust
fn main() {
// ...
// `check_result` to false to receive the event response even if the controller returns a failure code
let event = device.send_command(le_set_scan_enable_command_builder.into(), /*check_result*/ false);
// Coerce the event into the expected format. A `Command` should have an associated event response
// "<command name>Complete".
let le_set_scan_enable_complete_event: LeSetScanEnableComplete = event.try_into().unwrap();
}
```
### Generic packet handling
At the very least, you should expect to at least know _which_ kind of base packet you are dealing with. Base packets in
`packets.pdl` can be identified because they do not extend any other packet. They are easily found with the regex:
`^packet [^:]* \{`. For Bluetooth LE (BLE) HCI, one should find some kind of header preceding the packet with the purpose of
packet disambiguation. We do some of that disambiguation for H4 BLE packets using the `WithPacketHeader` trait at `internal/hci/mod.rs`.
Say you've identified a series of bytes that are certainly an `Acl` packet. They can be parsed using the `Acl` struct.
```rust
fn main() {
let bytes = bytes_that_are_certainly_acl();
let acl_packet = Acl::parse(bytes).unwrap();
}
```
Since you don't yet know what kind of `Acl` packet it is, you need to specialize it and then handle the various
potential cases.
```rust
fn main() {
// ...
match acl_packet.specialize() {
Payload(bytes) => do_something(bytes),
None => do_something_else(),
}
}
```
Some packets may yet further embed other packets, in which case you may need to further specialize until no more
specialization is needed.
-1
View File
@@ -25,7 +25,6 @@ use clap::Parser as _;
use pyo3::PyResult;
use rand::Rng;
use std::path;
#[pyo3_asyncio::tokio::main]
async fn main() -> PyResult<()> {
env_logger::builder()
+23 -1
View File
@@ -28,7 +28,7 @@ use bumble::wrapper::{
};
use pyo3::{
exceptions::PyException,
{PyErr, PyResult},
FromPyObject, IntoPy, Python, {PyErr, PyResult},
};
#[pyo3_asyncio::tokio::test]
@@ -78,6 +78,28 @@ async fn test_hci_roundtrip_success_and_failure() -> PyResult<()> {
Ok(())
}
#[pyo3_asyncio::tokio::test]
fn valid_error_code_extraction_succeeds() -> PyResult<()> {
let error_code = Python::with_gil(|py| {
let python_error_code_success = 0x00_u8.into_py(py);
ErrorCode::extract(python_error_code_success.as_ref(py))
})?;
assert_eq!(ErrorCode::Success, error_code);
Ok(())
}
#[pyo3_asyncio::tokio::test]
fn invalid_error_code_extraction_fails() -> PyResult<()> {
let failed_extraction = Python::with_gil(|py| {
let python_invalid_error_code = 0xFE_u8.into_py(py);
ErrorCode::extract(python_invalid_error_code.as_ref(py))
});
assert!(failed_extraction.is_err());
Ok(())
}
async fn create_local_device(address: Address) -> PyResult<Device> {
let link = Link::new_local_link()?;
let controller = Controller::new("C1", None, None, Some(link), Some(address.clone())).await?;
+5 -1
View File
@@ -178,7 +178,11 @@ impl IntoPy<PyObject> for AddressType {
impl<'source> FromPyObject<'source> for ErrorCode {
fn extract(ob: &'source PyAny) -> PyResult<Self> {
ob.extract()
// Bumble represents error codes simply as a single-byte number (in Rust, u8)
let value: u8 = ob.extract()?;
ErrorCode::try_from(value).map_err(|b| {
PyErr::new::<PyException, _>(format!("Failed to map {b} to an error code"))
})
}
}
+44 -27
View File
@@ -24,7 +24,6 @@ import pytest
from bumble.core import (
BT_BR_EDR_TRANSPORT,
BT_LE_TRANSPORT,
BT_PERIPHERAL_ROLE,
ConnectionParameters,
)
from bumble.device import (
@@ -43,6 +42,7 @@ from bumble.hci import (
HCI_CONNECTION_FAILED_TO_BE_ESTABLISHED_ERROR,
Address,
OwnAddressType,
Role,
HCI_Command_Complete_Event,
HCI_Command_Status_Event,
HCI_Connection_Complete_Event,
@@ -50,12 +50,7 @@ from bumble.hci import (
HCI_Error,
HCI_Packet,
)
from bumble.gatt import (
GATT_GENERIC_ACCESS_SERVICE,
GATT_CHARACTERISTIC_ATTRIBUTE_TYPE,
GATT_DEVICE_NAME_CHARACTERISTIC,
GATT_APPEARANCE_CHARACTERISTIC,
)
from bumble import gatt
from .test_utils import TwoDevices, async_barrier
@@ -300,7 +295,7 @@ async def test_legacy_advertising_disconnection(auto_restart):
peer_address,
None,
None,
BT_PERIPHERAL_ROLE,
Role.PERIPHERAL,
ConnectionParameters(0, 0, 0),
)
@@ -358,7 +353,7 @@ async def test_extended_advertising_connection(own_address_type):
peer_address,
None,
None,
BT_PERIPHERAL_ROLE,
Role.PERIPHERAL,
ConnectionParameters(0, 0, 0),
)
device.on_advertising_set_termination(
@@ -402,7 +397,7 @@ async def test_extended_advertising_connection_out_of_order(own_address_type):
Address('F0:F1:F2:F3:F4:F5'),
None,
None,
BT_PERIPHERAL_ROLE,
Role.PERIPHERAL,
ConnectionParameters(0, 0, 0),
)
@@ -592,32 +587,54 @@ async def test_power_on_default_static_address_should_not_be_any():
# -----------------------------------------------------------------------------
def test_gatt_services_with_gas():
def test_gatt_services_with_gas_and_gatt():
device = Device(host=Host(None, None))
# there should be one service and two chars, therefore 5 attributes
assert len(device.gatt_server.attributes) == 5
assert device.gatt_server.attributes[0].uuid == GATT_GENERIC_ACCESS_SERVICE
assert device.gatt_server.attributes[1].type == GATT_CHARACTERISTIC_ATTRIBUTE_TYPE
assert device.gatt_server.attributes[2].uuid == GATT_DEVICE_NAME_CHARACTERISTIC
assert device.gatt_server.attributes[3].type == GATT_CHARACTERISTIC_ATTRIBUTE_TYPE
assert device.gatt_server.attributes[4].uuid == GATT_APPEARANCE_CHARACTERISTIC
# there should be 2 service, 5 chars, and 1 descriptors, therefore 13 attributes
assert len(device.gatt_server.attributes) == 13
assert device.gatt_server.attributes[0].uuid == gatt.GATT_GENERIC_ACCESS_SERVICE
assert (
device.gatt_server.attributes[1].type == gatt.GATT_CHARACTERISTIC_ATTRIBUTE_TYPE
)
assert device.gatt_server.attributes[2].uuid == gatt.GATT_DEVICE_NAME_CHARACTERISTIC
assert (
device.gatt_server.attributes[3].type == gatt.GATT_CHARACTERISTIC_ATTRIBUTE_TYPE
)
assert device.gatt_server.attributes[4].uuid == gatt.GATT_APPEARANCE_CHARACTERISTIC
# -----------------------------------------------------------------------------
def test_gatt_services_without_gas():
device = Device(host=Host(None, None), generic_access_service=False)
# there should be no services
assert len(device.gatt_server.attributes) == 0
assert device.gatt_server.attributes[5].uuid == gatt.GATT_GENERIC_ATTRIBUTE_SERVICE
assert (
device.gatt_server.attributes[6].type == gatt.GATT_CHARACTERISTIC_ATTRIBUTE_TYPE
)
assert (
device.gatt_server.attributes[7].uuid
== gatt.GATT_SERVICE_CHANGED_CHARACTERISTIC
)
assert (
device.gatt_server.attributes[8].type
== gatt.GATT_CLIENT_CHARACTERISTIC_CONFIGURATION_DESCRIPTOR
)
assert (
device.gatt_server.attributes[9].type == gatt.GATT_CHARACTERISTIC_ATTRIBUTE_TYPE
)
assert (
device.gatt_server.attributes[10].uuid
== gatt.GATT_CLIENT_SUPPORTED_FEATURES_CHARACTERISTIC
)
assert (
device.gatt_server.attributes[11].type
== gatt.GATT_CHARACTERISTIC_ATTRIBUTE_TYPE
)
assert (
device.gatt_server.attributes[12].uuid == gatt.GATT_DATABASE_HASH_CHARACTERISTIC
)
# -----------------------------------------------------------------------------
async def run_test_device():
await test_device_connect_parallel()
await test_flush()
await test_gatt_services_with_gas()
await test_gatt_services_without_gas()
await test_gatt_services_with_gas_and_gatt()
# -----------------------------------------------------------------------------
+143
View File
@@ -0,0 +1,143 @@
# Copyright 2021-2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import pytest
from . import test_utils
from bumble import device
from bumble import gatt
from bumble.profiles import gatt_service
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_database_hash():
devices = await test_utils.TwoDevices.create_with_connection()
devices[0].gatt_server.services.clear()
devices[0].gatt_server.attributes.clear()
devices[0].gatt_server.attributes_by_handle.clear()
devices[0].add_service(
gatt.Service(
gatt.GATT_GENERIC_ACCESS_SERVICE,
characteristics=[
gatt.Characteristic(
gatt.GATT_DEVICE_NAME_CHARACTERISTIC,
(
gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.WRITE
),
gatt.Characteristic.Permissions.READ_REQUIRES_AUTHENTICATION,
),
gatt.Characteristic(
gatt.GATT_APPEARANCE_CHARACTERISTIC,
gatt.Characteristic.Properties.READ,
gatt.Characteristic.Permissions.READ_REQUIRES_AUTHENTICATION,
),
],
)
)
devices[0].add_service(
gatt_service.GenericAttributeProfileService(
server_supported_features=None,
database_hash_enabled=True,
service_change_enabled=True,
)
)
devices[0].gatt_server.add_attribute(
gatt.Service(gatt.GATT_GLUCOSE_SERVICE, characteristics=[])
)
# There is a special attribute order in the spec, so we need to add attribute one by
# one here.
battery_service = gatt.Service(
gatt.GATT_BATTERY_SERVICE,
characteristics=[
gatt.Characteristic(
gatt.GATT_BATTERY_LEVEL_CHARACTERISTIC,
properties=gatt.Characteristic.Properties.READ,
permissions=gatt.Characteristic.Permissions.READ_REQUIRES_AUTHENTICATION,
)
],
primary=False,
)
battery_service.handle = 0x0014
battery_service.end_group_handle = 0x0016
devices[0].gatt_server.add_attribute(
gatt.IncludedServiceDeclaration(battery_service)
)
c = gatt.Characteristic(
'2A18',
properties=(
gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.INDICATE
| gatt.Characteristic.Properties.EXTENDED_PROPERTIES
),
permissions=gatt.Characteristic.Permissions.READ_REQUIRES_AUTHENTICATION,
)
devices[0].gatt_server.add_attribute(
gatt.CharacteristicDeclaration(c, devices[0].gatt_server.next_handle() + 1)
)
devices[0].gatt_server.add_attribute(c)
devices[0].gatt_server.add_attribute(
gatt.Descriptor(
gatt.GATT_CLIENT_CHARACTERISTIC_CONFIGURATION_DESCRIPTOR,
gatt.Descriptor.Permissions.READ_REQUIRES_AUTHENTICATION,
b'\x02\x00',
),
)
devices[0].gatt_server.add_attribute(
gatt.Descriptor(
gatt.GATT_CHARACTERISTIC_EXTENDED_PROPERTIES_DESCRIPTOR,
gatt.Descriptor.Permissions.READ_REQUIRES_AUTHENTICATION,
b'\x00\x00',
),
)
devices[0].add_service(battery_service)
peer = device.Peer(devices.connections[1])
client = await peer.discover_service_and_create_proxy(
gatt_service.GenericAttributeProfileServiceProxy
)
assert client.database_hash_characteristic
assert await client.database_hash_characteristic.read_value() == bytes.fromhex(
'F1CA2D48ECF58BAC8A8830BBB9FBA990'
)
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_service_changed():
devices = await test_utils.TwoDevices.create_with_connection()
assert (service := devices[0].gatt_service)
peer = device.Peer(devices.connections[1])
assert (
client := await peer.discover_service_and_create_proxy(
gatt_service.GenericAttributeProfileServiceProxy
)
)
assert client.service_changed_characteristic
indications = []
await client.service_changed_characteristic.subscribe(
indications.append, prefer_notify=False
)
await devices[0].indicate_subscribers(
service.service_changed_characteristic, b'1234'
)
await test_utils.async_barrier()
assert indications[0] == b'1234'
+206 -30
View File
@@ -17,32 +17,43 @@
# -----------------------------------------------------------------------------
from __future__ import annotations
import asyncio
import enum
import logging
import os
import struct
import pytest
from typing import Any
from typing_extensions import Self
from unittest.mock import AsyncMock, Mock, ANY
from bumble.controller import Controller
from bumble.gatt_client import CharacteristicProxy
from bumble.link import LocalLink
from bumble.device import Device, Peer
from bumble.host import Host
from bumble.gatt import (
GATT_BATTERY_LEVEL_CHARACTERISTIC,
GATT_CLIENT_CHARACTERISTIC_CONFIGURATION_DESCRIPTOR,
CharacteristicAdapter,
SerializableCharacteristicAdapter,
DelegatedCharacteristicAdapter,
PackedCharacteristicAdapter,
MappedCharacteristicAdapter,
UTF8CharacteristicAdapter,
Service,
Characteristic,
CharacteristicValue,
Descriptor,
)
from bumble.gatt_client import CharacteristicProxy
from bumble.gatt_adapters import (
CharacteristicProxyAdapter,
SerializableCharacteristicAdapter,
SerializableCharacteristicProxyAdapter,
DelegatedCharacteristicAdapter,
DelegatedCharacteristicProxyAdapter,
PackedCharacteristicAdapter,
PackedCharacteristicProxyAdapter,
MappedCharacteristicAdapter,
MappedCharacteristicProxyAdapter,
UTF8CharacteristicAdapter,
UTF8CharacteristicProxyAdapter,
EnumCharacteristicAdapter,
EnumCharacteristicProxyAdapter,
)
from bumble.transport import AsyncPipeSink
from bumble.core import UUID
from bumble.att import (
@@ -199,7 +210,7 @@ async def test_characteristic_encoding():
await async_barrier()
assert characteristic.value == bytes([125])
cd = DelegatedCharacteristicAdapter(c, encode=lambda x: bytes([x // 2]))
cd = DelegatedCharacteristicProxyAdapter(c, encode=lambda x: bytes([x // 2]))
await cd.write_value(100, with_response=True)
await async_barrier()
assert characteristic.value == bytes([50])
@@ -207,7 +218,7 @@ async def test_characteristic_encoding():
c2 = peer.get_characteristics_by_uuid(async_characteristic.uuid)
assert len(c2) == 1
c2 = c2[0]
cd2 = PackedCharacteristicAdapter(c2, ">I")
cd2 = PackedCharacteristicProxyAdapter(c2, ">I")
cd2v = await cd2.read_value()
assert cd2v == 0x05060708
@@ -249,7 +260,7 @@ async def test_characteristic_encoding():
await async_barrier()
assert last_change is None
cd = DelegatedCharacteristicAdapter(c, decode=lambda x: x[0])
cd = DelegatedCharacteristicProxyAdapter(c, decode=lambda x: x[0])
await cd.subscribe(on_change)
await server.notify_subscribers(characteristic)
await async_barrier()
@@ -314,21 +325,16 @@ async def test_attribute_getters():
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_CharacteristicAdapter() -> None:
# Check that the CharacteristicAdapter base class is transparent
v = bytes([1, 2, 3])
c = Characteristic(
c: Characteristic[Any] = Characteristic(
GATT_BATTERY_LEVEL_CHARACTERISTIC,
Characteristic.Properties.READ,
Characteristic.READABLE,
v,
)
a = CharacteristicAdapter(c)
value = await a.read_value(None)
assert value == v
v = bytes([3, 4, 5])
await a.write_value(None, v)
await c.write_value(None, v)
assert c.value == v
# Simple delegated adapter
@@ -415,11 +421,171 @@ async def test_CharacteristicAdapter() -> None:
class_read_value = await class_c.read_value(None)
assert class_read_value == class_value_bytes
c.value = b''
class_c.value = b''
await class_c.write_value(None, class_value_bytes)
assert isinstance(c.value, BlaBla)
assert c.value.a == 3
assert c.value.b == 4
assert isinstance(class_c.value, BlaBla)
assert class_c.value.a == class_value.a
assert class_c.value.b == class_value.b
# Enum adapter
class MyEnum(enum.IntEnum):
ENUM_1 = 1234
ENUM_2 = 5678
enum_value = MyEnum.ENUM_2
enum_value_bytes = int(enum_value).to_bytes(3, 'big')
c.value = enum_value
enum_c = EnumCharacteristicAdapter(c, MyEnum, 3, 'big')
enum_read_value = await enum_c.read_value(None)
assert enum_read_value == enum_value_bytes
enum_c.value = b''
await enum_c.write_value(None, enum_value_bytes)
assert isinstance(enum_c.value, MyEnum)
assert enum_c.value == enum_value
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_CharacteristicProxyAdapter() -> None:
class Client:
def __init__(self, value):
self.value = value
async def read_value(self, handle, no_long_read=False) -> bytes:
return self.value
async def write_value(self, handle, value, with_response=False):
self.value = value
class TestAttributeProxy(CharacteristicProxy):
def __init__(self, value) -> None:
super().__init__(Client(value), 0, 0, None, 0) # type: ignore
@property
def value(self):
return self.client.value
@value.setter
def value(self, value):
self.client.value = value
v = bytes([1, 2, 3])
c = TestAttributeProxy(v)
a: CharacteristicProxyAdapter = CharacteristicProxyAdapter(c)
value = await a.read_value()
assert value == v
v = bytes([3, 4, 5])
await a.write_value(v)
assert c.value == v
# Simple delegated adapter
delegated = DelegatedCharacteristicProxyAdapter(
c, lambda x: bytes(reversed(x)), lambda x: bytes(reversed(x))
)
delegated_value = await delegated.read_value()
assert delegated_value == bytes(reversed(v))
delegated_value2 = bytes([3, 4, 5])
await delegated.write_value(delegated_value2)
assert c.value == bytes(reversed(delegated_value2))
# Packed adapter with single element format
packed_value_ref = 1234
packed_value_bytes = struct.pack('>H', packed_value_ref)
c.value = packed_value_bytes
packed = PackedCharacteristicProxyAdapter(c, '>H')
packed_value_read = await packed.read_value()
assert packed_value_read == packed_value_ref
c.value = None
await packed.write_value(packed_value_ref)
assert c.value == packed_value_bytes
# Packed adapter with multi-element format
v1 = 1234
v2 = 5678
packed_multi_value_bytes = struct.pack('>HH', v1, v2)
c.value = packed_multi_value_bytes
packed_multi = PackedCharacteristicProxyAdapter(c, '>HH')
packed_multi_read_value = await packed_multi.read_value()
assert packed_multi_read_value == (v1, v2)
c.value = b''
await packed_multi.write_value((v1, v2))
assert c.value == packed_multi_value_bytes
# Mapped adapter
v1 = 1234
v2 = 5678
packed_mapped_value_bytes = struct.pack('>HH', v1, v2)
mapped = {'v1': v1, 'v2': v2}
c.value = packed_mapped_value_bytes
packed_mapped = MappedCharacteristicProxyAdapter(c, '>HH', ('v1', 'v2'))
packed_mapped_read_value = await packed_mapped.read_value()
assert packed_mapped_read_value == mapped
c.value = b''
await packed_mapped.write_value(mapped)
assert c.value == packed_mapped_value_bytes
# UTF-8 adapter
string_value = 'Hello π'
string_value_bytes = string_value.encode('utf-8')
c.value = string_value_bytes
string_c = UTF8CharacteristicProxyAdapter(c)
string_read_value = await string_c.read_value()
assert string_read_value == string_value
c.value = b''
await string_c.write_value(string_value)
assert c.value == string_value_bytes
# Class adapter
class BlaBla:
def __init__(self, a: int, b: int) -> None:
self.a = a
self.b = b
@classmethod
def from_bytes(cls, data: bytes) -> Self:
a, b = struct.unpack(">II", data)
return cls(a, b)
def __bytes__(self) -> bytes:
return struct.pack(">II", self.a, self.b)
class_value = BlaBla(3, 4)
class_value_bytes = struct.pack(">II", 3, 4)
c.value = class_value_bytes
class_c = SerializableCharacteristicProxyAdapter(c, BlaBla)
class_read_value = await class_c.read_value()
assert isinstance(class_read_value, BlaBla)
assert class_read_value.a == class_value.a
assert class_read_value.b == class_value.b
c.value = b''
await class_c.write_value(class_value)
assert c.value == class_value_bytes
# Enum adapter
class MyEnum(enum.IntEnum):
ENUM_1 = 1234
ENUM_2 = 5678
enum_value = MyEnum.ENUM_1
enum_value_bytes = int(enum_value).to_bytes(3, 'little')
c.value = enum_value_bytes
enum_c = EnumCharacteristicProxyAdapter(c, MyEnum, 3)
enum_read_value = await enum_c.read_value()
assert isinstance(enum_read_value, MyEnum)
assert enum_read_value == enum_value
c.value = b''
await enum_c.write_value(enum_value)
assert c.value == enum_value_bytes
# -----------------------------------------------------------------------------
@@ -601,7 +767,7 @@ async def test_read_write2():
v1 = await c1.read_value()
assert v1 == v
a1 = PackedCharacteristicAdapter(c1, '>I')
a1 = PackedCharacteristicProxyAdapter(c1, '>I')
v1 = await a1.read_value()
assert v1 == struct.unpack('>I', v)[0]
@@ -957,11 +1123,12 @@ async def test_discover_all():
peer = Peer(connection)
await peer.discover_all()
assert len(peer.gatt_client.services) == 3
# service 1800 gets added automatically
assert len(peer.gatt_client.services) == 4
# service 1800 and 1801 get added automatically
assert peer.gatt_client.services[0].uuid == UUID('1800')
assert peer.gatt_client.services[1].uuid == service1.uuid
assert peer.gatt_client.services[2].uuid == service2.uuid
assert peer.gatt_client.services[1].uuid == UUID('1801')
assert peer.gatt_client.services[2].uuid == service1.uuid
assert peer.gatt_client.services[3].uuid == service2.uuid
s = peer.get_services_by_uuid(service1.uuid)
assert len(s) == 1
assert len(s[0].characteristics) == 2
@@ -1084,10 +1251,18 @@ CharacteristicDeclaration(handle=0x0002, value_handle=0x0003, uuid=UUID-16:2A00
Characteristic(handle=0x0003, end=0x0003, uuid=UUID-16:2A00 (Device Name), READ)
CharacteristicDeclaration(handle=0x0004, value_handle=0x0005, uuid=UUID-16:2A01 (Appearance), READ)
Characteristic(handle=0x0005, end=0x0005, uuid=UUID-16:2A01 (Appearance), READ)
Service(handle=0x0006, end=0x0009, uuid=3A657F47-D34F-46B3-B1EC-698E29B6B829)
CharacteristicDeclaration(handle=0x0007, value_handle=0x0008, uuid=FDB159DB-036C-49E3-B3DB-6325AC750806, READ|WRITE|NOTIFY)
Characteristic(handle=0x0008, end=0x0009, uuid=FDB159DB-036C-49E3-B3DB-6325AC750806, READ|WRITE|NOTIFY)
Descriptor(handle=0x0009, type=UUID-16:2902 (Client Characteristic Configuration), value=0000)"""
Service(handle=0x0006, end=0x000D, uuid=UUID-16:1801 (Generic Attribute))
CharacteristicDeclaration(handle=0x0007, value_handle=0x0008, uuid=UUID-16:2A05 (Service Changed), INDICATE)
Characteristic(handle=0x0008, end=0x0009, uuid=UUID-16:2A05 (Service Changed), INDICATE)
Descriptor(handle=0x0009, type=UUID-16:2902 (Client Characteristic Configuration), value=0000)
CharacteristicDeclaration(handle=0x000A, value_handle=0x000B, uuid=UUID-16:2B29 (Client Supported Features), READ|WRITE)
Characteristic(handle=0x000B, end=0x000B, uuid=UUID-16:2B29 (Client Supported Features), READ|WRITE)
CharacteristicDeclaration(handle=0x000C, value_handle=0x000D, uuid=UUID-16:2B2A (Database Hash), READ)
Characteristic(handle=0x000D, end=0x000D, uuid=UUID-16:2B2A (Database Hash), READ)
Service(handle=0x000E, end=0x0011, uuid=3A657F47-D34F-46B3-B1EC-698E29B6B829)
CharacteristicDeclaration(handle=0x000F, value_handle=0x0010, uuid=FDB159DB-036C-49E3-B3DB-6325AC750806, READ|WRITE|NOTIFY)
Characteristic(handle=0x0010, end=0x0011, uuid=FDB159DB-036C-49E3-B3DB-6325AC750806, READ|WRITE|NOTIFY)
Descriptor(handle=0x0011, type=UUID-16:2902 (Client Characteristic Configuration), value=0000)"""
)
@@ -1105,6 +1280,7 @@ async def async_main():
await test_CharacteristicValue()
await test_CharacteristicValue_async()
await test_CharacteristicAdapter()
await test_CharacteristicProxyAdapter()
# -----------------------------------------------------------------------------
+4
View File
@@ -78,7 +78,11 @@ async def test_init_service(gmap_client: GamingAudioServiceProxy):
| GmapRole.BROADCAST_GAME_RECEIVER
| GmapRole.BROADCAST_GAME_SENDER
)
assert gmap_client.ugg_features is not None
assert await gmap_client.ugg_features.read_value() == UggFeatures.UGG_MULTISINK
assert gmap_client.ugt_features is not None
assert await gmap_client.ugt_features.read_value() == UgtFeatures.UGT_SOURCE
assert gmap_client.bgr_features is not None
assert await gmap_client.bgr_features.read_value() == BgrFeatures.BGR_MULTISINK
assert gmap_client.bgs_features is not None
assert await gmap_client.bgs_features.read_value() == BgsFeatures.BGS_96_KBPS
+3 -2
View File
@@ -24,7 +24,7 @@ import pytest
from unittest.mock import AsyncMock, MagicMock, patch
from bumble.controller import Controller
from bumble.core import BT_BR_EDR_TRANSPORT, BT_PERIPHERAL_ROLE, BT_CENTRAL_ROLE
from bumble.core import BT_BR_EDR_TRANSPORT, BT_LE_TRANSPORT
from bumble.link import LocalLink
from bumble.device import Device, Peer
from bumble.host import Host
@@ -39,6 +39,7 @@ from bumble.smp import (
)
from bumble.core import ProtocolError
from bumble.keys import PairingKeys
from bumble.hci import Role
# -----------------------------------------------------------------------------
@@ -111,7 +112,7 @@ async def test_self_connection():
@pytest.mark.asyncio
@pytest.mark.parametrize(
'responder_role,',
(BT_CENTRAL_ROLE, BT_PERIPHERAL_ROLE),
(Role.CENTRAL, Role.PERIPHERAL),
)
async def test_self_classic_connection(responder_role):
# Create two devices, each with a controller, attached to the same link