11 Commits

Author SHA1 Message Date
8d43a97900 testcommit 2026-01-05 16:25:20 +01:00
dd02e0ddc3 feature/app_update (#18)
implement slim update functinallity
2026-01-05 16:10:25 +01:00
51885c534f Merge branch 'release' into main 2026-01-05 15:48:39 +01:00
e352ef3fd9 main (#16)
Co-authored-by: pstruebi <patrick.struebin@summitwave.eu>
Reviewed-on: https://gitea.pstruebi.xyz/auracaster/bumble-auracast/pulls/16
2026-01-05 15:43:20 +01:00
fa38818991 qos_preset (#15)
introduce presets fast and robust for qos settings

Reviewed-on: https://gitea.pstruebi.xyz/auracaster/bumble-auracast/pulls/15
2026-01-05 15:24:32 +01:00
d79c7254bb feat: load i2c-dev kernel module on startup (#14)
Ensure i2c-dev kernel module is loaded before initializing I2C communication to guarantee /dev/i2c-* device access. Add error handling and logging for module loading process.

Reviewed-on: https://gitea.pstruebi.xyz/auracaster/bumble-auracast/pulls/14
2025-12-18 16:27:40 +01:00
c134a29c48 add certificate download option 2025-12-18 15:56:13 +01:00
pstruebi
6965e31163 change bumble bind port 2025-12-17 16:35:09 +01:00
45f058be46 quali (#13)
make adjustments for bluetooth sig qualification

Co-authored-by: Paul Obernesser <paul.obernesser@inncubator.at>
Reviewed-on: https://gitea.pstruebi.xyz/auracaster/bumble-auracast/pulls/13
2025-12-11 14:44:35 +01:00
6c7b74a0b2 feature/analog_input (#12)
Co-authored-by: Paul Obernesser <paul.obernesser@inncubator.at>
Reviewed-on: https://gitea.pstruebi.xyz/auracaster/bumble-auracast/pulls/12
2025-12-03 12:28:30 +01:00
98dd00e653 Implement adaptive frame dropping (#10)
- Implement adaptive frame dropping to prevent latency from accumulating
- small packets are dropped and a crossfade is used to hide the dropping.
- still audible in some situations

Co-authored-by: pstruebi <struebin.patrick.com>
Reviewed-on: https://gitea.pstruebi.xyz/auracaster/bumble-auracast/pulls/10
2025-11-04 17:16:33 +01:00
55 changed files with 3279 additions and 1074 deletions

6
.gitignore vendored
View File

@@ -44,3 +44,9 @@ src/auracast/server/certs/per_device/
src/auracast/.env
src/auracast/server/certs/ca/ca_cert.srl
src/auracast/server/credentials.json
pcm1862-i2s.dtbo
ch1.wav
ch2.wav
src/auracast/available_samples.txt
src/auracast/server/stream_settings2.json
src/scripts/temperature_log*

View File

@@ -218,6 +218,30 @@ sudo ldconfig # refresh linker cache
- echo i2c-dev | sudo tee -a /etc/modules
- read temp /src/scripts/temp
# configure the pcm1862 i2s interface
bash misc/build_pcm1862_dts.sh
bash misc/install_asoundconf.sh
- configure differential inputs
sudo modprobe i2c-dev
i2cdetect -y 1 | grep -i 4a || true
i2cset -f -y 1 0x4a 0x00 0x00 # Page 0
i2cset -f -y 1 0x4a 0x06 0x10 # Left = VIN1P/M [DIFF]
i2cset -f -y 1 0x4a 0x07 0x10 # Right = VIN2P/M [DIFF]
# test recording
arecord -f cd -c 1 -D record_left left.wav -r48000
arecord -f cd -c 1 -D record_right right.wav -r48000
# Run with realtime priority
- for the feedback loop to work right realtime priority is absolutely nececcarry.
chrt -f 99 python src/auracast/multicast.py
- give the user realtime priority:
sudo tee /etc/security/limits.d/99-realtime.conf >/dev/null <<'EOF'
caster - rtprio 99
caster - memlock unlimited
EOF
# Known issues:
- When running on a laptop there might be issues switching between usb and browser audio input since they use the same audio device

28
poetry.lock generated
View File

@@ -1,4 +1,4 @@
# This file is automatically @generated by Poetry 2.1.2 and should not be changed by hand.
# This file is automatically @generated by Poetry 2.1.4 and should not be changed by hand.
[[package]]
name = "aioconsole"
@@ -2443,6 +2443,30 @@ files = [
{file = "rpds_py-0.25.1.tar.gz", hash = "sha256:8960b6dac09b62dac26e75d7e2c4a22efb835d827a7278c34f72b2b84fa160e3"},
]
[[package]]
name = "samplerate"
version = "0.2.2"
description = "Monolithic python wrapper for libsamplerate based on pybind11 and NumPy"
optional = false
python-versions = ">=3.7"
groups = ["main"]
files = [
{file = "samplerate-0.2.2-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:99b47c238ef7216b87ccf5e8860b94b527cceef7a8add38f146e75f6efec257f"},
{file = "samplerate-0.2.2-cp310-cp310-win_amd64.whl", hash = "sha256:0aa6ae933cb85eac5ffdebc38abc198be890c2bcbac263c30301699d651e9513"},
{file = "samplerate-0.2.2-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:a41fe7a8c68101bf9900ba415cf2a0a58199bba9cac15e0a3b22b70006705b29"},
{file = "samplerate-0.2.2-cp311-cp311-win_amd64.whl", hash = "sha256:86fb8eb9a6c75d4c17f8125e203d29bf2d87bf5ce0e671184ba5111f015c9264"},
{file = "samplerate-0.2.2-cp312-cp312-macosx_10_13_universal2.whl", hash = "sha256:3f30fea3e42b51e2441cf464e24c4744fa0b9a837b7beefb6a8eb6cc72af1e51"},
{file = "samplerate-0.2.2-cp312-cp312-win_amd64.whl", hash = "sha256:1170c5e4f68d9c1bbec2fce1549108838a473058f69cca7bc377e053ee43457b"},
{file = "samplerate-0.2.2-cp38-cp38-macosx_11_0_universal2.whl", hash = "sha256:567dfe3888634435b8da1ac4bc06ad289ba777876f446760249e923e6b3585c5"},
{file = "samplerate-0.2.2-cp38-cp38-win_amd64.whl", hash = "sha256:6c819b0360e9632be0391ec3eecc15510e30775632f4022e384e28908f59648c"},
{file = "samplerate-0.2.2-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:d072b658e438d55fed1224da9b226be1328ff9aea4268d02dbc7d864a72ce4f4"},
{file = "samplerate-0.2.2-cp39-cp39-win_amd64.whl", hash = "sha256:bdae4f21890378f3886816800c8ef3395dabaa13fcac07bb0de7ad413703bfef"},
{file = "samplerate-0.2.2.tar.gz", hash = "sha256:40964bfa28d33bc948389d958c2e742585f21891d8372ebba89260f491a15caa"},
]
[package.dependencies]
numpy = "*"
[[package]]
name = "six"
version = "1.17.0"
@@ -2952,4 +2976,4 @@ test = ["pytest", "pytest-asyncio"]
[metadata]
lock-version = "2.1"
python-versions = ">=3.11"
content-hash = "6b5300c349ed045e8fd3e617e6262bbd7e5c48c518e4c62cedf7c17da50ce8c0"
content-hash = "3c9f92c7a5af40f98da9c7824d9c2a6f7eb809e91e43cfef4995761b2e887256"

View File

@@ -16,7 +16,8 @@ dependencies = [
"aiortc (>=1.13.0,<2.0.0)",
"sounddevice (>=0.5.2,<0.6.0)",
"python-dotenv (>=1.1.1,<2.0.0)",
"smbus2 (>=0.5.0,<0.6.0)"
"smbus2 (>=0.5.0,<0.6.0)",
"samplerate (>=0.2.2,<0.3.0)"
]
[project.optional-dependencies]

View File

@@ -7,24 +7,19 @@ class AuracastQoSConfig(BaseModel):
number_of_retransmissions: int
max_transport_latency_ms: int
class AuracastQosHigh(AuracastQoSConfig):
class AuracastQosRobust(AuracastQoSConfig):
iso_int_multiple_10ms: int = 1
number_of_retransmissions:int = 4 #4
max_transport_latency_ms:int = 43 #varies from the default value in bumble (was 65)
class AuracastQosMid(AuracastQoSConfig):
iso_int_multiple_10ms: int = 2
number_of_retransmissions:int = 3
max_transport_latency_ms:int = 65
class AuracastQosLow(AuracastQoSConfig):
iso_int_multiple_10ms: int = 3
number_of_retransmissions:int = 2 #4
max_transport_latency_ms:int = 65 #varies from the default value in bumble (was 65)
class AuracastQosFast(AuracastQoSConfig):
iso_int_multiple_10ms: int = 1
number_of_retransmissions:int = 2
max_transport_latency_ms:int = 22
class AuracastGlobalConfig(BaseModel):
qos_config: AuracastQoSConfig = AuracastQosHigh()
qos_config: AuracastQoSConfig = AuracastQosRobust()
debug: bool = False
device_name: str = 'Auracaster'
transport: str = ''
@@ -40,8 +35,6 @@ class AuracastGlobalConfig(BaseModel):
# so receivers may render earlier than the presentation delay for lower latency.
immediate_rendering: bool = False
assisted_listening_stream: bool = False
# Adaptive frame dropping: discard sub-frame samples when buffer exceeds threshold
enable_adaptive_frame_dropping: bool = False
# "Audio input. "
# "'device' -> use the host's default sound input device, "
@@ -61,6 +54,7 @@ class AuracastBigConfig(BaseModel):
loop: bool = True
precode_wav: bool = False
iso_que_len: int = 64
num_bis: int = 1 # 1 = mono (FRONT_LEFT), 2 = stereo (FRONT_LEFT + FRONT_RIGHT)
class AuracastBigConfigDeu(AuracastBigConfig):
id: int = 12
@@ -75,7 +69,7 @@ class AuracastBigConfigEng(AuracastBigConfig):
random_address: str = 'F2:F1:F2:F3:F4:F5'
name: str = 'Lecture Hall A'
language: str ='eng'
program_info: str = 'Lecture EN'
program_info: str = 'Lecture EN'
audio_source: str = 'file:./testdata/wave_particle_5min_en.wav'
class AuracastBigConfigFra(AuracastBigConfig):
@@ -84,7 +78,7 @@ class AuracastBigConfigFra(AuracastBigConfig):
# French
name: str = 'Auditoire A'
language: str ='fra'
program_info: str = 'Auditoire FR'
program_info: str = 'Auditoire FR'
audio_source: str = 'file:./testdata/wave_particle_5min_fr.wav'
class AuracastBigConfigSpa(AuracastBigConfig):
@@ -92,7 +86,7 @@ class AuracastBigConfigSpa(AuracastBigConfig):
random_address: str = 'F4:F1:F2:F3:F4:F5'
name: str = 'Auditorio A'
language: str ='spa'
program_info: str = 'Auditorio ES'
program_info: str = 'Auditorio ES'
audio_source: str = 'file:./testdata/wave_particle_5min_es.wav'
class AuracastBigConfigIta(AuracastBigConfig):
@@ -100,7 +94,7 @@ class AuracastBigConfigIta(AuracastBigConfig):
random_address: str = 'F5:F1:F2:F3:F4:F5'
name: str = 'Aula A'
language: str ='ita'
program_info: str = 'Aula IT'
program_info: str = 'Aula IT'
audio_source: str = 'file:./testdata/wave_particle_5min_it.wav'
@@ -109,7 +103,7 @@ class AuracastBigConfigPol(AuracastBigConfig):
random_address: str = 'F6:F1:F2:F3:F4:F5'
name: str = 'Sala Wykładowa'
language: str ='pol'
program_info: str = 'Sala Wykładowa PL'
program_info: str = 'Sala Wykładowa PL'
audio_source: str = 'file:./testdata/wave_particle_5min_pl.wav'

View File

@@ -27,6 +27,10 @@ from typing import cast, Any, AsyncGenerator, Coroutine, List
import itertools
import glob
import time
import threading
import numpy as np # for audio down-mix
import os
import lc3 # type: ignore # pylint: disable=E0401
@@ -42,7 +46,6 @@ from bumble.profiles import bass
import bumble.device
import bumble.transport
import bumble.utils
import numpy as np # for audio down-mix
from bumble.device import Host, AdvertisingChannelMap
from bumble.audio import io as audio_io
@@ -54,55 +57,98 @@ from auracast.utils.webrtc_audio_input import WebRTCAudioInput
# Patch sounddevice.InputStream globally to use low-latency settings
import sounddevice as sd
from collections import deque
class ModSoundDeviceAudioInput(audio_io.SoundDeviceAudioInput):
"""Patched SoundDeviceAudioInput that creates RawInputStream with low-latency parameters."""
"""Patched SoundDeviceAudioInput with low-latency capture and adaptive resampling."""
def _open(self):
"""Patched _open method that creates RawInputStream with low-latency parameters."""
try:
dev_info = sd.query_devices(self._device)
hostapis = sd.query_hostapis()
api_index = dev_info.get('hostapi')
api_name = hostapis[api_index]['name'] if isinstance(api_index, int) and 0 <= api_index < len(hostapis) else 'unknown'
pa_ver = None
try:
pa_ver = sd.get_portaudio_version()
except Exception:
pass
logging.info(
"SoundDevice backend=%s device='%s' (id=%s) ch=%s default_low_input_latency=%.4f default_high_input_latency=%.4f portaudio=%s",
api_name,
dev_info.get('name'),
self._device,
dev_info.get('max_input_channels'),
float(dev_info.get('default_low_input_latency') or 0.0),
float(dev_info.get('default_high_input_latency') or 0.0),
pa_ver[1] if isinstance(pa_ver, tuple) and len(pa_ver) >= 2 else pa_ver,
)
except Exception as e:
logging.warning("Failed to query sounddevice backend/device info: %s", e)
"""Create RawInputStream with low-latency parameters and initialize ring buffer."""
dev_info = sd.query_devices(self._device)
hostapis = sd.query_hostapis()
api_index = dev_info.get('hostapi')
api_name = hostapis[api_index]['name'] if isinstance(api_index, int) and 0 <= api_index < len(hostapis) else 'unknown'
pa_ver = sd.get_portaudio_version()
logging.info(
"SoundDevice backend=%s device='%s' (id=%s) ch=%s default_low_input_latency=%.4f default_high_input_latency=%.4f portaudio=%s",
api_name,
dev_info.get('name'),
self._device,
dev_info.get('max_input_channels'),
float(dev_info.get('default_low_input_latency') or 0.0),
float(dev_info.get('default_high_input_latency') or 0.0),
pa_ver[1] if isinstance(pa_ver, tuple) and len(pa_ver) >= 2 else pa_ver,
)
# Create RawInputStream with injected low-latency parameters
# Target ~2 ms blocksize (48 kHz -> 96 frames). For other rates, keep ~2 ms.
_sr = int(self._pcm_format.sample_rate)
self.counter=0
self.max_avail=0
self.logfile_name="available_samples.txt"
self.blocksize = 120
if os.path.exists(self.logfile_name):
os.remove(self.logfile_name)
self._stream = sd.RawInputStream(
samplerate=self._pcm_format.sample_rate,
device=self._device,
channels=self._pcm_format.channels,
dtype='int16',
blocksize=240, # Match frame size
latency=0.010,
blocksize=self.blocksize,
latency=0.004,
)
self._stream.start()
logging.info(f"SoundDeviceAudioInput: Opened with blocksize=240, latency=0.010 (10ms)")
return audio_io.PcmFormat(
audio_io.PcmFormat.Endianness.LITTLE,
audio_io.PcmFormat.SampleType.INT16,
self._pcm_format.sample_rate,
2,
1,
)
def _read(self, frame_size: int) -> bytes:
"""Read PCM samples from the stream."""
#if self.counter % 50 == 0:
frame_size = frame_size + 1 # consume samples a little faster to avoid latency akkumulation
pcm_buffer, overflowed = self._stream.read(frame_size)
if overflowed:
logging.warning("SoundDeviceAudioInput: overflowed")
n_available = self._stream.read_available
# adapt = n_available > 20
# if adapt:
# pcm_extra, overflowed = self._stream.read(3)
# logging.info('consuming extra samples, available was %d', n_available)
# if overflowed:
# logging.warning("SoundDeviceAudioInput: overflowed")
# out = bytes(pcm_buffer) + bytes(pcm_extra)
# else:
out = bytes(pcm_buffer)
self.max_avail = max(self.max_avail, n_available)
#Diagnostics
#with open(self.logfile_name, "a", encoding="utf-8") as f:
# f.write(f"{n_available}, {adapt}, {round(self._runavg, 2)}, {overflowed}\n")
if self.counter % 500 == 0:
logging.info(
"read available=%d, max=%d, latency:%d",
n_available, self.max_avail, self._stream.latency
)
self.max_avail = 0
self.counter += 1
return out
audio_io.SoundDeviceAudioInput = ModSoundDeviceAudioInput
# modified from bumble
@@ -132,13 +178,10 @@ class ModWaveAudioInput(audio_io.ThreadedAudioInput):
return b''
pcm_samples = self._wav.readframes(frame_size)
if self._bytes_read == 0:
logging.info(f"WaveAudioInput: first read requested frame_size={frame_size} -> got {len(pcm_samples)} bytes")
if not pcm_samples and self._bytes_read:
if not self.rewind:
return None
# Loop around.
logging.info("WaveAudioInput: EOF reached, rewinding to start")
self._wav.rewind()
self._bytes_read = 0
pcm_samples = self._wav.readframes(frame_size)
@@ -214,6 +257,20 @@ def run_async(async_command: Coroutine) -> None:
color('!!! An error occurred while executing the command:', 'red'), message
)
def _build_bis_list(num_bis: int) -> list:
"""Build BIS list for BasicAudioAnnouncement based on num_bis (1=mono, 2=stereo)."""
locations = [bap.AudioLocation.FRONT_LEFT, bap.AudioLocation.FRONT_RIGHT]
return [
bap.BasicAudioAnnouncement.BIS(
index=idx + 1,
codec_specific_configuration=bap.CodecSpecificConfiguration(
audio_channel_allocation=locations[idx]
),
)
for idx in range(num_bis)
]
async def init_broadcast(
device,
global_config : auracast_config.AuracastGlobalConfig,
@@ -229,7 +286,8 @@ async def init_broadcast(
tag=le_audio.Metadata.Tag.LANGUAGE, data=conf.language.encode()
),
le_audio.Metadata.Entry(
tag=le_audio.Metadata.Tag.PROGRAM_INFO, data=conf.program_info.encode()
tag=le_audio.Metadata.Tag.PROGRAM_INFO,
data=conf.program_info.encode('latin-1')
),
le_audio.Metadata.Entry(
tag=le_audio.Metadata.Tag.BROADCAST_NAME, data=conf.name.encode()
@@ -252,9 +310,10 @@ async def init_broadcast(
else []
)
)
logging.info(
metadata.pretty_print("\n")
)
try:
logging.info(metadata.pretty_print("\n"))
except UnicodeDecodeError:
logging.info("Metadata: (contains non-UTF-8 bytes)")
bigs[f'big{i}'] = {}
# Config advertising set
bigs[f'big{i}']['basic_audio_announcement'] = bap.BasicAudioAnnouncement(
@@ -264,18 +323,11 @@ async def init_broadcast(
codec_id=hci.CodingFormat(codec_id=hci.CodecID.LC3),
codec_specific_configuration=bap.CodecSpecificConfiguration(
sampling_frequency=bap_sampling_freq,
frame_duration=bap.FrameDuration.DURATION_10000_US,
frame_duration=bap.FrameDuration.DURATION_7500_US if global_config.frame_duration_us == 7500 else bap.FrameDuration.DURATION_10000_US,
octets_per_codec_frame=global_config.octets_per_frame,
),
metadata=metadata,
bis=[
bap.BasicAudioAnnouncement.BIS(
index=1,
codec_specific_configuration=bap.CodecSpecificConfiguration(
audio_channel_allocation=bap.AudioLocation.FRONT_LEFT
),
),
],
bis=_build_bis_list(conf.num_bis),
)
],
)
@@ -296,6 +348,36 @@ async def init_broadcast(
)
)
bigs[f'big{i}']['broadcast_audio_announcement'] = bap.BroadcastAudioAnnouncement(conf.id)
# Build advertising data types list
advertising_data_types = [
(core.AdvertisingData.BROADCAST_NAME, conf.name.encode()),
]
# [PBP] Add Public Broadcast Profile Service Data (UUID 0x1856)
# Required for PTS Qualification (PBP/PBS/STR)
# Dynamically calculate PBP features based on stream configuration
pbp_features = 0x00
# Bit 0: Encryption (set if broadcast_code is configured)
if conf.code is not None:
pbp_features |= 0x01
# Bit 1 vs Bit 2: Quality based on sample rate
if global_config.auracast_sampling_rate_hz in [16000, 24000]:
pbp_features |= 0x02 # Standard Quality
elif global_config.auracast_sampling_rate_hz == 48000:
pbp_features |= 0x04 # High Quality
# Build PBP service data with Program_Info metadata (LTV format: Length, Type=0x03, Value)
# LTV: Length = 1 (type) + len(value), Type = 0x03 (Program_Info)
program_info_bytes = conf.program_info.encode('latin-1')
pbp_metadata_ltv = bytes([len(program_info_bytes) + 1, 0x03]) + program_info_bytes
pbp_service_data = struct.pack('<H', 0x1856) + bytes([pbp_features, len(pbp_metadata_ltv)]) + pbp_metadata_ltv
advertising_data_types.append(
(core.AdvertisingData.SERVICE_DATA_16_BIT_UUID, pbp_service_data)
)
advertising_set = await device.create_advertising_set(
random_address=hci.Address(conf.random_address),
advertising_parameters=bumble.device.AdvertisingParameters(
@@ -312,11 +394,7 @@ async def init_broadcast(
),
advertising_data=(
bigs[f'big{i}']['broadcast_audio_announcement'].get_advertising_data()
+ bytes(
core.AdvertisingData(
[(core.AdvertisingData.BROADCAST_NAME, conf.name.encode())]
)
)
+ bytes(core.AdvertisingData(advertising_data_types))
+ advertising_manufacturer_data
),
periodic_advertising_parameters=bumble.device.PeriodicAdvertisingParameters(
@@ -341,7 +419,7 @@ async def init_broadcast(
big = await device.create_big(
bigs[f'big{i}']['advertising_set'],
parameters=bumble.device.BigParameters(
num_bis=1,
num_bis=conf.num_bis,
sdu_interval=global_config.qos_config.iso_int_multiple_10ms*10000, # Is the same as iso interval
max_sdu=global_config.octets_per_frame,
max_transport_latency=global_config.qos_config.max_transport_latency_ms,
@@ -359,11 +437,18 @@ async def init_broadcast(
direction=bis_link.Direction.HOST_TO_CONTROLLER
)
iso_queue = bumble.device.IsoPacketStream(big.bis_links[0], conf.iso_que_len)
# Create ISO queue(s) - one per BIS
iso_queues = [
bumble.device.IsoPacketStream(link, conf.iso_que_len)
for link in big.bis_links
]
logging.info('Setup ISO Data Path')
bigs[f'big{i}']['iso_queue'] = iso_queue
bigs[f'big{i}']['iso_queues'] = iso_queues
bigs[f'big{i}']['num_bis'] = conf.num_bis
# Keep backward compat: iso_queue points to first queue
bigs[f'big{i}']['iso_queue'] = iso_queues[0]
if global_config.debug:
logging.info(f'big{i} parameters are:')
@@ -593,64 +678,18 @@ class Streamer():
if hasattr(audio_input, "rewind"):
audio_input.rewind = big_config[i].loop
# Retry logic ALSA sometimes keeps the device busy for a short time after the
# previous stream has closed. Handle PortAudioError -9985 with back-off retries.
import sounddevice as _sd
max_attempts = 3
for attempt in range(1, max_attempts + 1):
try:
pcm_format = await audio_input.open()
logging.info(
f"Opened audio input: {type(audio_input).__name__} src={audio_source} sr={pcm_format.sample_rate} ch={pcm_format.channels}"
)
break # success
except _sd.PortAudioError as err:
# -9985 == paDeviceUnavailable
logging.error('Could not open audio device %s with error %s', audio_source, err)
code = None
if hasattr(err, 'errno'):
code = err.errno
elif len(err.args) > 1 and isinstance(err.args[1], int):
code = err.args[1]
if code == -9985 and attempt < max_attempts:
backoff_ms = 200 * attempt
logging.warning("PortAudio device busy (attempt %d/%d). Retrying in %.1f ms…", attempt, max_attempts, backoff_ms)
# ensure device handle and PortAudio context are closed before retrying
try:
if hasattr(audio_input, "aclose"):
await audio_input.aclose()
elif hasattr(audio_input, "close"):
audio_input.close()
except Exception:
pass
# Fully terminate PortAudio to drop lingering handles (sounddevice quirk)
if hasattr(_sd, "_terminate"):
try:
_sd._terminate()
except Exception:
pass
# Small pause then re-initialize PortAudio
await asyncio.sleep(0.1)
if hasattr(_sd, "_initialize"):
try:
_sd._initialize()
except Exception:
pass
pcm_format = await audio_input.open()
# Back-off before next attempt
await asyncio.sleep(backoff_ms / 1000)
# Recreate audio_input fresh for next attempt
audio_input = await audio_io.create_audio_input(audio_source, input_format)
continue
# Other errors or final attempt re-raise so caller can abort gracefully
raise
else:
# Loop exhausted without break
logging.error("Unable to open audio device after %d attempts giving up", max_attempts)
num_bis = big.get('num_bis', 1)
if num_bis == 2 and pcm_format.channels < 2:
logging.error("Stereo (num_bis=2) requires at least 2 input channels, got %d", pcm_format.channels)
return
if pcm_format.channels != num_bis:
if num_bis == 1:
logging.info("Input device provides %d channels will down-mix to mono for LC3", pcm_format.channels)
else:
logging.info("Input device provides %d channels using first %d for stereo", pcm_format.channels, num_bis)
if pcm_format.channels != 1:
logging.info("Input device provides %d channels will down-mix to mono for LC3", pcm_format.channels)
if pcm_format.sample_type == audio_io.PcmFormat.SampleType.INT16:
pcm_bit_depth = 16
elif pcm_format.sample_type == audio_io.PcmFormat.SampleType.FLOAT32:
@@ -659,283 +698,139 @@ class Streamer():
logging.error("Only INT16 and FLOAT32 sample types are supported")
return
encoder = lc3.Encoder(
frame_duration_us=global_config.frame_duration_us,
sample_rate_hz=global_config.auracast_sampling_rate_hz,
num_channels=1,
input_sample_rate_hz=pcm_format.sample_rate,
)
# Create one encoder per BIS (mono: 1 encoder, stereo: 2 encoders)
encoders = [
lc3.Encoder(
frame_duration_us=global_config.frame_duration_us,
sample_rate_hz=global_config.auracast_sampling_rate_hz,
num_channels=1,
input_sample_rate_hz=pcm_format.sample_rate,
)
for _ in range(num_bis)
]
lc3_frame_samples = encoder.get_frame_samples() # number of the pcm samples per lc3 frame
lc3_frame_samples = encoders[0].get_frame_samples() # number of the pcm samples per lc3 frame
big['pcm_bit_depth'] = pcm_bit_depth
big['channels'] = pcm_format.channels
big['lc3_frame_samples'] = lc3_frame_samples
big['lc3_bytes_per_frame'] = global_config.octets_per_frame
big['audio_input'] = audio_input
big['encoder'] = encoder
big['encoders'] = encoders
# Keep backward compat
big['encoder'] = encoders[0]
big['precoded'] = False
logging.info("Streaming audio...")
bigs = self.bigs
self.is_streaming = True
# frame drop algo parameters
sample_rate = big['audio_input']._pcm_format.sample_rate
samples_discarded_total = 0 # Total samples discarded
discard_events = 0 # Number of times we discarded samples
frames_since_last_discard = 999 # Guard: frames since last discard (start high to allow first drop)
enable_drift_compensation = getattr(global_config, 'enable_adaptive_frame_dropping', False)
# Hardcoded parameters (unit: milliseconds)
drift_threshold_ms = 2.0 if enable_drift_compensation else 0.0
static_drop_ms = 1 if enable_drift_compensation else 0.0
# Guard interval measured in LC3 frames (10 ms each); 50 => 500 ms cooldown
discard_guard_frames = int(2*sample_rate / 1000) if enable_drift_compensation else 0
# Derived sample counts
drop_threshold_samples = int(sample_rate * drift_threshold_ms / 1000.0)
static_drop_samples = int(sample_rate * static_drop_ms / 1000.0)
if enable_drift_compensation:
logging.info(f"Clock drift compensation ENABLED: threshold={drift_threshold_ms}ms, guard={discard_guard_frames} frames")
else:
logging.info("Clock drift compensation DISABLED")
# Periodic monitoring
last_stats_log = time.perf_counter()
stats_interval = 5.0 # Log stats every 5 seconds
frame_count = 0
# Prime inputs: read one frame from each non-precoded input to verify data flow
try:
for j, _big in enumerate(bigs.values()):
if not _big.get('precoded'):
gen = _big.get('frames_gen')
if gen is None:
gen = _big['audio_input'].frames(_big['lc3_frame_samples'])
_big['frames_gen'] = gen
test_frame = await anext(gen, None)
logging.info(
f"Prime read BIG{j}: bytes={0 if test_frame is None else len(test_frame)} samples={_big['lc3_frame_samples']}"
)
# Store for crossfade if needed
if enable_drift_compensation and test_frame is not None:
_big['prev_pcm_frame'] = test_frame
except Exception as e:
logging.error(f"Prime read failed: {e}", exc_info=True)
# One streamer fits all
while self.is_streaming:
stream_finished = [False for _ in range(len(bigs))]
for i, big in enumerate(bigs.values()):
if big.get('logged_loop_enter') is not True:
logging.info(
f"Stream loop enter: input={type(big['audio_input']).__name__} lc3_frame_samples={big.get('lc3_frame_samples')} bytes_per_frame={big.get('lc3_bytes_per_frame')}"
if big['precoded']: # everything was already lc3 coded beforehand
lc3_frame = bytes(
itertools.islice(big['lc3_frames'], big['lc3_bytes_per_frame'])
)
big['logged_loop_enter'] = True
try:
if big['precoded']:
# everything was already lc3 coded beforehand
lc3_frame = bytes(itertools.islice(big['lc3_frames'], big['lc3_bytes_per_frame']))
if lc3_frame == b'':
# Not all streams may stop at the same time
stream_finished[i] = True
continue
else:
# code lc3 on the fly
# Use stored frames generator when available so we can aclose() it on stop
frames_gen = big.get('frames_gen')
if frames_gen is None:
logging.info("Creating frames generator for input")
frames_gen = big['audio_input'].frames(big['lc3_frame_samples'])
big['frames_gen'] = frames_gen
# Read the frame we need for encoding
logging.info("Awaiting next PCM frame from frames_gen…")
pcm_frame = await anext(frames_gen, None)
if big.get('logged_first_frame') is not True:
logging.info(
f"First PCM frame bytes={0 if pcm_frame is None else len(pcm_frame)} | lc3_frame_samples={big['lc3_frame_samples']} | bytes_per_frame={big['lc3_bytes_per_frame']}"
if lc3_frame == b'': # Not all streams may stop at the same time
stream_finished[i] = True
continue
else: # code lc3 on the fly with perf counters
# Ensure frames generator exists (so we can aclose() on stop)
frames_gen = big.get('frames_gen')
if frames_gen is None:
# For stereo, request frame_samples per channel (interleaved input)
frames_gen = big['audio_input'].frames(big['lc3_frame_samples'])
big['frames_gen'] = frames_gen
# Initialize perf tracking bucket per BIG
perf = big.setdefault('_perf', {
'n': 0,
'samples_sum': 0.0, 'samples_max': 0.0,
'enc_sum': 0.0, 'enc_max': 0.0,
'write_sum': 0.0, 'write_max': 0.0,
'loop_sum': 0.0, 'loop_max': 0.0,
})
# Total loop duration timer (sample + encode + write)
t_loop0 = time.perf_counter()
# Measure time to get a sample from the buffer
t0 = time.perf_counter()
pcm_frame = await anext(frames_gen, None)
dt_sample = time.perf_counter() - t0
if pcm_frame is None: # Not all streams may stop at the same time
stream_finished[i] = True
continue
# Measure LC3 encoding time
t1 = time.perf_counter()
num_bis = big.get('num_bis', 1)
if num_bis == 1:
# Mono: single encoder, single queue
lc3_frame = big['encoder'].encode(
pcm_frame, num_bytes=big['lc3_bytes_per_frame'], bit_depth=big['pcm_bit_depth']
)
lc3_frames_out = [lc3_frame]
else:
# Stereo: split interleaved PCM into L/R, encode separately
pcm_array = np.frombuffer(pcm_frame, dtype=np.int16)
channels_in = big['channels']
lc3_frames_out = []
for ch_idx, encoder in enumerate(big['encoders']):
# Extract channel (interleaved: L,R,L,R,... or L,R,C,... for >2 ch)
ch_pcm = pcm_array[ch_idx::channels_in].tobytes()
lc3_frame = encoder.encode(
ch_pcm, num_bytes=big['lc3_bytes_per_frame'], bit_depth=big['pcm_bit_depth']
)
big['logged_first_frame'] = True
lc3_frames_out.append(lc3_frame)
dt_enc = time.perf_counter() - t1
if pcm_frame is None:
# Not all streams may stop at the same time
stream_finished[i] = True
continue
# Measure write blocking time
t2 = time.perf_counter()
for q_idx, lc3_frame in enumerate(lc3_frames_out):
await big['iso_queues'][q_idx].write(lc3_frame)
dt_write = time.perf_counter() - t2
# Discard excess samples in buffer if above threshold (clock drift compensation)
if enable_drift_compensation and hasattr(big['audio_input'], '_stream') and big['audio_input']._stream:
sd_buffer_samples = big['audio_input']._stream.read_available
# Guard: only allow discard if enough frames have passed since last discard
if sd_buffer_samples > drop_threshold_samples and frames_since_last_discard >= discard_guard_frames:
# Always drop a static amount for predictable behavior
samples_to_drop = min(static_drop_samples, max(1, big['lc3_frame_samples'] - 1))
try:
await anext(big['audio_input'].frames(samples_to_drop))
samples_discarded_total += samples_to_drop
discard_events += 1
sample_rate = big['audio_input']._pcm_format.sample_rate
time_since_last_ms = frames_since_last_discard * 10 # Each frame is 10ms
logging.info(
f"DISCARD #{discard_events}: dropped {samples_to_drop} samples ({samples_to_drop / sample_rate * 1000:.1f}ms) | "
f"buffer was {sd_buffer_samples} samples ({sd_buffer_samples / sample_rate * 1000:.1f}ms) | "
f"since_last={frames_since_last_discard} frames ({time_since_last_ms}ms) | "
f"frame={frame_count}"
)
frames_since_last_discard = 0
big['last_drop_samples'] = samples_to_drop
big['apply_crossfade'] = True
except Exception as e:
logging.error(f"Failed to discard samples: {e}")
# Total loop duration
dt_loop = time.perf_counter() - t_loop0
# Down-mix multi-channel PCM to mono for LC3 encoder if needed
if big.get('channels', 1) > 1:
if isinstance(pcm_frame, np.ndarray):
if pcm_frame.ndim > 1:
mono = pcm_frame.mean(axis=1).astype(pcm_frame.dtype)
pcm_frame = mono
else:
# Convert raw bytes to numpy, average channels, convert back
dtype = np.int16 if big['pcm_bit_depth'] == 16 else np.float32
samples = np.frombuffer(pcm_frame, dtype=dtype)
samples = samples.reshape(-1, big['channels']).mean(axis=1)
pcm_frame = samples.astype(dtype).tobytes()
# Update stats
perf['n'] += 1
perf['samples_sum'] += dt_sample
perf['enc_sum'] += dt_enc
perf['write_sum'] += dt_write
perf['loop_sum'] += dt_loop
perf['samples_max'] = max(perf['samples_max'], dt_sample)
perf['enc_max'] = max(perf['enc_max'], dt_enc)
perf['write_max'] = max(perf['write_max'], dt_write)
perf['loop_max'] = max(perf['loop_max'], dt_loop)
# Apply crossfade if samples were just dropped (drift compensation)
if big.get('apply_crossfade') and big.get('prev_pcm_frame') is not None:
dtype = np.int16 if big['pcm_bit_depth'] == 16 else np.float32
sample_rate = big['audio_input']._pcm_format.sample_rate
crossfade_samples = min(int(sample_rate * 0.010), big['lc3_frame_samples'] // 2)
prev_samples = np.frombuffer(big['prev_pcm_frame'], dtype=dtype).copy()
curr_samples = np.frombuffer(pcm_frame, dtype=dtype).copy()
t = np.linspace(0, 1, crossfade_samples)
fade_out = np.cos(t * np.pi / 2)
fade_in = np.sin(t * np.pi / 2)
if len(prev_samples) >= crossfade_samples and len(curr_samples) >= crossfade_samples:
crossfaded = (prev_samples[-crossfade_samples:] * fade_out + curr_samples[:crossfade_samples] * fade_in).astype(dtype)
curr_samples[:crossfade_samples] = crossfaded
pcm_frame = curr_samples.tobytes()
big['apply_crossfade'] = False
frame_count += 1
if enable_drift_compensation:
big['prev_pcm_frame'] = pcm_frame
lc3_frame = big['encoder'].encode(pcm_frame, num_bytes=big['lc3_bytes_per_frame'], bit_depth=big['pcm_bit_depth'])
if big.get('logged_first_lc3') is not True:
try:
logging.info(f"First LC3 frame size={len(lc3_frame)} bytes")
except Exception:
pass
big['logged_first_lc3'] = True
await big['iso_queue'].write(lc3_frame)
except asyncio.CancelledError:
raise
except Exception as e:
if not big.get('logged_exception'):
logging.error(f"Exception in stream loop for BIG {i}: {e}", exc_info=True)
big['logged_exception'] = True
frame_count += 1
# Increment guard counter (tracks frames since last discard)
frames_since_last_discard += 1
# Periodic stats logging
now = time.perf_counter()
if now - last_stats_log >= stats_interval:
# Get current buffer status from PortAudio
current_sd_buffer = 0
if hasattr(big['audio_input'], '_stream') and big['audio_input']._stream:
try:
current_sd_buffer = big['audio_input']._stream.read_available
except Exception:
pass
# Get stream latency and CPU load from sounddevice
stream_latency_ms = None
cpu_load_pct = None
if hasattr(big['audio_input'], '_stream') and big['audio_input']._stream:
try:
latency = big['audio_input']._stream.latency
if frame_count == 501: # Debug log once
logging.info(f"DEBUG: stream.latency raw value = {latency}, type = {type(latency)}")
# latency can be either a float (for input-only streams) or tuple (input, output)
if latency is not None:
if isinstance(latency, (int, float)):
# Single value for input-only stream
stream_latency_ms = float(latency) * 1000.0
elif isinstance(latency, (tuple, list)) and len(latency) >= 1:
# Tuple (input_latency, output_latency)
stream_latency_ms = latency[0] * 1000.0
except Exception as e:
if frame_count == 501: # Log once at startup
logging.warning(f"Could not get stream.latency: {e}")
try:
cpu_load = big['audio_input']._stream.cpu_load
if frame_count == 501: # Debug log once
logging.info(f"DEBUG: stream.cpu_load raw value = {cpu_load}")
# cpu_load is a fraction (0.0 to 1.0)
if cpu_load is not None and cpu_load >= 0:
cpu_load_pct = cpu_load * 100.0 # Convert to percentage
except Exception as e:
if frame_count == 501: # Log once at startup
logging.warning(f"Could not get stream.cpu_load: {e}")
# Get backend-specific buffer status
backend_delay = None
backend_label = "Backend"
# Determine which backend we're using based on audio_input device
try:
device_info = big['audio_input']._device if hasattr(big['audio_input'], '_device') else None
if device_info is not None and isinstance(device_info, int):
hostapi = sd.query_hostapis(sd.query_devices(device_info)['hostapi'])
backend_name = hostapi['name']
else:
backend_name = "Unknown"
except Exception:
backend_name = "Unknown"
if 'pulse' in backend_name.lower():
# PipeWire/PulseAudio backend - no direct buffer access
# SD_buffer is the only reliable metric
backend_label = "PipeWire"
backend_delay = None # Cannot read PipeWire internal buffers directly
else:
# ALSA backend - can read kernel buffer
backend_label = "ALSA_kernel"
try:
with open('/proc/asound/card0/pcm0c/sub0/status', 'r') as f:
for line in f:
if 'delay' in line and ':' in line:
backend_delay = int(line.split(':')[1].strip())
break
except Exception:
pass
if enable_drift_compensation:
avg_discard_per_event = (samples_discarded_total / discard_events) if discard_events > 0 else 0.0
discard_event_rate = (discard_events / frame_count * 100) if frame_count > 0 else 0.0
latency_str = f"stream_latency={stream_latency_ms:.2f} ms" if stream_latency_ms is not None else "stream_latency=N/A"
cpu_str = f"cpu_load={cpu_load_pct:.1f}%" if cpu_load_pct is not None else "cpu_load=N/A"
# Log every 500 frames for this BIG and reset accumulators
if perf['n'] >= 500:
n = perf['n']
logging.info(
f"STATS: frames={frame_count} | discard_events={discard_events} ({discard_event_rate:.1f}%) | "
f"avg_discard={avg_discard_per_event:.0f} samples/event | "
f"SD_buffer={current_sd_buffer} samples ({current_sd_buffer / big['audio_input']._pcm_format.sample_rate * 1000:.1f} ms) | "
f"{latency_str} | {cpu_str} | "
f"threshold={drop_threshold_samples} samples ({drop_threshold_samples / big['audio_input']._pcm_format.sample_rate * 1000:.1f} ms)"
"Perf(i=%d, last %d): sample mean=%.6fms max=%.6fms | encode mean=%.6fms max=%.6fms | write mean=%.6fms max=%.6fms | loop mean=%.6fms max=%.6fms",
i,
n,
(perf['samples_sum'] / n) * 1e3, perf['samples_max'] * 1e3,
(perf['enc_sum'] / n) * 1e3, perf['enc_max'] * 1e3,
(perf['write_sum'] / n) * 1e3, perf['write_max'] * 1e3,
(perf['loop_sum'] / n) * 1e3, perf['loop_max'] * 1e3,
)
else:
backend_str = f"{backend_label}={backend_delay} samples ({backend_delay / big['audio_input']._pcm_format.sample_rate * 1000:.1f} ms)" if backend_delay is not None else f"{backend_label}=N/A (use pw-top)"
latency_str = f"stream_latency={stream_latency_ms:.2f} ms" if stream_latency_ms is not None else "stream_latency=N/A"
cpu_str = f"cpu_load={cpu_load_pct:.1f}%" if cpu_load_pct is not None else "cpu_load=N/A"
logging.info(
f"STATS: frames={frame_count} | "
f"SD_buffer={current_sd_buffer} samples ({current_sd_buffer / big['audio_input']._pcm_format.sample_rate * 1000:.1f} ms) | "
f"{latency_str} | {cpu_str} | "
f"{backend_str} | "
f"drift_compensation=DISABLED"
)
last_stats_log = now
perf.update({
'n': 0,
'samples_sum': 0.0, 'samples_max': 0.0,
'enc_sum': 0.0, 'enc_max': 0.0,
'write_sum': 0.0, 'write_max': 0.0,
'loop_sum': 0.0, 'loop_max': 0.0,
})
if all(stream_finished): # Take into account that multiple files have different lengths
logging.info('All streams finished, stopping streamer')
@@ -988,106 +883,28 @@ if __name__ == "__main__":
format='%(module)s.py:%(lineno)d %(levelname)s: %(message)s'
)
os.chdir(os.path.dirname(__file__))
# =============================================================================
# AUDIO BACKEND CONFIGURATION - Toggle between ALSA and PipeWire
# =============================================================================
# Uncomment ONE of the following backend configurations:
# Option 1: Direct ALSA (Direct hardware access, bypasses PipeWire)
AUDIO_BACKEND = 'ALSA'
target_latency_ms = 10.0
# Option 2: PipeWire via PulseAudio API (Routes through pipewire-pulse)
#AUDIO_BACKEND = 'PipeWire'
#target_latency_ms = 5.0 # PipeWire typically handles lower latency better
# =============================================================================
import sounddevice as sd
import subprocess
# Detect if PipeWire is running (even if we're using ALSA API)
pipewire_running = False
try:
result = subprocess.run(['systemctl', '--user', 'is-active', 'pipewire'],
capture_output=True, text=True, timeout=1)
pipewire_running = (result.returncode == 0)
except Exception:
pass
if AUDIO_BACKEND == 'ALSA':
os.environ['SDL_AUDIODRIVER'] = 'alsa'
sd.default.latency = target_latency_ms / 1000.0
# Find ALSA host API
try:
alsa_hostapi = next(i for i, ha in enumerate(sd.query_hostapis())
if 'ALSA' in ha['name'])
logging.info(f"ALSA host API available at index: {alsa_hostapi}")
except StopIteration:
logging.error("ALSA backend not found!")
# Find ALSA host API
alsa_hostapi = next(i for i, ha in enumerate(sd.query_hostapis())
if 'ALSA' in ha['name'])
elif AUDIO_BACKEND == 'PipeWire':
os.environ['SDL_AUDIODRIVER'] = 'pulseaudio'
sd.default.latency = target_latency_ms / 1000.0
if not pipewire_running:
logging.error("PipeWire selected but not running!")
raise RuntimeError("PipeWire is not active")
# Find PulseAudio host API (required for PipeWire mode)
try:
pulse_hostapi = next(i for i, ha in enumerate(sd.query_hostapis())
if 'pulse' in ha['name'].lower())
logging.info(f"Using PulseAudio host API at index: {pulse_hostapi} → routes to PipeWire")
except StopIteration:
logging.error("PulseAudio host API not found! Did you rebuild PortAudio with -DPA_USE_PULSEAUDIO=ON?")
raise RuntimeError("PulseAudio API not available in PortAudio")
else:
logging.error(f"Unknown AUDIO_BACKEND: {AUDIO_BACKEND}")
raise ValueError(f"Invalid AUDIO_BACKEND: {AUDIO_BACKEND}")
# Select audio input device based on backend
shure_device_idx = None
search_str='ch1'
# Use ALSA devices
from auracast.utils.sounddevice_utils import get_alsa_usb_inputs
devices = get_alsa_usb_inputs()
logging.info(f"Searching ALSA devices for first device with string {search_str}...")
if AUDIO_BACKEND == 'ALSA':
# Use ALSA devices
from auracast.utils.sounddevice_utils import get_alsa_usb_inputs
devices = get_alsa_usb_inputs()
logging.info("Searching ALSA devices for Shure MVX2U...")
for idx, dev in devices:
logging.info(f" ALSA device [{idx}]: {dev['name']} ({dev['max_input_channels']} ch)")
if 'shure' in dev['name'].lower() and 'mvx2u' in dev['name'].lower():
shure_device_idx = idx
logging.info(f"✓ Selected ALSA device {idx}: {dev['name']}")
break
elif AUDIO_BACKEND == 'PipeWire':
# Use PulseAudio devices (routed through PipeWire)
logging.info("Searching PulseAudio devices for Shure MVX2U...")
for idx, dev in enumerate(sd.query_devices()):
# Only consider PulseAudio input devices
if dev['max_input_channels'] > 0:
hostapi = sd.query_hostapis(dev['hostapi'])
if 'pulse' in hostapi['name'].lower():
dev_name_lower = dev['name'].lower()
logging.info(f" PulseAudio device [{idx}]: {dev['name']} ({dev['max_input_channels']} ch)")
# Skip monitor devices (they're output monitors, not real inputs)
if 'monitor' in dev_name_lower:
continue
# Look for Shure MVX2U - prefer "Mono" device for mono input
if 'shure' in dev_name_lower and 'mvx2u' in dev_name_lower:
shure_device_idx = idx
logging.info(f"✓ Selected PulseAudio device {idx}: {dev['name']} → routes to PipeWire")
break
audio_dev = None
for idx, dev in devices:
logging.info(f" ALSA device [{idx}]: {dev['name']} ({dev['max_input_channels']} ch)")
if search_str in dev['name'].lower():
audio_dev = idx
logging.info(f"✓ Selected ALSA device {idx}: {dev['name']}")
break
if shure_device_idx is None:
logging.error(f"Shure MVX2U not found in {AUDIO_BACKEND} devices!")
if audio_dev is None:
logging.error(f"Audio device {audio_dev} not found in {AUDIO_BACKEND} devices!")
raise RuntimeError(f"Audio device not found for {AUDIO_BACKEND} backend")
config = auracast_config.AuracastConfigGroup(
@@ -1101,7 +918,7 @@ if __name__ == "__main__":
)
# TODO: How can we use other iso interval than 10ms ?(medium or low rel) ? - nrf53audio receiver repports I2S tx underrun
config.qos_config=auracast_config.AuracastQosHigh()
config.qos_config=auracast_config.AuracastQosRobust()
#config.transport='serial:/dev/serial/by-id/usb-ZEPHYR_Zephyr_HCI_UART_sample_81BD14B8D71B5662-if00,1000000,rtscts' # transport for nrf52 dongle
#config.transport='serial:/dev/serial/by-id/usb-SEGGER_J-Link_001050076061-if02,1000000,rtscts' # transport for nrf53dk
@@ -1111,8 +928,6 @@ if __name__ == "__main__":
#config.transport= 'auto'
config.transport='serial:/dev/ttyAMA3,1000000,rtscts' # transport for raspberry pi
# TODO: encrypted streams are not working
for big in config.bigs:
#big.code = 'abcd'
#big.code = '78 e5 dc f1 34 ab 42 bf c1 92 ef dd 3a fd 67 ae'
@@ -1120,11 +935,11 @@ if __name__ == "__main__":
#big.audio_source = big.audio_source.replace('.wav', '_10_16_32.lc3') #lc3 precoded files
#big.audio_source = read_lc3_file(big.audio_source) # load files in advance
# --- Configure Shure MVX2U USB Audio Interface (ALSA backend) ---
if shure_device_idx is not None:
big.audio_source = f'device:{shure_device_idx}' # Shure MVX2U USB mono interface
# --- Configure device (ALSA backend) ---
if audio_dev is not None:
big.audio_source = f'device:{audio_dev}'
big.input_format = 'int16le,48000,1' # int16, 48kHz, mono
logging.info(f"Configured BIG '{big.name}' with Shure MVX2U (device:{shure_device_idx}, 48kHz mono)")
logging.info(f"Configured BIG '{big.name}' with (device:{audio_dev}, 48kHz mono)")
else:
logging.warning(f"Shure device not found, BIG '{big.name}' will use default audio_source: {big.audio_source}")
@@ -1137,15 +952,12 @@ if __name__ == "__main__":
# 24kHz is only working with 2 streams - probably airtime constraint
# TODO: with more than three broadcasters (16kHz) no advertising (no primary channels is present anymore)
# TODO: find the bottleneck - probably airtime
# TODO: test encrypted streams
config.auracast_sampling_rate_hz = 16000
config.octets_per_frame = 40 # 32kbps@16kHz
config.auracast_sampling_rate_hz = 24000
config.octets_per_frame = 60 # 32kbps@16kHz
#config.immediate_rendering = True
#config.debug = True
# Enable clock drift compensation to prevent latency accumulation
# With ~43 samples/sec drift (0.89ms/sec), threshold of 2ms will trigger every ~2.2 seconds
run_async(
broadcast(
config,

View File

@@ -140,7 +140,7 @@ async def main():
os.chdir(os.path.dirname(__file__))
global_conf = auracast_config.AuracastGlobalConfig(
qos_config=auracast_config.AuracastQosHigh()
qos_config=auracast_config.AuracastQosRobust()
)
#global_conf.transport='serial:/dev/serial/by-id/usb-SEGGER_J-Link_001057705357-if02,1000000,rtscts' # transport for nrf54l15dk
global_conf.transport='serial:/dev/serial/by-id/usb-ZEPHYR_Zephyr_HCI_UART_sample_81BD14B8D71B5662-if00,115200,rtscts' #nrf52dongle hci_uart usb cdc

View File

@@ -159,7 +159,7 @@ if __name__ == "__main__":
],
immediate_rendering=False,
presentation_delay_us=40000,
qos_config=auracast_config.AuracastQosHigh(),
qos_config=auracast_config.AuracastQosRobust(),
auracast_sampling_rate_hz = LC3_SRATE,
octets_per_frame = OCTETS_PER_FRAME,
transport=TRANSPORT1,

View File

@@ -8,6 +8,8 @@ import requests
from dotenv import load_dotenv
import streamlit as st
from auracast.utils.read_temp import read_case_temp, read_cpu_temp
from auracast import auracast_config
from auracast.utils.frontend_auth import (
is_pw_disabled,
@@ -88,6 +90,11 @@ QUALITY_MAP = {
"Fair (16kHz)": {"rate": 16000, "octets": 40},
}
QOS_PRESET_MAP = {
"Fast": auracast_config.AuracastQosFast(),
"Robust": auracast_config.AuracastQosRobust(),
}
# Try loading persisted settings from backend
saved_settings = {}
try:
@@ -100,6 +107,10 @@ except Exception:
# Define is_streaming early from the fetched status for use throughout the UI
is_streaming = bool(saved_settings.get("is_streaming", False))
# Extract secondary status, if provided by the backend /status endpoint.
secondary_status = saved_settings.get("secondary") or {}
secondary_is_streaming = bool(saved_settings.get("secondary_is_streaming", secondary_status.get("is_streaming", False)))
st.title("Auracast Audio Mode Control")
def render_stream_controls(status_streaming: bool, start_label: str, stop_label: str, mode_label: str):
@@ -119,9 +130,10 @@ def render_stream_controls(status_streaming: bool, start_label: str, stop_label:
# Audio mode selection with persisted default
# Note: backend persists 'USB' for any device:<name> source (including AES67). We default to 'USB' in that case.
options = [
"Demo",
"USB",
"Network",
"Demo",
"Analog",
"USB",
"Network",
]
saved_audio_mode = saved_settings.get("audio_mode", "Demo")
if saved_audio_mode not in options:
@@ -153,7 +165,12 @@ if isinstance(backend_mode_raw, str):
elif backend_mode_raw in options:
backend_mode_mapped = backend_mode_raw
running_mode = backend_mode_mapped if (is_streaming and backend_mode_mapped) else audio_mode
# When Analog is selected in the UI we always show it as such, even though the
# backend currently persists USB for all device sources.
if audio_mode == "Analog":
running_mode = "Analog"
else:
running_mode = backend_mode_mapped if (is_streaming and backend_mode_mapped) else audio_mode
is_started = False
is_stopped = False
@@ -203,7 +220,7 @@ if audio_mode == "Demo":
type=("password"),
help="Optional: Set a broadcast code to protect your stream. Leave empty for an open (uncoded) broadcast."
)
col_flags1, col_flags2, col_pdelay, col_rtn = st.columns([1, 1, 0.7, 0.6], gap="small", vertical_alignment="center")
col_flags1, col_flags2, col_pdelay, col_qos = st.columns([1, 1, 0.7, 0.6], gap="small", vertical_alignment="center")
with col_flags1:
assisted_listening = st.checkbox(
"Assistive listening",
@@ -225,13 +242,13 @@ if audio_mode == "Demo":
min_value=10, max_value=200, step=5, value=default_pdelay_ms,
help="Delay between capture and presentation for receivers."
)
default_rtn = int(saved_settings.get('rtn', 4) or 4)
with col_rtn:
rtn_options = [1,2,3,4]
default_rtn_clamped = min(4, max(1, default_rtn))
rtn = st.selectbox(
"RTN", options=rtn_options, index=rtn_options.index(default_rtn_clamped),
help="Number of ISO retransmissions (higher improves robustness at cost of airtime)."
with col_qos:
qos_options = list(QOS_PRESET_MAP.keys())
saved_qos = saved_settings.get('qos_preset', 'Fast')
default_qos_idx = qos_options.index(saved_qos) if saved_qos in qos_options else 0
qos_preset = st.selectbox(
"QoS", options=qos_options, index=default_qos_idx,
help="Fast: 2 retransmissions, lower latency. Robust: 4 retransmissions, better reliability."
)
#st.info(f"Demo mode selected: {demo_selected} (Streams: {demo_stream_map[demo_selected]['streams']}, Rate: {demo_stream_map[demo_selected]['rate']} Hz)")
# Start/Stop buttons for demo mode
@@ -283,11 +300,7 @@ if audio_mode == "Demo":
assisted_listening_stream=assisted_listening,
immediate_rendering=immediate_rendering,
presentation_delay_us=int(presentation_delay_ms * 1000),
qos_config=auracast_config.AuracastQoSConfig(
iso_int_multiple_10ms=1,
number_of_retransmissions=int(rtn),
max_transport_latency_ms=int(rtn)*10 + 3,
),
qos_config=QOS_PRESET_MAP[qos_preset],
bigs=bigs1
)
config2 = None
@@ -299,11 +312,7 @@ if audio_mode == "Demo":
assisted_listening_stream=assisted_listening,
immediate_rendering=immediate_rendering,
presentation_delay_us=int(presentation_delay_ms * 1000),
qos_config=auracast_config.AuracastQoSConfig(
iso_int_multiple_10ms=1,
number_of_retransmissions=int(rtn),
max_transport_latency_ms=int(rtn)*10 + 3,
),
qos_config=QOS_PRESET_MAP[qos_preset],
bigs=bigs2
)
# Call /init and /init2
@@ -338,111 +347,103 @@ if audio_mode == "Demo":
quality = None # Not used in demo mode
else:
# Stream quality selection (now enabled)
quality_options = list(QUALITY_MAP.keys())
default_quality = "Medium (24kHz)" if "Medium (24kHz)" in quality_options else quality_options[0]
quality = st.selectbox(
"Stream Quality (Sampling Rate)",
quality_options,
index=quality_options.index(default_quality),
help="Select the audio sampling rate for the stream. Lower rates may improve compatibility."
)
# --- Mode-specific configuration ---
default_name = saved_settings.get('channel_names', ["Broadcast0"])[0]
default_lang = saved_settings.get('languages', ["deu"])[0]
default_input = saved_settings.get('input_device') or 'default'
stream_name = st.text_input(
"Channel Name",
value=default_name,
help="The primary name for your broadcast. Like the SSID of a WLAN, it identifies your stream for receivers."
)
raw_program_info = saved_settings.get('program_info', default_name)
if isinstance(raw_program_info, list) and raw_program_info:
default_program_info = raw_program_info[0]
else:
default_program_info = raw_program_info
program_info = st.text_input(
"Program Info",
value=default_program_info,
help="Additional details about the broadcast program, such as its content or purpose. Shown to receivers for more context."
)
language = st.text_input(
"Language (ISO 639-3)",
value=default_lang,
help="Three-letter language code (e.g., 'eng' for English, 'deu' for German). Used by receivers to display the language of the stream. See: https://en.wikipedia.org/wiki/List_of_ISO_639-3_codes"
)
# Optional broadcast code for coded streams
stream_passwort = st.text_input(
"Stream Passwort",
value="",
type="password",
help="Optional: Set a broadcast code to protect your stream. Leave empty for an open (uncoded) broadcast."
)
# Flags and QoS row (compact, four columns)
col_flags1, col_flags2, col_pdelay, col_rtn = st.columns([1, 1, 0.7, 0.6], gap="small")
with col_flags1:
assisted_listening = st.checkbox(
"Assistive listening",
value=bool(saved_settings.get('assisted_listening_stream', False)),
help="tells the receiver that this is an assistive listening stream"
)
with col_flags2:
immediate_rendering = st.checkbox(
"Immediate rendering",
value=bool(saved_settings.get('immediate_rendering', False)),
help="tells the receiver to ignore presentation delay and render immediately if possible."
)
# QoS/presentation controls inline with flags
default_pdelay = int(saved_settings.get('presentation_delay_us', 40000) or 40000)
with col_pdelay:
default_pdelay_ms = max(10, min(200, default_pdelay // 1000))
presentation_delay_ms = st.number_input(
"Delay (ms)",
min_value=10, max_value=200, step=5, value=default_pdelay_ms,
help="Delay between capture and presentation for receivers."
)
default_rtn = int(saved_settings.get('rtn', 4) or 4)
with col_rtn:
rtn_options = [1,2,3,4]
default_rtn_clamped = min(4, max(1, default_rtn))
rtn = st.selectbox(
"RTN", options=rtn_options, index=rtn_options.index(default_rtn_clamped),
help="Number of ISO retransmissions (higher improves robustness at cost of airtime)."
)
default_lang = saved_settings.get('languages', ["deu"])[0]
# Input device selection for USB or AES67 mode
if audio_mode in ("USB", "Network"):
# Per-mode configuration and controls
input_device = None
radio2_enabled = False
radio1_cfg = None
radio2_cfg = None
if audio_mode == "Analog":
# --- Radio 1 controls ---
st.subheader("Radio 1")
quality_options = list(QUALITY_MAP.keys())
default_quality = "Medium (24kHz)" if "Medium (24kHz)" in quality_options else quality_options[0]
quality1 = st.selectbox(
"Stream Quality (Radio 1)",
quality_options,
index=quality_options.index(default_quality),
help="Select the audio sampling rate for Radio 1."
)
stream_passwort1 = st.text_input(
"Stream Passwort (Radio 1)",
value="",
type="password",
help="Optional: Set a broadcast code for Radio 1."
)
col_r1_flags1, col_r1_flags2, col_r1_pdelay, col_r1_qos = st.columns([1, 1, 0.7, 0.6], gap="small")
with col_r1_flags1:
assisted_listening1 = st.checkbox(
"Assistive listening (R1)",
value=bool(saved_settings.get('assisted_listening_stream', False)),
help="tells the receiver that this is an assistive listening stream"
)
with col_r1_flags2:
immediate_rendering1 = st.checkbox(
"Immediate rendering (R1)",
value=bool(saved_settings.get('immediate_rendering', False)),
help="tells the receiver to ignore presentation delay and render immediately if possible."
)
default_pdelay = int(saved_settings.get('presentation_delay_us', 40000) or 40000)
with col_r1_pdelay:
default_pdelay_ms = max(10, min(200, default_pdelay // 1000))
presentation_delay_ms1 = st.number_input(
"Delay (ms, R1)",
min_value=10, max_value=200, step=5, value=default_pdelay_ms,
help="Delay between capture and presentation for Radio 1."
)
with col_r1_qos:
qos_options = list(QOS_PRESET_MAP.keys())
saved_qos = saved_settings.get('qos_preset', 'Fast')
default_qos_idx = qos_options.index(saved_qos) if saved_qos in qos_options else 0
qos_preset1 = st.selectbox(
"QoS (R1)", options=qos_options, index=default_qos_idx,
help="Fast: 2 retransmissions, lower latency. Robust: 4 retransmissions, better reliability."
)
col_r1_name, col_r1_lang = st.columns([2, 1])
with col_r1_name:
stream_name1 = st.text_input(
"Channel Name (Radio 1)",
value=default_name,
help="Name for the first analog radio (Radio 1)."
)
with col_r1_lang:
language1 = st.text_input(
"Language (ISO 639-3) (Radio 1)",
value=default_lang,
help="Language code for Radio 1."
)
program_info1 = st.text_input(
"Program Info (Radio 1)",
value=default_program_info,
help="Program information for Radio 1."
)
# Analog mode exposes only ALSA ch1/ch2 inputs.
if not is_streaming:
# Only query device lists when NOT streaming to avoid extra backend calls
try:
endpoint = "/audio_inputs_pw_usb" if audio_mode == "USB" else "/audio_inputs_pw_network"
resp = requests.get(f"{BACKEND_URL}{endpoint}")
resp = requests.get(f"{BACKEND_URL}/audio_inputs_pw_usb")
device_list = resp.json().get('inputs', [])
except Exception as e:
st.error(f"Failed to fetch devices: {e}")
device_list = []
# Display "name [id]" but use name as value
input_options = [f"{d['name']} [{d['id']}]" for d in device_list]
option_name_map = {f"{d['name']} [{d['id']}]": d['name'] for d in device_list}
device_names = [d['name'] for d in device_list]
analog_devices = [d for d in device_list if d.get('name') in ('ch1', 'ch2')]
# Determine default input by name (from persisted server state)
default_input_name = saved_settings.get('input_device')
if default_input_name not in device_names and device_names:
default_input_name = device_names[0]
default_input_label = None
for label, name in option_name_map.items():
if name == default_input_name:
default_input_label = label
break
if not input_options:
warn_text = (
"No USB audio input devices found. Connect a USB input and click Refresh."
if audio_mode == "USB" else
"No AES67/Network inputs found."
)
st.warning(warn_text)
if not analog_devices:
st.warning("No Analog (ch1/ch2) ALSA inputs found. Check asound configuration.")
if st.button("Refresh", disabled=is_streaming):
try:
r = requests.post(f"{BACKEND_URL}/refresh_audio_devices", timeout=8)
@@ -451,16 +452,248 @@ else:
except Exception as e:
st.error(f"Failed to refresh devices: {e}")
st.rerun()
input_device = None
analog_names = [d['name'] for d in analog_devices]
else:
analog_devices = []
analog_names = []
if not is_streaming:
if analog_names:
default_r1_idx = 0
input_device1 = st.selectbox(
"Input Device (Radio 1)",
analog_names,
index=default_r1_idx,
)
else:
col1, col2 = st.columns([3, 1], vertical_alignment="bottom")
with col1:
selected_option = st.selectbox(
"Input Device",
input_options,
index=input_options.index(default_input_label) if default_input_label in input_options else 0
input_device1 = None
else:
input_device1 = saved_settings.get('input_device')
st.selectbox(
"Input Device (Radio 1)",
[input_device1 or "No device selected"],
index=0,
disabled=True,
help="Stop the stream to change the input device."
)
# --- Radio 2 controls ---
st.subheader("Radio 2")
# If the backend reports that the secondary radio is currently streaming,
# initialize the checkbox to checked so the UI reflects the active state
# when the frontend is loaded.
radio2_enabled_default = secondary_is_streaming
radio2_enabled = st.checkbox(
"Enable Radio 2",
value=radio2_enabled_default,
help="Activate a second analog radio with its own quality and timing settings."
)
if radio2_enabled:
quality2 = st.selectbox(
"Stream Quality (Radio 2)",
quality_options,
index=quality_options.index(default_quality),
help="Select the audio sampling rate for Radio 2."
)
stream_passwort2 = st.text_input(
"Stream Passwort (Radio 2)",
value="",
type="password",
help="Optional: Set a broadcast code for Radio 2."
)
col_r2_flags1, col_r2_flags2, col_r2_pdelay, col_r2_qos = st.columns([1, 1, 0.7, 0.6], gap="small")
with col_r2_flags1:
assisted_listening2 = st.checkbox(
"Assistive listening (R2)",
value=bool(saved_settings.get('assisted_listening_stream', False)),
help="tells the receiver that this is an assistive listening stream"
)
with col_r2_flags2:
immediate_rendering2 = st.checkbox(
"Immediate rendering (R2)",
value=bool(saved_settings.get('immediate_rendering', False)),
help="tells the receiver to ignore presentation delay and render immediately if possible."
)
with col_r2_pdelay:
presentation_delay_ms2 = st.number_input(
"Delay (ms, R2)",
min_value=10, max_value=200, step=5, value=default_pdelay_ms,
help="Delay between capture and presentation for Radio 2."
)
with col_r2_qos:
saved_qos2 = saved_settings.get('secondary', {}).get('qos_preset', 'Fast')
default_qos_idx2 = qos_options.index(saved_qos2) if saved_qos2 in qos_options else 0
qos_preset2 = st.selectbox(
"QoS (R2)", options=qos_options, index=default_qos_idx2,
help="Fast: 2 retransmissions, lower latency. Robust: 4 retransmissions, better reliability."
)
col_r2_name, col_r2_lang = st.columns([2, 1])
with col_r2_name:
stream_name2 = st.text_input(
"Channel Name (Radio 2)",
value=f"{default_name}_2",
help="Name for the second analog radio (Radio 2)."
)
with col_r2_lang:
language2 = st.text_input(
"Language (ISO 639-3) (Radio 2)",
value=default_lang,
help="Language code for Radio 2."
)
program_info2 = st.text_input(
"Program Info (Radio 2)",
value=default_program_info,
help="Program information for Radio 2."
)
if not is_streaming:
if analog_names:
default_r2_idx = 1 if len(analog_names) > 1 else 0
input_device2 = st.selectbox(
"Input Device (Radio 2)",
analog_names,
index=default_r2_idx,
)
with col2:
else:
input_device2 = None
else:
input_device2 = saved_settings.get('input_device')
st.selectbox(
"Input Device (Radio 2)",
[input_device2 or "No device selected"],
index=0,
disabled=True,
help="Stop the stream to change the input device."
)
radio2_cfg = {
'id': 1002,
'name': stream_name2,
'program_info': program_info2,
'language': language2,
'input_device': input_device2,
'quality': quality2,
'stream_passwort': stream_passwort2,
'assisted_listening': assisted_listening2,
'immediate_rendering': immediate_rendering2,
'presentation_delay_ms': presentation_delay_ms2,
'qos_preset': qos_preset2,
}
radio1_cfg = {
'id': 1001,
'name': stream_name1,
'program_info': program_info1,
'language': language1,
'input_device': input_device1,
'quality': quality1,
'stream_passwort': stream_passwort1,
'assisted_listening': assisted_listening1,
'immediate_rendering': immediate_rendering1,
'presentation_delay_ms': presentation_delay_ms1,
'qos_preset': qos_preset1,
}
else:
# USB/Network: single set of controls shared with the single channel
quality_options = list(QUALITY_MAP.keys())
default_quality = "Medium (24kHz)" if "Medium (24kHz)" in quality_options else quality_options[0]
quality = st.selectbox(
"Stream Quality (Sampling Rate)",
quality_options,
index=quality_options.index(default_quality),
help="Select the audio sampling rate for the stream. Lower rates may improve compatibility."
)
stream_passwort = st.text_input(
"Stream Passwort",
value="",
type="password",
help="Optional: Set a broadcast code to protect your stream. Leave empty for an open (uncoded) broadcast."
)
col_flags1, col_flags2, col_pdelay, col_qos = st.columns([1, 1, 0.7, 0.6], gap="small")
with col_flags1:
assisted_listening = st.checkbox(
"Assistive listening",
value=bool(saved_settings.get('assisted_listening_stream', False)),
help="tells the receiver that this is an assistive listening stream"
)
with col_flags2:
immediate_rendering = st.checkbox(
"Immediate rendering",
value=bool(saved_settings.get('immediate_rendering', False)),
help="tells the receiver to ignore presentation delay and render immediately if possible."
)
default_pdelay = int(saved_settings.get('presentation_delay_us', 40000) or 40000)
with col_pdelay:
default_pdelay_ms = max(10, min(200, default_pdelay // 1000))
presentation_delay_ms = st.number_input(
"Delay (ms)",
min_value=10, max_value=200, step=5, value=default_pdelay_ms,
help="Delay between capture and presentation for receivers."
)
with col_qos:
qos_options = list(QOS_PRESET_MAP.keys())
saved_qos = saved_settings.get('qos_preset', 'Fast')
default_qos_idx = qos_options.index(saved_qos) if saved_qos in qos_options else 0
qos_preset = st.selectbox(
"QoS", options=qos_options, index=default_qos_idx,
help="Fast: 2 retransmissions, lower latency. Robust: 4 retransmissions, better reliability."
)
stream_name = st.text_input(
"Channel Name",
value=default_name,
help="The primary name for your broadcast. Like the SSID of a WLAN, it identifies your stream for receivers."
)
program_info = st.text_input(
"Program Info",
value=default_program_info,
help="Additional details about the broadcast program, such as its content or purpose. Shown to receivers for more context."
)
language = st.text_input(
"Language (ISO 639-3)",
value=default_lang,
help="Three-letter language code (e.g., 'eng' for English, 'deu' for German). Used by receivers to display the language of the stream. See: https://en.wikipedia.org/wiki/List_of_ISO_639-3_codes"
)
if audio_mode in ("USB", "Network"):
if not is_streaming:
try:
endpoint = "/audio_inputs_pw_usb" if audio_mode == "USB" else "/audio_inputs_pw_network"
resp = requests.get(f"{BACKEND_URL}{endpoint}")
device_list = resp.json().get('inputs', [])
except Exception as e:
st.error(f"Failed to fetch devices: {e}")
device_list = []
if audio_mode == "USB":
device_list = [d for d in device_list if d.get('name') not in ('ch1', 'ch2')]
input_options = [f"{d['name']} [{d['id']}]" for d in device_list]
option_name_map = {f"{d['name']} [{d['id']}]": d['name'] for d in device_list}
device_names = [d['name'] for d in device_list]
default_input_name = saved_settings.get('input_device')
if default_input_name not in device_names and device_names:
default_input_name = device_names[0]
default_input_label = None
for label, name in option_name_map.items():
if name == default_input_name:
default_input_label = label
break
if not input_options:
warn_text = (
"No USB audio input devices found. Connect a USB input and click Refresh."
if audio_mode == "USB" else
"No AES67/Network inputs found."
)
st.warning(warn_text)
if st.button("Refresh", disabled=is_streaming):
try:
r = requests.post(f"{BACKEND_URL}/refresh_audio_devices", timeout=8)
@@ -469,21 +702,38 @@ else:
except Exception as e:
st.error(f"Failed to refresh devices: {e}")
st.rerun()
# Send only the device name to backend
input_device = option_name_map.get(selected_option)
input_device = None
else:
col1, col2 = st.columns([3, 1], vertical_alignment="bottom")
with col1:
selected_option = st.selectbox(
"Input Device",
input_options,
index=input_options.index(default_input_label) if default_input_label in input_options else 0
)
with col2:
if st.button("Refresh", disabled=is_streaming):
try:
r = requests.post(f"{BACKEND_URL}/refresh_audio_devices", timeout=8)
if not r.ok:
st.error(f"Failed to refresh: {r.text}")
except Exception as e:
st.error(f"Failed to refresh devices: {e}")
st.rerun()
input_device = option_name_map.get(selected_option)
else:
input_device = saved_settings.get('input_device')
current_label = input_device or "No device selected"
st.selectbox(
"Input Device",
[current_label],
index=0,
disabled=True,
help="Stop the stream to change the input device."
)
else:
# When streaming, keep showing the current selection but lock editing.
input_device = saved_settings.get('input_device')
current_label = input_device or "No device selected"
st.selectbox(
"Input Device",
[current_label],
index=0,
disabled=True,
help="Stop the stream to change the input device."
)
else:
input_device = None
input_device = None
start_stream, stop_stream = render_stream_controls(is_streaming, "Start Auracast", "Stop Auracast", running_mode)
if stop_stream:
@@ -499,48 +749,96 @@ else:
if start_stream:
# Always send stop to ensure backend is in a clean state, regardless of current status
r = requests.post(f"{BACKEND_URL}/stop_audio").json()
#if r['was_running']:
# st.success("Stream Stopped!")
# Small pause lets backend fully release audio devices before re-init
time.sleep(1)
# Prepare config using the model (do NOT send qos_config, only relevant fields)
q = QUALITY_MAP[quality]
config = auracast_config.AuracastConfigGroup(
auracast_sampling_rate_hz=q['rate'],
octets_per_frame=q['octets'],
transport='', # is set in backend
assisted_listening_stream=assisted_listening,
immediate_rendering=immediate_rendering,
presentation_delay_us=int(presentation_delay_ms * 1000),
qos_config=auracast_config.AuracastQoSConfig(
iso_int_multiple_10ms=1,
number_of_retransmissions=int(rtn),
max_transport_latency_ms=int(rtn)*10 + 3,
),
bigs = [
auracast_config.AuracastBigConfig(
code=(stream_passwort.strip() or None),
name=stream_name,
program_info=program_info,
language=language,
audio_source=(f"device:{input_device}"),
input_format=(f"int16le,{q['rate']},1"),
iso_que_len=1,
sampling_frequency=q['rate'],
octets_per_frame=q['octets'],
),
]
)
try:
r = requests.post(f"{BACKEND_URL}/init", json=config.model_dump())
if r.status_code == 200:
is_started = True
else:
st.error(f"Failed to initialize: {r.text}")
except Exception as e:
st.error(f"Error: {e}")
if audio_mode == "Analog":
# Build separate configs per radio, each with its own quality and QoS parameters.
is_started = False
def _build_group_from_radio(cfg: dict) -> auracast_config.AuracastConfigGroup | None:
if not cfg or not cfg.get('input_device'):
return None
q = QUALITY_MAP[cfg['quality']]
return auracast_config.AuracastConfigGroup(
auracast_sampling_rate_hz=q['rate'],
octets_per_frame=q['octets'],
transport='', # is set in backend
assisted_listening_stream=bool(cfg['assisted_listening']),
immediate_rendering=bool(cfg['immediate_rendering']),
presentation_delay_us=int(cfg['presentation_delay_ms'] * 1000),
qos_config=QOS_PRESET_MAP[cfg['qos_preset']],
bigs=[
auracast_config.AuracastBigConfig(
id=cfg.get('id', 123456),
code=(cfg['stream_passwort'].strip() or None),
name=cfg['name'],
program_info=cfg['program_info'],
language=cfg['language'],
audio_source=f"device:{cfg['input_device']}",
input_format=f"int16le,{q['rate']},1",
iso_que_len=1,
sampling_frequency=q['rate'],
octets_per_frame=q['octets'],
)
],
)
# Radio 1 (always active if a device is selected)
config1 = _build_group_from_radio(radio1_cfg)
# Radio 2 (optional)
config2 = _build_group_from_radio(radio2_cfg) if radio2_enabled else None
try:
if config1 is not None:
r1 = requests.post(f"{BACKEND_URL}/init", json=config1.model_dump())
if r1.status_code == 200:
is_started = True
else:
st.error(f"Failed to initialize Radio 1: {r1.text}")
else:
st.error("Radio 1 has no valid input device configured.")
if config2 is not None:
r2 = requests.post(f"{BACKEND_URL}/init2", json=config2.model_dump())
if r2.status_code != 200:
st.error(f"Failed to initialize Radio 2: {r2.text}")
except Exception as e:
st.error(f"Error while starting Analog radios: {e}")
else:
# USB/Network: single config as before, using shared controls
q = QUALITY_MAP[quality]
config = auracast_config.AuracastConfigGroup(
auracast_sampling_rate_hz=q['rate'],
octets_per_frame=q['octets'],
transport='', # is set in backend
assisted_listening_stream=assisted_listening,
immediate_rendering=immediate_rendering,
presentation_delay_us=int(presentation_delay_ms * 1000),
qos_config=QOS_PRESET_MAP[qos_preset],
bigs=[
auracast_config.AuracastBigConfig(
code=(stream_passwort.strip() or None),
name=stream_name,
program_info=program_info,
language=language,
audio_source=(f"device:{input_device}"),
input_format=(f"int16le,{q['rate']},1"),
iso_que_len=1,
sampling_frequency=q['rate'],
octets_per_frame=q['octets'],
),
],
)
try:
r = requests.post(f"{BACKEND_URL}/init", json=config.model_dump())
if r.status_code == 200:
is_started = True
else:
st.error(f"Failed to initialize: {r.text}")
except Exception as e:
st.error(f"Error: {e}")
# Centralized rerun based on start/stop outcomes
if is_started or is_stopped:
@@ -563,6 +861,36 @@ if is_started or is_stopped:
############################
with st.expander("System control", expanded=False):
st.subheader("System temperatures")
temp_col1, temp_col2, temp_col3 = st.columns([1, 1, 1])
with temp_col1:
refresh_temps = st.button("Refresh")
try:
case_temp = read_case_temp()
cpu_temp = read_cpu_temp()
with temp_col2:
st.write(f"CPU: {cpu_temp} °C")
with temp_col3:
st.write(f"Case: {case_temp} °C")
except Exception as e:
st.warning(f"Could not read temperatures: {e}")
st.subheader("CA Certificate")
st.caption("Download the CA certificate to trust this device's HTTPS connection.")
try:
cert_resp = requests.get(f"{BACKEND_URL}/cert", timeout=2)
if cert_resp.status_code == 200:
st.download_button(
label="Download CA Certificate",
data=cert_resp.content,
file_name="ca_cert.pem",
mime="application/x-pem-file",
)
else:
st.warning("CA certificate not available.")
except Exception as e:
st.warning(f"Could not fetch CA certificate: {e}")
st.subheader("Change password")
if is_pw_disabled():
st.info("Frontend password protection is disabled via DISABLE_FRONTEND_PW.")
@@ -588,6 +916,69 @@ with st.expander("System control", expanded=False):
except Exception as e:
st.error(f"Failed to update password: {e}")
st.subheader("Software Version")
# Show current version
try:
ver_resp = requests.get(f"{BACKEND_URL}/version", timeout=2)
if ver_resp.ok:
ver_data = ver_resp.json()
current_version = ver_data.get('version', 'unknown')
ver_type = ver_data.get('type', '')
ver_label = current_version if ver_type == 'tag' else f"{current_version} (dev)"
st.write(f"**Current version:** {ver_label}")
else:
st.write("**Current version:** unknown")
current_version = "unknown"
except Exception:
st.write("**Current version:** unknown")
current_version = "unknown"
# Initialize session state for update check
if 'available_update' not in st.session_state:
st.session_state['available_update'] = None
col_check, col_status = st.columns([1, 2])
with col_check:
if st.button("Check for updates"):
try:
check_resp = requests.get(f"{BACKEND_URL}/check_update", timeout=30)
if check_resp.ok:
check_data = check_resp.json()
if check_data.get('error'):
st.session_state['available_update'] = {'error': check_data['error']}
else:
st.session_state['available_update'] = check_data
else:
st.session_state['available_update'] = {'error': f"Failed: {check_resp.status_code}"}
except Exception as e:
st.session_state['available_update'] = {'error': str(e)}
st.rerun()
with col_status:
if st.session_state['available_update']:
upd = st.session_state['available_update']
if upd.get('error'):
st.warning(f"Check failed: {upd['error']}")
elif upd.get('update_available'):
st.info(f"Update available: **{upd['available']}**")
else:
st.success("You are on the latest version.")
# Update button (only show if update is available)
if st.session_state['available_update'] and st.session_state['available_update'].get('update_available'):
if st.button("Update now"):
try:
r = requests.post(f"{BACKEND_URL}/system_update", timeout=120)
if r.ok:
result = r.json()
tag = result.get('tag', 'unknown')
st.success(f"Update to {tag} initiated. The UI will restart shortly.")
st.session_state['available_update'] = None
else:
st.error(f"Failed to update: {r.status_code} {r.text}")
except Exception as e:
st.error(f"Error calling update: {e}")
st.subheader("Reboot")
if st.button("Reboot now", type="primary"):
try:

File diff suppressed because it is too large Load Diff

View File

@@ -34,4 +34,4 @@ echo "Using Avahi domain: $AVAHI_DOMAIN"
POETRY_BIN="/home/caster/.local/bin/poetry"
# Start Streamlit HTTPS server (port 443)
$POETRY_BIN run streamlit run multicast_frontend.py --server.port 443 --server.enableCORS false --server.enableXsrfProtection false --server.headless true --server.sslCertFile "$CERT" --server.sslKeyFile "$KEY" --browser.gatherUsageStats false
$POETRY_BIN run streamlit run multicast_frontend.py --server.port 443 --server.address 0.0.0.0 --server.enableCORS false --server.enableXsrfProtection false --server.headless true --server.sslCertFile "$CERT" --server.sslKeyFile "$KEY" --browser.gatherUsageStats false

View File

@@ -0,0 +1,18 @@
from smbus2 import SMBus
def read_case_temp():
addr = 0x48 # change if your scan shows different
with SMBus(1) as bus:
msb, lsb = bus.read_i2c_block_data(addr, 0x00, 2)
raw = ((msb << 8) | lsb) >> 4
if raw & 0x800: # sign bit for 12-bit
raw -= 1 << 12
return round(raw * 0.0625, 2)
def read_cpu_temp():
with open("/sys/class/thermal/thermal_zone0/temp", "r") as f:
return round(int(f.read()) / 1000, 2)
if __name__ == "__main__":
print("Case temperature: ", read_case_temp(), "°C")
print("CPU temperature: ", read_cpu_temp(), "°C")

View File

@@ -2,7 +2,7 @@ import os
import asyncio
import logging as log
async def reset_nrf54l(slot: int = 0, timeout: float = 8.0):
async def reset_nrf54l(interface: int = 0, timeout: float = 8.0):
"""
Reset the nRF54L target using OpenOCD before starting broadcast.
@@ -24,7 +24,7 @@ async def reset_nrf54l(slot: int = 0, timeout: float = 8.0):
try:
# Resolve project directory and filenames
proj_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', 'openocd'))
names = ['raspberrypi-swd0.cfg', 'swd0.cfg'] if slot == 0 else ['raspberrypi-swd1.cfg', 'swd1.cfg']
names = ['raspberrypi-swd0.cfg', 'swd0.cfg'] if interface == 0 else ['raspberrypi-swd1.cfg', 'swd1.cfg']
cfg = None
for n in names:
p = os.path.join(proj_dir, n)
@@ -56,7 +56,7 @@ async def reset_nrf54l(slot: int = 0, timeout: float = 8.0):
ok = await _run(cmd)
if ok:
log.info("reset_nrf54l: reset succeeded (slot %d) using %s", slot, cfg)
log.info("reset_nrf54l: reset succeeded (interface %d) using %s", interface, cfg)
except FileNotFoundError:
log.error("reset_nrf54l: openocd not found; skipping reset")
@@ -71,7 +71,10 @@ if __name__ == '__main__':
format='%(asctime)s.%(msecs)03d %(levelname)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
interface_to_reset = 0
log.info(f"Executing reset for interface {interface_to_reset}")
asyncio.run(reset_nrf54l(interface=interface_to_reset))
slot_to_reset = 1
log.info(f"Executing reset for slot {slot_to_reset}")
asyncio.run(reset_nrf54l(slot=slot_to_reset))
interface_to_reset = 1
log.info(f"Executing reset for interface {interface_to_reset}")
asyncio.run(reset_nrf54l(interface=interface_to_reset))

View File

@@ -232,13 +232,19 @@ def get_alsa_usb_inputs():
name = dev.get('name', '').lower()
# Filter for USB devices based on common patterns:
# - Contains 'usb' in the name
# - hw:X,Y pattern (ALSA hardware devices)
# - hw:X or hw:X,Y pattern present anywhere in name (ALSA hardware devices)
# - dsnoop/ch1/ch2 convenience entries from asound.conf
# Exclude: default, dmix, pulse, pipewire, sysdefault
if any(exclude in name for exclude in ['default', 'dmix', 'pulse', 'pipewire', 'sysdefault']):
continue
# Include if it has 'usb' in name or matches hw:X pattern
if 'usb' in name or re.match(r'hw:\d+', name):
# Include if it has 'usb' or contains an hw:* token, or matches common dsnoop/mono aliases
if (
'usb' in name or
re.search(r'hw:\d+(?:,\d+)?', name) or
name.startswith('dsnoop') or
name in ('ch1', 'ch2')
):
usb_inputs.append((idx, dev))
return usb_inputs

28
src/misc/asound.conf Normal file
View File

@@ -0,0 +1,28 @@
pcm.ch1 {
type dsnoop
ipc_key 234884
slave {
pcm "hw:CARD=i2s,DEV=0"
channels 2
rate 48000
format S16_LE
period_size 120
buffer_size 240
}
bindings.0 0
}
pcm.ch2 {
type dsnoop
ipc_key 234884
slave {
pcm "hw:CARD=i2s,DEV=0"
channels 2
rate 48000
format S16_LE
period_size 120
buffer_size 240
}
bindings.0 1
}

19
src/misc/build_pcm1862_dts.sh Executable file
View File

@@ -0,0 +1,19 @@
#!/usr/bin/env bash
set -euo pipefail
DTS=./src/misc/pcm1862-i2s.dts
DTBO=pcm1862-i2s.dtbo
OUT=/boot/firmware/overlays
# build
dtc -@ -I dts -O dtb -o "$DTBO" "$DTS"
# install
sudo install -m 0644 "$DTBO" "$OUT/$DTBO"
# NOTE: also add
# dtparam=i2c_arm=on
# dtoverlay=pcm1862-i2s
# to /boot/firmware/config.txt
echo "Built and installed $DTBO to $OUT."
echo "Now either reboot to load the installed overlay"

View File

@@ -0,0 +1 @@
sudo cp src/misc/asound.conf /etc/asound.conf

55
src/misc/pcm1862-i2s.dts Normal file
View File

@@ -0,0 +1,55 @@
/dts-v1/;
/plugin/;
/ {
compatible = "brcm,bcm2835";
/* Enable the I²S controller */
fragment@0 {
target = <&i2s>;
__overlay__ {
status = "okay";
};
};
/* PCM1862 on I2C1 at 0x4a (change if your bus/address differ) */
fragment@1 {
target = <&i2c1>;
__overlay__ {
#address-cells = <1>;
#size-cells = <0>;
pcm1862: adc@4a {
compatible = "ti,pcm1862";
reg = <0x4a>;
#sound-dai-cells = <0>;
/* Rails are hard-powered on your board, so no regulators here */
};
};
};
/* Link bcm2835-i2s <-> pcm1862 via simple-audio-card */
fragment@2 {
target-path = "/";
__overlay__ {
pcm1862_sound: pcm1862-sound {
compatible = "simple-audio-card";
simple-audio-card,name = "pcm1862 on i2s";
simple-audio-card,format = "i2s";
/* Pi is master for BCLK/LRCLK */
simple-audio-card,bitclock-master = <&dai_cpu>;
simple-audio-card,frame-master = <&dai_cpu>;
dai_cpu: simple-audio-card,cpu {
sound-dai = <&i2s>;
dai-tdm-slot-num = <2>;
dai-tdm-slot-width = <32>;
};
simple-audio-card,codec {
sound-dai = <&pcm1862>;
};
};
};
};
};

View File

@@ -1,7 +1,7 @@
adapter driver bcm2835gpio
transport select swd
adapter gpio swclk 17
adapter gpio swdio 18
adapter gpio swdio 26
#adapter gpio trst 26
#reset_config trst_only

View File

@@ -1,7 +1,7 @@
adapter driver bcm2835gpio
transport select swd
adapter gpio swclk 24
adapter gpio swdio 23
adapter gpio swclk 23
adapter gpio swdio 24
#adapter gpio trst 27
#reset_config trst_only

View File

@@ -0,0 +1,42 @@
"""
BAP/BSRC/SCC/BV-20-C: Config Broadcast, LC3 16_2_2
Configuration: 16kHz, 40 octets/frame, stereo (2 BISes), QoS _2 variant
"""
import logging
import os
from auracast.auracast_config import AuracastGlobalConfig, AuracastBigConfig, AuracastQosRobust
from auracast.multicast import broadcast, run_async
if __name__ == "__main__":
logging.basicConfig(
level=os.environ.get("LOG_LEVEL", logging.INFO),
format="%(module)s.py:%(lineno)d %(levelname)s: %(message)s",
)
os.chdir(os.path.join(os.path.dirname(__file__), "../../auracast"))
config = AuracastGlobalConfig()
# _2 variant uses different QoS (RTN=2, higher latency)
config.qos_config = AuracastQosRobust()
config.transport = "serial:/dev/ttyAMA3,1000000,rtscts"
# 16_2_2: 16kHz, 40 octets/frame
config.auracast_sampling_rate_hz = 16000
config.octets_per_frame = 40
big = AuracastBigConfig()
big.random_address = "F1:F1:F2:F3:F4:F5"
big.audio_source = "file:./testdata/announcement_en.wav"
big.num_bis = 1
big.id = 12
run_async(
broadcast(
config,
[big],
)
)

View File

@@ -0,0 +1,42 @@
"""
BAP/BSRC/SCC/BV-22-C: Config Broadcast, LC3 24_2_2
Configuration: 24kHz, 60 octets/frame, stereo (2 BISes), QoS _2 variant
"""
import logging
import os
from auracast.auracast_config import AuracastGlobalConfig, AuracastBigConfig, AuracastQosRobust
from auracast.multicast import broadcast, run_async
if __name__ == "__main__":
logging.basicConfig(
level=os.environ.get("LOG_LEVEL", logging.INFO),
format="%(module)s.py:%(lineno)d %(levelname)s: %(message)s",
)
os.chdir(os.path.join(os.path.dirname(__file__), "../../auracast"))
config = AuracastGlobalConfig()
# _2 variant uses different QoS (RTN=2, higher latency)
config.qos_config = AuracastQosRobust()
config.transport = "serial:/dev/ttyAMA3,1000000,rtscts"
# 24_2_2: 24kHz, 60 octets/frame
config.auracast_sampling_rate_hz = 24000
config.octets_per_frame = 60
big = AuracastBigConfig()
big.random_address = "F1:F1:F2:F3:F4:F5"
big.audio_source = "file:./testdata/announcement_en.wav"
big.num_bis = 1
big.id = 12
run_async(
broadcast(
config,
[big],
)
)

View File

@@ -0,0 +1,42 @@
"""
BAP/BSRC/SCC/BV-28-C: Config Broadcast, LC3 48_2_2
Configuration: 48kHz, 100 octets/frame, stereo (2 BISes), QoS _2 variant
"""
import logging
import os
from auracast.auracast_config import AuracastGlobalConfig, AuracastBigConfig, AuracastQosRobust
from auracast.multicast import broadcast, run_async
if __name__ == "__main__":
logging.basicConfig(
level=os.environ.get("LOG_LEVEL", logging.INFO),
format="%(module)s.py:%(lineno)d %(levelname)s: %(message)s",
)
os.chdir(os.path.join(os.path.dirname(__file__), "../../auracast"))
config = AuracastGlobalConfig()
# _2 variant uses different QoS (RTN=2, higher latency)
config.qos_config = AuracastQosRobust()
config.transport = "serial:/dev/ttyAMA3,1000000,rtscts"
# 48_2_2: 48kHz, 100 octets/frame
config.auracast_sampling_rate_hz = 48000
config.octets_per_frame = 100
big = AuracastBigConfig()
big.random_address = "F1:F1:F2:F3:F4:F5"
big.audio_source = "file:./testdata/announcement_en.wav"
big.num_bis = 1
big.id = 12
run_async(
broadcast(
config,
[big],
)
)

View File

@@ -0,0 +1,42 @@
"""
BAP/BSRC/SCC/BV-30-C: Config Broadcast, LC3 48_4_2
Configuration: 48kHz, 120 octets/frame, stereo (2 BISes), QoS _2 variant
"""
import logging
import os
from auracast.auracast_config import AuracastGlobalConfig, AuracastBigConfig, AuracastQosRobust
from auracast.multicast import broadcast, run_async
if __name__ == "__main__":
logging.basicConfig(
level=os.environ.get("LOG_LEVEL", logging.INFO),
format="%(module)s.py:%(lineno)d %(levelname)s: %(message)s",
)
os.chdir(os.path.join(os.path.dirname(__file__), "../../auracast"))
config = AuracastGlobalConfig()
# _2 variant uses different QoS (RTN=2, higher latency)
config.qos_config = AuracastQosRobust()
config.transport = "serial:/dev/ttyAMA3,1000000,rtscts"
# 48_4_2: 48kHz, 120 octets/frame
config.auracast_sampling_rate_hz = 48000
config.octets_per_frame = 120
big = AuracastBigConfig()
big.random_address = "F1:F1:F2:F3:F4:F5"
big.audio_source = "file:./testdata/announcement_en.wav"
big.num_bis = 1
big.id = 12
run_async(
broadcast(
config,
[big],
)
)

View File

@@ -0,0 +1,43 @@
"""
BAP/BSRC/SCC/BV-32-C: Config Broadcast, LC3 48_6_2
also works for BV35,36,37 - just restart
Configuration: 48kHz, 155 octets/frame, stereo (2 BISes), QoS _2 variant
"""
import logging
import os
from auracast.auracast_config import AuracastGlobalConfig, AuracastBigConfig, AuracastQosRobust
from auracast.multicast import broadcast, run_async
if __name__ == "__main__":
logging.basicConfig(
level=os.environ.get("LOG_LEVEL", logging.INFO),
format="%(module)s.py:%(lineno)d %(levelname)s: %(message)s",
)
os.chdir(os.path.join(os.path.dirname(__file__), "../../auracast"))
config = AuracastGlobalConfig()
# _2 variant uses different QoS (RTN=2, higher latency)
config.qos_config = AuracastQosRobust()
config.transport = "serial:/dev/ttyAMA3,1000000,rtscts"
# 48_6_2: 48kHz, 155 octets/frame
config.auracast_sampling_rate_hz = 48000
config.octets_per_frame = 155
big = AuracastBigConfig()
big.random_address = "F1:F1:F2:F3:F4:F5"
big.audio_source = "file:./testdata/announcement_en.wav"
big.num_bis = 1
big.id = 12
run_async(
broadcast(
config,
[big],
)
)

View File

@@ -0,0 +1,45 @@
"""
BAP/BSRC/SCC/BV-38-C: Multi BIG Configuration
Configuration: Two BIGs (id=12 and id=13), stereo (2 BISes each)
"""
import logging
import os
from auracast.auracast_config import AuracastGlobalConfig, AuracastBigConfig, AuracastQosFast
from auracast.multicast import broadcast, run_async
if __name__ == "__main__":
logging.basicConfig(
level=os.environ.get("LOG_LEVEL", logging.INFO),
format="%(module)s.py:%(lineno)d %(levelname)s: %(message)s",
)
os.chdir(os.path.join(os.path.dirname(__file__), "../../auracast"))
config = AuracastGlobalConfig()
config.qos_config = AuracastQosFast()
config.transport = "serial:/dev/ttyAMA3,1000000,rtscts"
# First BIG
big1 = AuracastBigConfig()
big1.random_address = "F1:F1:F2:F3:F4:F5"
big1.audio_source = "file:./testdata/announcement_en.wav"
big1.num_bis = 1
big1.id = 12
# Second BIG
big2 = AuracastBigConfig()
big2.random_address = "F1:F1:F2:F3:F4:F6"
big2.audio_source = "file:./testdata/announcement_en.wav"
big2.num_bis = 1
big2.id = 13
run_async(
broadcast(
config,
[big1, big2],
)
)

View File

@@ -0,0 +1,42 @@
"""
For BV36-C and BV 37-C to success just restart the stream while the testcase is running
"""
import logging
import os
from auracast.auracast_config import AuracastGlobalConfig, AuracastBigConfig, AuracastQosRobust
from auracast.multicast import broadcast, run_async
if __name__ == "__main__":
logging.basicConfig(
level=os.environ.get("LOG_LEVEL", logging.INFO),
format="%(module)s.py:%(lineno)d %(levelname)s: %(message)s",
)
# Ensure relative audio paths like in AuracastBigConfig work (./auracast/...) from src/auracast/
os.chdir(os.path.join(os.path.dirname(__file__), "../../auracast"))
# Start from default global config
config = AuracastGlobalConfig()
# Use same QoS profile as multicast main
config.qos_config = AuracastQosRobust()
# Transport similar to multicast main; adjust if needed for your setup
# config.transport = "auto" # let multicast auto-detect
config.transport = "serial:/dev/ttyAMA3,1000000,rtscts" # Raspberry Pi default
# Default BIG, only modify the random address as requested
big = AuracastBigConfig()
big.random_address = "F1:F1:F2:F3:F4:F5"
big.audio_source = "file:./testdata/announcement_en.wav"
big.id = 12
run_async(
broadcast(
config,
[big],
)
)

View File

@@ -0,0 +1,41 @@
"""
BAP/BSRC/SCC/BV-06-C and BAP/BSRC/STR/BV-06-C: Config Broadcast, LC3 24_2_1
Configuration: 24kHz, stereo (2 BISes)
"""
import logging
import os
from auracast.auracast_config import AuracastGlobalConfig, AuracastBigConfig, AuracastQosFast
from auracast.multicast import broadcast, run_async
if __name__ == "__main__":
logging.basicConfig(
level=os.environ.get("LOG_LEVEL", logging.INFO),
format="%(module)s.py:%(lineno)d %(levelname)s: %(message)s",
)
os.chdir(os.path.join(os.path.dirname(__file__), "../../auracast"))
config = AuracastGlobalConfig()
config.qos_config = AuracastQosFast()
config.transport = "serial:/dev/ttyAMA3,1000000,rtscts"
# 24_2_1: 24kHz
config.auracast_sampling_rate_hz = 24000
config.octets_per_frame = 60
big = AuracastBigConfig()
big.random_address = "F1:F1:F2:F3:F4:F5"
big.audio_source = "file:./testdata/announcement_en.wav"
big.num_bis = 1
big.id = 12
run_async(
broadcast(
config,
[big],
)
)

View File

@@ -0,0 +1,41 @@
"""
BAP/BSRC/SCC/BV-12-C and BAP/BSRC/STR/BV-12-C: Config Broadcast, LC3 48_2_1
Configuration: 48kHz, 100 octets/frame, stereo (2 BISes)
"""
import logging
import os
from auracast.auracast_config import AuracastGlobalConfig, AuracastBigConfig, AuracastQosFast
from auracast.multicast import broadcast, run_async
if __name__ == "__main__":
logging.basicConfig(
level=os.environ.get("LOG_LEVEL", logging.INFO),
format="%(module)s.py:%(lineno)d %(levelname)s: %(message)s",
)
os.chdir(os.path.join(os.path.dirname(__file__), "../../auracast"))
config = AuracastGlobalConfig()
config.qos_config = AuracastQosFast()
config.transport = "serial:/dev/ttyAMA3,1000000,rtscts"
# 48_2_1: 48kHz, 100 octets/frame
config.auracast_sampling_rate_hz = 48000
config.octets_per_frame = 100
big = AuracastBigConfig()
big.random_address = "F1:F1:F2:F3:F4:F5"
big.audio_source = "file:./testdata/announcement_en.wav"
big.num_bis = 1
big.id = 12
run_async(
broadcast(
config,
[big],
)
)

View File

@@ -0,0 +1,41 @@
"""
BAP/BSRC/SCC/BV-14-C and BAP/BSRC/STR/BV-14-C: Config Broadcast, LC3 48_4_1
Configuration: 48kHz, 120 octets/frame, stereo (2 BISes)
"""
import logging
import os
from auracast.auracast_config import AuracastGlobalConfig, AuracastBigConfig, AuracastQosFast
from auracast.multicast import broadcast, run_async
if __name__ == "__main__":
logging.basicConfig(
level=os.environ.get("LOG_LEVEL", logging.INFO),
format="%(module)s.py:%(lineno)d %(levelname)s: %(message)s",
)
os.chdir(os.path.join(os.path.dirname(__file__), "../../auracast"))
config = AuracastGlobalConfig()
config.qos_config = AuracastQosFast()
config.transport = "serial:/dev/ttyAMA3,1000000,rtscts"
# 48_4_1: 48kHz, 120 octets/frame
config.auracast_sampling_rate_hz = 48000
config.octets_per_frame = 120
big = AuracastBigConfig()
big.random_address = "F1:F1:F2:F3:F4:F5"
big.audio_source = "file:./testdata/announcement_en.wav"
big.num_bis = 1
big.id = 12
run_async(
broadcast(
config,
[big],
)
)

View File

@@ -0,0 +1,41 @@
"""
BAP/BSRC/SCC/BV-16-C and BAP/BSRC/STR/BV-16-C: Config Broadcast, LC3 48_6_1
Configuration: 48kHz, 155 octets/frame, stereo (2 BISes)
"""
import logging
import os
from auracast.auracast_config import AuracastGlobalConfig, AuracastBigConfig, AuracastQosFast
from auracast.multicast import broadcast, run_async
if __name__ == "__main__":
logging.basicConfig(
level=os.environ.get("LOG_LEVEL", logging.INFO),
format="%(module)s.py:%(lineno)d %(levelname)s: %(message)s",
)
os.chdir(os.path.join(os.path.dirname(__file__), "../../auracast"))
config = AuracastGlobalConfig()
config.qos_config = AuracastQosFast()
config.transport = "serial:/dev/ttyAMA3,1000000,rtscts"
# 48_6_1: 48kHz, 155 octets/frame
config.auracast_sampling_rate_hz = 48000
config.octets_per_frame = 155
big = AuracastBigConfig()
big.random_address = "F1:F1:F2:F3:F4:F5"
big.audio_source = "file:./testdata/announcement_en.wav"
big.num_bis = 1
big.id = 12
run_async(
broadcast(
config,
[big],
)
)

View File

@@ -0,0 +1,41 @@
"""
BAP/BSRC/STR/BV-21-C: BSRC, Multiple BISes, LC3 16_2
Configuration: 16kHz, 40 octets/frame, stereo (2 BISes)
"""
import logging
import os
from auracast.auracast_config import AuracastGlobalConfig, AuracastBigConfig, AuracastQosFast
from auracast.multicast import broadcast, run_async
if __name__ == "__main__":
logging.basicConfig(
level=os.environ.get("LOG_LEVEL", logging.INFO),
format="%(module)s.py:%(lineno)d %(levelname)s: %(message)s",
)
os.chdir(os.path.join(os.path.dirname(__file__), "../../auracast"))
config = AuracastGlobalConfig()
config.qos_config = AuracastQosFast()
config.transport = "serial:/dev/ttyAMA3,1000000,rtscts"
# 16_2: 16kHz, 40 octets/frame
config.auracast_sampling_rate_hz = 16000
config.octets_per_frame = 40
big = AuracastBigConfig()
big.random_address = "F1:F1:F2:F3:F4:F5"
big.audio_source = "file:./testdata/announcement_es_stereo.wav"
big.id = 12
big.num_bis = 2 # stereo (multiple BISes)
run_async(
broadcast(
config,
[big],
)
)

View File

@@ -0,0 +1,41 @@
"""
BAP/BSRC/STR/BV-23-C: BSRC, Multiple BISes, LC3 24_2
Configuration: 24kHz, 60 octets/frame, stereo (2 BISes)
"""
import logging
import os
from auracast.auracast_config import AuracastGlobalConfig, AuracastBigConfig, AuracastQosFast
from auracast.multicast import broadcast, run_async
if __name__ == "__main__":
logging.basicConfig(
level=os.environ.get("LOG_LEVEL", logging.INFO),
format="%(module)s.py:%(lineno)d %(levelname)s: %(message)s",
)
os.chdir(os.path.join(os.path.dirname(__file__), "../../auracast"))
config = AuracastGlobalConfig()
config.qos_config = AuracastQosFast()
config.transport = "serial:/dev/ttyAMA3,1000000,rtscts"
# 24_2: 24kHz, 60 octets/frame
config.auracast_sampling_rate_hz = 24000
config.octets_per_frame = 60
big = AuracastBigConfig()
big.random_address = "F1:F1:F2:F3:F4:F5"
big.audio_source = "file:./testdata/announcement_es_stereo.wav"
big.id = 12
big.num_bis = 2 # stereo (multiple BISes)
run_async(
broadcast(
config,
[big],
)
)

View File

@@ -0,0 +1,41 @@
"""
BAP/BSRC/STR/BV-29-C: BSRC, Multiple BISes, LC3 48_2
Configuration: 48kHz, 100 octets/frame, stereo (2 BISes)
"""
import logging
import os
from auracast.auracast_config import AuracastGlobalConfig, AuracastBigConfig, AuracastQosFast
from auracast.multicast import broadcast, run_async
if __name__ == "__main__":
logging.basicConfig(
level=os.environ.get("LOG_LEVEL", logging.INFO),
format="%(module)s.py:%(lineno)d %(levelname)s: %(message)s",
)
os.chdir(os.path.join(os.path.dirname(__file__), "../../auracast"))
config = AuracastGlobalConfig()
config.qos_config = AuracastQosFast()
config.transport = "serial:/dev/ttyAMA3,1000000,rtscts"
# 48_2: 48kHz, 100 octets/frame
config.auracast_sampling_rate_hz = 48000
config.octets_per_frame = 100
big = AuracastBigConfig()
big.random_address = "F1:F1:F2:F3:F4:F5"
big.audio_source = "file:./testdata/announcement_es_stereo.wav"
big.id = 12
big.num_bis = 2 # stereo (multiple BISes)
run_async(
broadcast(
config,
[big],
)
)

View File

@@ -0,0 +1,41 @@
"""
BAP/BSRC/STR/BV-31-C: BSRC, Multiple BISes, LC3 48_4
Configuration: 48kHz, 120 octets/frame, stereo (2 BISes)
"""
import logging
import os
from auracast.auracast_config import AuracastGlobalConfig, AuracastBigConfig, AuracastQosFast
from auracast.multicast import broadcast, run_async
if __name__ == "__main__":
logging.basicConfig(
level=os.environ.get("LOG_LEVEL", logging.INFO),
format="%(module)s.py:%(lineno)d %(levelname)s: %(message)s",
)
os.chdir(os.path.join(os.path.dirname(__file__), "../../auracast"))
config = AuracastGlobalConfig()
config.qos_config = AuracastQosFast()
config.transport = "serial:/dev/ttyAMA3,1000000,rtscts"
# 48_4: 48kHz, 120 octets/frame
config.auracast_sampling_rate_hz = 48000
config.octets_per_frame = 120
big = AuracastBigConfig()
big.random_address = "F1:F1:F2:F3:F4:F5"
big.audio_source = "file:./testdata/announcement_es_stereo.wav"
big.id = 12
big.num_bis = 2 # stereo (multiple BISes)
run_async(
broadcast(
config,
[big],
)
)

View File

@@ -0,0 +1,41 @@
"""
BAP/BSRC/STR/BV-33-C: BSRC, Multiple BISes, LC3 48_6
Configuration: 48kHz, 155 octets/frame, stereo (2 BISes)
"""
import logging
import os
from auracast.auracast_config import AuracastGlobalConfig, AuracastBigConfig, AuracastQosFast
from auracast.multicast import broadcast, run_async
if __name__ == "__main__":
logging.basicConfig(
level=os.environ.get("LOG_LEVEL", logging.INFO),
format="%(module)s.py:%(lineno)d %(levelname)s: %(message)s",
)
os.chdir(os.path.join(os.path.dirname(__file__), "../../auracast"))
config = AuracastGlobalConfig()
config.qos_config = AuracastQosFast()
config.transport = "serial:/dev/ttyAMA3,1000000,rtscts"
# 48_6: 48kHz, 155 octets/frame
config.auracast_sampling_rate_hz = 48000
config.octets_per_frame = 155
big = AuracastBigConfig()
big.random_address = "F1:F1:F2:F3:F4:F5"
big.audio_source = "file:./testdata/announcement_es_stereo.wav"
big.id = 12
big.num_bis = 2 # stereo (multiple BISes)
run_async(
broadcast(
config,
[big],
)
)

View File

@@ -0,0 +1,46 @@
"""
CAP/INI/BST/BV-01-C and CAP/INI/BST/BV-05-C:
- BV-01-C: Broadcast Audio Starting for Single Audio Stream
- BV-05-C: Broadcast Audio Starting for Single Audio Streams - Single CCID
Make sure to set TSPX_BST_CODEC_CONFIG to 16_2_1
Restart the stream when asked to terminate.
"""
import logging
import os
from auracast.auracast_config import AuracastGlobalConfig, AuracastBigConfig, AuracastQosRobust
from auracast.multicast import broadcast, run_async
if __name__ == "__main__":
logging.basicConfig(
level=os.environ.get("LOG_LEVEL", logging.INFO),
format="%(module)s.py:%(lineno)d %(levelname)s: %(message)s",
)
os.chdir(os.path.join(os.path.dirname(__file__), "../../auracast"))
# Start from default global config
config = AuracastGlobalConfig()
# Use same QoS profile as multicast main
config.qos_config = AuracastQosRobust()
# Transport similar to multicast main; adjust if needed for your setup
# config.transport = "auto" # let multicast auto-detect
config.transport = "serial:/dev/ttyAMA3,1000000,rtscts" # Raspberry Pi default
# Default BIG, only modify the random address as requested
big = AuracastBigConfig()
big.random_address = "F1:F1:F2:F3:F4:F5"
big.audio_source = "file:./testdata/announcement_en.wav"
big.id = 12
run_async(
broadcast(
config,
[big],
)
)

View File

@@ -0,0 +1,42 @@
"""
"""
import logging
import os
from auracast.auracast_config import AuracastGlobalConfig, AuracastBigConfig, AuracastQosRobust
from auracast.multicast import broadcast, run_async
if __name__ == "__main__":
logging.basicConfig(
level=os.environ.get("LOG_LEVEL", logging.INFO),
format="%(module)s.py:%(lineno)d %(levelname)s: %(message)s",
)
# Ensure relative audio paths like in AuracastBigConfig work (./auracast/...) from src/auracast/
os.chdir(os.path.join(os.path.dirname(__file__), "../../auracast"))
# Start from default global config
config = AuracastGlobalConfig()
# Use same QoS profile as multicast main
config.qos_config = AuracastQosRobust()
# Transport similar to multicast main; adjust if needed for your setup
# config.transport = "auto" # let multicast auto-detect
config.transport = "serial:/dev/ttyAMA3,1000000,rtscts" # Raspberry Pi default
# Stereo BIG with 2 BISes (FRONT_LEFT + FRONT_RIGHT)
big = AuracastBigConfig()
big.random_address = "F1:F1:F2:F3:F4:F5"
big.audio_source = "file:./testdata/announcement_es_stereo.wav"
big.id = 12
big.num_bis = 2 # stereo: 2 BISes
run_async(
broadcast(
config,
[big],
)
)

View File

@@ -0,0 +1,100 @@
"""
GAP/BROB/BCST/BV-01-C: Broadcaster role with non-connectable advertising.
Advertising with TSPX_advertising_data value (27 bytes):
- Flags: BR/EDR Not Supported
- 16-bit Service UUIDs: 0x1800, 0x1801
- Local Name: "PTS-GAP-06B8"
- Appearance: 0x0000
"""
import asyncio
import logging
import os
import bumble.device
import bumble.transport
from bumble import hci
from bumble.device import DeviceConfiguration, AdvertisingParameters, AdvertisingEventProperties
async def run_broadcaster():
"""Configure and start non-connectable advertising for GAP/BROB/BCST/BV-01-C."""
# Transport - adjust as needed for your setup
transport_str = "serial:/dev/ttyAMA3,1000000,rtscts"
async with await bumble.transport.open_transport(transport_str) as (hci_source, hci_sink):
# Device configuration
device_config = DeviceConfiguration(
name="PTS-GAP-06B8",
address=hci.Address("F1:F1:F2:F3:F4:F5"),
)
device = bumble.device.Device.from_config_with_hci(
device_config,
hci_source,
hci_sink,
)
await device.power_on()
# Exact advertising data payload (27 bytes) as specified:
# 0x02, 0x01, 0x04 - Flags: BR/EDR Not Supported
# 0x05, 0x03, 0x00, 0x18, 0x01, 0x18 - 16-bit Service UUIDs: 0x1800, 0x1801
# 0x0D, 0x09, 0x50, 0x54, 0x53, 0x2D, 0x47, - Complete Local Name: "PTS-GAP-06B8"
# 0x41, 0x50, 0x2D, 0x30, 0x36, 0x42, 0x38
# 0x03, 0x19, 0x00, 0x00 - Appearance: 0x0000
adv_data = bytes([
0x02, 0x01, 0x04, # Flags: BR/EDR Not Supported
0x05, 0x03, 0x00, 0x18, 0x01, 0x18, # 16-bit Service UUIDs
0x0D, 0x09, 0x50, 0x54, 0x53, 0x2D, 0x47, 0x41, # Local Name: "PTS-GAP-06B8"
0x50, 0x2D, 0x30, 0x36, 0x42, 0x38,
0x03, 0x19, 0x00, 0x00 # Appearance
])
logging.info("Advertising data (%d bytes): %s", len(adv_data), adv_data.hex())
# Create advertising set with non-connectable parameters (ADV_NONCONN_IND equivalent)
advertising_set = await device.create_advertising_set(
advertising_parameters=AdvertisingParameters(
advertising_event_properties=AdvertisingEventProperties(
is_connectable=False, # Non-connectable (ADV_NONCONN_IND)
is_scannable=False, # Not scannable
is_directed=False,
is_high_duty_cycle_directed_connectable=False,
is_legacy=True, # Use legacy advertising PDUs
is_anonymous=False,
),
primary_advertising_interval_min=0x0800, # 1.28s
primary_advertising_interval_max=0x0800, # 1.28s
primary_advertising_phy=hci.Phy.LE_1M,
),
advertising_data=adv_data,
auto_start=True,
)
logging.info("Non-connectable advertising started (ADV_NONCONN_IND)")
logging.info("Advertising set handle: %s", advertising_set.advertising_handle)
# Keep advertising until interrupted
logging.info("Press Ctrl+C to stop...")
try:
while True:
await asyncio.sleep(1)
except asyncio.CancelledError:
pass
finally:
await advertising_set.stop()
logging.info("Advertising stopped")
if __name__ == "__main__":
logging.basicConfig(
level=os.environ.get("LOG_LEVEL", logging.INFO),
format="%(module)s.py:%(lineno)d %(levelname)s: %(message)s",
)
try:
asyncio.run(run_broadcaster())
except KeyboardInterrupt:
logging.info("Interrupted by user")

View File

@@ -0,0 +1,102 @@
"""
GAP/CONN/NCON/BV-01-C: Non-Connectable Mode.
PTS Action: Select YES when asked "Does the IUT have an ability to send
non-connectable advertising report?"
Configuration (same as GAP/BROB/BCST/BV-01-C):
- Advertising_Type: 0x03 (ADV_NONCONN_IND)
- Flags AD Type (0x01): 0x04 (Not Discoverable, BR/EDR Not Supported)
- Legacy non-connectable advertising packet
"""
import asyncio
import logging
import os
import bumble.device
import bumble.transport
from bumble import hci
from bumble.device import DeviceConfiguration, AdvertisingParameters, AdvertisingEventProperties
async def run_non_connectable():
"""Configure and start non-connectable advertising for GAP/CONN/NCON/BV-01-C."""
# Transport - adjust as needed for your setup
transport_str = "serial:/dev/ttyAMA3,1000000,rtscts"
async with await bumble.transport.open_transport(transport_str) as (hci_source, hci_sink):
# Device configuration
device_config = DeviceConfiguration(
name="PTS-GAP-06B8",
address=hci.Address("F1:F1:F2:F3:F4:F5"),
)
device = bumble.device.Device.from_config_with_hci(
device_config,
hci_source,
hci_sink,
)
await device.power_on()
# Exact advertising data payload (27 bytes) as specified:
# 0x02, 0x01, 0x04 - Flags: BR/EDR Not Supported
# 0x05, 0x03, 0x00, 0x18, 0x01, 0x18 - 16-bit Service UUIDs: 0x1800, 0x1801
# 0x0D, 0x09, 0x50, 0x54, 0x53, 0x2D, 0x47, - Complete Local Name: "PTS-GAP-06B8"
# 0x41, 0x50, 0x2D, 0x30, 0x36, 0x42, 0x38
# 0x03, 0x19, 0x00, 0x00 - Appearance: 0x0000
adv_data = bytes([
0x02, 0x01, 0x04, # Flags: BR/EDR Not Supported
0x05, 0x03, 0x00, 0x18, 0x01, 0x18, # 16-bit Service UUIDs
0x0D, 0x09, 0x50, 0x54, 0x53, 0x2D, 0x47, 0x41, # Local Name: "PTS-GAP-06B8"
0x50, 0x2D, 0x30, 0x36, 0x42, 0x38,
0x03, 0x19, 0x00, 0x00 # Appearance
])
logging.info("Advertising data (%d bytes): %s", len(adv_data), adv_data.hex())
# Create advertising set with non-connectable parameters (ADV_NONCONN_IND equivalent)
advertising_set = await device.create_advertising_set(
advertising_parameters=AdvertisingParameters(
advertising_event_properties=AdvertisingEventProperties(
is_connectable=False, # Non-connectable (ADV_NONCONN_IND)
is_scannable=False, # Not scannable
is_directed=False,
is_high_duty_cycle_directed_connectable=False,
is_legacy=True, # Use legacy advertising PDUs
is_anonymous=False,
),
primary_advertising_interval_min=0x0800, # 1.28s
primary_advertising_interval_max=0x0800, # 1.28s
primary_advertising_phy=hci.Phy.LE_1M,
),
advertising_data=adv_data,
auto_start=True,
)
logging.info("Non-connectable advertising started (ADV_NONCONN_IND)")
logging.info("Advertising set handle: %s", advertising_set.advertising_handle)
# Keep advertising until interrupted
logging.info("Press Ctrl+C to stop...")
try:
while True:
await asyncio.sleep(1)
except asyncio.CancelledError:
pass
finally:
await advertising_set.stop()
logging.info("Advertising stopped")
if __name__ == "__main__":
logging.basicConfig(
level=os.environ.get("LOG_LEVEL", logging.INFO),
format="%(module)s.py:%(lineno)d %(levelname)s: %(message)s",
)
try:
asyncio.run(run_non_connectable())
except KeyboardInterrupt:
logging.info("Interrupted by user")

View File

@@ -0,0 +1,45 @@
"""
PBP/PBS/PBM/BV-01-C: Transmit Program_Info Metadata
Configuration: 16kHz, unencrypted, stereo (2 BISes)
Program_Info metadata: 0x00112233445566778899AABBCCDDEEFF
"""
import logging
import os
from auracast.auracast_config import AuracastGlobalConfig, AuracastBigConfig, AuracastQosFast
from auracast.multicast import broadcast, run_async
if __name__ == "__main__":
logging.basicConfig(
level=os.environ.get("LOG_LEVEL", logging.INFO),
format="%(module)s.py:%(lineno)d %(levelname)s: %(message)s",
)
os.chdir(os.path.join(os.path.dirname(__file__), "../../auracast"))
config = AuracastGlobalConfig()
config.qos_config = AuracastQosFast()
config.transport = "serial:/dev/ttyAMA3,1000000,rtscts"
# 16_2_1: 16kHz, stereo
config.auracast_sampling_rate_hz = 16000
config.octets_per_frame = 40
big = AuracastBigConfig()
big.random_address = "F1:F1:F2:F3:F4:F5"
big.audio_source = "file:./testdata/announcement_en.wav"
big.num_bis = 1
big.id = 12
big.name = "Broadcast"
# Program_Info metadata: 00112233445566778899AABBCCDDEEFF
big.program_info = bytes.fromhex("00112233445566778899AABBCCDDEEFF").decode('latin-1')
run_async(
broadcast(
config,
[big],
)
)

View File

@@ -0,0 +1,45 @@
"""
PBP/PBS/STR/BV-01-C: Standard Quality Streaming Support, 16_2_1 - PBS
(TSPC_PBP_1_1 AND TSPC_PBP_7_1) OR TSPC_ALL
Configuration: 16kHz, unencrypted, stereo (2 BISes)
PBP Features: 0x02 (Standard Quality)
"""
import logging
import os
from auracast.auracast_config import AuracastGlobalConfig, AuracastBigConfig, AuracastQosFast
from auracast.multicast import broadcast, run_async
if __name__ == "__main__":
logging.basicConfig(
level=os.environ.get("LOG_LEVEL", logging.INFO),
format="%(module)s.py:%(lineno)d %(levelname)s: %(message)s",
)
os.chdir(os.path.join(os.path.dirname(__file__), "../../auracast"))
config = AuracastGlobalConfig()
config.qos_config = AuracastQosFast()
config.transport = "serial:/dev/ttyAMA3,1000000,rtscts"
# 16_2_1: 16kHz, stereo
config.auracast_sampling_rate_hz = 16000
config.octets_per_frame = 40
big = AuracastBigConfig()
big.random_address = "F1:F1:F2:F3:F4:F5"
big.audio_source = "file:./testdata/announcement_en.wav"
big.id = 12
big.num_bis = 1
big.name = "Broadcaster"
# Unencrypted (no code)
run_async(
broadcast(
config,
[big],
)
)

View File

@@ -0,0 +1,46 @@
"""
PBP/PBS/STR/BV-02-C: High Quality Streaming Support - PBS
(TSPC_PBP_1_1 AND TSPC_PBP_6_5) OR TSPC_ALL
Configuration: 48kHz, unencrypted, stereo (2 BISes)
PBP Features: 0x04 (High Quality)
"""
import logging
import os
from auracast.auracast_config import AuracastGlobalConfig, AuracastBigConfig, AuracastQosFast
from auracast.multicast import broadcast, run_async
if __name__ == "__main__":
logging.basicConfig(
level=os.environ.get("LOG_LEVEL", logging.INFO),
format="%(module)s.py:%(lineno)d %(levelname)s: %(message)s",
)
os.chdir(os.path.join(os.path.dirname(__file__), "../../auracast"))
config = AuracastGlobalConfig()
config.qos_config = AuracastQosFast()
config.transport = "serial:/dev/ttyAMA3,1000000,rtscts"
# High Quality: 48kHz, 48_1_1 configuration
config.auracast_sampling_rate_hz = 48000
config.octets_per_frame = 75 # 48_1_1: 48kHz, 75 octets/frame
config.frame_duration_us = 7500 # 7.5ms frame duration for 48_1_1
big = AuracastBigConfig()
big.random_address = "F1:F1:F2:F3:F4:F5"
big.audio_source = "file:./testdata/announcement_en.wav"
big.num_bis = 1
big.id = 12
big.name = "Broadcaster"
# Unencrypted (no code)
run_async(
broadcast(
config,
[big],
)
)

View File

@@ -0,0 +1,45 @@
"""
PBP/PBS/STR/BV-03-C: Encrypted Streaming Support, Standard Quality - PBS
(TSPC_PBP_1_1 AND TSPC_PBP_6_6 AND TSPC_PBP_6_4) OR TSPC_ALL
Configuration: 16kHz, encrypted, stereo (2 BISes)
PBP Features: 0x03 (Standard Quality + Encrypted)
"""
import logging
import os
from auracast.auracast_config import AuracastGlobalConfig, AuracastBigConfig, AuracastQosFast
from auracast.multicast import broadcast, run_async
if __name__ == "__main__":
logging.basicConfig(
level=os.environ.get("LOG_LEVEL", logging.INFO),
format="%(module)s.py:%(lineno)d %(levelname)s: %(message)s",
)
os.chdir(os.path.join(os.path.dirname(__file__), "../../auracast"))
config = AuracastGlobalConfig()
config.qos_config = AuracastQosFast()
config.transport = "serial:/dev/ttyAMA3,1000000,rtscts"
# Standard Quality: 16kHz
config.auracast_sampling_rate_hz = 16000
config.octets_per_frame = 40
big = AuracastBigConfig()
big.random_address = "F1:F1:F2:F3:F4:F5"
big.audio_source = "file:./testdata/announcement_en.wav"
big.num_bis = 1
big.id = 12
big.name = "Broadcaster"
big.code = "0x0102680553F1415AA265BBAFC6EA03B8" # Encrypted (hex format)
run_async(
broadcast(
config,
[big],
)
)

View File

@@ -0,0 +1,46 @@
"""
PBP/PBS/STR/BV-04-C: Encrypted Streaming Support, High Quality - PBS
(TSPC_PBP_1_1 AND TSPC_PBP_6_6 AND TSPC_PBP_6_5) OR TSPC_ALL
Configuration: 48kHz, encrypted, stereo (2 BISes)
PBP Features: 0x05 (High Quality + Encrypted)
"""
import logging
import os
from auracast.auracast_config import AuracastGlobalConfig, AuracastBigConfig, AuracastQosFast
from auracast.multicast import broadcast, run_async
if __name__ == "__main__":
logging.basicConfig(
level=os.environ.get("LOG_LEVEL", logging.INFO),
format="%(module)s.py:%(lineno)d %(levelname)s: %(message)s",
)
os.chdir(os.path.join(os.path.dirname(__file__), "../../auracast"))
config = AuracastGlobalConfig()
config.qos_config = AuracastQosFast()
config.transport = "serial:/dev/ttyAMA3,1000000,rtscts"
# High Quality: 48kHz, 48_1_1 configuration
config.auracast_sampling_rate_hz = 48000
config.octets_per_frame = 75 # 48_1_1: 48kHz, 75 octets/frame
config.frame_duration_us = 7500 # 7.5ms frame duration for 48_1_1
big = AuracastBigConfig()
big.random_address = "F1:F1:F2:F3:F4:F5"
big.audio_source = "file:./testdata/announcement_en.wav"
big.num_bis = 1
big.id = 12
big.name = "Broadcaster"
big.code = "0x0102680553F1415AA265BBAFC6EA03B8" # Encrypted (hex format)
run_async(
broadcast(
config,
[big],
)
)

View File

@@ -0,0 +1,46 @@
"""
PBP/PBS/STR/BV-05-C: Standard Quality Streaming Support, 16_2_2 - PBS
(TSPC_PBP_1_1 AND TSPC_PBP_7_3) OR TSPC_ALL
Configuration: 16kHz, unencrypted, stereo (2 BISes), QoS 16_2_2
PBP Features: 0x02 (Standard Quality)
"""
import logging
import os
from auracast.auracast_config import AuracastGlobalConfig, AuracastBigConfig, AuracastQosRobust
from auracast.multicast import broadcast, run_async
if __name__ == "__main__":
logging.basicConfig(
level=os.environ.get("LOG_LEVEL", logging.INFO),
format="%(module)s.py:%(lineno)d %(levelname)s: %(message)s",
)
os.chdir(os.path.join(os.path.dirname(__file__), "../../auracast"))
config = AuracastGlobalConfig()
# 16_2_2 uses different QoS (RTN=2, higher latency)
config.qos_config = AuracastQosRobust()
config.transport = "serial:/dev/ttyAMA3,1000000,rtscts"
# 16_2_2: 16kHz
config.auracast_sampling_rate_hz = 16000
config.octets_per_frame = 40
big = AuracastBigConfig()
big.random_address = "F1:F1:F2:F3:F4:F5"
big.audio_source = "file:./testdata/announcement_en.wav"
big.num_bis = 1
big.id = 12
big.name = "Broadcaster"
# Unencrypted (no code)
run_async(
broadcast(
config,
[big],
)
)

View File

@@ -0,0 +1,45 @@
"""
PBP/PBS/STR/BV-06-C: Standard Quality Streaming Support, 24_2_1 - PBS
(TSPC_PBP_1_1 AND TSPC_PBP_7_2) OR TSPC_ALL
Configuration: 24kHz, unencrypted, stereo (2 BISes)
PBP Features: 0x02 (Standard Quality)
"""
import logging
import os
from auracast.auracast_config import AuracastGlobalConfig, AuracastBigConfig, AuracastQosFast
from auracast.multicast import broadcast, run_async
if __name__ == "__main__":
logging.basicConfig(
level=os.environ.get("LOG_LEVEL", logging.INFO),
format="%(module)s.py:%(lineno)d %(levelname)s: %(message)s",
)
os.chdir(os.path.join(os.path.dirname(__file__), "../../auracast"))
config = AuracastGlobalConfig()
config.qos_config = AuracastQosFast()
config.transport = "serial:/dev/ttyAMA3,1000000,rtscts"
# 24_2_1: 24kHz
config.auracast_sampling_rate_hz = 24000
config.octets_per_frame = 60
big = AuracastBigConfig()
big.random_address = "F1:F1:F2:F3:F4:F5"
big.audio_source = "file:./testdata/announcement_en.wav"
big.num_bis = 1
big.id = 12
big.name = "Broadcaster"
# Unencrypted (no code)
run_async(
broadcast(
config,
[big],
)
)

View File

@@ -0,0 +1,46 @@
"""
PBP/PBS/STR/BV-07-C: Standard Quality Streaming Support, 24_2_2 - PBS
(TSPC_PBP_1_1 AND TSPC_PBP_7_4) OR TSPC_ALL
Configuration: 24kHz, unencrypted, stereo (2 BISes), QoS 24_2_2
PBP Features: 0x02 (Standard Quality)
"""
import logging
import os
from auracast.auracast_config import AuracastGlobalConfig, AuracastBigConfig, AuracastQosRobust
from auracast.multicast import broadcast, run_async
if __name__ == "__main__":
logging.basicConfig(
level=os.environ.get("LOG_LEVEL", logging.INFO),
format="%(module)s.py:%(lineno)d %(levelname)s: %(message)s",
)
os.chdir(os.path.join(os.path.dirname(__file__), "../../auracast"))
config = AuracastGlobalConfig()
# 24_2_2 uses different QoS (RTN=2, higher latency)
config.qos_config = AuracastQosRobust()
config.transport = "serial:/dev/ttyAMA3,1000000,rtscts"
# 24_2_2: 24kHz
config.auracast_sampling_rate_hz = 24000
config.octets_per_frame = 60
big = AuracastBigConfig()
big.random_address = "F1:F1:F2:F3:F4:F5"
big.audio_source = "file:./testdata/announcement_en.wav"
big.num_bis = 1
big.id = 12
big.name = "Broadcaster"
# Unencrypted (no code)
run_async(
broadcast(
config,
[big],
)
)

View File

@@ -0,0 +1,33 @@
# Qualification procedure
# flash a qualifiaction dongle
- To use a normal nRF5240 Nordic dongle
- See https://bluekitchen-gmbh.com/bluetooth-pts-with-nordic-nrf52840-usb-dongle/ for Nordic nRF52 dev dongle
- Install PTS Firmware Upgrade Software
- Plug-in nRF52840 USB Dongle
- Start PTS Firmware Update Software
- If you click on 'OK', updating the bootloader will fail (the Nordic bootloader on the nRF52840 USB cannot be updated via DFU)
- Close the software
- Open an Explorer window and navigate to C:\Program Files (x86)\Bluetooth SIG\PTS Firmware Upgrade Software\tools and copy the file nrfutil.exe
- Navigate to AppData\Local\PTSFirmwareUpgradeSoftware within your user folder and paste the nrfutil.exe into this folder
- Note the file with the UUID128-like file name as you'll need it soon
- Open a PowerShell via File->Open Windows PowerShell as a regular user
- Reset the nRF52840 USB Dongle by pressing the smaller button (labeled 'RESET') to enter DFU mode
- A red LED should start flashing
- Run the nrfutil.exe with the .bin file (it's actually a ZIP archive) with the UUID128-like name
- Or just press the TAB key: .\nrfutil dfu usb-serial -pkg be4d3ab8-9c98-408a-8be4-18acf4b32d28.zip -p COM4
- Et voila, the nRF52840 USB Dongle can be used with PTS
# PTS ixit prerequisites
In BAP set
- Broadcast_ID=12
- Broadcast_ID_2=13
In CAP set
- TSPX_BST_CODEC_CONFIG=16_2_1
Everywhere set
- use STREAMING_DATA_CONFIRMATION_METHOD=By Playing
# Notes
- some testcases are just passed by restarting the stream.

View File

@@ -1,16 +0,0 @@
import sounddevice as sd, pprint
from auracast.utils.sounddevice_utils import devices_by_backend
print("PortAudio library:", sd._libname)
print("PortAudio version:", sd.get_portaudio_version())
print("\nHost APIs:")
pprint.pprint(sd.query_hostapis())
print("\nDevices:")
pprint.pprint(sd.query_devices())
# Example: only PulseAudio devices on Linux
print("\nOnly PulseAudio devices:")
for i, d in devices_by_backend("PulseAudio"):
print(f"{i}: {d['name']} in={d['max_input_channels']} out={d['max_output_channels']}")

View File

@@ -0,0 +1,47 @@
import sounddevice as sd, pprint
from auracast.utils.sounddevice_utils import (
devices_by_backend,
get_alsa_inputs,
get_alsa_usb_inputs,
get_network_pw_inputs,
refresh_pw_cache,
)
print("PortAudio library:", sd._libname)
print("PortAudio version:", sd.get_portaudio_version())
print("\nHost APIs:")
apis = sd.query_hostapis()
pprint.pprint(apis)
print("\nAll Devices (with host API name):")
devs = sd.query_devices()
for i, d in enumerate(devs):
ha_name = apis[d['hostapi']]['name'] if isinstance(d.get('hostapi'), int) and d['hostapi'] < len(apis) else '?'
if d.get('max_input_channels', 0) > 0:
print(f"IN {i:>3}: {d['name']} api={ha_name} in={d['max_input_channels']}")
elif d.get('max_output_channels', 0) > 0:
print(f"OUT {i:>3}: {d['name']} api={ha_name} out={d['max_output_channels']}")
else:
print(f"DEV {i:>3}: {d['name']} api={ha_name} (no I/O)")
print("\nALSA input devices (PortAudio ALSA host):")
for i, d in devices_by_backend('ALSA'):
if d.get('max_input_channels', 0) > 0:
print(f"ALSA {i:>3}: {d['name']} in={d['max_input_channels']}")
print("\nALSA USB-filtered inputs:")
for i, d in get_alsa_usb_inputs():
print(f"USB {i:>3}: {d['name']} in={d['max_input_channels']}")
print("\nRefreshing PipeWire caches...")
try:
refresh_pw_cache()
except Exception:
pass
print("PipeWire Network inputs (from cache):")
for i, d in get_network_pw_inputs():
print(f"NET {i:>3}: {d['name']} in={d.get('max_input_channels', 0)}")

View File

@@ -0,0 +1,36 @@
import csv
import time
from datetime import datetime
from pathlib import Path
from auracast.utils.read_temp import read_case_temp, read_cpu_temp
def main() -> None:
script_path = Path(__file__).resolve()
log_dir = script_path.parent
start_time = datetime.now()
filename = start_time.strftime("temperature_log_%Y%m%d_%H%M%S.csv")
log_path = log_dir / filename
with log_path.open("w", newline="") as csvfile:
writer = csv.writer(csvfile)
writer.writerow(["timestamp", "cpu_temp_c", "case_temp_c"])
try:
while True:
now = datetime.now().isoformat(timespec="seconds")
cpu_temp = read_cpu_temp()
case_temp = read_case_temp()
writer.writerow([now, cpu_temp, case_temp])
csvfile.flush()
time.sleep(30)
except KeyboardInterrupt:
pass
if __name__ == "__main__":
main()

View File

@@ -1,8 +0,0 @@
from smbus2 import SMBus
addr = 0x48 # change if your scan shows different
with SMBus(1) as bus:
msb, lsb = bus.read_i2c_block_data(addr, 0x00, 2)
raw = ((msb << 8) | lsb) >> 4
if raw & 0x800: # sign bit for 12-bit
raw -= 1 << 12
print(f"{raw * 0.0625:.2f} °C")

View File

@@ -9,6 +9,9 @@ ExecStart=/home/caster/.local/bin/poetry run python src/auracast/server/multicas
Restart=on-failure
Environment=PYTHONUNBUFFERED=1
Environment=LOG_LEVEL=INFO
CPUSchedulingPolicy=fifo
CPUSchedulingPriority=99
LimitRTPRIO=99
[Install]
WantedBy=default.target