improvements

This commit is contained in:
Gilles Boccon-Gibod
2025-02-03 17:58:09 -05:00
parent 55d3fd90f5
commit 70141c0439
22 changed files with 495 additions and 649 deletions

View File

@@ -17,30 +17,25 @@
# -----------------------------------------------------------------------------
from __future__ import annotations
import abc
import asyncio
import asyncio.subprocess
from concurrent.futures import ThreadPoolExecutor
import collections
import contextlib
import dataclasses
import enum
import functools
import logging
import os
import pathlib
import wave
import struct
import sys
import time
from typing import (
cast,
Any,
AsyncGenerator,
BinaryIO,
Coroutine,
Deque,
Optional,
Tuple,
TYPE_CHECKING,
)
import click
@@ -49,8 +44,11 @@ import pyee
try:
import lc3 # type: ignore # pylint: disable=E0401
except ImportError as e:
raise ImportError("Try `python -m pip install \".[auracast]\"`.") from e
raise ImportError(
"Try `python -m pip install \"git+https://github.com/google/liblc3.git\"`."
) from e
from bumble.audio import io as audio_io
from bumble.colors import color
from bumble import company_ids
from bumble import core
@@ -64,9 +62,6 @@ import bumble.device
import bumble.transport
import bumble.utils
if TYPE_CHECKING:
import sounddevice
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
@@ -82,46 +77,16 @@ AURACAST_DEFAULT_SYNC_TIMEOUT = 5.0
AURACAST_DEFAULT_ATT_MTU = 256
AURACAST_DEFAULT_FRAME_DURATION = 10000
AURACAST_DEFAULT_SAMPLE_RATE = 48000
AURACAST_DEFAULT_TRANSMIT_BITRATE = 80000
# -----------------------------------------------------------------------------
# Audio I/O Support
# -----------------------------------------------------------------------------
@dataclasses.dataclass
class PcmFormat:
class Endianness(enum.Enum):
LITTLE = 0
BIG = 1
class SampleType(enum.Enum):
FLOAT32 = 0
INT16 = 1
endianness: Endianness
sample_type: SampleType
sample_rate: int
channels: int
@classmethod
def from_str(cls, format_str: str) -> PcmFormat:
endianness = cls.Endianness.LITTLE # Others not yet supported.
sample_type_str, sample_rate_str, channels_str = format_str.split(',')
if sample_type_str == 'int16le':
sample_type = cls.SampleType.INT16
elif sample_rate_str == 'float32le':
sample_type = cls.SampleType.FLOAT32
else:
raise ValueError(f'sample type {sample_type_str} not supported')
sample_rate = int(sample_rate_str)
channels = int(channels_str)
return cls(endianness, sample_type, sample_rate, channels)
def check_audio_output(output: str) -> bool:
if output == 'device' or output.startswith('device:'):
try:
import sounddevice
import sounddevice # type: ignore[import-untyped]
except ImportError as exc:
raise ValueError(
'audio output not available (sounddevice python module not installed)'
@@ -164,16 +129,16 @@ def check_audio_output(output: str) -> bool:
return True
async def create_audio_output(output: str) -> AudioOutput:
async def create_audio_output(output: str) -> audio_io.AudioOutput:
if output == 'stdout':
return StreamAudioOutput(sys.stdout.buffer)
return audio_io.StreamAudioOutput(sys.stdout.buffer)
if output == 'device' or output.startswith('device:'):
device_name = '' if output == 'device' else output[7:]
return SoundDeviceAudioOutput(device_name)
return audio_io.SoundDeviceAudioOutput(device_name)
if output == 'ffplay':
return SubprocessAudioOutput(
return audio_io.SubprocessAudioOutput(
command=(
'ffplay -probesize 32 -fflags nobuffer -analyzeduration 0 '
'-ar {sample_rate} '
@@ -183,152 +148,11 @@ async def create_audio_output(output: str) -> AudioOutput:
)
if output.startswith('file:'):
return FileAudioOutput(output[5:])
return audio_io.FileAudioOutput(output[5:])
raise ValueError('unsupported audio output')
class AudioOutput(abc.ABC):
"""Audio output to which PCM samples can be written."""
async def open(self, pcm_format: PcmFormat) -> None:
"""Start the output."""
@abc.abstractmethod
def write(self, pcm_samples: bytes) -> None:
"""Write PCM samples. Must not block."""
async def aclose(self) -> None:
"""Close the output."""
class ThreadedAudioOutput(AudioOutput):
"""Base class for AudioOutput classes that may need to call blocking functions.
The actual writing is performed in a thread, so as to ensure that calling write()
does not block the caller.
"""
def __init__(self) -> None:
self._thread_pool = ThreadPoolExecutor(1)
self._pcm_samples: asyncio.Queue[bytes] = asyncio.Queue()
self._write_task = asyncio.create_task(self._write_loop())
async def _write_loop(self) -> None:
while True:
pcm_samples = await self._pcm_samples.get()
await asyncio.get_running_loop().run_in_executor(
self._thread_pool, self._write, pcm_samples
)
@abc.abstractmethod
def _write(self, pcm_samples: bytes) -> None:
"""This method does the actual writing and can block."""
def write(self, pcm_samples: bytes) -> None:
self._pcm_samples.put_nowait(pcm_samples)
def _close(self) -> None:
"""This method does the actual closing and can block."""
async def aclose(self) -> None:
await asyncio.get_running_loop().run_in_executor(self._thread_pool, self._close)
self._write_task.cancel()
self._thread_pool.shutdown()
class SoundDeviceAudioOutput(ThreadedAudioOutput):
def __init__(self, device_name: str) -> None:
super().__init__()
self._device = int(device_name) if device_name else None
self._stream: sounddevice.RawOutputStream | None = None
async def open(self, pcm_format: PcmFormat) -> None:
import sounddevice
self._stream = sounddevice.RawOutputStream(
samplerate=pcm_format.sample_rate,
device=self._device,
channels=pcm_format.channels,
dtype='float32',
)
self._stream.start()
def _write(self, pcm_samples: bytes) -> None:
if self._stream is None:
return
try:
self._stream.write(pcm_samples)
except Exception as error:
print(f'Sound device error: {error}')
raise
def _close(self):
self._stream.stop()
self._stream = None
class StreamAudioOutput(ThreadedAudioOutput):
"""AudioOutput where PCM samples are written to a stream that may block."""
def __init__(self, stream: BinaryIO) -> None:
super().__init__()
self._stream = stream
def _write(self, pcm_samples: bytes) -> None:
self._stream.write(pcm_samples)
self._stream.flush()
class FileAudioOutput(StreamAudioOutput):
"""AudioOutput where PCM samples are written to a file."""
def __init__(self, filename: str) -> None:
self._file = open(filename, "wb")
super().__init__(self._file)
async def shutdown(self):
self._file.close()
return await super().shutdown()
class SubprocessAudioOutput(AudioOutput):
"""AudioOutput where audio samples are written to a subprocess via stdin."""
def __init__(self, command: str) -> None:
self._command = command
self._subprocess: asyncio.subprocess.Process | None
async def open(self, pcm_format: PcmFormat) -> None:
if pcm_format.channels == 1:
channel_layout = 'mono'
elif pcm_format.channels == 2:
channel_layout = 'stereo'
else:
raise ValueError(f'{pcm_format.channels} channels not supported')
command = self._command.format(
sample_rate=pcm_format.sample_rate, channel_layout=channel_layout
)
self._subprocess = await asyncio.create_subprocess_shell(
command,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
def write(self, pcm_samples: bytes) -> None:
if self._subprocess is None or self._subprocess.stdin is None:
return
self._subprocess.stdin.write(pcm_samples)
async def aclose(self):
if self._subprocess:
self._subprocess.terminate()
def check_audio_input(input: str) -> bool:
if input == 'device' or input.startswith('device:'):
try:
@@ -378,23 +202,23 @@ def check_audio_input(input: str) -> bool:
return True
async def create_audio_input(input: str, input_format: str) -> AudioInput:
pcm_format: PcmFormat | None
async def create_audio_input(input: str, input_format: str) -> audio_io.AudioInput:
pcm_format: audio_io.PcmFormat | None
if input_format == 'auto':
pcm_format = None
else:
pcm_format = PcmFormat.from_str(input_format)
pcm_format = audio_io.PcmFormat.from_str(input_format)
if input == 'stdin':
if not pcm_format:
raise ValueError('input format details required for stdin')
return StreamAudioInput(sys.stdin.buffer, pcm_format)
return audio_io.StreamAudioInput(sys.stdin.buffer, pcm_format)
if input == 'device' or input.startswith('device:'):
if not pcm_format:
raise ValueError('input format details required for device')
device_name = '' if input == 'device' else input[7:]
return SoundDeviceAudioInput(device_name, pcm_format)
return audio_io.SoundDeviceAudioInput(device_name, pcm_format)
# If there's no file: prefix, check if we can assume it is a file.
if pathlib.Path(input).is_file():
@@ -405,177 +229,35 @@ async def create_audio_input(input: str, input_format: str) -> AudioInput:
if filename.endswith('.wav'):
if input_format != 'auto':
raise ValueError(".wav file only supported with 'auto' format")
return WaveAudioInput(filename)
return audio_io.WaveAudioInput(filename)
if pcm_format is None:
raise ValueError('input format details required for raw PCM files')
return FileAudioInput(filename, pcm_format)
return audio_io.FileAudioInput(filename, pcm_format)
raise ValueError('input not supported')
class AudioInput(abc.ABC):
"""Audio input that produces PCM samples."""
@abc.abstractmethod
async def open(self) -> PcmFormat:
"""Open the input."""
@abc.abstractmethod
def frames(self, frame_size: int) -> AsyncGenerator[bytes]:
"""Generate one frame of PCM samples. Must not block."""
async def aclose(self) -> None:
"""Close the input."""
class ThreadedAudioInput(AudioInput):
"""Base class for AudioInput implementation where reading samples may block."""
def __init__(self) -> None:
self._thread_pool = ThreadPoolExecutor(1)
self._pcm_samples: asyncio.Queue[bytes] = asyncio.Queue()
@abc.abstractmethod
def _read(self, frame_size: int) -> bytes:
pass
@abc.abstractmethod
def _open(self) -> PcmFormat:
pass
def _close(self) -> None:
pass
async def open(self) -> PcmFormat:
return await asyncio.get_running_loop().run_in_executor(
self._thread_pool, self._open
# -----------------------------------------------------------------------------
# Utils
# -----------------------------------------------------------------------------
def codec_config_string(
codec_config: bap.CodecSpecificConfiguration, indent: str
) -> str:
lines = []
if codec_config.sampling_frequency is not None:
lines.append(f'Sampling Frequency: {codec_config.sampling_frequency.hz} hz')
if codec_config.frame_duration is not None:
lines.append(f'Frame Duration: {codec_config.frame_duration.us} µs')
if codec_config.octets_per_codec_frame is not None:
lines.append(f'Frame Size: {codec_config.octets_per_codec_frame} bytes')
if codec_config.codec_frames_per_sdu is not None:
lines.append(f'Frames Per SDU: {codec_config.codec_frames_per_sdu}')
if codec_config.audio_channel_allocation is not None:
lines.append(
f'Audio Location: {codec_config.audio_channel_allocation.name}'
)
async def frames(self, frame_size: int) -> AsyncGenerator[bytes]:
while pcm_sample := await asyncio.get_running_loop().run_in_executor(
self._thread_pool, self._read, frame_size
):
yield pcm_sample
async def aclose(self) -> None:
await asyncio.get_running_loop().run_in_executor(self._thread_pool, self._close)
self._thread_pool.shutdown()
class WaveAudioInput(ThreadedAudioInput):
"""Audio input that reads PCM samples from a .wav file."""
def __init__(self, filename: str) -> None:
super().__init__()
self._filename = filename
self._wav: wave.Wave_read | None = None
self._bytes_read = 0
def _open(self) -> PcmFormat:
self._wav = wave.open(self._filename, 'rb')
if self._wav.getsampwidth() != 2:
raise ValueError('sample width not supported')
return PcmFormat(
PcmFormat.Endianness.LITTLE,
PcmFormat.SampleType.INT16,
self._wav.getframerate(),
self._wav.getnchannels(),
)
def _read(self, frame_size: int) -> bytes:
if not self._wav:
return b''
pcm_samples = self._wav.readframes(frame_size)
if not pcm_samples and self._bytes_read:
# Loop around.
self._wav.rewind()
self._bytes_read = 0
pcm_samples = self._wav.readframes(frame_size)
self._bytes_read += len(pcm_samples)
return pcm_samples
def _close(self) -> None:
if self._wav:
self._wav.close()
class StreamAudioInput(ThreadedAudioInput):
"""AudioInput where samples are read from a raw PCM stream that may block."""
def __init__(self, stream: BinaryIO, pcm_format: PcmFormat) -> None:
super().__init__()
self._stream = stream
self._pcm_format = pcm_format
def _open(self) -> PcmFormat:
return self._pcm_format
def _read(self, frame_size: int) -> bytes:
return self._stream.read(frame_size * self._pcm_format.channels * 2)
class FileAudioInput(StreamAudioInput):
"""AudioInput where PCM samples are read from a raw PCM file."""
def __init__(self, filename: str, pcm_format: PcmFormat) -> None:
self._stream = open(filename, "rb")
super().__init__(self._stream, pcm_format)
def _read(self, frame_size: int) -> bytes:
return self._stream.read(frame_size * self._pcm_format.channels * 2)
def _close(self) -> None:
self._stream.close()
class SoundDeviceAudioInput(ThreadedAudioInput):
def __init__(self, device_name: str, pcm_format: PcmFormat) -> None:
super().__init__()
self._device = int(device_name) if device_name else None
self._pcm_format = pcm_format
self._stream: sounddevice.RawInputStream | None = None
def _open(self) -> PcmFormat:
import sounddevice
self._stream = sounddevice.RawInputStream(
samplerate=self._pcm_format.sample_rate,
device=self._device,
channels=self._pcm_format.channels,
dtype='int16',
)
self._stream.start()
return PcmFormat(
PcmFormat.Endianness.LITTLE,
PcmFormat.SampleType.INT16,
self._pcm_format.sample_rate,
2,
)
def _read(self, frame_size: int) -> bytes:
if not self._stream:
return b''
pcm_buffer, overflowed = self._stream.read(frame_size)
if overflowed:
logger.warning("input overflow")
# Convert the buffer to stereo if needed
if self._pcm_format.channels == 1:
stereo_buffer = bytearray()
for i in range(frame_size):
sample = pcm_buffer[i * 2 : i * 2 + 2]
stereo_buffer += sample + sample
return stereo_buffer
return bytes(pcm_buffer)
def _close(self):
self._stream.stop()
self._stream = None
return '\n'.join(indent + line for line in lines)
# -----------------------------------------------------------------------------
@@ -670,18 +352,17 @@ class BroadcastScanner(pyee.EventEmitter):
if self.public_broadcast_announcement:
print(
f' {color("Features", "cyan")}: '
f'{self.public_broadcast_announcement.features}'
)
print(
f' {color("Metadata", "cyan")}: '
f'{self.public_broadcast_announcement.metadata}'
f'{self.public_broadcast_announcement.features.name}'
)
print(f' {color("Metadata", "cyan")}:')
print(self.public_broadcast_announcement.metadata.pretty_print(' '))
if self.basic_audio_announcement:
print(color(' Audio:', 'cyan'))
print(
color(' Presentation Delay:', 'magenta'),
self.basic_audio_announcement.presentation_delay,
"µs",
)
for subgroup in self.basic_audio_announcement.subgroups:
print(color(' Subgroup:', 'magenta'))
@@ -698,17 +379,22 @@ class BroadcastScanner(pyee.EventEmitter):
color(' Vendor Specific Codec ID:', 'green'),
subgroup.codec_id.vendor_specific_codec_id,
)
print(color(' Codec Config:', 'yellow'))
print(
color(' Codec Config:', 'yellow'),
subgroup.codec_specific_configuration,
codec_config_string(
subgroup.codec_specific_configuration, ' '
),
)
print(color(' Metadata: ', 'yellow'), subgroup.metadata)
print(color(' Metadata: ', 'yellow'))
print(subgroup.metadata.pretty_print(' '))
for bis in subgroup.bis:
print(color(f' BIS [{bis.index}]:', 'yellow'))
print(color(' Codec Config:', 'green'))
print(
color(' Codec Config:', 'green'),
bis.codec_specific_configuration,
codec_config_string(
bis.codec_specific_configuration, ' '
),
)
if self.biginfo:
@@ -1008,7 +694,7 @@ async def run_assist(
except core.ProtocolError as error:
print(
color(
f'!!! Failed to subscribe to Broadcast Receive State characteristic:',
'!!! Failed to subscribe to Broadcast Receive State characteristic',
'red',
),
error,
@@ -1217,31 +903,52 @@ async def run_receive(
sample_rate_hz=sampling_frequency.hz,
num_channels=num_bis,
)
sdus = [b''] * num_bis
lc3_queues: list[Deque[bytes]] = [collections.deque() for i in range(num_bis)]
packet_stats = [0, 0]
async with contextlib.aclosing(
await create_audio_output(output)
) as audio_output:
await audio_output.open(
PcmFormat(
PcmFormat.Endianness.LITTLE,
PcmFormat.SampleType.FLOAT32,
audio_io.PcmFormat(
audio_io.PcmFormat.Endianness.LITTLE,
audio_io.PcmFormat.SampleType.FLOAT32,
sampling_frequency.hz,
num_bis,
)
)
def sink(queue: Deque[bytes], packet: hci.HCI_IsoDataPacket):
# TODO: re-assemble fragments and detect errors
queue.append(packet.iso_sdu_fragment)
while all(lc3_queues):
# This assumes SDUs contain one LC3 frame each, which may not
# be correct for all cases. TODO: revisit this assumption.
frame = b''.join([lc3_queue.popleft() for lc3_queue in lc3_queues])
if not frame:
print(color('!!! received empty frame', 'red'))
continue
packet_stats[0] += len(frame)
packet_stats[1] += 1
print(
f'\rRECEIVED: {packet_stats[0]} bytes in '
f'{packet_stats[1]} packets',
end='',
)
try:
pcm = decoder.decode(frame).tobytes()
except lc3.BaseError as error:
print(color(f'!!! LC3 decoding error: {error}'))
continue
audio_output.write(pcm)
for i, bis_link in enumerate(big_sync.bis_links):
print(f'Setup ISO for BIS {bis_link.handle}')
def sink(index: int, packet: hci.HCI_IsoDataPacket):
nonlocal sdus
sdus[index] = packet.iso_sdu_fragment
if all(sdus):
audio_output.write(decoder.decode(b''.join(sdus)).tobytes())
sdus = [b''] * num_bis
bis_link.sink = functools.partial(sink, i)
bis_link.sink = functools.partial(sink, lc3_queues[i])
await device.send_command(
hci.HCI_LE_Setup_ISO_Data_Path_Command(
connection_handle=bis_link.handle,
@@ -1259,11 +966,12 @@ async def run_receive(
await terminated.wait()
async def run_broadcast(
async def run_transmit(
transport: str,
broadcast_id: int,
broadcast_code: str | None,
broadcast_name: str,
bitrate: int,
manufacturer_data: tuple[int, bytes] | None,
input: str,
input_format: str,
@@ -1365,34 +1073,6 @@ async def run_broadcast(
print('Start Periodic Advertising')
await advertising_set.start_periodic()
print('Setup BIG')
big = await device.create_big(
advertising_set,
parameters=bumble.device.BigParameters(
num_bis=2,
sdu_interval=AURACAST_DEFAULT_FRAME_DURATION,
max_sdu=100,
max_transport_latency=65,
rtn=4,
broadcast_code=(
bytes.fromhex(broadcast_code) if broadcast_code else None
),
),
)
print('Setup ISO Data Path')
data_packet_queue = big.bis_links[0].data_packet_queue
def on_flow():
print(
f'\rPACKETS: pending={data_packet_queue.pending}, '
f'queued={data_packet_queue.queued}, '
f'completed={data_packet_queue.completed}',
end='',
)
data_packet_queue.on('flow', on_flow)
audio_input = await create_audio_input(input, input_format)
pcm_format = await audio_input.open()
if pcm_format.channels != 2:
@@ -1404,25 +1084,62 @@ async def run_broadcast(
num_channels=pcm_format.channels,
input_sample_rate_hz=pcm_format.sample_rate,
)
frame_size = encoder.get_frame_samples()
lc3_frame_samples = encoder.get_frame_samples()
lc3_frame_size = encoder.get_frame_bytes(bitrate)
print('Setup BIG')
big = await device.create_big(
advertising_set,
parameters=bumble.device.BigParameters(
num_bis=pcm_format.channels,
sdu_interval=AURACAST_DEFAULT_FRAME_DURATION,
max_sdu=lc3_frame_size,
max_transport_latency=65,
rtn=4,
broadcast_code=(
bytes.fromhex(broadcast_code) if broadcast_code else None
),
),
)
iso_queues = [
bumble.device.IsoPacketStream(big.bis_links[0], 2),
bumble.device.IsoPacketStream(big.bis_links[1], 2),
]
def on_flow():
data_packet_queue = iso_queues[0].data_packet_queue
print(
f'\rPACKETS: pending={data_packet_queue.pending}, '
f'queued={data_packet_queue.queued}, '
f'completed={data_packet_queue.completed}',
end='',
)
iso_queues[0].data_packet_queue.on('flow', on_flow)
async with contextlib.aclosing(audio_input):
frame_count = 0
start_time = time.time()
async for pcm_frame in audio_input.frames(frame_size):
now = time.time()
if (
target_time := (
start_time
+ frame_count * AURACAST_DEFAULT_FRAME_DURATION / 1_000_000
)
) >= now:
await asyncio.sleep(target_time - now)
# start_time = time.time()
async for pcm_frame in audio_input.frames(lc3_frame_samples):
# now = time.time()
# if (
# target_time := (
# start_time
# + frame_count * AURACAST_DEFAULT_FRAME_DURATION / 1_000_000
# )
# ) >= now:
# await asyncio.sleep(target_time - now)
lc3_frame = encoder.encode(pcm_frame, num_bytes=200, bit_depth=16)
lc3_frame = encoder.encode(
pcm_frame, num_bytes=2 * lc3_frame_size, bit_depth=16
)
mid = len(lc3_frame) // 2
big.bis_links[0].write(lc3_frame[:mid])
big.bis_links[1].write(lc3_frame[mid:])
# big.bis_links[0].write(lc3_frame[:mid])
# big.bis_links[1].write(lc3_frame[mid:])
await iso_queues[0].write(lc3_frame[:mid])
await iso_queues[1].write(lc3_frame[mid:])
frame_count += 1
@@ -1569,7 +1286,7 @@ def receive(
)
@auracast.command('broadcast')
@auracast.command('transmit')
@click.argument('transport')
@click.option(
'--input',
@@ -1581,7 +1298,7 @@ def receive(
"(specify 'device:?' to get a list of available sound input devices), "
"'stdin' -> receive audio from stdin as int16 PCM, "
"'file:<filename> -> read audio from a .wav or raw int16 PCM file. "
"(The file: prefix may be ommitted if the file path does not start with "
"(The file: prefix may be omitted if the file path does not start with "
"the substring 'device:' or 'file:' and is not 'stdin')"
),
)
@@ -1615,23 +1332,30 @@ def receive(
default='Bumble Auracast',
help='Broadcast name',
)
@click.option(
'--bitrate',
type=int,
default=AURACAST_DEFAULT_TRANSMIT_BITRATE,
help='Bitrate, per channel, in bps',
)
@click.option(
'--manufacturer-data',
metavar='VENDOR-ID:DATA-HEX',
help='Manufacturer data (specify as <vendor-id>:<data-hex>)',
)
@click.pass_context
def broadcast(
def transmit(
ctx,
transport,
broadcast_id,
broadcast_code,
manufacturer_data,
broadcast_name,
bitrate,
input,
input_format,
):
"""Start a broadcast as a source."""
"""Transmit a broadcast source."""
if manufacturer_data:
vendor_id_str, data_hex = manufacturer_data.split(':')
vendor_id = int(vendor_id_str)
@@ -1645,11 +1369,12 @@ def broadcast(
input_format = 'int16le,48000,1'
run_async(
run_broadcast(
run_transmit(
transport=transport,
broadcast_id=broadcast_id,
broadcast_code=broadcast_code,
broadcast_name=broadcast_name,
bitrate=bitrate,
manufacturer_data=manufacturer_data_tuple,
input=input,
input_format=input_format,

View File

@@ -29,7 +29,9 @@ from bumble.gatt import Service
from bumble.profiles.device_information_service import DeviceInformationServiceProxy
from bumble.profiles.battery_service import BatteryServiceProxy
from bumble.profiles.gap import GenericAccessServiceProxy
from bumble.profiles.pacs import PublishedAudioCapabilitiesServiceProxy
from bumble.profiles.tmap import TelephonyAndMediaAudioServiceProxy
from bumble.profiles.vcs import VolumeControlServiceProxy
from bumble.transport import open_transport_or_link
@@ -126,14 +128,52 @@ async def show_tmas(
print(color('### Telephony And Media Audio Service', 'yellow'))
if tmas.role:
print(
color(' Role:', 'green'),
await tmas.role.read_value(),
)
role = await tmas.role.read_value()
print(color(' Role:', 'green'), role)
print()
# -----------------------------------------------------------------------------
async def show_pacs(pacs: PublishedAudioCapabilitiesServiceProxy) -> None:
print(color('### Published Audio Capabilities Service', 'yellow'))
contexts = await pacs.available_audio_contexts.read_value()
print(color(' Available Audio Contexts:', 'green'), contexts)
contexts = await pacs.supported_audio_contexts.read_value()
print(color(' Supported Audio Contexts:', 'green'), contexts)
if pacs.sink_pac:
pac = await pacs.sink_pac.read_value()
print(color(' Sink PAC: ', 'green'), pac)
if pacs.sink_audio_locations:
audio_locations = await pacs.sink_audio_locations.read_value()
print(color(' Sink Audio Locations: ', 'green'), audio_locations)
if pacs.source_pac:
pac = await pacs.source_pac.read_value()
print(color(' Source PAC: ', 'green'), pac)
if pacs.source_audio_locations:
audio_locations = await pacs.source_audio_locations.read_value()
print(color(' Source Audio Locations: ', 'green'), audio_locations)
print()
# -----------------------------------------------------------------------------
async def show_vcs(vcs: VolumeControlServiceProxy) -> None:
print(color('### Volume Control Service', 'yellow'))
volume_state = await vcs.volume_state.read_value()
print(color(' Volume State:', 'green'), volume_state)
volume_flags = await vcs.volume_flags.read_value()
print(color(' Volume Flags:', 'green'), volume_flags)
# -----------------------------------------------------------------------------
async def show_device_info(peer, done: Optional[asyncio.Future]) -> None:
try:
@@ -161,6 +201,12 @@ async def show_device_info(peer, done: Optional[asyncio.Future]) -> None:
if tmas := peer.create_service_proxy(TelephonyAndMediaAudioServiceProxy):
await try_show(show_tmas, tmas)
if pacs := peer.create_service_proxy(PublishedAudioCapabilitiesServiceProxy):
await try_show(show_pacs, pacs)
if vcs := peer.create_service_proxy(VolumeControlServiceProxy):
await try_show(show_vcs, vcs)
if done is not None:
done.set_result(None)
except asyncio.CancelledError: