Files
bumble-auracast/src/auracast/multicast.py
T
pstruebi a20b289663 refactor: rename QoS configuration from "Default" to "Robust" and add QoS preset selection
- Rename `AuracastQosDefault` class to `AuracastQosRobust` to better reflect its characteristics (4 retransmissions)
- Replace RTN (retransmission number) dropdown with QoS preset selector offering "Fast" (2 RTN) and "Robust" (4 RTN) options
- Update frontend and backend to use QoS preset mapping instead of manual QoS config construction
- Add `_resolve_qos_preset_name()` helper function to convert QoS config
2025-12-18 17:07:46 +01:00

974 lines
40 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import pprint
import asyncio
import contextlib
import logging
import wave
import itertools
import struct
from typing import cast, Any, AsyncGenerator, Coroutine, List
import itertools
import glob
import time
import threading
import numpy as np # for audio down-mix
import os
import lc3 # type: ignore # pylint: disable=E0401
from bumble.colors import color
from bumble import company_ids
from bumble import core
from bumble import gatt
from bumble import hci
from bumble.profiles import bap
from bumble.profiles import le_audio
from bumble.profiles import pbp
from bumble.profiles import bass
import bumble.device
import bumble.transport
import bumble.utils
from bumble.device import Host, AdvertisingChannelMap
from bumble.audio import io as audio_io
from auracast import auracast_config
from auracast.utils.read_lc3_file import read_lc3_file
from auracast.utils.network_audio_receiver import NetworkAudioReceiverUncoded
from auracast.utils.webrtc_audio_input import WebRTCAudioInput
# Patch sounddevice.InputStream globally to use low-latency settings
import sounddevice as sd
from collections import deque
class ModSoundDeviceAudioInput(audio_io.SoundDeviceAudioInput):
"""Patched SoundDeviceAudioInput with low-latency capture and adaptive resampling."""
def _open(self):
"""Create RawInputStream with low-latency parameters and initialize ring buffer."""
dev_info = sd.query_devices(self._device)
hostapis = sd.query_hostapis()
api_index = dev_info.get('hostapi')
api_name = hostapis[api_index]['name'] if isinstance(api_index, int) and 0 <= api_index < len(hostapis) else 'unknown'
pa_ver = sd.get_portaudio_version()
logging.info(
"SoundDevice backend=%s device='%s' (id=%s) ch=%s default_low_input_latency=%.4f default_high_input_latency=%.4f portaudio=%s",
api_name,
dev_info.get('name'),
self._device,
dev_info.get('max_input_channels'),
float(dev_info.get('default_low_input_latency') or 0.0),
float(dev_info.get('default_high_input_latency') or 0.0),
pa_ver[1] if isinstance(pa_ver, tuple) and len(pa_ver) >= 2 else pa_ver,
)
# Create RawInputStream with injected low-latency parameters
# Target ~2 ms blocksize (48 kHz -> 96 frames). For other rates, keep ~2 ms.
_sr = int(self._pcm_format.sample_rate)
self.counter=0
self.max_avail=0
self.logfile_name="available_samples.txt"
self.blocksize = 120
if os.path.exists(self.logfile_name):
os.remove(self.logfile_name)
self._stream = sd.RawInputStream(
samplerate=self._pcm_format.sample_rate,
device=self._device,
channels=self._pcm_format.channels,
dtype='int16',
blocksize=self.blocksize,
latency=0.004,
)
self._stream.start()
return audio_io.PcmFormat(
audio_io.PcmFormat.Endianness.LITTLE,
audio_io.PcmFormat.SampleType.INT16,
self._pcm_format.sample_rate,
1,
)
def _read(self, frame_size: int) -> bytes:
"""Read PCM samples from the stream."""
#if self.counter % 50 == 0:
frame_size = frame_size + 1 # consume samples a little faster to avoid latency akkumulation
pcm_buffer, overflowed = self._stream.read(frame_size)
if overflowed:
logging.warning("SoundDeviceAudioInput: overflowed")
n_available = self._stream.read_available
# adapt = n_available > 20
# if adapt:
# pcm_extra, overflowed = self._stream.read(3)
# logging.info('consuming extra samples, available was %d', n_available)
# if overflowed:
# logging.warning("SoundDeviceAudioInput: overflowed")
# out = bytes(pcm_buffer) + bytes(pcm_extra)
# else:
out = bytes(pcm_buffer)
self.max_avail = max(self.max_avail, n_available)
#Diagnostics
#with open(self.logfile_name, "a", encoding="utf-8") as f:
# f.write(f"{n_available}, {adapt}, {round(self._runavg, 2)}, {overflowed}\n")
if self.counter % 500 == 0:
logging.info(
"read available=%d, max=%d, latency:%d",
n_available, self.max_avail, self._stream.latency
)
self.max_avail = 0
self.counter += 1
return out
audio_io.SoundDeviceAudioInput = ModSoundDeviceAudioInput
# modified from bumble
class ModWaveAudioInput(audio_io.ThreadedAudioInput):
"""Audio input that reads PCM samples from a .wav file."""
def __init__(self, filename: str) -> None:
super().__init__()
self._filename = filename
self._wav: wave.Wave_read | None = None
self._bytes_read = 0
self.rewind=True
def _open(self) -> audio_io.PcmFormat:
self._wav = wave.open(self._filename, 'rb')
if self._wav.getsampwidth() != 2:
raise ValueError('sample width not supported')
return audio_io.PcmFormat(
audio_io.PcmFormat.Endianness.LITTLE,
audio_io.PcmFormat.SampleType.INT16,
self._wav.getframerate(),
self._wav.getnchannels(),
)
def _read(self, frame_size: int) -> bytes:
if not self._wav:
return b''
pcm_samples = self._wav.readframes(frame_size)
if not pcm_samples and self._bytes_read:
if not self.rewind:
return None
# Loop around.
self._wav.rewind()
self._bytes_read = 0
pcm_samples = self._wav.readframes(frame_size)
self._bytes_read += len(pcm_samples)
return pcm_samples
def _close(self) -> None:
if self._wav:
self._wav.close()
audio_io.WaveAudioInput = ModWaveAudioInput
def broadcast_code_bytes(broadcast_code: str) -> bytes:
"""
Convert a broadcast code string to a 16-byte value.
If `broadcast_code` is `0x` followed by 32 hex characters, it is interpreted as a
raw 16-byte raw broadcast code in big-endian byte order.
Otherwise, `broadcast_code` is converted to a 16-byte value as specified in
BLUETOOTH CORE SPECIFICATION Version 6.0 | Vol 3, Part C , section 3.2.6.3
"""
if broadcast_code.startswith("0x") and len(broadcast_code) == 34:
return bytes.fromhex(broadcast_code[2:])[::-1]
broadcast_code_utf8 = broadcast_code.encode("utf-8")
if len(broadcast_code_utf8) > 16:
raise ValueError("broadcast code must be <= 16 bytes in utf-8 encoding")
padding = bytes(16 - len(broadcast_code_utf8))
return broadcast_code_utf8 + padding
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
logger = logging.getLogger(__name__)
@contextlib.asynccontextmanager
async def create_device(config: auracast_config.AuracastGlobalConfig) -> AsyncGenerator[bumble.device.Device, Any]:
async with await bumble.transport.open_transport(config.transport) as (
hci_source,
hci_sink,
):
device_config = bumble.device.DeviceConfiguration(
name=config.device_name,
address= hci.Address(config.auracast_device_address),
keystore='JsonKeyStore',
#le_simultaneous_enabled=True #TODO: What is this doing ?
)
device = bumble.device.Device.from_config_with_hci(
device_config,
hci_source,
hci_sink,
)
await device.power_on()
yield device
def run_async(async_command: Coroutine) -> None:
try:
asyncio.run(async_command)
except core.ProtocolError as error:
if error.error_namespace == 'att' and error.error_code in list(
bass.ApplicationError
):
message = bass.ApplicationError(error.error_code).name
else:
message = str(error)
print(
color('!!! An error occurred while executing the command:', 'red'), message
)
def _build_bis_list(num_bis: int) -> list:
"""Build BIS list for BasicAudioAnnouncement based on num_bis (1=mono, 2=stereo)."""
locations = [bap.AudioLocation.FRONT_LEFT, bap.AudioLocation.FRONT_RIGHT]
return [
bap.BasicAudioAnnouncement.BIS(
index=idx + 1,
codec_specific_configuration=bap.CodecSpecificConfiguration(
audio_channel_allocation=locations[idx]
),
)
for idx in range(num_bis)
]
async def init_broadcast(
device,
global_config : auracast_config.AuracastGlobalConfig,
big_config: List[auracast_config.AuracastBigConfig]
) -> dict:
bap_sampling_freq = getattr(bap.SamplingFrequency, f"FREQ_{global_config.auracast_sampling_rate_hz}")
bigs = {}
for i, conf in enumerate(big_config):
metadata=le_audio.Metadata(
[
le_audio.Metadata.Entry(
tag=le_audio.Metadata.Tag.LANGUAGE, data=conf.language.encode()
),
le_audio.Metadata.Entry(
tag=le_audio.Metadata.Tag.PROGRAM_INFO,
data=conf.program_info.encode('latin-1')
),
le_audio.Metadata.Entry(
tag=le_audio.Metadata.Tag.BROADCAST_NAME, data=conf.name.encode()
),
]
+ (
[
# Broadcast Audio Immediate Rendering flag (type 0x09), zero-length value
le_audio.Metadata.Entry(tag = le_audio.Metadata.Tag.BROADCAST_AUDIO_IMMEDIATE_RENDERING_FLAG, data=b"")
]
if global_config.immediate_rendering
else []
)
+ (
[
# Assisted Listening Stream tag expects a 1-octet value. Use 0x01 to indicate enabled.
le_audio.Metadata.Entry(tag = le_audio.Metadata.Tag.ASSISTED_LISTENING_STREAM, data=b"\x01")
]
if global_config.assisted_listening_stream
else []
)
)
try:
logging.info(metadata.pretty_print("\n"))
except UnicodeDecodeError:
logging.info("Metadata: (contains non-UTF-8 bytes)")
bigs[f'big{i}'] = {}
# Config advertising set
bigs[f'big{i}']['basic_audio_announcement'] = bap.BasicAudioAnnouncement(
presentation_delay=global_config.presentation_delay_us,
subgroups=[
bap.BasicAudioAnnouncement.Subgroup(
codec_id=hci.CodingFormat(codec_id=hci.CodecID.LC3),
codec_specific_configuration=bap.CodecSpecificConfiguration(
sampling_frequency=bap_sampling_freq,
frame_duration=bap.FrameDuration.DURATION_7500_US if global_config.frame_duration_us == 7500 else bap.FrameDuration.DURATION_10000_US,
octets_per_codec_frame=global_config.octets_per_frame,
),
metadata=metadata,
bis=_build_bis_list(conf.num_bis),
)
],
)
logger.info('Setup Advertising')
advertising_manufacturer_data = (
b''
if global_config.manufacturer_data == (None, None)
else bytes(
core.AdvertisingData(
[
(
core.AdvertisingData.MANUFACTURER_SPECIFIC_DATA,
struct.pack('<H', global_config.manufacturer_data[0])
+ global_config.manufacturer_data[1],
)
]
)
)
)
bigs[f'big{i}']['broadcast_audio_announcement'] = bap.BroadcastAudioAnnouncement(conf.id)
# Build advertising data types list
advertising_data_types = [
(core.AdvertisingData.BROADCAST_NAME, conf.name.encode()),
]
# [PBP] Add Public Broadcast Profile Service Data (UUID 0x1856)
# Required for PTS Qualification (PBP/PBS/STR)
# Dynamically calculate PBP features based on stream configuration
pbp_features = 0x00
# Bit 0: Encryption (set if broadcast_code is configured)
if conf.code is not None:
pbp_features |= 0x01
# Bit 1 vs Bit 2: Quality based on sample rate
if global_config.auracast_sampling_rate_hz in [16000, 24000]:
pbp_features |= 0x02 # Standard Quality
elif global_config.auracast_sampling_rate_hz == 48000:
pbp_features |= 0x04 # High Quality
# Build PBP service data with Program_Info metadata (LTV format: Length, Type=0x03, Value)
# LTV: Length = 1 (type) + len(value), Type = 0x03 (Program_Info)
program_info_bytes = conf.program_info.encode('latin-1')
pbp_metadata_ltv = bytes([len(program_info_bytes) + 1, 0x03]) + program_info_bytes
pbp_service_data = struct.pack('<H', 0x1856) + bytes([pbp_features, len(pbp_metadata_ltv)]) + pbp_metadata_ltv
advertising_data_types.append(
(core.AdvertisingData.SERVICE_DATA_16_BIT_UUID, pbp_service_data)
)
advertising_set = await device.create_advertising_set(
random_address=hci.Address(conf.random_address),
advertising_parameters=bumble.device.AdvertisingParameters(
advertising_event_properties=bumble.device.AdvertisingEventProperties(
is_connectable=False
),
primary_advertising_interval_min=100,
primary_advertising_interval_max=200,
advertising_sid=i,
primary_advertising_phy=hci.Phy.LE_1M, # 2m phy config throws error - because for primary advertising channels, 1mbit is only supported
secondary_advertising_phy=hci.Phy.LE_1M, # this is the secondary advertising beeing send on non advertising channels (extendend advertising)
#advertising_tx_power= # tx power in dbm (max 20)
#secondary_advertising_max_skip=10,
),
advertising_data=(
bigs[f'big{i}']['broadcast_audio_announcement'].get_advertising_data()
+ bytes(core.AdvertisingData(advertising_data_types))
+ advertising_manufacturer_data
),
periodic_advertising_parameters=bumble.device.PeriodicAdvertisingParameters(
periodic_advertising_interval_min=80,
periodic_advertising_interval_max=160,
),
periodic_advertising_data=bigs[f'big{i}']['basic_audio_announcement'].get_advertising_data(),
auto_restart=True,
auto_start=True,
)
bigs[f'big{i}']['advertising_set'] = advertising_set
logging.info('Start Periodic Advertising')
await advertising_set.start_periodic()
logging.info('Setup BIG')
if global_config.qos_config.iso_int_multiple_10ms == 1:
frame_enable = 0
else:
frame_enable = 1
big = await device.create_big(
bigs[f'big{i}']['advertising_set'],
parameters=bumble.device.BigParameters(
num_bis=conf.num_bis,
sdu_interval=global_config.qos_config.iso_int_multiple_10ms*10000, # Is the same as iso interval
max_sdu=global_config.octets_per_frame,
max_transport_latency=global_config.qos_config.max_transport_latency_ms,
rtn=global_config.qos_config.number_of_retransmissions,
broadcast_code=(
broadcast_code_bytes(conf.code) if conf.code else None
),
framing=frame_enable # needed if iso interval is not frame interval of codedc
),
)
bigs[f'big{i}']['big'] = big
for bis_link in big.bis_links:
await bis_link.setup_data_path(
direction=bis_link.Direction.HOST_TO_CONTROLLER
)
# Create ISO queue(s) - one per BIS
iso_queues = [
bumble.device.IsoPacketStream(link, conf.iso_que_len)
for link in big.bis_links
]
logging.info('Setup ISO Data Path')
bigs[f'big{i}']['iso_queues'] = iso_queues
bigs[f'big{i}']['num_bis'] = conf.num_bis
# Keep backward compat: iso_queue points to first queue
bigs[f'big{i}']['iso_queue'] = iso_queues[0]
if global_config.debug:
logging.info(f'big{i} parameters are:')
logging.info('%s', pprint.pformat(vars(big)))
logging.info(f'Finished setup of big{i}.')
await asyncio.sleep(i+1) # Wait for advertising to set up
def on_flow():
data_packet_queue = iso_queue.data_packet_queue
print(
f'\rPACKETS: pending={data_packet_queue.pending}, '
f'queued={data_packet_queue.queued}, '
f'completed={data_packet_queue.completed}',
end='',
)
if global_config.debug:
bigs[f'big{0}']['iso_queue'].data_packet_queue.on('flow', on_flow)
return bigs
class Streamer():
"""
Streamer class that supports multiple input formats. See bumble for streaming from wav or device
Added functionallity on top of bumble:
- loop parameter
- if True the audio_source will be looped for ever
- precode wav files
- lc3 coded files
- just use a .lc3 file as audio_source
- lc3 coded from ram
- use a bytestring b'' as audio_source
"""
def __init__(
self,
bigs,
global_config : auracast_config.AuracastGlobalConfig,
big_config: List[auracast_config.AuracastBigConfig]
):
self.task = None
self.is_streaming = False
self.bigs = bigs
self.global_config = global_config
self.big_config = big_config
def start_streaming(self):
if not self.is_streaming:
self.task = asyncio.create_task(self.stream())
else:
logging.warning('Streamer is already running')
async def stop_streaming(self):
"""Gracefully stop streaming and release audio devices."""
if not self.is_streaming and self.task is None:
return
# Ask the streaming loop to finish
self.is_streaming = False
if self.task is not None:
self.task.cancel()
# Let cancellation propagate to the stream() coroutine
await asyncio.sleep(0.01)
self.task = None
# Close audio inputs (await to ensure ALSA devices are released)
async_closers = []
sync_closers = []
for big in self.bigs.values():
ai = big.get("audio_input")
if not ai:
continue
# First close any frames generator backed by the input to stop reads
frames_gen = big.get("frames_gen")
if frames_gen and hasattr(frames_gen, "aclose"):
try:
await frames_gen.aclose()
except Exception:
pass
big.pop("frames_gen", None)
if hasattr(ai, "aclose") and callable(getattr(ai, "aclose")):
async_closers.append(ai.aclose())
elif hasattr(ai, "close") and callable(getattr(ai, "close")):
sync_closers.append(ai.close)
# Remove reference so a fresh one is created next time
big.pop("audio_input", None)
if async_closers:
await asyncio.gather(*async_closers, return_exceptions=True)
for fn in sync_closers:
try:
fn()
except Exception:
pass
# Reset PortAudio to drop lingering PipeWire capture nodes
try:
import sounddevice as _sd
if hasattr(_sd, "_terminate"):
_sd._terminate()
await asyncio.sleep(0.05)
if hasattr(_sd, "_initialize"):
_sd._initialize()
except Exception:
pass
async def stream(self):
bigs = self.bigs
big_config = self.big_config
global_config = self.global_config
for i, big in enumerate(bigs.values()):
audio_source = big_config[i].audio_source
input_format = big_config[i].input_format
# --- New: network_uncoded mode using NetworkAudioReceiver ---
if isinstance(audio_source, NetworkAudioReceiverUncoded):
# Start the UDP receiver coroutine so packets are actually received
asyncio.create_task(audio_source.receive())
encoder = lc3.Encoder(
frame_duration_us=global_config.frame_duration_us,
sample_rate_hz=global_config.auracast_sampling_rate_hz,
num_channels=1,
input_sample_rate_hz=audio_source.samplerate,
)
lc3_frame_samples = encoder.get_frame_samples()
big['pcm_bit_depth'] = 16
big['lc3_frame_samples'] = lc3_frame_samples
big['lc3_bytes_per_frame'] = global_config.octets_per_frame
big['audio_input'] = audio_source
big['encoder'] = encoder
big['precoded'] = False
# Prepare frames generator for graceful shutdown
big['frames_gen'] = big['audio_input'].frames(lc3_frame_samples)
elif audio_source == 'webrtc':
big['audio_input'] = WebRTCAudioInput()
encoder = lc3.Encoder(
frame_duration_us=global_config.frame_duration_us,
sample_rate_hz=global_config.auracast_sampling_rate_hz,
num_channels=1,
input_sample_rate_hz=48000, # TODO: get samplerate from webrtc
)
lc3_frame_samples = encoder.get_frame_samples()
big['pcm_bit_depth'] = 16
big['lc3_frame_samples'] = lc3_frame_samples
big['lc3_bytes_per_frame'] = global_config.octets_per_frame
big['encoder'] = encoder
big['precoded'] = False
# Prepare frames generator for graceful shutdown
big['frames_gen'] = big['audio_input'].frames(lc3_frame_samples)
# precoded lc3 from ram
elif isinstance(big_config[i].audio_source, bytes):
big['precoded'] = True
big['lc3_bytes_per_frame'] = global_config.octets_per_frame
lc3_frames = iter(big_config[i].audio_source)
if big_config[i].loop:
lc3_frames = itertools.cycle(lc3_frames)
big['lc3_frames'] = lc3_frames
# precoded lc3 file
elif big_config[i].audio_source.endswith('.lc3'):
big['precoded'] = True
big['lc3_bytes_per_frame'] = global_config.octets_per_frame
filename = big_config[i].audio_source.replace('file:', '')
lc3_bytes = read_lc3_file(filename)
lc3_frames = iter(lc3_bytes)
if big_config[i].loop:
lc3_frames = itertools.cycle(lc3_frames)
big['lc3_frames'] = lc3_frames
# use wav files and code them entirely before streaming
elif big_config[i].precode_wav and big_config[i].audio_source.endswith('.wav'):
logging.info('Precoding wav file: %s, this may take a while', big_config[i].audio_source)
big['precoded'] = True
big['lc3_bytes_per_frame'] = global_config.octets_per_frame
audio_input = await audio_io.create_audio_input(audio_source, input_format)
audio_input.rewind = False
pcm_format = await audio_input.open()
if pcm_format.channels != 1:
logging.error("Only 1 channels PCM configurations are supported")
return
if pcm_format.sample_type == audio_io.PcmFormat.SampleType.INT16:
pcm_bit_depth = 16
elif pcm_format.sample_type == audio_io.PcmFormat.SampleType.FLOAT32:
pcm_bit_depth = None
else:
logging.error("Only INT16 and FLOAT32 sample types are supported")
return
encoder = lc3.Encoder(
frame_duration_us=global_config.frame_duration_us,
sample_rate_hz=global_config.auracast_sampling_rate_hz,
num_channels=1,
input_sample_rate_hz=pcm_format.sample_rate,
)
lc3_frame_samples = encoder.get_frame_samples() # number of the pcm samples per lc3 frame
lc3_bytes = b''
async for pcm_frame in audio_input.frames(lc3_frame_samples):
lc3_bytes += encoder.encode(
pcm_frame, num_bytes=global_config.octets_per_frame, bit_depth=pcm_bit_depth
)
lc3_frames = iter(lc3_bytes)
# have a look at itertools.islice
if big_config[i].loop:
lc3_frames = itertools.cycle(lc3_frames)
big['lc3_frames'] = lc3_frames
# anything else, e.g. realtime stream from device (bumble)
else:
audio_input = await audio_io.create_audio_input(audio_source, input_format)
# Store early so stop_streaming can close even if open() fails
big['audio_input'] = audio_input
# SoundDeviceAudioInput (used for `mic:<device>` captures) has no `.rewind`.
if hasattr(audio_input, "rewind"):
audio_input.rewind = big_config[i].loop
pcm_format = await audio_input.open()
num_bis = big.get('num_bis', 1)
if num_bis == 2 and pcm_format.channels < 2:
logging.error("Stereo (num_bis=2) requires at least 2 input channels, got %d", pcm_format.channels)
return
if pcm_format.channels != num_bis:
if num_bis == 1:
logging.info("Input device provides %d channels will down-mix to mono for LC3", pcm_format.channels)
else:
logging.info("Input device provides %d channels using first %d for stereo", pcm_format.channels, num_bis)
if pcm_format.sample_type == audio_io.PcmFormat.SampleType.INT16:
pcm_bit_depth = 16
elif pcm_format.sample_type == audio_io.PcmFormat.SampleType.FLOAT32:
pcm_bit_depth = None
else:
logging.error("Only INT16 and FLOAT32 sample types are supported")
return
# Create one encoder per BIS (mono: 1 encoder, stereo: 2 encoders)
encoders = [
lc3.Encoder(
frame_duration_us=global_config.frame_duration_us,
sample_rate_hz=global_config.auracast_sampling_rate_hz,
num_channels=1,
input_sample_rate_hz=pcm_format.sample_rate,
)
for _ in range(num_bis)
]
lc3_frame_samples = encoders[0].get_frame_samples() # number of the pcm samples per lc3 frame
big['pcm_bit_depth'] = pcm_bit_depth
big['channels'] = pcm_format.channels
big['lc3_frame_samples'] = lc3_frame_samples
big['lc3_bytes_per_frame'] = global_config.octets_per_frame
big['audio_input'] = audio_input
big['encoders'] = encoders
# Keep backward compat
big['encoder'] = encoders[0]
big['precoded'] = False
logging.info("Streaming audio...")
bigs = self.bigs
self.is_streaming = True
frame_count = 0
# One streamer fits all
while self.is_streaming:
stream_finished = [False for _ in range(len(bigs))]
for i, big in enumerate(bigs.values()):
if big['precoded']: # everything was already lc3 coded beforehand
lc3_frame = bytes(
itertools.islice(big['lc3_frames'], big['lc3_bytes_per_frame'])
)
if lc3_frame == b'': # Not all streams may stop at the same time
stream_finished[i] = True
continue
else: # code lc3 on the fly with perf counters
# Ensure frames generator exists (so we can aclose() on stop)
frames_gen = big.get('frames_gen')
if frames_gen is None:
# For stereo, request frame_samples per channel (interleaved input)
frames_gen = big['audio_input'].frames(big['lc3_frame_samples'])
big['frames_gen'] = frames_gen
# Initialize perf tracking bucket per BIG
perf = big.setdefault('_perf', {
'n': 0,
'samples_sum': 0.0, 'samples_max': 0.0,
'enc_sum': 0.0, 'enc_max': 0.0,
'write_sum': 0.0, 'write_max': 0.0,
'loop_sum': 0.0, 'loop_max': 0.0,
})
# Total loop duration timer (sample + encode + write)
t_loop0 = time.perf_counter()
# Measure time to get a sample from the buffer
t0 = time.perf_counter()
pcm_frame = await anext(frames_gen, None)
dt_sample = time.perf_counter() - t0
if pcm_frame is None: # Not all streams may stop at the same time
stream_finished[i] = True
continue
# Measure LC3 encoding time
t1 = time.perf_counter()
num_bis = big.get('num_bis', 1)
if num_bis == 1:
# Mono: single encoder, single queue
lc3_frame = big['encoder'].encode(
pcm_frame, num_bytes=big['lc3_bytes_per_frame'], bit_depth=big['pcm_bit_depth']
)
lc3_frames_out = [lc3_frame]
else:
# Stereo: split interleaved PCM into L/R, encode separately
pcm_array = np.frombuffer(pcm_frame, dtype=np.int16)
channels_in = big['channels']
lc3_frames_out = []
for ch_idx, encoder in enumerate(big['encoders']):
# Extract channel (interleaved: L,R,L,R,... or L,R,C,... for >2 ch)
ch_pcm = pcm_array[ch_idx::channels_in].tobytes()
lc3_frame = encoder.encode(
ch_pcm, num_bytes=big['lc3_bytes_per_frame'], bit_depth=big['pcm_bit_depth']
)
lc3_frames_out.append(lc3_frame)
dt_enc = time.perf_counter() - t1
# Measure write blocking time
t2 = time.perf_counter()
for q_idx, lc3_frame in enumerate(lc3_frames_out):
await big['iso_queues'][q_idx].write(lc3_frame)
dt_write = time.perf_counter() - t2
# Total loop duration
dt_loop = time.perf_counter() - t_loop0
# Update stats
perf['n'] += 1
perf['samples_sum'] += dt_sample
perf['enc_sum'] += dt_enc
perf['write_sum'] += dt_write
perf['loop_sum'] += dt_loop
perf['samples_max'] = max(perf['samples_max'], dt_sample)
perf['enc_max'] = max(perf['enc_max'], dt_enc)
perf['write_max'] = max(perf['write_max'], dt_write)
perf['loop_max'] = max(perf['loop_max'], dt_loop)
frame_count += 1
# Log every 500 frames for this BIG and reset accumulators
if perf['n'] >= 500:
n = perf['n']
logging.info(
"Perf(i=%d, last %d): sample mean=%.6fms max=%.6fms | encode mean=%.6fms max=%.6fms | write mean=%.6fms max=%.6fms | loop mean=%.6fms max=%.6fms",
i,
n,
(perf['samples_sum'] / n) * 1e3, perf['samples_max'] * 1e3,
(perf['enc_sum'] / n) * 1e3, perf['enc_max'] * 1e3,
(perf['write_sum'] / n) * 1e3, perf['write_max'] * 1e3,
(perf['loop_sum'] / n) * 1e3, perf['loop_max'] * 1e3,
)
perf.update({
'n': 0,
'samples_sum': 0.0, 'samples_max': 0.0,
'enc_sum': 0.0, 'enc_max': 0.0,
'write_sum': 0.0, 'write_max': 0.0,
'loop_sum': 0.0, 'loop_max': 0.0,
})
if all(stream_finished): # Take into account that multiple files have different lengths
logging.info('All streams finished, stopping streamer')
self.is_streaming = False
break
# -----------------------------------------------------------------------------
# Main
# -----------------------------------------------------------------------------
async def broadcast(global_conf: auracast_config.AuracastGlobalConfig, big_conf: List[auracast_config.AuracastBigConfig]):
"""Start a broadcast."""
if global_conf.transport == 'auto':
devices = glob.glob('/dev/serial/by-id/*')
logging.info('Found serial devices: %s', devices)
for device in devices:
if 'usb-ZEPHYR_Zephyr_HCI_UART_sample' in device:
logging.info('Using: %s', device)
global_conf.transport = f'serial:{device},115200,rtscts'
break
# check again if transport is still auto
if global_conf.transport == 'auto':
raise AssertionError('No suitable transport found.')
async with create_device(global_conf) as device:
if not device.supports_le_periodic_advertising:
logger.error(color('Periodic advertising not supported', 'red'))
return
bigs = await init_broadcast( # the bigs dictionary contains all the global configurations
device,
global_conf,
big_conf
)
streamer = Streamer(bigs, global_conf, big_conf)
streamer.start_streaming()
await asyncio.wait([streamer.task])
# -----------------------------------------------------------------------------
if __name__ == "__main__":
import os
logging.basicConfig( #export LOG_LEVEL=DEBUG
level=os.environ.get('LOG_LEVEL', logging.INFO),
format='%(module)s.py:%(lineno)d %(levelname)s: %(message)s'
)
os.chdir(os.path.dirname(__file__))
# Find ALSA host API
alsa_hostapi = next(i for i, ha in enumerate(sd.query_hostapis())
if 'ALSA' in ha['name'])
search_str='ch1'
# Use ALSA devices
from auracast.utils.sounddevice_utils import get_alsa_usb_inputs
devices = get_alsa_usb_inputs()
logging.info(f"Searching ALSA devices for first device with string {search_str}...")
audio_dev = None
for idx, dev in devices:
logging.info(f" ALSA device [{idx}]: {dev['name']} ({dev['max_input_channels']} ch)")
if search_str in dev['name'].lower():
audio_dev = idx
logging.info(f"✓ Selected ALSA device {idx}: {dev['name']}")
break
if audio_dev is None:
logging.error(f"Audio device {audio_dev} not found in {AUDIO_BACKEND} devices!")
raise RuntimeError(f"Audio device not found for {AUDIO_BACKEND} backend")
config = auracast_config.AuracastConfigGroup(
bigs = [
auracast_config.AuracastBigConfigDeu(),
#auracast_config.AuracastBigConfigEng(),
#auracast_config.AuracastBigConfigFra(),
#auracast_config.AuracastBigConfigEs(),
#auracast_config.AuracastBigConfigIt(),
]
)
# TODO: How can we use other iso interval than 10ms ?(medium or low rel) ? - nrf53audio receiver repports I2S tx underrun
config.qos_config=auracast_config.AuracastQosRobust()
#config.transport='serial:/dev/serial/by-id/usb-ZEPHYR_Zephyr_HCI_UART_sample_81BD14B8D71B5662-if00,1000000,rtscts' # transport for nrf52 dongle
#config.transport='serial:/dev/serial/by-id/usb-SEGGER_J-Link_001050076061-if02,1000000,rtscts' # transport for nrf53dk
#config.transport='serial:/dev/serial/by-id/usb-SEGGER_J-Link_001057705357-if02,1000000,rtscts' # transport for nrf54l15dk
#config.transport='serial:/dev/serial/by-id/usb-ZEPHYR_Zephyr_HCI_UART_sample_95A087EADB030B24-if00,115200,rtscts' #nrf52dongle hci_uart usb cdc
#config.transport='usb:2fe3:000b' #nrf52dongle hci_usb # TODO: iso packet over usb not supported
#config.transport= 'auto'
config.transport='serial:/dev/ttyAMA3,1000000,rtscts' # transport for raspberry pi
for big in config.bigs:
#big.code = 'abcd'
#big.code = '78 e5 dc f1 34 ab 42 bf c1 92 ef dd 3a fd 67 ae'
big.precode_wav = False
#big.audio_source = big.audio_source.replace('.wav', '_10_16_32.lc3') #lc3 precoded files
#big.audio_source = read_lc3_file(big.audio_source) # load files in advance
# --- Configure device (ALSA backend) ---
if audio_dev is not None:
big.audio_source = f'device:{audio_dev}'
big.input_format = 'int16le,48000,1' # int16, 48kHz, mono
logging.info(f"Configured BIG '{big.name}' with (device:{audio_dev}, 48kHz mono)")
else:
logging.warning(f"Shure device not found, BIG '{big.name}' will use default audio_source: {big.audio_source}")
big.name='Broadcast0'
big.iso_que_len=1
# --- Network_uncoded mode using NetworkAudioReceiver ---
#big.audio_source = NetworkAudioReceiverUncoded(port=50007, samplerate=16000, channels=1, chunk_size=1024)
# 16kHz works reliably with 3 streams
# 24kHz is only working with 2 streams - probably airtime constraint
# TODO: with more than three broadcasters (16kHz) no advertising (no primary channels is present anymore)
# TODO: find the bottleneck - probably airtime
config.auracast_sampling_rate_hz = 24000
config.octets_per_frame = 60 # 32kbps@16kHz
#config.immediate_rendering = True
#config.debug = True
run_async(
broadcast(
config,
config.bigs
)
)
# TODO: possible inputs:
# wav file locally
# precoded lc3 file locally
# realtime audio locally
# realtime audio network lc3 coded
# (realtime audio network uncoded)