Merge pull request #643 from google/gbg/auracast-audio-io

auracast audio io
This commit is contained in:
Gilles Boccon-Gibod
2025-02-08 18:19:24 -05:00
committed by GitHub
27 changed files with 1388 additions and 414 deletions

View File

@@ -18,14 +18,23 @@
from __future__ import annotations from __future__ import annotations
import asyncio import asyncio
import asyncio.subprocess
import collections
import contextlib import contextlib
import dataclasses import dataclasses
import functools import functools
import logging import logging
import os import os
import wave import struct
import itertools from typing import (
from typing import cast, Any, AsyncGenerator, Coroutine, Dict, Optional, Tuple cast,
Any,
AsyncGenerator,
Coroutine,
Deque,
Optional,
Tuple,
)
import click import click
import pyee import pyee
@@ -33,8 +42,11 @@ import pyee
try: try:
import lc3 # type: ignore # pylint: disable=E0401 import lc3 # type: ignore # pylint: disable=E0401
except ImportError as e: except ImportError as e:
raise ImportError("Try `python -m pip install \".[lc3]\"`.") from e raise ImportError(
"Try `python -m pip install \"git+https://github.com/google/liblc3.git\"`."
) from e
from bumble.audio import io as audio_io
from bumble.colors import color from bumble.colors import color
from bumble import company_ids from bumble import company_ids
from bumble import core from bumble import core
@@ -48,7 +60,6 @@ import bumble.device
import bumble.transport import bumble.transport
import bumble.utils import bumble.utils
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# Logging # Logging
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -62,6 +73,31 @@ AURACAST_DEFAULT_DEVICE_NAME = 'Bumble Auracast'
AURACAST_DEFAULT_DEVICE_ADDRESS = hci.Address('F0:F1:F2:F3:F4:F5') AURACAST_DEFAULT_DEVICE_ADDRESS = hci.Address('F0:F1:F2:F3:F4:F5')
AURACAST_DEFAULT_SYNC_TIMEOUT = 5.0 AURACAST_DEFAULT_SYNC_TIMEOUT = 5.0
AURACAST_DEFAULT_ATT_MTU = 256 AURACAST_DEFAULT_ATT_MTU = 256
AURACAST_DEFAULT_FRAME_DURATION = 10000
AURACAST_DEFAULT_SAMPLE_RATE = 48000
AURACAST_DEFAULT_TRANSMIT_BITRATE = 80000
# -----------------------------------------------------------------------------
# Utils
# -----------------------------------------------------------------------------
def codec_config_string(
codec_config: bap.CodecSpecificConfiguration, indent: str
) -> str:
lines = []
if codec_config.sampling_frequency is not None:
lines.append(f'Sampling Frequency: {codec_config.sampling_frequency.hz} hz')
if codec_config.frame_duration is not None:
lines.append(f'Frame Duration: {codec_config.frame_duration.us} µs')
if codec_config.octets_per_codec_frame is not None:
lines.append(f'Frame Size: {codec_config.octets_per_codec_frame} bytes')
if codec_config.codec_frames_per_sdu is not None:
lines.append(f'Frames Per SDU: {codec_config.codec_frames_per_sdu}')
if codec_config.audio_channel_allocation is not None:
lines.append(
f'Audio Location: {codec_config.audio_channel_allocation.name}'
)
return '\n'.join(indent + line for line in lines)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -156,18 +192,17 @@ class BroadcastScanner(pyee.EventEmitter):
if self.public_broadcast_announcement: if self.public_broadcast_announcement:
print( print(
f' {color("Features", "cyan")}: ' f' {color("Features", "cyan")}: '
f'{self.public_broadcast_announcement.features}' f'{self.public_broadcast_announcement.features.name}'
)
print(
f' {color("Metadata", "cyan")}: '
f'{self.public_broadcast_announcement.metadata}'
) )
print(f' {color("Metadata", "cyan")}:')
print(self.public_broadcast_announcement.metadata.pretty_print(' '))
if self.basic_audio_announcement: if self.basic_audio_announcement:
print(color(' Audio:', 'cyan')) print(color(' Audio:', 'cyan'))
print( print(
color(' Presentation Delay:', 'magenta'), color(' Presentation Delay:', 'magenta'),
self.basic_audio_announcement.presentation_delay, self.basic_audio_announcement.presentation_delay,
"µs",
) )
for subgroup in self.basic_audio_announcement.subgroups: for subgroup in self.basic_audio_announcement.subgroups:
print(color(' Subgroup:', 'magenta')) print(color(' Subgroup:', 'magenta'))
@@ -184,17 +219,22 @@ class BroadcastScanner(pyee.EventEmitter):
color(' Vendor Specific Codec ID:', 'green'), color(' Vendor Specific Codec ID:', 'green'),
subgroup.codec_id.vendor_specific_codec_id, subgroup.codec_id.vendor_specific_codec_id,
) )
print(color(' Codec Config:', 'yellow'))
print( print(
color(' Codec Config:', 'yellow'), codec_config_string(
subgroup.codec_specific_configuration, subgroup.codec_specific_configuration, ' '
),
) )
print(color(' Metadata: ', 'yellow'), subgroup.metadata) print(color(' Metadata: ', 'yellow'))
print(subgroup.metadata.pretty_print(' '))
for bis in subgroup.bis: for bis in subgroup.bis:
print(color(f' BIS [{bis.index}]:', 'yellow')) print(color(f' BIS [{bis.index}]:', 'yellow'))
print(color(' Codec Config:', 'green'))
print( print(
color(' Codec Config:', 'green'), codec_config_string(
bis.codec_specific_configuration, bis.codec_specific_configuration, ' '
),
) )
if self.biginfo: if self.biginfo:
@@ -494,7 +534,7 @@ async def run_assist(
except core.ProtocolError as error: except core.ProtocolError as error:
print( print(
color( color(
f'!!! Failed to subscribe to Broadcast Receive State characteristic:', '!!! Failed to subscribe to Broadcast Receive State characteristic',
'red', 'red',
), ),
error, error,
@@ -625,11 +665,20 @@ async def run_pair(transport: str, address: str) -> None:
async def run_receive( async def run_receive(
transport: str, transport: str,
broadcast_id: int, broadcast_id: Optional[int],
output: str,
broadcast_code: str | None, broadcast_code: str | None,
sync_timeout: float, sync_timeout: float,
subgroup_index: int, subgroup_index: int,
) -> None: ) -> None:
# Run a pre-flight check for the output.
try:
if not audio_io.check_audio_output(output):
return
except ValueError as error:
print(error)
return
async with create_device(transport) as device: async with create_device(transport) as device:
if not device.supports_le_periodic_advertising: if not device.supports_le_periodic_advertising:
print(color('Periodic advertising not supported', 'red')) print(color('Periodic advertising not supported', 'red'))
@@ -643,7 +692,7 @@ async def run_receive(
def on_new_broadcast(broadcast: BroadcastScanner.Broadcast) -> None: def on_new_broadcast(broadcast: BroadcastScanner.Broadcast) -> None:
if scan_result.done(): if scan_result.done():
return return
if broadcast.broadcast_id == broadcast_id: if broadcast_id is None or broadcast.broadcast_id == broadcast_id:
scan_result.set_result(broadcast) scan_result.set_result(broadcast)
scanner.on('new_broadcast', on_new_broadcast) scanner.on('new_broadcast', on_new_broadcast)
@@ -694,57 +743,95 @@ async def run_receive(
sample_rate_hz=sampling_frequency.hz, sample_rate_hz=sampling_frequency.hz,
num_channels=num_bis, num_channels=num_bis,
) )
sdus = [b''] * num_bis lc3_queues: list[Deque[bytes]] = [collections.deque() for i in range(num_bis)]
subprocess = await asyncio.create_subprocess_shell( packet_stats = [0, 0]
f'stdbuf -i0 ffplay -ar {sampling_frequency.hz} -ac {num_bis} -f f32le pipe:0',
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
for i, bis_link in enumerate(big_sync.bis_links):
print(f'Setup ISO for BIS {bis_link.handle}')
def sink(index: int, packet: hci.HCI_IsoDataPacket): audio_output = await audio_io.create_audio_output(output)
nonlocal sdus # This try should be replaced with contextlib.aclosing() when python 3.9 is no
sdus[index] = packet.iso_sdu_fragment # longer needed.
if all(sdus) and subprocess.stdin: try:
subprocess.stdin.write(decoder.decode(b''.join(sdus)).tobytes()) await audio_output.open(
sdus = [b''] * num_bis audio_io.PcmFormat(
audio_io.PcmFormat.Endianness.LITTLE,
bis_link.sink = functools.partial(sink, i) audio_io.PcmFormat.SampleType.FLOAT32,
await bis_link.setup_data_path( sampling_frequency.hz,
direction=bis_link.Direction.CONTROLLER_TO_HOST num_bis,
)
) )
terminated = asyncio.Event() def sink(queue: Deque[bytes], packet: hci.HCI_IsoDataPacket):
big_sync.on(big_sync.Event.TERMINATION, lambda _: terminated.set()) # TODO: re-assemble fragments and detect errors
await terminated.wait() queue.append(packet.iso_sdu_fragment)
while all(lc3_queues):
# This assumes SDUs contain one LC3 frame each, which may not
# be correct for all cases. TODO: revisit this assumption.
frame = b''.join([lc3_queue.popleft() for lc3_queue in lc3_queues])
if not frame:
print(color('!!! received empty frame', 'red'))
continue
packet_stats[0] += len(frame)
packet_stats[1] += 1
print(
f'\rRECEIVED: {packet_stats[0]} bytes in '
f'{packet_stats[1]} packets',
end='',
)
try:
pcm = decoder.decode(frame).tobytes()
except lc3.BaseError as error:
print(color(f'!!! LC3 decoding error: {error}'))
continue
audio_output.write(pcm)
for i, bis_link in enumerate(big_sync.bis_links):
print(f'Setup ISO for BIS {bis_link.handle}')
bis_link.sink = functools.partial(sink, lc3_queues[i])
await device.send_command(
hci.HCI_LE_Setup_ISO_Data_Path_Command(
connection_handle=bis_link.handle,
data_path_direction=hci.HCI_LE_Setup_ISO_Data_Path_Command.Direction.CONTROLLER_TO_HOST,
data_path_id=0,
codec_id=hci.CodingFormat(codec_id=hci.CodecID.TRANSPARENT),
controller_delay=0,
codec_configuration=b'',
),
check_result=True,
)
terminated = asyncio.Event()
big_sync.on(big_sync.Event.TERMINATION, lambda _: terminated.set())
await terminated.wait()
finally:
await audio_output.aclose()
async def run_broadcast( async def run_transmit(
transport: str, broadcast_id: int, broadcast_code: str | None, wav_file_path: str transport: str,
broadcast_id: int,
broadcast_code: str | None,
broadcast_name: str,
bitrate: int,
manufacturer_data: tuple[int, bytes] | None,
input: str,
input_format: str,
) -> None: ) -> None:
# Run a pre-flight check for the input.
try:
if not audio_io.check_audio_input(input):
return
except ValueError as error:
print(error)
return
async with create_device(transport) as device: async with create_device(transport) as device:
if not device.supports_le_periodic_advertising: if not device.supports_le_periodic_advertising:
print(color('Periodic advertising not supported', 'red')) print(color('Periodic advertising not supported', 'red'))
return return
with wave.open(wav_file_path, 'rb') as wav:
print('Encoding wav file into lc3...')
encoder = lc3.Encoder(
frame_duration_us=10000,
sample_rate_hz=48000,
num_channels=2,
input_sample_rate_hz=wav.getframerate(),
)
frames = list[bytes]()
while pcm := wav.readframes(encoder.get_frame_samples()):
frames.append(
encoder.encode(pcm, num_bytes=200, bit_depth=wav.getsampwidth() * 8)
)
del encoder
print('Encoding complete.')
basic_audio_announcement = bap.BasicAudioAnnouncement( basic_audio_announcement = bap.BasicAudioAnnouncement(
presentation_delay=40000, presentation_delay=40000,
subgroups=[ subgroups=[
@@ -783,7 +870,23 @@ async def run_broadcast(
], ],
) )
broadcast_audio_announcement = bap.BroadcastAudioAnnouncement(broadcast_id) broadcast_audio_announcement = bap.BroadcastAudioAnnouncement(broadcast_id)
print('Start Advertising')
advertising_manufacturer_data = (
b''
if manufacturer_data is None
else bytes(
core.AdvertisingData(
[
(
core.AdvertisingData.MANUFACTURER_SPECIFIC_DATA,
struct.pack('<H', manufacturer_data[0])
+ manufacturer_data[1],
)
]
)
)
)
advertising_set = await device.create_advertising_set( advertising_set = await device.create_advertising_set(
advertising_parameters=bumble.device.AdvertisingParameters( advertising_parameters=bumble.device.AdvertisingParameters(
advertising_event_properties=bumble.device.AdvertisingEventProperties( advertising_event_properties=bumble.device.AdvertisingEventProperties(
@@ -796,9 +899,10 @@ async def run_broadcast(
broadcast_audio_announcement.get_advertising_data() broadcast_audio_announcement.get_advertising_data()
+ bytes( + bytes(
core.AdvertisingData( core.AdvertisingData(
[(core.AdvertisingData.BROADCAST_NAME, b'Bumble Auracast')] [(core.AdvertisingData.BROADCAST_NAME, broadcast_name.encode())]
) )
) )
+ advertising_manufacturer_data
), ),
periodic_advertising_parameters=bumble.device.PeriodicAdvertisingParameters( periodic_advertising_parameters=bumble.device.PeriodicAdvertisingParameters(
periodic_advertising_interval_min=80, periodic_advertising_interval_min=80,
@@ -808,47 +912,83 @@ async def run_broadcast(
auto_restart=True, auto_restart=True,
auto_start=True, auto_start=True,
) )
print('Start Periodic Advertising') print('Start Periodic Advertising')
await advertising_set.start_periodic() await advertising_set.start_periodic()
print('Setup BIG')
big = await device.create_big(
advertising_set,
parameters=bumble.device.BigParameters(
num_bis=2,
sdu_interval=10000,
max_sdu=100,
max_transport_latency=65,
rtn=4,
broadcast_code=(
bytes.fromhex(broadcast_code) if broadcast_code else None
),
),
)
print('Setup ISO Data Path')
def on_flow(packet_queue): audio_input = await audio_io.create_audio_input(input, input_format)
pcm_format = await audio_input.open()
# This try should be replaced with contextlib.aclosing() when python 3.9 is no
# longer needed.
try:
if pcm_format.channels != 2:
print("Only 2 channels PCM configurations are supported")
return
if pcm_format.sample_type == audio_io.PcmFormat.SampleType.INT16:
pcm_bit_depth = 16
elif pcm_format.sample_type == audio_io.PcmFormat.SampleType.FLOAT32:
pcm_bit_depth = None
else:
print("Only INT16 and FLOAT32 sample types are supported")
return
encoder = lc3.Encoder(
frame_duration_us=AURACAST_DEFAULT_FRAME_DURATION,
sample_rate_hz=AURACAST_DEFAULT_SAMPLE_RATE,
num_channels=pcm_format.channels,
input_sample_rate_hz=pcm_format.sample_rate,
)
lc3_frame_samples = encoder.get_frame_samples()
lc3_frame_size = encoder.get_frame_bytes(bitrate)
print( print(
f'\rPACKETS: pending={packet_queue.pending}, ' f'Encoding with {lc3_frame_samples} '
f'queued={packet_queue.queued}, completed={packet_queue.completed}', f'PCM samples per {lc3_frame_size} byte frame'
end='',
) )
packet_queue = None print('Setup BIG')
for bis_link in big.bis_links: big = await device.create_big(
await bis_link.setup_data_path( advertising_set,
direction=bis_link.Direction.HOST_TO_CONTROLLER parameters=bumble.device.BigParameters(
num_bis=pcm_format.channels,
sdu_interval=AURACAST_DEFAULT_FRAME_DURATION,
max_sdu=lc3_frame_size,
max_transport_latency=65,
rtn=4,
broadcast_code=(
bytes.fromhex(broadcast_code) if broadcast_code else None
),
),
) )
if packet_queue is None:
packet_queue = bis_link.data_packet_queue
if packet_queue: iso_queues = [
packet_queue.on('flow', lambda: on_flow(packet_queue)) bumble.device.IsoPacketStream(big.bis_links[0], 64),
bumble.device.IsoPacketStream(big.bis_links[1], 64),
]
for frame in itertools.cycle(frames): def on_flow():
mid = len(frame) // 2 data_packet_queue = iso_queues[0].data_packet_queue
big.bis_links[0].write(frame[:mid]) print(
big.bis_links[1].write(frame[mid:]) f'\rPACKETS: pending={data_packet_queue.pending}, '
await asyncio.sleep(0.009) f'queued={data_packet_queue.queued}, '
f'completed={data_packet_queue.completed}',
end='',
)
iso_queues[0].data_packet_queue.on('flow', on_flow)
frame_count = 0
async for pcm_frame in audio_input.frames(lc3_frame_samples):
lc3_frame = encoder.encode(
pcm_frame, num_bytes=2 * lc3_frame_size, bit_depth=pcm_bit_depth
)
mid = len(lc3_frame) // 2
await iso_queues[0].write(lc3_frame[:mid])
await iso_queues[1].write(lc3_frame[mid:])
frame_count += 1
finally:
await audio_input.aclose()
def run_async(async_command: Coroutine) -> None: def run_async(async_command: Coroutine) -> None:
@@ -917,7 +1057,7 @@ def scan(ctx, filter_duplicates, sync_timeout, transport):
@click.argument('address') @click.argument('address')
@click.pass_context @click.pass_context
def assist(ctx, broadcast_name, source_id, command, transport, address): def assist(ctx, broadcast_name, source_id, command, transport, address):
"""Scan for broadcasts on behalf of a audio server""" """Scan for broadcasts on behalf of an audio server"""
run_async(run_assist(broadcast_name, source_id, command, transport, address)) run_async(run_assist(broadcast_name, source_id, command, transport, address))
@@ -932,7 +1072,24 @@ def pair(ctx, transport, address):
@auracast.command('receive') @auracast.command('receive')
@click.argument('transport') @click.argument('transport')
@click.argument('broadcast_id', type=int) @click.argument(
'broadcast_id',
type=int,
required=False,
)
@click.option(
'--output',
default='device',
help=(
"Audio output. "
"'device' -> use the host's default sound output device, "
"'device:<DEVICE_ID>' -> use one of the host's sound output device "
"(specify 'device:?' to get a list of available sound output devices), "
"'stdout' -> send audio to stdout, "
"'file:<filename> -> write audio to a raw float32 PCM file, "
"'ffplay' -> pipe the audio to ffplay"
),
)
@click.option( @click.option(
'--broadcast-code', '--broadcast-code',
metavar='BROADCAST_CODE', metavar='BROADCAST_CODE',
@@ -954,16 +1111,57 @@ def pair(ctx, transport, address):
help='Index of Subgroup', help='Index of Subgroup',
) )
@click.pass_context @click.pass_context
def receive(ctx, transport, broadcast_id, broadcast_code, sync_timeout, subgroup): def receive(
ctx,
transport,
broadcast_id,
output,
broadcast_code,
sync_timeout,
subgroup,
):
"""Receive a broadcast source""" """Receive a broadcast source"""
run_async( run_async(
run_receive(transport, broadcast_id, broadcast_code, sync_timeout, subgroup) run_receive(
transport,
broadcast_id,
output,
broadcast_code,
sync_timeout,
subgroup,
)
) )
@auracast.command('broadcast') @auracast.command('transmit')
@click.argument('transport') @click.argument('transport')
@click.argument('wav_file_path', type=str) @click.option(
'--input',
required=True,
help=(
"Audio input. "
"'device' -> use the host's default sound input device, "
"'device:<DEVICE_ID>' -> use one of the host's sound input devices "
"(specify 'device:?' to get a list of available sound input devices), "
"'stdin' -> receive audio from stdin as int16 PCM, "
"'file:<filename> -> read audio from a .wav or raw int16 PCM file. "
"(The file: prefix may be omitted if the file path does not start with "
"the substring 'device:' or 'file:' and is not 'stdin')"
),
)
@click.option(
'--input-format',
metavar="FORMAT",
default='auto',
help=(
"Audio input format. "
"Use 'auto' for .wav files, or for the default setting with the devices. "
"For other inputs, the format is specified as "
"<sample-type>,<sample-rate>,<channels> (supported <sample-type>: 'int16le' "
"for 16-bit signed integers with little-endian byte order or 'float32le' for "
"32-bit floating point with little-endian byte order)"
),
)
@click.option( @click.option(
'--broadcast-id', '--broadcast-id',
metavar='BROADCAST_ID', metavar='BROADCAST_ID',
@@ -974,18 +1172,60 @@ def receive(ctx, transport, broadcast_id, broadcast_code, sync_timeout, subgroup
@click.option( @click.option(
'--broadcast-code', '--broadcast-code',
metavar='BROADCAST_CODE', metavar='BROADCAST_CODE',
type=str,
help='Broadcast encryption code in hex format', help='Broadcast encryption code in hex format',
) )
@click.option(
'--broadcast-name',
metavar='BROADCAST_NAME',
default='Bumble Auracast',
help='Broadcast name',
)
@click.option(
'--bitrate',
type=int,
default=AURACAST_DEFAULT_TRANSMIT_BITRATE,
help='Bitrate, per channel, in bps',
)
@click.option(
'--manufacturer-data',
metavar='VENDOR-ID:DATA-HEX',
help='Manufacturer data (specify as <vendor-id>:<data-hex>)',
)
@click.pass_context @click.pass_context
def broadcast(ctx, transport, broadcast_id, broadcast_code, wav_file_path): def transmit(
"""Start a broadcast as a source.""" ctx,
transport,
broadcast_id,
broadcast_code,
manufacturer_data,
broadcast_name,
bitrate,
input,
input_format,
):
"""Transmit a broadcast source"""
if manufacturer_data:
vendor_id_str, data_hex = manufacturer_data.split(':')
vendor_id = int(vendor_id_str)
data = bytes.fromhex(data_hex)
manufacturer_data_tuple = (vendor_id, data)
else:
manufacturer_data_tuple = None
if (input == 'device' or input.startswith('device:')) and input_format == 'auto':
# Use a default format for device inputs
input_format = 'int16le,48000,1'
run_async( run_async(
run_broadcast( run_transmit(
transport=transport, transport=transport,
broadcast_id=broadcast_id, broadcast_id=broadcast_id,
broadcast_code=broadcast_code, broadcast_code=broadcast_code,
wav_file_path=wav_file_path, broadcast_name=broadcast_name,
bitrate=bitrate,
manufacturer_data=manufacturer_data_tuple,
input=input,
input_format=input_format,
) )
) )

View File

@@ -29,7 +29,9 @@ from bumble.gatt import Service
from bumble.profiles.device_information_service import DeviceInformationServiceProxy from bumble.profiles.device_information_service import DeviceInformationServiceProxy
from bumble.profiles.battery_service import BatteryServiceProxy from bumble.profiles.battery_service import BatteryServiceProxy
from bumble.profiles.gap import GenericAccessServiceProxy from bumble.profiles.gap import GenericAccessServiceProxy
from bumble.profiles.pacs import PublishedAudioCapabilitiesServiceProxy
from bumble.profiles.tmap import TelephonyAndMediaAudioServiceProxy from bumble.profiles.tmap import TelephonyAndMediaAudioServiceProxy
from bumble.profiles.vcs import VolumeControlServiceProxy
from bumble.transport import open_transport_or_link from bumble.transport import open_transport_or_link
@@ -126,14 +128,52 @@ async def show_tmas(
print(color('### Telephony And Media Audio Service', 'yellow')) print(color('### Telephony And Media Audio Service', 'yellow'))
if tmas.role: if tmas.role:
print( role = await tmas.role.read_value()
color(' Role:', 'green'), print(color(' Role:', 'green'), role)
await tmas.role.read_value(),
)
print() print()
# -----------------------------------------------------------------------------
async def show_pacs(pacs: PublishedAudioCapabilitiesServiceProxy) -> None:
print(color('### Published Audio Capabilities Service', 'yellow'))
contexts = await pacs.available_audio_contexts.read_value()
print(color(' Available Audio Contexts:', 'green'), contexts)
contexts = await pacs.supported_audio_contexts.read_value()
print(color(' Supported Audio Contexts:', 'green'), contexts)
if pacs.sink_pac:
pac = await pacs.sink_pac.read_value()
print(color(' Sink PAC: ', 'green'), pac)
if pacs.sink_audio_locations:
audio_locations = await pacs.sink_audio_locations.read_value()
print(color(' Sink Audio Locations: ', 'green'), audio_locations)
if pacs.source_pac:
pac = await pacs.source_pac.read_value()
print(color(' Source PAC: ', 'green'), pac)
if pacs.source_audio_locations:
audio_locations = await pacs.source_audio_locations.read_value()
print(color(' Source Audio Locations: ', 'green'), audio_locations)
print()
# -----------------------------------------------------------------------------
async def show_vcs(vcs: VolumeControlServiceProxy) -> None:
print(color('### Volume Control Service', 'yellow'))
volume_state = await vcs.volume_state.read_value()
print(color(' Volume State:', 'green'), volume_state)
volume_flags = await vcs.volume_flags.read_value()
print(color(' Volume Flags:', 'green'), volume_flags)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
async def show_device_info(peer, done: Optional[asyncio.Future]) -> None: async def show_device_info(peer, done: Optional[asyncio.Future]) -> None:
try: try:
@@ -161,6 +201,12 @@ async def show_device_info(peer, done: Optional[asyncio.Future]) -> None:
if tmas := peer.create_service_proxy(TelephonyAndMediaAudioServiceProxy): if tmas := peer.create_service_proxy(TelephonyAndMediaAudioServiceProxy):
await try_show(show_tmas, tmas) await try_show(show_tmas, tmas)
if pacs := peer.create_service_proxy(PublishedAudioCapabilitiesServiceProxy):
await try_show(show_pacs, pacs)
if vcs := peer.create_service_proxy(VolumeControlServiceProxy):
await try_show(show_vcs, vcs)
if done is not None: if done is not None:
done.set_result(None) done.set_result(None)
except asyncio.CancelledError: except asyncio.CancelledError:

17
bumble/audio/__init__.py Normal file
View File

@@ -0,0 +1,17 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------

553
bumble/audio/io.py Normal file
View File

@@ -0,0 +1,553 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import asyncio
import abc
from concurrent.futures import ThreadPoolExecutor
import dataclasses
import enum
import logging
import pathlib
from typing import (
AsyncGenerator,
BinaryIO,
TYPE_CHECKING,
)
import sys
import wave
from bumble.colors import color
if TYPE_CHECKING:
import sounddevice # type: ignore[import-untyped]
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
# Classes
# -----------------------------------------------------------------------------
@dataclasses.dataclass
class PcmFormat:
class Endianness(enum.Enum):
LITTLE = 0
BIG = 1
class SampleType(enum.Enum):
FLOAT32 = 0
INT16 = 1
endianness: Endianness
sample_type: SampleType
sample_rate: int
channels: int
@classmethod
def from_str(cls, format_str: str) -> PcmFormat:
endianness = cls.Endianness.LITTLE # Others not yet supported.
sample_type_str, sample_rate_str, channels_str = format_str.split(',')
if sample_type_str == 'int16le':
sample_type = cls.SampleType.INT16
elif sample_type_str == 'float32le':
sample_type = cls.SampleType.FLOAT32
else:
raise ValueError(f'sample type {sample_type_str} not supported')
sample_rate = int(sample_rate_str)
channels = int(channels_str)
return cls(endianness, sample_type, sample_rate, channels)
@property
def bytes_per_sample(self) -> int:
return 2 if self.sample_type == self.SampleType.INT16 else 4
def check_audio_output(output: str) -> bool:
if output == 'device' or output.startswith('device:'):
try:
import sounddevice
except ImportError as exc:
raise ValueError(
'audio output not available (sounddevice python module not installed)'
) from exc
except OSError as exc:
raise ValueError(
'audio output not available '
'(sounddevice python module failed to load: '
f'{exc})'
) from exc
if output == 'device':
# Default device
return True
# Specific device
device = output[7:]
if device == '?':
print(color('Audio Devices:', 'yellow'))
for device_info in [
device_info
for device_info in sounddevice.query_devices()
if device_info['max_output_channels'] > 0
]:
device_index = device_info['index']
is_default = (
color(' [default]', 'green')
if sounddevice.default.device[1] == device_index
else ''
)
print(
f'{color(device_index, "cyan")}: {device_info["name"]}{is_default}'
)
return False
try:
device_info = sounddevice.query_devices(int(device))
except sounddevice.PortAudioError as exc:
raise ValueError('No such audio device') from exc
if device_info['max_output_channels'] < 1:
raise ValueError(
f'Device {device} ({device_info["name"]}) does not have an output'
)
return True
async def create_audio_output(output: str) -> AudioOutput:
if output == 'stdout':
return StreamAudioOutput(sys.stdout.buffer)
if output == 'device' or output.startswith('device:'):
device_name = '' if output == 'device' else output[7:]
return SoundDeviceAudioOutput(device_name)
if output == 'ffplay':
return SubprocessAudioOutput(
command=(
'ffplay -probesize 32 -fflags nobuffer -analyzeduration 0 '
'-ar {sample_rate} '
'-ch_layout {channel_layout} '
'-f f32le pipe:0'
)
)
if output.startswith('file:'):
return FileAudioOutput(output[5:])
raise ValueError('unsupported audio output')
class AudioOutput(abc.ABC):
"""Audio output to which PCM samples can be written."""
async def open(self, pcm_format: PcmFormat) -> None:
"""Start the output."""
@abc.abstractmethod
def write(self, pcm_samples: bytes) -> None:
"""Write PCM samples. Must not block."""
async def aclose(self) -> None:
"""Close the output."""
class ThreadedAudioOutput(AudioOutput):
"""Base class for AudioOutput classes that may need to call blocking functions.
The actual writing is performed in a thread, so as to ensure that calling write()
does not block the caller.
"""
def __init__(self) -> None:
self._thread_pool = ThreadPoolExecutor(1)
self._pcm_samples: asyncio.Queue[bytes] = asyncio.Queue()
self._write_task = asyncio.create_task(self._write_loop())
async def _write_loop(self) -> None:
while True:
pcm_samples = await self._pcm_samples.get()
await asyncio.get_running_loop().run_in_executor(
self._thread_pool, self._write, pcm_samples
)
@abc.abstractmethod
def _write(self, pcm_samples: bytes) -> None:
"""This method does the actual writing and can block."""
def write(self, pcm_samples: bytes) -> None:
self._pcm_samples.put_nowait(pcm_samples)
def _close(self) -> None:
"""This method does the actual closing and can block."""
async def aclose(self) -> None:
await asyncio.get_running_loop().run_in_executor(self._thread_pool, self._close)
self._write_task.cancel()
self._thread_pool.shutdown()
class SoundDeviceAudioOutput(ThreadedAudioOutput):
def __init__(self, device_name: str) -> None:
super().__init__()
self._device = int(device_name) if device_name else None
self._stream: sounddevice.RawOutputStream | None = None
async def open(self, pcm_format: PcmFormat) -> None:
import sounddevice # pylint: disable=import-error
self._stream = sounddevice.RawOutputStream(
samplerate=pcm_format.sample_rate,
device=self._device,
channels=pcm_format.channels,
dtype='float32',
)
self._stream.start()
def _write(self, pcm_samples: bytes) -> None:
if self._stream is None:
return
try:
self._stream.write(pcm_samples)
except Exception as error:
print(f'Sound device error: {error}')
raise
def _close(self):
self._stream.stop()
self._stream = None
class StreamAudioOutput(ThreadedAudioOutput):
"""AudioOutput where PCM samples are written to a stream that may block."""
def __init__(self, stream: BinaryIO) -> None:
super().__init__()
self._stream = stream
def _write(self, pcm_samples: bytes) -> None:
self._stream.write(pcm_samples)
self._stream.flush()
class FileAudioOutput(StreamAudioOutput):
"""AudioOutput where PCM samples are written to a file."""
def __init__(self, filename: str) -> None:
self._file = open(filename, "wb")
super().__init__(self._file)
async def shutdown(self):
self._file.close()
return await super().shutdown()
class SubprocessAudioOutput(AudioOutput):
"""AudioOutput where audio samples are written to a subprocess via stdin."""
def __init__(self, command: str) -> None:
self._command = command
self._subprocess: asyncio.subprocess.Process | None
async def open(self, pcm_format: PcmFormat) -> None:
if pcm_format.channels == 1:
channel_layout = 'mono'
elif pcm_format.channels == 2:
channel_layout = 'stereo'
else:
raise ValueError(f'{pcm_format.channels} channels not supported')
command = self._command.format(
sample_rate=pcm_format.sample_rate, channel_layout=channel_layout
)
self._subprocess = await asyncio.create_subprocess_shell(
command,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
def write(self, pcm_samples: bytes) -> None:
if self._subprocess is None or self._subprocess.stdin is None:
return
self._subprocess.stdin.write(pcm_samples)
async def aclose(self):
if self._subprocess:
self._subprocess.terminate()
def check_audio_input(input: str) -> bool:
if input == 'device' or input.startswith('device:'):
try:
import sounddevice # pylint: disable=import-error
except ImportError as exc:
raise ValueError(
'audio input not available (sounddevice python module not installed)'
) from exc
except OSError as exc:
raise ValueError(
'audio input not available '
'(sounddevice python module failed to load: '
f'{exc})'
) from exc
if input == 'device':
# Default device
return True
# Specific device
device = input[7:]
if device == '?':
print(color('Audio Devices:', 'yellow'))
for device_info in [
device_info
for device_info in sounddevice.query_devices()
if device_info['max_input_channels'] > 0
]:
device_index = device_info["index"]
is_mono = device_info['max_input_channels'] == 1
max_channels = color(f'[{"mono" if is_mono else "stereo"}]', 'cyan')
is_default = (
color(' [default]', 'green')
if sounddevice.default.device[0] == device_index
else ''
)
print(
f'{color(device_index, "cyan")}: {device_info["name"]}'
f' {max_channels}{is_default}'
)
return False
try:
device_info = sounddevice.query_devices(int(device))
except sounddevice.PortAudioError as exc:
raise ValueError('No such audio device') from exc
if device_info['max_input_channels'] < 1:
raise ValueError(
f'Device {device} ({device_info["name"]}) does not have an input'
)
return True
async def create_audio_input(input: str, input_format: str) -> AudioInput:
pcm_format: PcmFormat | None
if input_format == 'auto':
pcm_format = None
else:
pcm_format = PcmFormat.from_str(input_format)
if input == 'stdin':
if not pcm_format:
raise ValueError('input format details required for stdin')
return StreamAudioInput(sys.stdin.buffer, pcm_format)
if input == 'device' or input.startswith('device:'):
if not pcm_format:
raise ValueError('input format details required for device')
device_name = '' if input == 'device' else input[7:]
return SoundDeviceAudioInput(device_name, pcm_format)
# If there's no file: prefix, check if we can assume it is a file.
if pathlib.Path(input).is_file():
input = 'file:' + input
if input.startswith('file:'):
filename = input[5:]
if filename.endswith('.wav'):
if input_format != 'auto':
raise ValueError(".wav file only supported with 'auto' format")
return WaveAudioInput(filename)
if pcm_format is None:
raise ValueError('input format details required for raw PCM files')
return FileAudioInput(filename, pcm_format)
raise ValueError('input not supported')
class AudioInput(abc.ABC):
"""Audio input that produces PCM samples."""
@abc.abstractmethod
async def open(self) -> PcmFormat:
"""Open the input."""
@abc.abstractmethod
def frames(self, frame_size: int) -> AsyncGenerator[bytes]:
"""Generate one frame of PCM samples. Must not block."""
async def aclose(self) -> None:
"""Close the input."""
class ThreadedAudioInput(AudioInput):
"""Base class for AudioInput implementation where reading samples may block."""
def __init__(self) -> None:
self._thread_pool = ThreadPoolExecutor(1)
self._pcm_samples: asyncio.Queue[bytes] = asyncio.Queue()
@abc.abstractmethod
def _read(self, frame_size: int) -> bytes:
pass
@abc.abstractmethod
def _open(self) -> PcmFormat:
pass
def _close(self) -> None:
pass
async def open(self) -> PcmFormat:
return await asyncio.get_running_loop().run_in_executor(
self._thread_pool, self._open
)
async def frames(self, frame_size: int) -> AsyncGenerator[bytes]:
while pcm_sample := await asyncio.get_running_loop().run_in_executor(
self._thread_pool, self._read, frame_size
):
yield pcm_sample
async def aclose(self) -> None:
await asyncio.get_running_loop().run_in_executor(self._thread_pool, self._close)
self._thread_pool.shutdown()
class WaveAudioInput(ThreadedAudioInput):
"""Audio input that reads PCM samples from a .wav file."""
def __init__(self, filename: str) -> None:
super().__init__()
self._filename = filename
self._wav: wave.Wave_read | None = None
self._bytes_read = 0
def _open(self) -> PcmFormat:
self._wav = wave.open(self._filename, 'rb')
if self._wav.getsampwidth() != 2:
raise ValueError('sample width not supported')
return PcmFormat(
PcmFormat.Endianness.LITTLE,
PcmFormat.SampleType.INT16,
self._wav.getframerate(),
self._wav.getnchannels(),
)
def _read(self, frame_size: int) -> bytes:
if not self._wav:
return b''
pcm_samples = self._wav.readframes(frame_size)
if not pcm_samples and self._bytes_read:
# Loop around.
self._wav.rewind()
self._bytes_read = 0
pcm_samples = self._wav.readframes(frame_size)
self._bytes_read += len(pcm_samples)
return pcm_samples
def _close(self) -> None:
if self._wav:
self._wav.close()
class StreamAudioInput(ThreadedAudioInput):
"""AudioInput where samples are read from a raw PCM stream that may block."""
def __init__(self, stream: BinaryIO, pcm_format: PcmFormat) -> None:
super().__init__()
self._stream = stream
self._pcm_format = pcm_format
def _open(self) -> PcmFormat:
return self._pcm_format
def _read(self, frame_size: int) -> bytes:
return self._stream.read(
frame_size * self._pcm_format.channels * self._pcm_format.bytes_per_sample
)
class FileAudioInput(StreamAudioInput):
"""AudioInput where PCM samples are read from a raw PCM file."""
def __init__(self, filename: str, pcm_format: PcmFormat) -> None:
self._stream = open(filename, "rb")
super().__init__(self._stream, pcm_format)
def _close(self) -> None:
self._stream.close()
class SoundDeviceAudioInput(ThreadedAudioInput):
def __init__(self, device_name: str, pcm_format: PcmFormat) -> None:
super().__init__()
self._device = int(device_name) if device_name else None
self._pcm_format = pcm_format
self._stream: sounddevice.RawInputStream | None = None
def _open(self) -> PcmFormat:
import sounddevice # pylint: disable=import-error
self._stream = sounddevice.RawInputStream(
samplerate=self._pcm_format.sample_rate,
device=self._device,
channels=self._pcm_format.channels,
dtype='int16',
)
self._stream.start()
return PcmFormat(
PcmFormat.Endianness.LITTLE,
PcmFormat.SampleType.INT16,
self._pcm_format.sample_rate,
2,
)
def _read(self, frame_size: int) -> bytes:
if not self._stream:
return b''
pcm_buffer, overflowed = self._stream.read(frame_size)
if overflowed:
logger.warning("input overflow")
# Convert the buffer to stereo if needed
if self._pcm_format.channels == 1:
stereo_buffer = bytearray()
for i in range(frame_size):
sample = pcm_buffer[i * 2 : i * 2 + 2]
stereo_buffer += sample + sample
return stereo_buffer
return bytes(pcm_buffer)
def _close(self):
self._stream.stop()
self._stream = None

View File

@@ -17,6 +17,7 @@
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
from __future__ import annotations from __future__ import annotations
import asyncio import asyncio
import collections
from collections.abc import Iterable, Sequence from collections.abc import Iterable, Sequence
from contextlib import ( from contextlib import (
asynccontextmanager, asynccontextmanager,
@@ -36,6 +37,7 @@ from typing import (
Any, Any,
Callable, Callable,
ClassVar, ClassVar,
Deque,
Dict, Dict,
Optional, Optional,
Type, Type,
@@ -1506,6 +1508,49 @@ class BisLink(_IsoLink):
self.device = self.big.device self.device = self.big.device
# -----------------------------------------------------------------------------
class IsoPacketStream:
"""Async stream that can write SDUs to a CIS or BIS, with a maximum queue size."""
iso_link: _IsoLink
data_packet_queue: DataPacketQueue
def __init__(self, iso_link: _IsoLink, max_queue_size: int) -> None:
if iso_link.data_packet_queue is None:
raise ValueError('link has no data packet queue')
self.iso_link = iso_link
self.data_packet_queue = iso_link.data_packet_queue
self.data_packet_queue.on('flow', self._on_flow)
self._thresholds: Deque[int] = collections.deque()
self._semaphore = asyncio.Semaphore(max_queue_size)
def _on_flow(self) -> None:
# Release the semaphore once for each completed packet.
while (
self._thresholds and self.data_packet_queue.completed >= self._thresholds[0]
):
self._thresholds.popleft()
self._semaphore.release()
async def write(self, sdu: bytes) -> None:
"""
Write an SDU to the queue.
This method blocks until there are fewer than max_queue_size packets queued
but not yet completed.
"""
# Wait until there's space in the queue.
await self._semaphore.acquire()
# Queue the packet.
self.iso_link.write(sdu)
# Remember the position of the packet so we can know when it is completed.
self._thresholds.append(self.data_packet_queue.queued)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class Connection(CompositeEventEmitter): class Connection(CompositeEventEmitter):
device: Device device: Device

View File

@@ -42,7 +42,7 @@ from typing import (
) )
from bumble.colors import color from bumble.colors import color
from bumble.core import BaseBumbleError, UUID from bumble.core import BaseBumbleError, InvalidOperationError, UUID
from bumble.att import Attribute, AttributeValue from bumble.att import Attribute, AttributeValue
from bumble.utils import ByteSerializable from bumble.utils import ByteSerializable
@@ -314,6 +314,7 @@ GATT_CENTRAL_ADDRESS_RESOLUTION__CHARACTERISTIC = UUID.from_16_bi
GATT_CLIENT_SUPPORTED_FEATURES_CHARACTERISTIC = UUID.from_16_bits(0x2B29, 'Client Supported Features') GATT_CLIENT_SUPPORTED_FEATURES_CHARACTERISTIC = UUID.from_16_bits(0x2B29, 'Client Supported Features')
GATT_DATABASE_HASH_CHARACTERISTIC = UUID.from_16_bits(0x2B2A, 'Database Hash') GATT_DATABASE_HASH_CHARACTERISTIC = UUID.from_16_bits(0x2B2A, 'Database Hash')
GATT_SERVER_SUPPORTED_FEATURES_CHARACTERISTIC = UUID.from_16_bits(0x2B3A, 'Server Supported Features') GATT_SERVER_SUPPORTED_FEATURES_CHARACTERISTIC = UUID.from_16_bits(0x2B3A, 'Server Supported Features')
GATT_LE_GATT_SECURITY_LEVELS_CHARACTERISTIC = UUID.from_16_bits(0x2BF5, 'E GATT Security Levels')
# fmt: on # fmt: on
# pylint: enable=line-too-long # pylint: enable=line-too-long
@@ -322,8 +323,6 @@ GATT_SERVER_SUPPORTED_FEATURES_CHARACTERISTIC = UUID.from_16_bi
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# Utils # Utils
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def show_services(services: Iterable[Service]) -> None: def show_services(services: Iterable[Service]) -> None:
for service in services: for service in services:
print(color(str(service), 'cyan')) print(color(str(service), 'cyan'))
@@ -679,10 +678,14 @@ class DelegatedCharacteristicAdapter(CharacteristicAdapter):
self.decode = decode self.decode = decode
def encode_value(self, value): def encode_value(self, value):
return self.encode(value) if self.encode else value if self.encode is None:
raise InvalidOperationError('delegated adapter does not have an encoder')
return self.encode(value)
def decode_value(self, value): def decode_value(self, value):
return self.decode(value) if self.decode else value if self.decode is None:
raise InvalidOperationError('delegate adapter does not have a decoder')
return self.decode(value)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------

View File

@@ -78,6 +78,7 @@ from .gatt import (
GATT_INCLUDE_ATTRIBUTE_TYPE, GATT_INCLUDE_ATTRIBUTE_TYPE,
Characteristic, Characteristic,
ClientCharacteristicConfigurationBits, ClientCharacteristicConfigurationBits,
InvalidServiceError,
TemplateService, TemplateService,
) )
@@ -162,12 +163,23 @@ class ServiceProxy(AttributeProxy):
self.uuid = uuid self.uuid = uuid
self.characteristics = [] self.characteristics = []
async def discover_characteristics(self, uuids=()): async def discover_characteristics(self, uuids=()) -> list[CharacteristicProxy]:
return await self.client.discover_characteristics(uuids, self) return await self.client.discover_characteristics(uuids, self)
def get_characteristics_by_uuid(self, uuid): def get_characteristics_by_uuid(self, uuid: UUID) -> list[CharacteristicProxy]:
"""Get all the characteristics with a specified UUID."""
return self.client.get_characteristics_by_uuid(uuid, self) return self.client.get_characteristics_by_uuid(uuid, self)
def get_required_characteristic_by_uuid(self, uuid: UUID) -> CharacteristicProxy:
"""
Get the first characteristic with a specified UUID.
If no characteristic with that UUID is found, an InvalidServiceError is raised.
"""
if not (characteristics := self.get_characteristics_by_uuid(uuid)):
raise InvalidServiceError(f'{uuid} characteristic not found')
return characteristics[0]
def __str__(self) -> str: def __str__(self) -> str:
return f'Service(handle=0x{self.handle:04X}, uuid={self.uuid})' return f'Service(handle=0x{self.handle:04X}, uuid={self.uuid})'

View File

@@ -451,54 +451,35 @@ class AICSServiceProxy(ProfileServiceProxy):
def __init__(self, service_proxy: ServiceProxy) -> None: def __init__(self, service_proxy: ServiceProxy) -> None:
self.service_proxy = service_proxy self.service_proxy = service_proxy
if not (
characteristics := service_proxy.get_characteristics_by_uuid(
GATT_AUDIO_INPUT_STATE_CHARACTERISTIC
)
):
raise gatt.InvalidServiceError("Audio Input State Characteristic not found")
self.audio_input_state = SerializableCharacteristicAdapter( self.audio_input_state = SerializableCharacteristicAdapter(
characteristics[0], AudioInputState service_proxy.get_required_characteristic_by_uuid(
GATT_AUDIO_INPUT_STATE_CHARACTERISTIC
),
AudioInputState,
) )
if not (
characteristics := service_proxy.get_characteristics_by_uuid(
GATT_GAIN_SETTINGS_ATTRIBUTE_CHARACTERISTIC
)
):
raise gatt.InvalidServiceError(
"Gain Settings Attribute Characteristic not found"
)
self.gain_settings_properties = SerializableCharacteristicAdapter( self.gain_settings_properties = SerializableCharacteristicAdapter(
characteristics[0], GainSettingsProperties service_proxy.get_required_characteristic_by_uuid(
GATT_GAIN_SETTINGS_ATTRIBUTE_CHARACTERISTIC
),
GainSettingsProperties,
) )
if not ( self.audio_input_status = PackedCharacteristicAdapter(
characteristics := service_proxy.get_characteristics_by_uuid( service_proxy.get_required_characteristic_by_uuid(
GATT_AUDIO_INPUT_STATUS_CHARACTERISTIC GATT_AUDIO_INPUT_STATUS_CHARACTERISTIC
) ),
): 'B',
raise gatt.InvalidServiceError( )
"Audio Input Status Characteristic not found"
)
self.audio_input_status = PackedCharacteristicAdapter(characteristics[0], 'B')
if not ( self.audio_input_control_point = (
characteristics := service_proxy.get_characteristics_by_uuid( service_proxy.get_required_characteristic_by_uuid(
GATT_AUDIO_INPUT_CONTROL_POINT_CHARACTERISTIC GATT_AUDIO_INPUT_CONTROL_POINT_CHARACTERISTIC
) )
): )
raise gatt.InvalidServiceError(
"Audio Input Control Point Characteristic not found"
)
self.audio_input_control_point = characteristics[0]
if not ( self.audio_input_description = UTF8CharacteristicAdapter(
characteristics := service_proxy.get_characteristics_by_uuid( service_proxy.get_required_characteristic_by_uuid(
GATT_AUDIO_INPUT_DESCRIPTION_CHARACTERISTIC GATT_AUDIO_INPUT_DESCRIPTION_CHARACTERISTIC
) )
): )
raise gatt.InvalidServiceError(
"Audio Input Description Characteristic not found"
)
self.audio_input_description = UTF8CharacteristicAdapter(characteristics[0])

View File

@@ -288,8 +288,8 @@ class AshaServiceProxy(gatt_client.ProfileServiceProxy):
'psm_characteristic', 'psm_characteristic',
), ),
): ):
if not ( setattr(
characteristics := self.service_proxy.get_characteristics_by_uuid(uuid) self,
): attribute_name,
raise gatt.InvalidServiceError(f"Missing {uuid} Characteristic") self.service_proxy.get_required_characteristic_by_uuid(uuid),
setattr(self, attribute_name, characteristics[0]) )

View File

@@ -354,34 +354,25 @@ class BroadcastAudioScanServiceProxy(gatt_client.ProfileServiceProxy):
SERVICE_CLASS = BroadcastAudioScanService SERVICE_CLASS = BroadcastAudioScanService
broadcast_audio_scan_control_point: gatt_client.CharacteristicProxy broadcast_audio_scan_control_point: gatt_client.CharacteristicProxy
broadcast_receive_states: List[gatt.SerializableCharacteristicAdapter] broadcast_receive_states: List[gatt.DelegatedCharacteristicAdapter]
def __init__(self, service_proxy: gatt_client.ServiceProxy): def __init__(self, service_proxy: gatt_client.ServiceProxy):
self.service_proxy = service_proxy self.service_proxy = service_proxy
if not ( self.broadcast_audio_scan_control_point = (
characteristics := service_proxy.get_characteristics_by_uuid( service_proxy.get_required_characteristic_by_uuid(
gatt.GATT_BROADCAST_AUDIO_SCAN_CONTROL_POINT_CHARACTERISTIC gatt.GATT_BROADCAST_AUDIO_SCAN_CONTROL_POINT_CHARACTERISTIC
) )
): )
raise gatt.InvalidServiceError(
"Broadcast Audio Scan Control Point characteristic not found"
)
self.broadcast_audio_scan_control_point = characteristics[0]
if not ( self.broadcast_receive_states = [
characteristics := service_proxy.get_characteristics_by_uuid( gatt.DelegatedCharacteristicAdapter(
characteristic,
decode=lambda x: BroadcastReceiveState.from_bytes(x) if x else None,
)
for characteristic in service_proxy.get_characteristics_by_uuid(
gatt.GATT_BROADCAST_RECEIVE_STATE_CHARACTERISTIC gatt.GATT_BROADCAST_RECEIVE_STATE_CHARACTERISTIC
) )
):
raise gatt.InvalidServiceError(
"Broadcast Receive State characteristic not found"
)
self.broadcast_receive_states = [
gatt.SerializableCharacteristicAdapter(
characteristic, BroadcastReceiveState
)
for characteristic in characteristics
] ]
async def send_control_point_operation( async def send_control_point_operation(

View File

@@ -30,7 +30,6 @@ from bumble.gatt import (
GATT_UGT_FEATURES_CHARACTERISTIC, GATT_UGT_FEATURES_CHARACTERISTIC,
GATT_BGS_FEATURES_CHARACTERISTIC, GATT_BGS_FEATURES_CHARACTERISTIC,
GATT_BGR_FEATURES_CHARACTERISTIC, GATT_BGR_FEATURES_CHARACTERISTIC,
InvalidServiceError,
) )
from bumble.gatt_client import ProfileServiceProxy, ServiceProxy from bumble.gatt_client import ProfileServiceProxy, ServiceProxy
from enum import IntFlag from enum import IntFlag
@@ -154,14 +153,10 @@ class GamingAudioServiceProxy(ProfileServiceProxy):
def __init__(self, service_proxy: ServiceProxy) -> None: def __init__(self, service_proxy: ServiceProxy) -> None:
self.service_proxy = service_proxy self.service_proxy = service_proxy
if not (
characteristics := service_proxy.get_characteristics_by_uuid(
GATT_GMAP_ROLE_CHARACTERISTIC
)
):
raise InvalidServiceError("GMAP Role Characteristic not found")
self.gmap_role = DelegatedCharacteristicAdapter( self.gmap_role = DelegatedCharacteristicAdapter(
characteristic=characteristics[0], service_proxy.get_required_characteristic_by_uuid(
GATT_GMAP_ROLE_CHARACTERISTIC
),
decode=lambda value: GmapRole(value[0]), decode=lambda value: GmapRole(value[0]),
) )

View File

@@ -17,23 +17,35 @@
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
from __future__ import annotations from __future__ import annotations
import dataclasses import dataclasses
import enum
import struct import struct
from typing import List, Type from typing import Any, List, Type
from typing_extensions import Self from typing_extensions import Self
from bumble.profiles import bap
from bumble import utils from bumble import utils
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# Classes # Classes
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class AudioActiveState(utils.OpenIntEnum):
NO_AUDIO_DATA_TRANSMITTED = 0x00
AUDIO_DATA_TRANSMITTED = 0x01
class AssistedListeningStream(utils.OpenIntEnum):
UNSPECIFIED_AUDIO_ENHANCEMENT = 0x00
@dataclasses.dataclass @dataclasses.dataclass
class Metadata: class Metadata:
'''Bluetooth Assigned Numbers, Section 6.12.6 - Metadata LTV structures. '''Bluetooth Assigned Numbers, Section 6.12.6 - Metadata LTV structures.
As Metadata fields may extend, and Spec doesn't forbid duplication, we don't parse As Metadata fields may extend, and the spec may not guarantee the uniqueness of
Metadata into a key-value style dataclass here. Rather, we encourage users to parse tags, we don't automatically parse the Metadata data into specific classes.
again outside the lib. Users of this class may decode the data by themselves, or use the Entry.decode
method.
''' '''
class Tag(utils.OpenIntEnum): class Tag(utils.OpenIntEnum):
@@ -57,6 +69,44 @@ class Metadata:
tag: Metadata.Tag tag: Metadata.Tag
data: bytes data: bytes
def decode(self) -> Any:
"""
Decode the data into an object, if possible.
If no specific object class exists to represent the data, the raw data
bytes are returned.
"""
if self.tag in (
Metadata.Tag.PREFERRED_AUDIO_CONTEXTS,
Metadata.Tag.STREAMING_AUDIO_CONTEXTS,
):
return bap.ContextType(struct.unpack("<H", self.data)[0])
if self.tag in (
Metadata.Tag.PROGRAM_INFO,
Metadata.Tag.PROGRAM_INFO_URI,
Metadata.Tag.BROADCAST_NAME,
):
return self.data.decode("utf-8")
if self.tag == Metadata.Tag.LANGUAGE:
return self.data.decode("ascii")
if self.tag == Metadata.Tag.CCID_LIST:
return list(self.data)
if self.tag == Metadata.Tag.PARENTAL_RATING:
return self.data[0]
if self.tag == Metadata.Tag.AUDIO_ACTIVE_STATE:
return AudioActiveState(self.data[0])
if self.tag == Metadata.Tag.ASSISTED_LISTENING_STREAM:
return AssistedListeningStream(self.data[0])
return self.data
@classmethod @classmethod
def from_bytes(cls: Type[Self], data: bytes) -> Self: def from_bytes(cls: Type[Self], data: bytes) -> Self:
return cls(tag=Metadata.Tag(data[0]), data=data[1:]) return cls(tag=Metadata.Tag(data[0]), data=data[1:])
@@ -66,6 +116,29 @@ class Metadata:
entries: List[Entry] = dataclasses.field(default_factory=list) entries: List[Entry] = dataclasses.field(default_factory=list)
def pretty_print(self, indent: str) -> str:
"""Convenience method to generate a string with one key-value pair per line."""
max_key_length = 0
keys = []
values = []
for entry in self.entries:
key = entry.tag.name
max_key_length = max(max_key_length, len(key))
keys.append(key)
decoded = entry.decode()
if isinstance(decoded, enum.Enum):
values.append(decoded.name)
elif isinstance(decoded, bytes):
values.append(decoded.hex())
else:
values.append(str(decoded))
return '\n'.join(
f'{indent}{key}: {" " * (max_key_length-len(key))}{value}'
for key, value in zip(keys, values)
)
@classmethod @classmethod
def from_bytes(cls: Type[Self], data: bytes) -> Self: def from_bytes(cls: Type[Self], data: bytes) -> Self:
entries = [] entries = []
@@ -81,3 +154,13 @@ class Metadata:
def __bytes__(self) -> bytes: def __bytes__(self) -> bytes:
return b''.join([bytes(entry) for entry in self.entries]) return b''.join([bytes(entry) for entry in self.entries])
def __str__(self) -> str:
entries_str = []
for entry in self.entries:
decoded = entry.decode()
entries_str.append(
f'{entry.tag.name}: '
f'{decoded.hex() if isinstance(decoded, bytes) else decoded!r}'
)
return f'Metadata(entries={", ".join(entry_str for entry_str in entries_str)})'

View File

@@ -72,6 +72,19 @@ class PacRecord:
metadata=metadata, metadata=metadata,
) )
@classmethod
def list_from_bytes(cls, data: bytes) -> list[PacRecord]:
"""Parse a serialized list of records preceded by a one byte list length."""
record_count = data[0]
records = []
offset = 1
for _ in range(record_count):
record = PacRecord.from_bytes(data[offset:])
offset += len(bytes(record))
records.append(record)
return records
def __bytes__(self) -> bytes: def __bytes__(self) -> bytes:
capabilities_bytes = bytes(self.codec_specific_capabilities) capabilities_bytes = bytes(self.codec_specific_capabilities)
metadata_bytes = bytes(self.metadata) metadata_bytes = bytes(self.metadata)
@@ -172,39 +185,58 @@ class PublishedAudioCapabilitiesService(gatt.TemplateService):
class PublishedAudioCapabilitiesServiceProxy(gatt_client.ProfileServiceProxy): class PublishedAudioCapabilitiesServiceProxy(gatt_client.ProfileServiceProxy):
SERVICE_CLASS = PublishedAudioCapabilitiesService SERVICE_CLASS = PublishedAudioCapabilitiesService
sink_pac: Optional[gatt_client.CharacteristicProxy] = None sink_pac: Optional[gatt.DelegatedCharacteristicAdapter] = None
sink_audio_locations: Optional[gatt_client.CharacteristicProxy] = None sink_audio_locations: Optional[gatt.DelegatedCharacteristicAdapter] = None
source_pac: Optional[gatt_client.CharacteristicProxy] = None source_pac: Optional[gatt.DelegatedCharacteristicAdapter] = None
source_audio_locations: Optional[gatt_client.CharacteristicProxy] = None source_audio_locations: Optional[gatt.DelegatedCharacteristicAdapter] = None
available_audio_contexts: gatt_client.CharacteristicProxy available_audio_contexts: gatt.DelegatedCharacteristicAdapter
supported_audio_contexts: gatt_client.CharacteristicProxy supported_audio_contexts: gatt.DelegatedCharacteristicAdapter
def __init__(self, service_proxy: gatt_client.ServiceProxy): def __init__(self, service_proxy: gatt_client.ServiceProxy):
self.service_proxy = service_proxy self.service_proxy = service_proxy
self.available_audio_contexts = service_proxy.get_characteristics_by_uuid( self.available_audio_contexts = gatt.DelegatedCharacteristicAdapter(
gatt.GATT_AVAILABLE_AUDIO_CONTEXTS_CHARACTERISTIC service_proxy.get_required_characteristic_by_uuid(
)[0] gatt.GATT_AVAILABLE_AUDIO_CONTEXTS_CHARACTERISTIC
self.supported_audio_contexts = service_proxy.get_characteristics_by_uuid( ),
gatt.GATT_SUPPORTED_AUDIO_CONTEXTS_CHARACTERISTIC decode=lambda x: tuple(map(ContextType, struct.unpack('<HH', x))),
)[0] )
self.supported_audio_contexts = gatt.DelegatedCharacteristicAdapter(
service_proxy.get_required_characteristic_by_uuid(
gatt.GATT_SUPPORTED_AUDIO_CONTEXTS_CHARACTERISTIC
),
decode=lambda x: tuple(map(ContextType, struct.unpack('<HH', x))),
)
if characteristics := service_proxy.get_characteristics_by_uuid( if characteristics := service_proxy.get_characteristics_by_uuid(
gatt.GATT_SINK_PAC_CHARACTERISTIC gatt.GATT_SINK_PAC_CHARACTERISTIC
): ):
self.sink_pac = characteristics[0] self.sink_pac = gatt.DelegatedCharacteristicAdapter(
characteristics[0],
decode=PacRecord.list_from_bytes,
)
if characteristics := service_proxy.get_characteristics_by_uuid( if characteristics := service_proxy.get_characteristics_by_uuid(
gatt.GATT_SOURCE_PAC_CHARACTERISTIC gatt.GATT_SOURCE_PAC_CHARACTERISTIC
): ):
self.source_pac = characteristics[0] self.source_pac = gatt.DelegatedCharacteristicAdapter(
characteristics[0],
decode=PacRecord.list_from_bytes,
)
if characteristics := service_proxy.get_characteristics_by_uuid( if characteristics := service_proxy.get_characteristics_by_uuid(
gatt.GATT_SINK_AUDIO_LOCATION_CHARACTERISTIC gatt.GATT_SINK_AUDIO_LOCATION_CHARACTERISTIC
): ):
self.sink_audio_locations = characteristics[0] self.sink_audio_locations = gatt.DelegatedCharacteristicAdapter(
characteristics[0],
decode=lambda x: AudioLocation(struct.unpack('<I', x)[0]),
)
if characteristics := service_proxy.get_characteristics_by_uuid( if characteristics := service_proxy.get_characteristics_by_uuid(
gatt.GATT_SOURCE_AUDIO_LOCATION_CHARACTERISTIC gatt.GATT_SOURCE_AUDIO_LOCATION_CHARACTERISTIC
): ):
self.source_audio_locations = characteristics[0] self.source_audio_locations = gatt.DelegatedCharacteristicAdapter(
characteristics[0],
decode=lambda x: AudioLocation(struct.unpack('<I', x)[0]),
)

View File

@@ -25,7 +25,6 @@ from bumble.gatt import (
TemplateService, TemplateService,
Characteristic, Characteristic,
DelegatedCharacteristicAdapter, DelegatedCharacteristicAdapter,
InvalidServiceError,
GATT_TELEPHONY_AND_MEDIA_AUDIO_SERVICE, GATT_TELEPHONY_AND_MEDIA_AUDIO_SERVICE,
GATT_TMAP_ROLE_CHARACTERISTIC, GATT_TMAP_ROLE_CHARACTERISTIC,
) )
@@ -74,15 +73,10 @@ class TelephonyAndMediaAudioServiceProxy(ProfileServiceProxy):
def __init__(self, service_proxy: ServiceProxy): def __init__(self, service_proxy: ServiceProxy):
self.service_proxy = service_proxy self.service_proxy = service_proxy
if not (
characteristics := service_proxy.get_characteristics_by_uuid(
GATT_TMAP_ROLE_CHARACTERISTIC
)
):
raise InvalidServiceError('TMAP Role characteristic not found')
self.role = DelegatedCharacteristicAdapter( self.role = DelegatedCharacteristicAdapter(
characteristics[0], service_proxy.get_required_characteristic_by_uuid(
GATT_TMAP_ROLE_CHARACTERISTIC
),
decode=lambda value: Role( decode=lambda value: Role(
struct.unpack_from('<H', value, 0)[0], struct.unpack_from('<H', value, 0)[0],
), ),

View File

@@ -1,4 +1,4 @@
# Copyright 2021-2024 Google LLC # Copyright 2021-2025 Google LLC
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
@@ -17,14 +17,16 @@
# Imports # Imports
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
from __future__ import annotations from __future__ import annotations
import dataclasses
import enum import enum
from typing import Optional, Sequence
from bumble import att from bumble import att
from bumble import device from bumble import device
from bumble import gatt from bumble import gatt
from bumble import gatt_client from bumble import gatt_client
from typing import Optional, Sequence
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# Constants # Constants
@@ -67,6 +69,20 @@ class VolumeControlPointOpcode(enum.IntEnum):
MUTE = 0x06 MUTE = 0x06
@dataclasses.dataclass
class VolumeState:
volume_setting: int
mute: int
change_counter: int
@classmethod
def from_bytes(cls, data: bytes) -> VolumeState:
return cls(data[0], data[1], data[2])
def __bytes__(self) -> bytes:
return bytes([self.volume_setting, self.mute, self.change_counter])
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# Server # Server
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -126,16 +142,8 @@ class VolumeControlService(gatt.TemplateService):
included_services=list(included_services), included_services=list(included_services),
) )
@property
def volume_state_bytes(self) -> bytes:
return bytes([self.volume_setting, self.muted, self.change_counter])
@volume_state_bytes.setter
def volume_state_bytes(self, new_value: bytes) -> None:
self.volume_setting, self.muted, self.change_counter = new_value
def _on_read_volume_state(self, _connection: Optional[device.Connection]) -> bytes: def _on_read_volume_state(self, _connection: Optional[device.Connection]) -> bytes:
return self.volume_state_bytes return bytes(VolumeState(self.volume_setting, self.muted, self.change_counter))
def _on_write_volume_control_point( def _on_write_volume_control_point(
self, connection: Optional[device.Connection], value: bytes self, connection: Optional[device.Connection], value: bytes
@@ -153,14 +161,9 @@ class VolumeControlService(gatt.TemplateService):
self.change_counter = (self.change_counter + 1) % 256 self.change_counter = (self.change_counter + 1) % 256
connection.abort_on( connection.abort_on(
'disconnection', 'disconnection',
connection.device.notify_subscribers( connection.device.notify_subscribers(attribute=self.volume_state),
attribute=self.volume_state,
value=self.volume_state_bytes,
),
)
self.emit(
'volume_state', self.volume_setting, self.muted, self.change_counter
) )
self.emit('volume_state_change')
def _on_relative_volume_down(self) -> bool: def _on_relative_volume_down(self) -> bool:
old_volume = self.volume_setting old_volume = self.volume_setting
@@ -207,24 +210,26 @@ class VolumeControlServiceProxy(gatt_client.ProfileServiceProxy):
SERVICE_CLASS = VolumeControlService SERVICE_CLASS = VolumeControlService
volume_control_point: gatt_client.CharacteristicProxy volume_control_point: gatt_client.CharacteristicProxy
volume_state: gatt.SerializableCharacteristicAdapter
volume_flags: gatt.DelegatedCharacteristicAdapter
def __init__(self, service_proxy: gatt_client.ServiceProxy) -> None: def __init__(self, service_proxy: gatt_client.ServiceProxy) -> None:
self.service_proxy = service_proxy self.service_proxy = service_proxy
self.volume_state = gatt.PackedCharacteristicAdapter( self.volume_state = gatt.SerializableCharacteristicAdapter(
service_proxy.get_characteristics_by_uuid( service_proxy.get_required_characteristic_by_uuid(
gatt.GATT_VOLUME_STATE_CHARACTERISTIC gatt.GATT_VOLUME_STATE_CHARACTERISTIC
)[0], ),
'BBB', VolumeState,
) )
self.volume_control_point = service_proxy.get_characteristics_by_uuid( self.volume_control_point = service_proxy.get_required_characteristic_by_uuid(
gatt.GATT_VOLUME_CONTROL_POINT_CHARACTERISTIC gatt.GATT_VOLUME_CONTROL_POINT_CHARACTERISTIC
)[0] )
self.volume_flags = gatt.PackedCharacteristicAdapter( self.volume_flags = gatt.DelegatedCharacteristicAdapter(
service_proxy.get_characteristics_by_uuid( service_proxy.get_required_characteristic_by_uuid(
gatt.GATT_VOLUME_FLAGS_CHARACTERISTIC gatt.GATT_VOLUME_FLAGS_CHARACTERISTIC
)[0], ),
'B', decode=lambda data: VolumeFlags(data[0]),
) )

View File

@@ -27,8 +27,8 @@ from bumble.gatt import (
DelegatedCharacteristicAdapter, DelegatedCharacteristicAdapter,
TemplateService, TemplateService,
CharacteristicValue, CharacteristicValue,
SerializableCharacteristicAdapter,
UTF8CharacteristicAdapter, UTF8CharacteristicAdapter,
InvalidServiceError,
GATT_VOLUME_OFFSET_CONTROL_SERVICE, GATT_VOLUME_OFFSET_CONTROL_SERVICE,
GATT_VOLUME_OFFSET_STATE_CHARACTERISTIC, GATT_VOLUME_OFFSET_STATE_CHARACTERISTIC,
GATT_AUDIO_LOCATION_CHARACTERISTIC, GATT_AUDIO_LOCATION_CHARACTERISTIC,
@@ -82,9 +82,7 @@ class VolumeOffsetState:
async def notify_subscribers_via_connection(self, connection: Connection) -> None: async def notify_subscribers_via_connection(self, connection: Connection) -> None:
assert self.attribute_value is not None assert self.attribute_value is not None
await connection.device.notify_subscribers( await connection.device.notify_subscribers(attribute=self.attribute_value)
attribute=self.attribute_value, value=bytes(self)
)
def on_read(self, _connection: Optional[Connection]) -> bytes: def on_read(self, _connection: Optional[Connection]) -> bytes:
return bytes(self) return bytes(self)
@@ -111,9 +109,7 @@ class VocsAudioLocation:
assert self.attribute_value assert self.attribute_value
self.audio_location = AudioLocation(int.from_bytes(value, 'little')) self.audio_location = AudioLocation(int.from_bytes(value, 'little'))
await connection.device.notify_subscribers( await connection.device.notify_subscribers(attribute=self.attribute_value)
attribute=self.attribute_value, value=value
)
@dataclass @dataclass
@@ -169,9 +165,7 @@ class AudioOutputDescription:
assert self.attribute_value assert self.attribute_value
self.audio_output_description = value.decode('utf-8') self.audio_output_description = value.decode('utf-8')
await connection.device.notify_subscribers( await connection.device.notify_subscribers(attribute=self.attribute_value)
attribute=self.attribute_value, value=value
)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -203,37 +197,30 @@ class VolumeOffsetControlService(TemplateService):
VolumeOffsetControlPoint(self.volume_offset_state) VolumeOffsetControlPoint(self.volume_offset_state)
) )
self.volume_offset_state_characteristic = DelegatedCharacteristicAdapter( self.volume_offset_state_characteristic = Characteristic(
Characteristic( uuid=GATT_VOLUME_OFFSET_STATE_CHARACTERISTIC,
uuid=GATT_VOLUME_OFFSET_STATE_CHARACTERISTIC, properties=(
properties=( Characteristic.Properties.READ | Characteristic.Properties.NOTIFY
Characteristic.Properties.READ | Characteristic.Properties.NOTIFY
),
permissions=Characteristic.Permissions.READ_REQUIRES_ENCRYPTION,
value=CharacteristicValue(read=self.volume_offset_state.on_read),
), ),
encode=lambda value: bytes(value), permissions=Characteristic.Permissions.READ_REQUIRES_ENCRYPTION,
value=CharacteristicValue(read=self.volume_offset_state.on_read),
) )
self.audio_location_characteristic = DelegatedCharacteristicAdapter( self.audio_location_characteristic = Characteristic(
Characteristic( uuid=GATT_AUDIO_LOCATION_CHARACTERISTIC,
uuid=GATT_AUDIO_LOCATION_CHARACTERISTIC, properties=(
properties=( Characteristic.Properties.READ
Characteristic.Properties.READ | Characteristic.Properties.NOTIFY
| Characteristic.Properties.NOTIFY | Characteristic.Properties.WRITE_WITHOUT_RESPONSE
| Characteristic.Properties.WRITE_WITHOUT_RESPONSE ),
), permissions=(
permissions=( Characteristic.Permissions.READ_REQUIRES_ENCRYPTION
Characteristic.Permissions.READ_REQUIRES_ENCRYPTION | Characteristic.Permissions.WRITE_REQUIRES_ENCRYPTION
| Characteristic.Permissions.WRITE_REQUIRES_ENCRYPTION ),
), value=CharacteristicValue(
value=CharacteristicValue( read=self.audio_location.on_read,
read=self.audio_location.on_read, write=self.audio_location.on_write,
write=self.audio_location.on_write,
),
), ),
encode=lambda value: bytes(value),
decode=VocsAudioLocation.from_bytes,
) )
self.audio_location.attribute_value = self.audio_location_characteristic.value self.audio_location.attribute_value = self.audio_location_characteristic.value
@@ -244,25 +231,22 @@ class VolumeOffsetControlService(TemplateService):
value=CharacteristicValue(write=self.volume_offset_control_point.on_write), value=CharacteristicValue(write=self.volume_offset_control_point.on_write),
) )
self.audio_output_description_characteristic = DelegatedCharacteristicAdapter( self.audio_output_description_characteristic = Characteristic(
Characteristic( uuid=GATT_AUDIO_OUTPUT_DESCRIPTION_CHARACTERISTIC,
uuid=GATT_AUDIO_OUTPUT_DESCRIPTION_CHARACTERISTIC, properties=(
properties=( Characteristic.Properties.READ
Characteristic.Properties.READ | Characteristic.Properties.NOTIFY
| Characteristic.Properties.NOTIFY | Characteristic.Properties.WRITE_WITHOUT_RESPONSE
| Characteristic.Properties.WRITE_WITHOUT_RESPONSE ),
), permissions=(
permissions=( Characteristic.Permissions.READ_REQUIRES_ENCRYPTION
Characteristic.Permissions.READ_REQUIRES_ENCRYPTION | Characteristic.Permissions.WRITE_REQUIRES_ENCRYPTION
| Characteristic.Permissions.WRITE_REQUIRES_ENCRYPTION ),
), value=CharacteristicValue(
value=CharacteristicValue( read=self.audio_output_description.on_read,
read=self.audio_output_description.on_read, write=self.audio_output_description.on_write,
write=self.audio_output_description.on_write, ),
),
)
) )
self.audio_output_description.attribute_value = ( self.audio_output_description.attribute_value = (
self.audio_output_description_characteristic.value self.audio_output_description_characteristic.value
) )
@@ -287,44 +271,29 @@ class VolumeOffsetControlServiceProxy(ProfileServiceProxy):
def __init__(self, service_proxy: ServiceProxy) -> None: def __init__(self, service_proxy: ServiceProxy) -> None:
self.service_proxy = service_proxy self.service_proxy = service_proxy
if not ( self.volume_offset_state = SerializableCharacteristicAdapter(
characteristics := service_proxy.get_characteristics_by_uuid( service_proxy.get_required_characteristic_by_uuid(
GATT_VOLUME_OFFSET_STATE_CHARACTERISTIC GATT_VOLUME_OFFSET_STATE_CHARACTERISTIC
) ),
): VolumeOffsetState,
raise InvalidServiceError("Volume Offset State characteristic not found")
self.volume_offset_state = DelegatedCharacteristicAdapter(
characteristics[0], decode=VolumeOffsetState.from_bytes
) )
if not (
characteristics := service_proxy.get_characteristics_by_uuid(
GATT_AUDIO_LOCATION_CHARACTERISTIC
)
):
raise InvalidServiceError("Audio Location characteristic not found")
self.audio_location = DelegatedCharacteristicAdapter( self.audio_location = DelegatedCharacteristicAdapter(
characteristics[0], service_proxy.get_required_characteristic_by_uuid(
encode=lambda value: bytes(value), GATT_AUDIO_LOCATION_CHARACTERISTIC
decode=VocsAudioLocation.from_bytes, ),
encode=lambda value: bytes([int(value)]),
decode=lambda data: AudioLocation(data[0]),
) )
if not ( self.volume_offset_control_point = (
characteristics := service_proxy.get_characteristics_by_uuid( service_proxy.get_required_characteristic_by_uuid(
GATT_VOLUME_OFFSET_CONTROL_POINT_CHARACTERISTIC GATT_VOLUME_OFFSET_CONTROL_POINT_CHARACTERISTIC
) )
): )
raise InvalidServiceError(
"Volume Offset Control Point characteristic not found"
)
self.volume_offset_control_point = characteristics[0]
if not ( self.audio_output_description = UTF8CharacteristicAdapter(
characteristics := service_proxy.get_characteristics_by_uuid( service_proxy.get_required_characteristic_by_uuid(
GATT_AUDIO_OUTPUT_DESCRIPTION_CHARACTERISTIC GATT_AUDIO_OUTPUT_DESCRIPTION_CHARACTERISTIC
) )
): )
raise InvalidServiceError(
"Audio Output Description characteristic not found"
)
self.audio_output_description = UTF8CharacteristicAdapter(characteristics[0])

View File

@@ -39,12 +39,14 @@ nav:
- Drivers: - Drivers:
- drivers/index.md - drivers/index.md
- Realtek: drivers/realtek.md - Realtek: drivers/realtek.md
- Intel: drivers/intel.md
- API: - API:
- Guide: api/guide.md - Guide: api/guide.md
- Examples: api/examples.md - Examples: api/examples.md
- Reference: api/reference.md - Reference: api/reference.md
- Apps & Tools: - Apps & Tools:
- apps_and_tools/index.md - apps_and_tools/index.md
- Auracast: apps_and_tools/auracast.md
- Console: apps_and_tools/console.md - Console: apps_and_tools/console.md
- Bench: apps_and_tools/bench.md - Bench: apps_and_tools/bench.md
- Speaker: apps_and_tools/speaker.md - Speaker: apps_and_tools/speaker.md
@@ -108,8 +110,8 @@ markdown_extensions:
- pymdownx.details - pymdownx.details
- pymdownx.superfences - pymdownx.superfences
- pymdownx.emoji: - pymdownx.emoji:
emoji_index: !!python/name:materialx.emoji.twemoji emoji_index: !!python/name:material.extensions.emoji.twemoji
emoji_generator: !!python/name:materialx.emoji.to_svg emoji_generator: !!python/name:material.extensions.emoji.to_svg
- pymdownx.tabbed: - pymdownx.tabbed:
alternate_style: true alternate_style: true
- codehilite: - codehilite:

View File

@@ -4,12 +4,13 @@ APPS & TOOLS
Included in the project are a few apps and tools, built on top of the core libraries. Included in the project are a few apps and tools, built on top of the core libraries.
These include: These include:
* [Console](console.md) - an interactive text-based console * [Auracast](auracast.md) - Commands to broadcast, receive and/or control LE Audio.
* [Bench](bench.md) - Speed and Latency benchmarking between two devices (LE and Classic) * [Console](console.md) - An interactive text-based console.
* [Pair](pair.md) - Pair/bond two devices (LE and Classic) * [Bench](bench.md) - Speed and Latency benchmarking between two devices (LE and Classic).
* [Unbond](unbond.md) - Remove a previously established bond * [Pair](pair.md) - Pair/bond two devices (LE and Classic).
* [HCI Bridge](hci_bridge.md) - a HCI transport bridge to connect two HCI transports and filter/snoop the HCI packets * [Unbond](unbond.md) - Remove a previously established bond.
* [Golden Gate Bridge](gg_bridge.md) - a bridge between GATT and UDP to use with the Golden Gate "stack tool" * [HCI Bridge](hci_bridge.md) - An HCI transport bridge to connect two HCI transports and filter/snoop the HCI packets.
* [Show](show.md) - Parse a file with HCI packets and print the details of each packet in a human readable form * [Golden Gate Bridge](gg_bridge.md) - Bridge between GATT and UDP to use with the Golden Gate "stack tool".
* [Show](show.md) - Parse a file with HCI packets and print the details of each packet in a human readable form.
* [Speaker](speaker.md) - Virtual Bluetooth speaker, with a command line and browser-based UI. * [Speaker](speaker.md) - Virtual Bluetooth speaker, with a command line and browser-based UI.
* [Link Relay](link_relay.md) - WebSocket relay for virtual RemoteLink instances to communicate with each other. * [Link Relay](link_relay.md) - WebSocket relay for virtual RemoteLink instances to communicate with each other.

View File

@@ -9,9 +9,9 @@ for your platform.
Throughout the documentation, when shell commands are shown, it is assumed that you can Throughout the documentation, when shell commands are shown, it is assumed that you can
invoke Python as invoke Python as
``` ```
$ python $ python3
``` ```
If invoking python is different on your platform (it may be `python3` for example, or just `py` or `py.exe`), If invoking python is different on your platform (it may be `python` for example, or just `py` or `py.exe`),
adjust accordingly. adjust accordingly.
You may be simply using Bumble as a module for your own application or as a dependency to your own You may be simply using Bumble as a module for your own application or as a dependency to your own
@@ -30,12 +30,18 @@ manager, or from source.
python environment, or in a virtual environment, such as a `venv`, `pyenv` or `conda` environment. python environment, or in a virtual environment, such as a `venv`, `pyenv` or `conda` environment.
See the [Python Environments page](development/python_environments.md) page for details. See the [Python Environments page](development/python_environments.md) page for details.
### Install from PyPI
```
$ python3 -m pip install bumble
```
### Install From Source ### Install From Source
Install with `pip`. Run in a command shell in the directory where you downloaded the source Install with `pip`. Run in a command shell in the directory where you downloaded the source
distribution distribution
``` ```
$ python -m pip install -e . $ python3 -m pip install -e .
``` ```
### Install from GitHub ### Install from GitHub
@@ -44,21 +50,21 @@ You can install directly from GitHub without first downloading the repo.
Install the latest commit from the main branch with `pip`: Install the latest commit from the main branch with `pip`:
``` ```
$ python -m pip install git+https://github.com/google/bumble.git $ python3 -m pip install git+https://github.com/google/bumble.git
``` ```
You can specify a specific tag. You can specify a specific tag.
Install tag `v0.0.1` with `pip`: Install tag `v0.0.1` with `pip`:
``` ```
$ python -m pip install git+https://github.com/google/bumble.git@v0.0.1 $ python3 -m pip install git+https://github.com/google/bumble.git@v0.0.1
``` ```
You can also specify a specific commit. You can also specify a specific commit.
Install commit `27c0551` with `pip`: Install commit `27c0551` with `pip`:
``` ```
$ python -m pip install git+https://github.com/google/bumble.git@27c0551 $ python3 -m pip install git+https://github.com/google/bumble.git@27c0551
``` ```
# Working On The Bumble Code # Working On The Bumble Code
@@ -78,21 +84,21 @@ directory of the project.
```bash ```bash
$ export PYTHONPATH=. $ export PYTHONPATH=.
$ python apps/console.py serial:/dev/tty.usbmodem0006839912171 $ python3 apps/console.py serial:/dev/tty.usbmodem0006839912171
``` ```
or running an example, with the working directory set to the `examples` subdirectory or running an example, with the working directory set to the `examples` subdirectory
```bash ```bash
$ cd examples $ cd examples
$ export PYTHONPATH=.. $ export PYTHONPATH=..
$ python run_scanner.py usb:0 $ python3 run_scanner.py usb:0
``` ```
Or course, `export PYTHONPATH` only needs to be invoked once, not before each app/script execution. Or course, `export PYTHONPATH` only needs to be invoked once, not before each app/script execution.
Setting `PYTHONPATH` locally with each command would look something like: Setting `PYTHONPATH` locally with each command would look something like:
``` ```
$ PYTHONPATH=. python examples/run_advertiser.py examples/device1.json serial:/dev/tty.usbmodem0006839912171 $ PYTHONPATH=. python3 examples/run_advertiser.py examples/device1.json serial:/dev/tty.usbmodem0006839912171
``` ```
# Where To Go Next # Where To Go Next

View File

@@ -35,11 +35,11 @@ the command line.
visit [this Android Studio user guide page](https://developer.android.com/studio/run/emulator-commandline) visit [this Android Studio user guide page](https://developer.android.com/studio/run/emulator-commandline)
The `-packet-streamer-endpoint <endpoint>` command line option may be used to enable The `-packet-streamer-endpoint <endpoint>` command line option may be used to enable
Bluetooth emulation and tell the emulator which virtual controller to connect to. Bluetooth emulation and tell the emulator which virtual controller to connect to.
## Connecting to Netsim ## Connecting to Netsim
If the emulator doesn't have Bluetooth emulation enabled by default, use the If the emulator doesn't have Bluetooth emulation enabled by default, use the
`-packet-streamer-endpoint default` option to tell it to connect to Netsim. `-packet-streamer-endpoint default` option to tell it to connect to Netsim.
If Netsim is not running, the emulator will start it automatically. If Netsim is not running, the emulator will start it automatically.
@@ -60,17 +60,17 @@ the Bumble `android-netsim` transport in `host` mode (the default).
!!! example "Run the example GATT server connected to the emulator via Netsim" !!! example "Run the example GATT server connected to the emulator via Netsim"
``` shell ``` shell
$ python run_gatt_server.py device1.json android-netsim $ python3 run_gatt_server.py device1.json android-netsim
``` ```
By default, the Bumble `android-netsim` transport will try to automatically discover By default, the Bumble `android-netsim` transport will try to automatically discover
the port number on which the netsim process is exposing its gRPC server interface. If the port number on which the netsim process is exposing its gRPC server interface. If
that discovery process fails, or if you want to specify the interface manually, you that discovery process fails, or if you want to specify the interface manually, you
can pass a `hostname` and `port` as parameters to the transport, as: `android-netsim:<host>:<port>`. can pass a `hostname` and `port` as parameters to the transport, as: `android-netsim:<host>:<port>`.
!!! example "Run the example GATT server connected to the emulator via Netsim on a localhost, port 8877" !!! example "Run the example GATT server connected to the emulator via Netsim on a localhost, port 8877"
``` shell ``` shell
$ python run_gatt_server.py device1.json android-netsim:localhost:8877 $ python3 run_gatt_server.py device1.json android-netsim:localhost:8877
``` ```
### Multiple Instances ### Multiple Instances
@@ -84,7 +84,7 @@ For example: `android-netsim:localhost:8877,name=bumble1`
This is an advanced use case, which may not be officially supported, but should work in recent This is an advanced use case, which may not be officially supported, but should work in recent
versions of the emulator. versions of the emulator.
The first step is to run the Bumble HCI bridge, specifying netsim as the "host" end of the The first step is to run the Bumble HCI bridge, specifying netsim as the "host" end of the
bridge, and another controller (typically a USB Bluetooth dongle, but any other supported bridge, and another controller (typically a USB Bluetooth dongle, but any other supported
transport can work as well) as the "controller" end of the bridge. transport can work as well) as the "controller" end of the bridge.

View File

@@ -25,7 +25,7 @@ import struct
import sys import sys
from typing import Any, List, Union from typing import Any, List, Union
from bumble.device import Connection, Device, Peer from bumble.device import Device, Peer
from bumble import transport from bumble import transport
from bumble import gatt from bumble import gatt
from bumble import hci from bumble import hci
@@ -82,19 +82,19 @@ async def client(device: Device, address: hci.Address) -> None:
for index in range(1, 9): for index in range(1, 9):
characteristics.append( characteristics.append(
service.get_characteristics_by_uuid( service.get_characteristics_by_uuid(
CHARACTERISTIC_UUID_BASE + f"{index:02X}" core.UUID(CHARACTERISTIC_UUID_BASE + f"{index:02X}")
)[0] )[0]
) )
# Read all characteristics as raw bytes. # Read all characteristics as raw bytes.
for characteristic in characteristics: for characteristic in characteristics:
value = await characteristic.read_value() value = await characteristic.read_value()
print(f"### {characteristic} = {value} ({value.hex()})") print(f"### {characteristic} = {value!r} ({value.hex()})")
# Static characteristic with a bytes value. # Static characteristic with a bytes value.
c1 = characteristics[0] c1 = characteristics[0]
c1_value = await c1.read_value() c1_value = await c1.read_value()
print(f"@@@ C1 {c1} value = {c1_value} (type={type(c1_value)})") print(f"@@@ C1 {c1} value = {c1_value!r} (type={type(c1_value)})")
await c1.write_value("happy π day".encode("utf-8")) await c1.write_value("happy π day".encode("utf-8"))
# Static characteristic with a string value. # Static characteristic with a string value.
@@ -136,7 +136,7 @@ async def client(device: Device, address: hci.Address) -> None:
# Dynamic characteristic with a bytes value. # Dynamic characteristic with a bytes value.
c7 = characteristics[6] c7 = characteristics[6]
c7_value = await c7.read_value() c7_value = await c7.read_value()
print(f"@@@ C7 {c7} value = {c7_value} (type={type(c7_value)})") print(f"@@@ C7 {c7} value = {c7_value!r} (type={type(c7_value)})")
await c7.write_value(bytes.fromhex("01020304")) await c7.write_value(bytes.fromhex("01020304"))
# Dynamic characteristic with a string value. # Dynamic characteristic with a string value.

View File

@@ -42,7 +42,7 @@ from bumble.profiles.bap import (
from bumble.profiles.pacs import PacRecord, PublishedAudioCapabilitiesService from bumble.profiles.pacs import PacRecord, PublishedAudioCapabilitiesService
from bumble.profiles.cap import CommonAudioServiceService from bumble.profiles.cap import CommonAudioServiceService
from bumble.profiles.csip import CoordinatedSetIdentificationService, SirkType from bumble.profiles.csip import CoordinatedSetIdentificationService, SirkType
from bumble.profiles.vcp import VolumeControlService from bumble.profiles.vcs import VolumeControlService
from bumble.transport import open_transport_or_link from bumble.transport import open_transport_or_link
@@ -117,13 +117,17 @@ async def main() -> None:
ws: Optional[websockets.WebSocketServerProtocol] = None ws: Optional[websockets.WebSocketServerProtocol] = None
def on_volume_state(volume_setting: int, muted: int, change_counter: int): def on_volume_state_change():
if ws: if ws:
asyncio.create_task( asyncio.create_task(
ws.send(dumps_volume_state(volume_setting, muted, change_counter)) ws.send(
dumps_volume_state(
vcs.volume_setting, vcs.muted, vcs.change_counter
)
)
) )
vcs.on('volume_state', on_volume_state) vcs.on('volume_state_change', on_volume_state_change)
advertising_data = ( advertising_data = (
bytes( bytes(
@@ -170,16 +174,10 @@ async def main() -> None:
ws = websocket ws = websocket
async for message in websocket: async for message in websocket:
volume_state = json.loads(message) volume_state = json.loads(message)
vcs.volume_state_bytes = bytes( vcs.volume_setting = volume_state['volume_setting']
[ vcs.muted = volume_state['muted']
volume_state['volume_setting'], vcs.change_counter = volume_state['change_counter']
volume_state['muted'], await device.notify_subscribers(vcs.volume_state)
volume_state['change_counter'],
]
)
await device.notify_subscribers(
vcs.volume_state, vcs.volume_state_bytes
)
ws = None ws = None
await websockets.serve(serve, 'localhost', 8989) await websockets.serve(serve, 'localhost', 8989)

View File

@@ -61,15 +61,17 @@ avatar = [
] ]
pandora = ["bt-test-interfaces >= 0.0.6"] pandora = ["bt-test-interfaces >= 0.0.6"]
documentation = [ documentation = [
"mkdocs >= 1.4.0", "mkdocs >= 1.6.0",
"mkdocs-material >= 8.5.6", "mkdocs-material >= 9.6",
"mkdocstrings[python] >= 0.19.0", "mkdocstrings[python] >= 0.27.0",
] ]
lc3 = [ auracast = [
"lc3 @ git+https://github.com/google/liblc3.git", "lc3py ; python_version>='3.10' and platform_system=='Linux' and platform_machine=='x86_64'",
"sounddevice >= 0.5.1",
] ]
[project.scripts] [project.scripts]
bumble-auracast = "bumble.apps.auracast:main"
bumble-ble-rpa-tool = "bumble.apps.ble_rpa_tool:main" bumble-ble-rpa-tool = "bumble.apps.ble_rpa_tool:main"
bumble-console = "bumble.apps.console:main" bumble-console = "bumble.apps.console:main"
bumble-controller-info = "bumble.apps.controller_info:main" bumble-controller-info = "bumble.apps.controller_info:main"
@@ -190,6 +192,10 @@ ignore_missing_imports = true
module = "serial_asyncio.*" module = "serial_asyncio.*"
ignore_missing_imports = true ignore_missing_imports = true
[[tool.mypy.overrides]]
module = "sounddevice.*"
ignore_missing_imports = true
[[tool.mypy.overrides]] [[tool.mypy.overrides]]
module = "usb.*" module = "usb.*"
ignore_missing_imports = true ignore_missing_imports = true

View File

@@ -33,7 +33,7 @@ from bumble.profiles.aics import (
AudioInputControlPointOpCode, AudioInputControlPointOpCode,
ErrorCode, ErrorCode,
) )
from bumble.profiles.vcp import VolumeControlService, VolumeControlServiceProxy from bumble.profiles.vcs import VolumeControlService, VolumeControlServiceProxy
from .test_utils import TwoDevices from .test_utils import TwoDevices

View File

@@ -53,7 +53,7 @@ def test_import():
le_audio, le_audio,
pacs, pacs,
pbp, pbp,
vcp, vcs,
) )
assert att assert att
@@ -87,7 +87,7 @@ def test_import():
assert le_audio assert le_audio
assert pacs assert pacs
assert pbp assert pbp
assert vcp assert vcs
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------

View File

@@ -20,7 +20,7 @@ import pytest_asyncio
import logging import logging
from bumble import device from bumble import device
from bumble.profiles import vcp from bumble.profiles import vcs
from .test_utils import TwoDevices from .test_utils import TwoDevices
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -34,7 +34,7 @@ logger = logging.getLogger(__name__)
async def vcp_client(): async def vcp_client():
devices = TwoDevices() devices = TwoDevices()
devices[0].add_service( devices[0].add_service(
vcp.VolumeControlService(volume_setting=32, muted=1, volume_flags=1) vcs.VolumeControlService(volume_setting=32, muted=1, volume_flags=1)
) )
await devices.setup_connection() await devices.setup_connection()
@@ -48,76 +48,76 @@ async def vcp_client():
peer = device.Peer(devices.connections[1]) peer = device.Peer(devices.connections[1])
vcp_client = await peer.discover_service_and_create_proxy( vcp_client = await peer.discover_service_and_create_proxy(
vcp.VolumeControlServiceProxy vcs.VolumeControlServiceProxy
) )
yield vcp_client yield vcp_client
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_init_service(vcp_client: vcp.VolumeControlServiceProxy): async def test_init_service(vcp_client: vcs.VolumeControlServiceProxy):
assert (await vcp_client.volume_flags.read_value()) == 1 assert (await vcp_client.volume_flags.read_value()) == 1
assert (await vcp_client.volume_state.read_value()) == (32, 1, 0) assert (await vcp_client.volume_state.read_value()) == vcs.VolumeState(32, 1, 0)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_relative_volume_down(vcp_client: vcp.VolumeControlServiceProxy): async def test_relative_volume_down(vcp_client: vcs.VolumeControlServiceProxy):
await vcp_client.volume_control_point.write_value( await vcp_client.volume_control_point.write_value(
bytes([vcp.VolumeControlPointOpcode.RELATIVE_VOLUME_DOWN, 0]) bytes([vcs.VolumeControlPointOpcode.RELATIVE_VOLUME_DOWN, 0])
) )
assert (await vcp_client.volume_state.read_value()) == (16, 1, 1) assert (await vcp_client.volume_state.read_value()) == vcs.VolumeState(16, 1, 1)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_relative_volume_up(vcp_client: vcp.VolumeControlServiceProxy): async def test_relative_volume_up(vcp_client: vcs.VolumeControlServiceProxy):
await vcp_client.volume_control_point.write_value( await vcp_client.volume_control_point.write_value(
bytes([vcp.VolumeControlPointOpcode.RELATIVE_VOLUME_UP, 0]) bytes([vcs.VolumeControlPointOpcode.RELATIVE_VOLUME_UP, 0])
) )
assert (await vcp_client.volume_state.read_value()) == (48, 1, 1) assert (await vcp_client.volume_state.read_value()) == vcs.VolumeState(48, 1, 1)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_unmute_relative_volume_down(vcp_client: vcp.VolumeControlServiceProxy): async def test_unmute_relative_volume_down(vcp_client: vcs.VolumeControlServiceProxy):
await vcp_client.volume_control_point.write_value( await vcp_client.volume_control_point.write_value(
bytes([vcp.VolumeControlPointOpcode.UNMUTE_RELATIVE_VOLUME_DOWN, 0]) bytes([vcs.VolumeControlPointOpcode.UNMUTE_RELATIVE_VOLUME_DOWN, 0])
) )
assert (await vcp_client.volume_state.read_value()) == (16, 0, 1) assert (await vcp_client.volume_state.read_value()) == vcs.VolumeState(16, 0, 1)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_unmute_relative_volume_up(vcp_client: vcp.VolumeControlServiceProxy): async def test_unmute_relative_volume_up(vcp_client: vcs.VolumeControlServiceProxy):
await vcp_client.volume_control_point.write_value( await vcp_client.volume_control_point.write_value(
bytes([vcp.VolumeControlPointOpcode.UNMUTE_RELATIVE_VOLUME_UP, 0]) bytes([vcs.VolumeControlPointOpcode.UNMUTE_RELATIVE_VOLUME_UP, 0])
) )
assert (await vcp_client.volume_state.read_value()) == (48, 0, 1) assert (await vcp_client.volume_state.read_value()) == vcs.VolumeState(48, 0, 1)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_set_absolute_volume(vcp_client: vcp.VolumeControlServiceProxy): async def test_set_absolute_volume(vcp_client: vcs.VolumeControlServiceProxy):
await vcp_client.volume_control_point.write_value( await vcp_client.volume_control_point.write_value(
bytes([vcp.VolumeControlPointOpcode.SET_ABSOLUTE_VOLUME, 0, 255]) bytes([vcs.VolumeControlPointOpcode.SET_ABSOLUTE_VOLUME, 0, 255])
) )
assert (await vcp_client.volume_state.read_value()) == (255, 1, 1) assert (await vcp_client.volume_state.read_value()) == vcs.VolumeState(255, 1, 1)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_mute(vcp_client: vcp.VolumeControlServiceProxy): async def test_mute(vcp_client: vcs.VolumeControlServiceProxy):
await vcp_client.volume_control_point.write_value( await vcp_client.volume_control_point.write_value(
bytes([vcp.VolumeControlPointOpcode.MUTE, 0]) bytes([vcs.VolumeControlPointOpcode.MUTE, 0])
) )
assert (await vcp_client.volume_state.read_value()) == (32, 1, 0) assert (await vcp_client.volume_state.read_value()) == vcs.VolumeState(32, 1, 0)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_unmute(vcp_client: vcp.VolumeControlServiceProxy): async def test_unmute(vcp_client: vcs.VolumeControlServiceProxy):
await vcp_client.volume_control_point.write_value( await vcp_client.volume_control_point.write_value(
bytes([vcp.VolumeControlPointOpcode.UNMUTE, 0]) bytes([vcs.VolumeControlPointOpcode.UNMUTE, 0])
) )
assert (await vcp_client.volume_state.read_value()) == (32, 0, 1) assert (await vcp_client.volume_state.read_value()) == vcs.VolumeState(32, 0, 1)

View File

@@ -32,9 +32,8 @@ from bumble.profiles.vocs import (
SetVolumeOffsetOpCode, SetVolumeOffsetOpCode,
VolumeOffsetControlServiceProxy, VolumeOffsetControlServiceProxy,
VolumeOffsetState, VolumeOffsetState,
VocsAudioLocation,
) )
from bumble.profiles.vcp import VolumeControlService, VolumeControlServiceProxy from bumble.profiles.vcs import VolumeControlService, VolumeControlServiceProxy
from bumble.profiles.bap import AudioLocation from bumble.profiles.bap import AudioLocation
from .test_utils import TwoDevices from .test_utils import TwoDevices
@@ -81,9 +80,7 @@ async def test_init_service(vocs_client: VolumeOffsetControlServiceProxy):
volume_offset=0, volume_offset=0,
change_counter=0, change_counter=0,
) )
assert await vocs_client.audio_location.read_value() == VocsAudioLocation( assert await vocs_client.audio_location.read_value() == AudioLocation.NOT_ALLOWED
audio_location=AudioLocation.NOT_ALLOWED
)
description = await vocs_client.audio_output_description.read_value() description = await vocs_client.audio_output_description.read_value()
assert description == '' assert description == ''
@@ -162,11 +159,9 @@ async def test_set_volume_offset(vocs_client: VolumeOffsetControlServiceProxy):
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_set_audio_channel_location(vocs_client: VolumeOffsetControlServiceProxy): async def test_set_audio_channel_location(vocs_client: VolumeOffsetControlServiceProxy):
new_audio_location = VocsAudioLocation(audio_location=AudioLocation.FRONT_LEFT) new_audio_location = AudioLocation.FRONT_LEFT
await vocs_client.audio_location.write_value( await vocs_client.audio_location.write_value(new_audio_location)
struct.pack('<I', new_audio_location.audio_location)
)
location = await vocs_client.audio_location.read_value() location = await vocs_client.audio_location.read_value()
assert location == new_audio_location assert location == new_audio_location