bugfixes/prepare_for_first_unit (#8)

- update ui
- implement different features like restart
- bugfixes

Co-authored-by: pstruebi <struebin.patrick.com>
Reviewed-on: https://gitea.pstruebi.xyz/auracaster/bumble-auracast/pulls/8
This commit was merged in pull request #8.
This commit is contained in:
2025-10-06 12:16:39 +02:00
parent a9ea04ed87
commit 430fb1009d
15 changed files with 832 additions and 453 deletions

1
.gitignore vendored
View File

@@ -42,3 +42,4 @@ records/DISABLE_FRONTEND_PW
src/auracast/server/stream_settings.json
src/auracast/server/certs/per_device/
src/auracast/.env
src/auracast/server/certs/ca/ca_cert.srl

View File

@@ -75,6 +75,8 @@ stty -F /dev/ttyAMA3 -a | grep -o 'hupcl' || echo "-hupcl is set"
# Audio latency
if there is hearable audio error with aes67, tune sess.latency.msec in pipewire-aes67.conf
if latency is piling up something may be blocking the event loop in multicast_server.py - the event loop must never block at any time
---
After completing these steps, your device will be discoverable as `<hostname>.<domain>` (e.g., `box1.auracast.local`) on the local network via mDNS.

8
poetry.lock generated
View File

@@ -332,7 +332,7 @@ files = [
[[package]]
name = "bumble"
version = "0.0.216.dev1+g6eba81e3d"
version = "0.0.218.dev6+g32d448edf"
description = "Bluetooth Stack for Apps, Emulation, Test and Experimentation"
optional = false
python-versions = ">=3.9"
@@ -371,8 +371,8 @@ test = ["coverage (>=6.4)", "pytest (>=8.2)", "pytest-asyncio (>=0.23.5)", "pyte
[package.source]
type = "git"
url = "ssh://git@ssh.pstruebi.xyz:222/auracaster/bumble_mirror.git"
reference = "6eba81e3ddb8ac0e4c336ca244892a0a8d43ba1c"
resolved_reference = "6eba81e3ddb8ac0e4c336ca244892a0a8d43ba1c"
reference = "32d448edf3276f6b9056765a12879054d8a01fd8"
resolved_reference = "32d448edf3276f6b9056765a12879054d8a01fd8"
[[package]]
name = "cachetools"
@@ -2952,4 +2952,4 @@ test = ["pytest", "pytest-asyncio"]
[metadata]
lock-version = "2.1"
python-versions = ">=3.11"
content-hash = "3afe565be2664b3d7f1cfdb1c5a73d931c14e97d3622aef24ba2f06f78e00e2b"
content-hash = "6b5300c349ed045e8fd3e617e6262bbd7e5c48c518e4c62cedf7c17da50ce8c0"

View File

@@ -4,7 +4,7 @@ version = "0.0.1"
requires-python = ">=3.11"
dependencies = [
"bumble @ git+ssh://git@ssh.pstruebi.xyz:222/auracaster/bumble_mirror.git@6eba81e3ddb8ac0e4c336ca244892a0a8d43ba1c",
"bumble @ git+ssh://git@ssh.pstruebi.xyz:222/auracaster/bumble_mirror.git@32d448edf3276f6b9056765a12879054d8a01fd8",
"lc3py @ git+ssh://git@ssh.pstruebi.xyz:222/auracaster/liblc3.git@ce2e41faf8c06d038df9f32504c61109a14130be",
"aioconsole",
"fastapi==0.115.11",

View File

@@ -313,8 +313,8 @@ async def init_broadcast(
bigs[f'big{i}']['iso_queue'] = iso_queue
logging.debug(f'big{i} parameters are:')
logging.debug('%s', pprint.pformat(vars(big)))
logging.info(f'big{i} parameters are:')
logging.info('%s', pprint.pformat(vars(big)))
logging.info(f'Finished setup of big{i}.')
await asyncio.sleep(i+1) # Wait for advertising to set up
@@ -375,18 +375,51 @@ class Streamer():
if self.task is not None:
self.task.cancel()
# Let cancellation propagate to the stream() coroutine
await asyncio.sleep(0.01)
self.task = None
# Close audio inputs (await to ensure ALSA devices are released)
close_tasks = []
async_closers = []
sync_closers = []
for big in self.bigs.values():
ai = big.get("audio_input")
if ai and hasattr(ai, "close"):
close_tasks.append(ai.close())
# Remove reference so a fresh one is created next time
big.pop("audio_input", None)
if close_tasks:
await asyncio.gather(*close_tasks, return_exceptions=True)
if not ai:
continue
# First close any frames generator backed by the input to stop reads
frames_gen = big.get("frames_gen")
if frames_gen and hasattr(frames_gen, "aclose"):
try:
await frames_gen.aclose()
except Exception:
pass
big.pop("frames_gen", None)
if hasattr(ai, "aclose") and callable(getattr(ai, "aclose")):
async_closers.append(ai.aclose())
elif hasattr(ai, "close") and callable(getattr(ai, "close")):
sync_closers.append(ai.close)
# Remove reference so a fresh one is created next time
big.pop("audio_input", None)
if async_closers:
await asyncio.gather(*async_closers, return_exceptions=True)
for fn in sync_closers:
try:
fn()
except Exception:
pass
# Reset PortAudio to drop lingering PipeWire capture nodes
try:
import sounddevice as _sd
if hasattr(_sd, "_terminate"):
_sd._terminate()
await asyncio.sleep(0.05)
if hasattr(_sd, "_initialize"):
_sd._initialize()
except Exception:
pass
async def stream(self):
@@ -414,6 +447,8 @@ class Streamer():
big['audio_input'] = audio_source
big['encoder'] = encoder
big['precoded'] = False
# Prepare frames generator for graceful shutdown
big['frames_gen'] = big['audio_input'].frames(lc3_frame_samples)
elif audio_source == 'webrtc':
big['audio_input'] = WebRTCAudioInput()
@@ -429,6 +464,8 @@ class Streamer():
big['lc3_bytes_per_frame'] = global_config.octets_per_frame
big['encoder'] = encoder
big['precoded'] = False
# Prepare frames generator for graceful shutdown
big['frames_gen'] = big['audio_input'].frames(lc3_frame_samples)
# precoded lc3 from ram
elif isinstance(big_config[i].audio_source, bytes):
@@ -599,7 +636,12 @@ class Streamer():
stream_finished[i] = True
continue
else: # code lc3 on the fly
pcm_frame = await anext(big['audio_input'].frames(big['lc3_frame_samples']), None)
# Use stored frames generator when available so we can aclose() it on stop
frames_gen = big.get('frames_gen')
if frames_gen is None:
frames_gen = big['audio_input'].frames(big['lc3_frame_samples'])
big['frames_gen'] = frames_gen
pcm_frame = await anext(frames_gen, None)
if pcm_frame is None: # Not all streams may stop at the same time
stream_finished[i] = True

View File

@@ -91,6 +91,8 @@ class Multicaster:
for big in self.bigs.values():
if big.get('advertising_set'):
await big['advertising_set'].stop()
# Explicitly power off the device to ensure a clean state before closing the transport
await self.device.power_off()
await self.device_acm.__aexit__(None, None, None) # Manually triggering teardown

View File

@@ -146,11 +146,12 @@ if __name__ == "__main__":
presentation_delay_us=40000,
qos_config=auracast_config.AuracastQosHigh(),
auracast_sampling_rate_hz = LC3_SRATE,
octets_per_frame = OCTETS_PER_FRAME, # 32kbps@16kHz
octets_per_frame = OCTETS_PER_FRAME,
transport=TRANSPORT1
)
#config.debug = True
logging.info(config.model_dump_json(indent=2))
multicast.run_async(
multicast.broadcast(
config,

View File

@@ -1 +1 @@
5078804E6FBCF893D5537715FD928E46AD576ECA
5078804E6FBCF893D5537715FD928E46AD576ECB

View File

@@ -99,11 +99,19 @@ try:
except Exception:
saved_settings = {}
# Define is_streaming early from the fetched status for use throughout the UI
is_streaming = bool(saved_settings.get("is_streaming", False))
st.title("Auracast Audio Mode Control")
# Audio mode selection with persisted default
# Note: backend persists 'USB' for any device:<name> source (including AES67). We default to 'USB' in that case.
options = ["Demo", "USB", "AES67", "Webapp"]
options = [
"Demo",
"USB",
"AES67",
# "Webapp"
]
saved_audio_mode = saved_settings.get("audio_mode", "Demo")
if saved_audio_mode not in options:
# Map legacy/unknown modes to closest
@@ -146,7 +154,7 @@ if audio_mode == "Demo":
type=("password"),
help="Optional: Set a broadcast code to protect your stream. Leave empty for an open (uncoded) broadcast."
)
col_flags1, col_flags2, col_placeholder = st.columns([1, 1, 2])
col_flags1, col_flags2, col_pdelay, col_rtn = st.columns([1, 1, 1, 1], gap="small")
with col_flags1:
assisted_listening = st.checkbox(
"Assistive listening",
@@ -157,15 +165,29 @@ if audio_mode == "Demo":
"Immediate rendering",
value=bool(saved_settings.get('immediate_rendering', False))
)
# QoS/presentation controls inline with flags
default_pdelay = int(saved_settings.get('presentation_delay_us', 40000) or 40000)
with col_pdelay:
presentation_delay_us = st.number_input(
"Presentation delay (µs)",
min_value=10000, max_value=200000, step=1000, value=default_pdelay,
help="Delay between capture and presentation for receivers."
)
default_rtn = int(saved_settings.get('rtn', 4) or 4)
with col_rtn:
rtn = st.selectbox(
"Retransmissions (RTN)", options=[0,1,2,3,4], index=[0,1,2,3,4].index(default_rtn),
help="Number of ISO retransmissions (higher improves robustness at cost of airtime)."
)
#st.info(f"Demo mode selected: {demo_selected} (Streams: {demo_stream_map[demo_selected]['streams']}, Rate: {demo_stream_map[demo_selected]['rate']} Hz)")
# Start/Stop buttons for demo mode
if 'demo_stream_started' not in st.session_state:
st.session_state['demo_stream_started'] = False
col1, col2 = st.columns(2)
with col1:
start_demo = st.button("Start Demo Stream")
start_demo = st.button("Start Demo Stream", disabled=is_streaming)
with col2:
stop_demo = st.button("Stop Demo Stream")
stop_demo = st.button("Stop Demo Stream", disabled=not is_streaming)
if start_demo:
# Always stop any running stream for clean state
try:
@@ -207,9 +229,15 @@ if audio_mode == "Demo":
config1 = auracast_config.AuracastConfigGroup(
auracast_sampling_rate_hz=q['rate'],
octets_per_frame=q['octets'],
transport='', # is set in backend
transport='', # is set in baccol_qoskend
assisted_listening_stream=assisted_listening,
immediate_rendering=immediate_rendering,
presentation_delay_us=presentation_delay_us,
qos_config=auracast_config.AuracastQoSConfig(
iso_int_multiple_10ms=1,
number_of_retransmissions=int(rtn),
max_transport_latency_ms=int(rtn)*10 + 3,
),
bigs=bigs1
)
config2 = None
@@ -220,6 +248,12 @@ if audio_mode == "Demo":
transport='', # is set in backend
assisted_listening_stream=assisted_listening,
immediate_rendering=immediate_rendering,
presentation_delay_us=presentation_delay_us,
qos_config=auracast_config.AuracastQoSConfig(
iso_int_multiple_10ms=1,
number_of_retransmissions=int(rtn),
max_transport_latency_ms=int(rtn)*10 + 3,
),
bigs=bigs2
)
# Call /init and /init2
@@ -298,8 +332,8 @@ else:
type="password",
help="Optional: Set a broadcast code to protect your stream. Leave empty for an open (uncoded) broadcast."
)
# Flags: Assistive Listening and Immediate Rendering (one row)
col_flags1, col_flags2, col_placeholder = st.columns([1, 1, 2])
# Flags and QoS row (compact, four columns)
col_flags1, col_flags2, col_pdelay, col_rtn = st.columns([1, 1, 1, 1], gap="small")
with col_flags1:
assisted_listening = st.checkbox(
"Assistive listening",
@@ -310,6 +344,20 @@ else:
"Immediate rendering",
value=bool(saved_settings.get('immediate_rendering', False))
)
# QoS/presentation controls inline with flags
default_pdelay = int(saved_settings.get('presentation_delay_us', 40000) or 40000)
with col_pdelay:
presentation_delay_us = st.number_input(
"Presentation delay (µs)",
min_value=10000, max_value=200000, step=1000, value=default_pdelay,
help="Delay between capture and presentation for receivers."
)
default_rtn = int(saved_settings.get('rtn', 4) or 4)
with col_rtn:
rtn = st.selectbox(
"Retransmissions (RTN)", options=[0,1,2,3,4], index=[0,1,2,3,4].index(default_rtn),
help="Number of ISO retransmissions (higher improves robustness at cost of airtime)."
)
# Gain slider for Webapp mode
if audio_mode == "Webapp":
mic_gain = st.slider("Microphone Gain", 0.0, 2.0, 1.0, 0.1, help="Adjust microphone volume sent to Auracast")
@@ -318,86 +366,91 @@ else:
# Input device selection for USB or AES67 mode
if audio_mode in ("USB", "AES67"):
try:
endpoint = "/audio_inputs_pw_usb" if audio_mode == "USB" else "/audio_inputs_pw_network"
resp = requests.get(f"{BACKEND_URL}{endpoint}")
device_list = resp.json().get('inputs', [])
except Exception as e:
st.error(f"Failed to fetch devices: {e}")
device_list = []
if not is_streaming:
# Only query device lists when NOT streaming to avoid extra backend calls
try:
endpoint = "/audio_inputs_pw_usb" if audio_mode == "USB" else "/audio_inputs_pw_network"
resp = requests.get(f"{BACKEND_URL}{endpoint}")
device_list = resp.json().get('inputs', [])
except Exception as e:
st.error(f"Failed to fetch devices: {e}")
device_list = []
# Display "name [id]" but use name as value
input_options = [f"{d['name']} [{d['id']}]" for d in device_list]
option_name_map = {f"{d['name']} [{d['id']}]": d['name'] for d in device_list}
device_names = [d['name'] for d in device_list]
# Display "name [id]" but use name as value
input_options = [f"{d['name']} [{d['id']}]" for d in device_list]
option_name_map = {f"{d['name']} [{d['id']}]": d['name'] for d in device_list}
device_names = [d['name'] for d in device_list]
# Determine default input by name (from persisted server state)
default_input_name = saved_settings.get('input_device')
if default_input_name not in device_names and device_names:
default_input_name = device_names[0]
default_input_label = None
for label, name in option_name_map.items():
if name == default_input_name:
default_input_label = label
break
if not input_options:
warn_text = (
"No USB audio input devices found. Connect a USB input and click Refresh."
if audio_mode == "USB" else
"No AES67/Network inputs found."
)
st.warning(warn_text)
if st.button("Refresh"):
# For completeness, refresh the general audio cache as well
try:
r = requests.post(f"{BACKEND_URL}/refresh_audio_inputs", json={"force": True}, timeout=5)
if r.ok:
jr = r.json()
if jr.get('stopped_stream'):
st.info("An active stream was stopped to perform a full device refresh.")
except Exception as e:
st.error(f"Failed to refresh devices: {e}")
st.rerun()
input_device = None
else:
col1, col2 = st.columns([3, 1], vertical_alignment="bottom")
with col1:
selected_option = st.selectbox(
"Input Device",
input_options,
index=input_options.index(default_input_label) if default_input_label in input_options else 0
# Determine default input by name (from persisted server state)
default_input_name = saved_settings.get('input_device')
if default_input_name not in device_names and device_names:
default_input_name = device_names[0]
default_input_label = None
for label, name in option_name_map.items():
if name == default_input_name:
default_input_label = label
break
if not input_options:
warn_text = (
"No USB audio input devices found. Connect a USB input and click Refresh."
if audio_mode == "USB" else
"No AES67/Network inputs found."
)
with col2:
if st.button("Refresh"):
st.warning(warn_text)
if st.button("Refresh", disabled=is_streaming):
try:
r = requests.post(f"{BACKEND_URL}/refresh_audio_inputs", json={"force": True}, timeout=5)
if r.ok:
jr = r.json()
if jr.get('stopped_stream'):
st.info("An active stream was stopped to perform a full device refresh.")
r = requests.post(f"{BACKEND_URL}/refresh_audio_devices", timeout=8)
if not r.ok:
st.error(f"Failed to refresh: {r.text}")
except Exception as e:
st.error(f"Failed to refresh devices: {e}")
st.rerun()
# Send only the device name to backend
input_device = option_name_map.get(selected_option)
input_device = None
else:
col1, col2 = st.columns([3, 1], vertical_alignment="bottom")
with col1:
selected_option = st.selectbox(
"Input Device",
input_options,
index=input_options.index(default_input_label) if default_input_label in input_options else 0
)
with col2:
if st.button("Refresh", disabled=is_streaming):
try:
r = requests.post(f"{BACKEND_URL}/refresh_audio_devices", timeout=8)
if not r.ok:
st.error(f"Failed to refresh: {r.text}")
except Exception as e:
st.error(f"Failed to refresh devices: {e}")
st.rerun()
# Send only the device name to backend
input_device = option_name_map.get(selected_option)
else:
# When streaming, keep showing the current selection but lock editing.
input_device = saved_settings.get('input_device')
current_label = input_device or "No device selected"
st.selectbox(
"Input Device",
[current_label],
index=0,
disabled=True,
help="Stop the stream to change the input device."
)
else:
input_device = None
# Buttons and status on a single row (4 columns: start, stop, spacer, status)
c_start, c_stop, c_spacer, c_status = st.columns([1, 1, 1, 2], gap="small", vertical_alignment="center")
with c_start:
start_stream = st.button("Start Auracast")
start_stream = st.button("Start Auracast", disabled=is_streaming)
with c_stop:
stop_stream = st.button("Stop Auracast")
stop_stream = st.button("Stop Auracast", disabled=not is_streaming)
# c_spacer intentionally left empty to push status to the far right
with c_status:
# Fetch current status from backend and render using Streamlit widgets (no HTML)
try:
status_resp = requests.get(f"{BACKEND_URL}/status", timeout=0.8)
status_json = status_resp.json() if status_resp.ok else {}
except Exception:
status_json = {}
is_streaming = bool(status_json.get("is_streaming", False))
# The is_streaming variable is now defined at the top of the script.
# We only need to re-fetch here if we want the absolute latest status for the display,
# but for UI consistency, we can just use the value from the top of the script run.
st.write("🟢 Streaming" if is_streaming else "🔴 Stopped")
# If gain slider moved while streaming, send update to JS without restarting
@@ -438,8 +491,8 @@ else:
if start_stream:
# Always send stop to ensure backend is in a clean state, regardless of current status
r = requests.post(f"{BACKEND_URL}/stop_audio").json()
if r['was_running']:
st.success("Stream Stopped!")
#if r['was_running']:
# st.success("Stream Stopped!")
# Small pause lets backend fully release audio devices before re-init
time.sleep(1)
@@ -451,6 +504,12 @@ else:
transport='', # is set in backend
assisted_listening_stream=assisted_listening,
immediate_rendering=immediate_rendering,
presentation_delay_us=presentation_delay_us,
qos_config=auracast_config.AuracastQoSConfig(
iso_int_multiple_10ms=1,
number_of_retransmissions=int(rtn),
max_transport_latency_ms=int(rtn)*10 + 3,
),
bigs = [
auracast_config.AuracastBigConfig(
code=(stream_passwort.strip() or None),
@@ -551,11 +610,12 @@ else:
############################
# System expander (collapsed)
############################
with st.expander("System", expanded=False):
with st.expander("System control", expanded=False):
st.subheader("Change password")
if is_pw_disabled():
st.info("Frontend password protection is disabled via DISABLE_FRONTEND_PW.")
else:
st.subheader("Change password")
with st.form("change_pw_form"):
cur = st.text_input("Current password", type="password")
new1 = st.text_input("New password", type="password")
@@ -577,6 +637,17 @@ with st.expander("System", expanded=False):
except Exception as e:
st.error(f"Failed to update password: {e}")
st.subheader("Reboot")
if st.button("Reboot now", type="primary"):
try:
r = requests.post(f"{BACKEND_URL}/system_reboot", timeout=1)
if r.ok:
st.success("Reboot initiated. The UI will become unreachable shortly.")
else:
st.error(f"Failed to reboot: {r.status_code} {r.text}")
except Exception as e:
st.error(f"Error calling reboot: {e}")
log.basicConfig(
level=os.environ.get('LOG_LEVEL', log.DEBUG),
format='%(module)s.py:%(lineno)d %(levelname)s: %(message)s'

View File

@@ -1,10 +1,16 @@
import glob
""" the main server where our multicaster objects live.
TODO: in the future the multicaster objects should run in their own threads or even make a second server since everything thats blocking the main event loop leads to inceased latency.
"""
import os
import logging as log
import uuid
import json
import sys
import threading
from concurrent.futures import Future
from datetime import datetime
import time
import asyncio
import numpy as np
from dotenv import load_dotenv
@@ -20,9 +26,11 @@ import sounddevice as sd # type: ignore
from typing import Set
import traceback
from auracast.utils.sounddevice_utils import (
list_usb_pw_inputs,
list_network_pw_inputs,
get_usb_pw_inputs,
get_network_pw_inputs,
refresh_pw_cache,
)
from auracast.utils.reset_utils import reset_nrf54l
load_dotenv()
# make sure pipewire sets latency
@@ -34,6 +42,11 @@ TRANSPORT2 = os.getenv('TRANSPORT2', 'serial:/dev/ttyAMA4,1000000,rtscts') # tr
PTIME = 40 # seems to have no effect at all
pcs: Set[RTCPeerConnection] = set() # keep refs so they dont GC early
os.environ["PULSE_LATENCY_MSEC"] = "3"
# In-memory cache to avoid disk I/O on hot paths like /status
SETTINGS_CACHE: dict = {}
class Offer(BaseModel):
sdp: str
type: str
@@ -53,21 +66,37 @@ def get_device_index_by_name(name: str):
return None
def load_stream_settings() -> dict:
"""Load persisted stream settings if available."""
if os.path.exists(STREAM_SETTINGS_FILE):
try:
def _hydrate_settings_cache_from_disk() -> None:
"""Populate SETTINGS_CACHE once from disk at startup.
Safe to call multiple times; errors fall back to empty dict.
"""
global SETTINGS_CACHE
try:
if os.path.exists(STREAM_SETTINGS_FILE):
with open(STREAM_SETTINGS_FILE, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception:
return {}
return {}
SETTINGS_CACHE = json.load(f)
else:
SETTINGS_CACHE = {}
except Exception:
SETTINGS_CACHE = {}
def load_stream_settings() -> dict:
"""Return stream settings from in-memory cache.
The cache is hydrated once at startup and updated by save_stream_settings().
No disk I/O occurs here.
"""
global SETTINGS_CACHE
return SETTINGS_CACHE
def save_stream_settings(settings: dict):
"""Save stream settings to disk."""
"""Update in-memory settings cache and persist to disk."""
global SETTINGS_CACHE
SETTINGS_CACHE = dict(settings)
try:
with open(STREAM_SETTINGS_FILE, 'w', encoding='utf-8') as f:
json.dump(settings, f, indent=2)
json.dump(SETTINGS_CACHE, f, indent=2)
except Exception as e:
log.error('Unable to persist stream settings: %s', e)
@@ -86,55 +115,106 @@ app.add_middleware(
# Initialize global configuration
global_config_group = auracast_config.AuracastConfigGroup()
# Create multicast controller
multicaster1: multicast_control.Multicaster | None = None
multicaster2: multicast_control.Multicaster | None = None
class StreamerWorker:
"""Owns multicaster(s) on a dedicated asyncio loop in a background thread."""
@app.post("/init")
async def initialize(conf: auracast_config.AuracastConfigGroup):
"""Initializes the primary broadcaster (multicaster1)."""
global global_config_group
global multicaster1
try:
def __init__(self) -> None:
self._thread: threading.Thread | None = None
self._loop: asyncio.AbstractEventLoop | None = None
# These live only on the worker loop
self._multicaster1: multicast_control.Multicaster | None = None
self._multicaster2: multicast_control.Multicaster | None = None
self._started = threading.Event()
# ---------- Thread/loop management ----------
def start(self) -> None:
if self._thread and self._thread.is_alive():
return
self._thread = threading.Thread(target=self._run, name="StreamerWorker", daemon=True)
self._thread.start()
self._started.wait(timeout=5)
def _run(self) -> None:
loop = asyncio.new_event_loop()
self._loop = loop
asyncio.set_event_loop(loop)
self._started.set()
try:
loop.run_forever()
finally:
try:
pending = asyncio.all_tasks(loop)
for t in pending:
t.cancel()
loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True))
except Exception:
pass
loop.close()
def _ensure_loop(self) -> asyncio.AbstractEventLoop:
if not self._loop:
raise RuntimeError("StreamerWorker loop not started")
return self._loop
async def call(self, coro_func, *args, **kwargs):
"""Schedule a coroutine on the worker loop and await its result from the API loop."""
loop = self._ensure_loop()
fut: Future = asyncio.run_coroutine_threadsafe(coro_func(*args, **kwargs), loop)
return await asyncio.wrap_future(fut)
# ---------- Worker-loop coroutines ----------
async def _w_init_primary(self, conf: auracast_config.AuracastConfigGroup) -> dict:
# Clean any previous
if self._multicaster1 is not None:
try:
await self._multicaster1.shutdown()
except Exception:
pass
self._multicaster1 = None
conf.transport = TRANSPORT1
# Derive audio_mode and input_device from first BIG audio_source
# Derive device name and input mode
first_source = conf.bigs[0].audio_source if conf.bigs else ''
input_device_name = None
audio_mode_persist = 'Demo'
if first_source.startswith('device:'):
input_device_name = first_source.split(':', 1)[1] if ':' in first_source else None
# Determine if the device is a USB or Network(AES67) PipeWire input
try:
usb_names = {d.get('name') for _, d in list_usb_pw_inputs(refresh=False)}
net_names = {d.get('name') for _, d in list_network_pw_inputs(refresh=False)}
usb_names = {d.get('name') for _, d in get_usb_pw_inputs()}
net_names = {d.get('name') for _, d in get_network_pw_inputs()}
except Exception:
usb_names, net_names = set(), set()
if input_device_name in net_names:
audio_mode_persist = 'AES67'
os.environ.setdefault("PULSE_LATENCY_MSEC", "6")
else:
audio_mode_persist = 'USB'
os.environ.setdefault("PULSE_LATENCY_MSEC", "3")
audio_mode_persist = 'AES67' if (input_device_name in net_names) else 'USB'
# Map device name to current index for use with sounddevice
device_index = get_device_index_by_name(input_device_name) if input_device_name else None
# Patch config to use index for sounddevice (but persist name)
if device_index is not None:
for big in conf.bigs:
if big.audio_source.startswith('device:'):
big.audio_source = f'device:{device_index}'
else:
log.error(f"Device name '{input_device_name}' not found in current device list.")
# Map device name to index and configure input_format
device_index = int(input_device_name) if (input_device_name and input_device_name.isdigit()) else get_device_index_by_name(input_device_name or '')
if device_index is None:
raise HTTPException(status_code=400, detail=f"Audio device '{input_device_name}' not found.")
elif first_source == 'webrtc':
audio_mode_persist = 'Webapp'
input_device_name = None
elif first_source.startswith('file:'):
audio_mode_persist = 'Demo'
input_device_name = None
else:
audio_mode_persist = 'Network'
input_device_name = None
save_stream_settings({
for big in conf.bigs:
if big.audio_source.startswith('device:'):
big.audio_source = f'device:{device_index}'
devinfo = sd.query_devices(device_index)
capture_rate = int(devinfo.get('default_samplerate') or 48000)
max_in = int(devinfo.get('max_input_channels') or 1)
channels = max(1, min(2, max_in))
for big in conf.bigs:
big.input_format = f"int16le,{capture_rate},{channels}"
# Coerce QoS: compute max_transport_latency from RTN if qos_config present
if getattr(conf, 'qos_config', None) and getattr(conf.qos_config, 'number_of_retransmissions', None) is not None:
conf.qos_config.max_transport_latency_ms = int(conf.qos_config.number_of_retransmissions) * 10 + 3
# Create and init multicaster1
self._multicaster1 = multicast_control.Multicaster(conf, conf.bigs)
await reset_nrf54l(1)
await self._multicaster1.init_broadcast()
auto_started = False
if any(big.audio_source.startswith("device:") or big.audio_source.startswith("file:") for big in conf.bigs):
await self._multicaster1.start_streaming()
auto_started = True
# Return proposed settings to persist on API side
return {
'channel_names': [big.name for big in conf.bigs],
'languages': [big.language for big in conf.bigs],
'audio_mode': audio_mode_persist,
@@ -143,106 +223,156 @@ async def initialize(conf: auracast_config.AuracastConfigGroup):
'gain': [getattr(big, 'input_gain', 1.0) for big in conf.bigs],
'auracast_sampling_rate_hz': conf.auracast_sampling_rate_hz,
'octets_per_frame': conf.octets_per_frame,
'presentation_delay_us': getattr(conf, 'presentation_delay_us', None),
'rtn': getattr(getattr(conf, 'qos_config', None), 'number_of_retransmissions', None),
'immediate_rendering': getattr(conf, 'immediate_rendering', False),
'assisted_listening_stream': getattr(conf, 'assisted_listening_stream', False),
'stream_password': (conf.bigs[0].code if conf.bigs and getattr(conf.bigs[0], 'code', None) else None),
'timestamp': datetime.utcnow().isoformat()
})
global_config_group = conf
if multicaster1 is not None:
try:
await multicaster1.shutdown()
except Exception:
log.warning("Failed to shutdown previous multicaster", exc_info=True)
log.info('Initializing multicaster1 with config:\n %s', conf.model_dump_json(indent=2))
multicaster1 = multicast_control.Multicaster(conf, conf.bigs)
await multicaster1.init_broadcast()
if any(big.audio_source.startswith("device:") or big.audio_source.startswith("file:") for big in conf.bigs):
log.info("Auto-starting streaming on multicaster1")
await multicaster1.start_streaming()
except Exception as e:
log.error("Exception in /init: %s", traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e))
'is_streaming': auto_started,
}
async def _w_init_secondary(self, conf: auracast_config.AuracastConfigGroup) -> None:
if self._multicaster2 is not None:
try:
await self._multicaster2.shutdown()
except Exception:
pass
self._multicaster2 = None
@app.post("/init2")
async def initialize2(conf: auracast_config.AuracastConfigGroup):
"""Initializes the secondary broadcaster (multicaster2). Does NOT persist stream settings."""
global multicaster2
try:
conf.transport = TRANSPORT2
# Patch device name to index for sounddevice
for big in conf.bigs:
if big.audio_source.startswith('device:'):
device_name = big.audio_source.split(':', 1)[1]
device_index = get_device_index_by_name(device_name)
if device_index is not None:
big.audio_source = f'device:{device_index}'
else:
log.error(f"Device name '{device_name}' not found in current device list.")
if device_index is None:
raise HTTPException(status_code=400, detail=f"Audio device '{device_name}' not found.")
log.info('Initializing multicaster2 with config:\n %s', conf.model_dump_json(indent=2))
multicaster2 = multicast_control.Multicaster(conf, conf.bigs)
await multicaster2.init_broadcast()
big.audio_source = f'device:{device_index}'
# Coerce QoS: compute max_transport_latency from RTN if qos_config present
if getattr(conf, 'qos_config', None) and getattr(conf.qos_config, 'number_of_retransmissions', None) is not None:
conf.qos_config.max_transport_latency_ms = int(conf.qos_config.number_of_retransmissions) * 10 + 3
self._multicaster2 = multicast_control.Multicaster(conf, conf.bigs)
await reset_nrf54l(0)
await self._multicaster2.init_broadcast()
if any(big.audio_source.startswith("device:") or big.audio_source.startswith("file:") for big in conf.bigs):
log.info("Auto-starting streaming on multicaster2")
await multicaster2.start_streaming()
await self._multicaster2.start_streaming()
async def _w_stop_all(self) -> bool:
was_running = False
if self._multicaster1 is not None:
try:
await self._multicaster1.stop_streaming()
await self._multicaster1.shutdown()
was_running = True
finally:
self._multicaster1 = None
if self._multicaster2 is not None:
try:
await self._multicaster2.stop_streaming()
await self._multicaster2.shutdown()
was_running = True
finally:
self._multicaster2 = None
return was_running
async def _w_status_primary(self) -> dict:
if self._multicaster1 is None:
return {'is_initialized': False, 'is_streaming': False}
try:
return self._multicaster1.get_status()
except Exception:
return {'is_initialized': True, 'is_streaming': False}
async def _w_stream_lc3(self, audio_data: dict[str, str], bigs_template: list) -> None:
if self._multicaster1 is None:
raise HTTPException(status_code=500, detail='Auracast endpoint was never intialized')
# Update bigs audio_source with provided bytes and start
for big in bigs_template:
if big.language not in audio_data:
raise HTTPException(status_code=500, detail='language len missmatch')
big.audio_source = audio_data[big.language].encode('latin-1')
self._multicaster1.big_conf = bigs_template
await self._multicaster1.start_streaming()
# Create the worker singleton and a route-level lock
streamer = StreamerWorker()
# multicaster1: multicast_control.Multicaster | None = None # kept for legacy references, do not use on API loop
# multicaster2: multicast_control.Multicaster | None = None
_stream_lock = asyncio.Lock() # serialize initialize/stop_audio on API side
@app.post("/init")
async def initialize(conf: auracast_config.AuracastConfigGroup):
"""Initializes the primary broadcaster on the streamer thread."""
global global_config_group
async with _stream_lock:
try:
global_config_group = conf
log.info('Initializing multicaster1 with config:\n %s', conf.model_dump_json(indent=2))
persisted = await streamer.call(streamer._w_init_primary, conf)
# Persist returned settings (avoid touching from worker thread)
persisted['timestamp'] = datetime.utcnow().isoformat()
save_stream_settings(persisted)
except Exception as e:
log.error("Exception in /init: %s", traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e))
@app.post("/init2")
async def initialize2(conf: auracast_config.AuracastConfigGroup):
"""Initializes the secondary broadcaster on the streamer thread."""
try:
log.info('Initializing multicaster2 with config:\n %s', conf.model_dump_json(indent=2))
await streamer.call(streamer._w_init_secondary, conf)
except Exception as e:
log.error("Exception in /init2: %s", traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e))
@app.post("/stream_lc3")
async def send_audio(audio_data: dict[str, str]):
"""Sends a block of pre-coded LC3 audio."""
if multicaster1 is None:
raise HTTPException(status_code=500, detail='Auracast endpoint was never intialized')
try:
for big in global_config_group.bigs:
assert big.language in audio_data, HTTPException(status_code=500, detail='language len missmatch')
log.info('Received a send audio request for %s', big.language)
big.audio_source = audio_data[big.language].encode('latin-1') # TODO: use base64 encoding
multicaster1.big_conf = global_config_group.bigs
await multicaster1.start_streaming()
return {"status": "audio_sent"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/stop_audio")
async def stop_audio():
"""Stops streaming on both multicaster1 and multicaster2."""
"""Stops streaming on both multicaster1 and multicaster2 (worker thread)."""
try:
# First close any active WebRTC peer connections so their track loops finish cleanly
# Close any active PeerConnections
close_tasks = [pc.close() for pc in list(pcs)]
pcs.clear()
if close_tasks:
await asyncio.gather(*close_tasks, return_exceptions=True)
# Now shut down both multicasters and release audio devices
running = False
if multicaster1 is not None:
await multicaster1.stop_streaming()
await multicaster1.reset() # Fully reset controller and advertising
running = True
if multicaster2 is not None:
await multicaster2.stop_streaming()
await multicaster2.reset() # Fully reset controller and advertising
running = True
was_running = await streamer.call(streamer._w_stop_all)
return {"status": "stopped", "was_running": running}
# Persist is_streaming=False
try:
settings = load_stream_settings() or {}
if settings.get('is_streaming'):
settings['is_streaming'] = False
settings['timestamp'] = datetime.utcnow().isoformat()
save_stream_settings(settings)
except Exception:
log.warning("Failed to persist is_streaming=False during stop_audio", exc_info=True)
await asyncio.sleep(0.2)
return {"status": "stopped", "was_running": was_running}
except Exception as e:
log.error("Exception in /stop_audio: %s", traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e))
@app.post("/stream_lc3")
async def send_audio(audio_data: dict[str, str]):
"""Sends a block of pre-coded LC3 audio via the worker."""
try:
await streamer.call(streamer._w_stream_lc3, audio_data, list(global_config_group.bigs))
return {"status": "audio_sent"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/status")
async def get_status():
"""Gets the current status of the multicaster together with persisted stream info."""
status = multicaster1.get_status() if multicaster1 else {
'is_initialized': False,
'is_streaming': False,
}
"""Gets current status (worker) merged with persisted settings cache."""
status = await streamer.call(streamer._w_status_primary)
status.update(load_stream_settings())
return status
@@ -255,12 +385,13 @@ async def _autostart_from_settings():
and initializes streaming.
"""
try:
settings = load_stream_settings() or {}
audio_mode = settings.get('audio_mode')
input_device_name = settings.get('input_device')
rate = settings.get('auracast_sampling_rate_hz')
octets = settings.get('octets_per_frame')
pres_delay = settings.get('presentation_delay_us')
saved_rtn = settings.get('rtn')
immediate_rendering = settings.get('immediate_rendering', False)
assisted_listening_stream = settings.get('assisted_listening_stream', False)
channel_names = settings.get('channel_names') or ["Broadcast0"]
@@ -268,18 +399,11 @@ async def _autostart_from_settings():
languages = settings.get('languages') or ["deu"]
stream_password = settings.get('stream_password')
original_ts = settings.get('timestamp')
previously_streaming = bool(settings.get('is_streaming'))
try:
usb_names = {d.get('name') for _, d in list_usb_pw_inputs(refresh=False)}
net_names = {d.get('name') for _, d in list_network_pw_inputs(refresh=False)}
except Exception:
usb_names, net_names = set(), set()
if input_device_name in net_names:
os.environ.setdefault("PULSE_LATENCY_MSEC", "6")
else:
os.environ.setdefault("PULSE_LATENCY_MSEC", "3")
# Only auto-start device-based inputs; Webapp and Demo require external sources/UI
# Only auto-start if the previous state was streaming and it's a device-based input.
if not previously_streaming:
return
if not input_device_name:
return
if rate is None or octets is None:
@@ -287,12 +411,14 @@ async def _autostart_from_settings():
return
# Avoid duplicate start if already streaming
if multicaster1 and multicaster1.get_status().get('is_streaming'):
current = await streamer.call(streamer._w_status_primary)
if current.get('is_streaming'):
return
while True:
# Do not interfere if user started a stream manually in the meantime
if multicaster1 and multicaster1.get_status().get('is_streaming'):
current = await streamer.call(streamer._w_status_primary)
if current.get('is_streaming'):
return
# Abort if saved settings changed to a different target while we were polling
current_settings = load_stream_settings() or {}
@@ -304,9 +430,9 @@ async def _autostart_from_settings():
current_settings.get('audio_mode') != audio_mode
):
return
# Avoid refreshing PortAudio while we poll
usb = [d for _, d in list_usb_pw_inputs(refresh=False)]
net = [d for _, d in list_network_pw_inputs(refresh=False)]
# Check against the cached device lists
usb = [d for _, d in get_usb_pw_inputs()]
net = [d for _, d in get_network_pw_inputs()]
names = {d.get('name') for d in usb} | {d.get('name') for d in net}
if input_device_name in names:
# Build a minimal config based on saved fields
@@ -317,7 +443,7 @@ async def _autostart_from_settings():
program_info=program_info[0] if isinstance(program_info, list) and program_info else program_info,
language=languages[0] if languages else "deu",
audio_source=f"device:{input_device_name}",
input_format=f"int16le,{rate},1",
# input_format is intentionally omitted to use the default
iso_que_len=1,
sampling_frequency=rate,
octets_per_frame=octets,
@@ -329,71 +455,43 @@ async def _autostart_from_settings():
transport=TRANSPORT1,
immediate_rendering=immediate_rendering,
assisted_listening_stream=assisted_listening_stream,
presentation_delay_us=pres_delay if pres_delay is not None else 40000,
bigs=bigs,
)
# Attach QoS if saved_rtn present
conf.qos_config = auracast_config.AuracastQoSConfig(
iso_int_multiple_10ms=1,
number_of_retransmissions=int(saved_rtn),
max_transport_latency_ms=int(saved_rtn) * 10 + 3,
)
# Initialize and start
await asyncio.sleep(2)
await initialize(conf)
return
await asyncio.sleep(2)
except Exception:
log.warning("Autostart task failed", exc_info=True)
#TODO: enable and test this
@app.on_event("startup")
async def _startup_autostart_event():
# Spawn the autostart task without blocking startup
log.info("Refreshing PipeWire device cache.")
# Hydrate settings cache once to avoid disk I/O during /status
_hydrate_settings_cache_from_disk()
refresh_pw_cache()
# Start the streamer worker thread
streamer.start()
asyncio.create_task(_autostart_from_settings())
@app.post("/refresh_audio_inputs")
async def refresh_audio_inputs(force: bool = False):
"""Triggers a re-scan of audio devices.
If force is True and a stream is active, the stream(s) will be stopped to allow
a full re-initialization of the sounddevice backend. The response will include
'stopped_stream': True if any running stream was stopped.
"""
stopped = False
if force:
try:
# Stop active streams before forcing sounddevice re-init
if multicaster1 is not None and multicaster1.get_status().get('is_streaming'):
await multicaster1.stop_streaming()
stopped = True
if multicaster2 is not None and multicaster2.get_status().get('is_streaming'):
await multicaster2.stop_streaming()
stopped = True
except Exception:
log.warning("Failed to stop stream(s) before force refresh", exc_info=True)
# Reinitialize sounddevice backend if requested
try:
if sys.platform == 'linux' and force:
log.info("Force re-initializing sounddevice backend")
sd._terminate()
sd._initialize()
except Exception:
log.error("Exception while force-refreshing audio devices:", exc_info=True)
return {"status": "ok", "inputs": [], "stopped_stream": stopped}
@app.get("/audio_inputs_pw_usb")
async def audio_inputs_pw_usb():
"""List PipeWire USB input nodes mapped to sounddevice indices.
Returns a list of dicts: [{id, name, max_input_channels}].
"""
"""List PipeWire USB input nodes from cache."""
try:
# Do not refresh PortAudio if we are currently streaming to avoid termination
streaming = False
try:
if multicaster1 is not None:
status = multicaster1.get_status()
streaming = bool(status.get('is_streaming'))
except Exception:
streaming = False
devices = [
{"id": idx, "name": dev.get("name"), "max_input_channels": dev.get("max_input_channels", 0)}
for idx, dev in list_usb_pw_inputs(refresh=not streaming)
for idx, dev in get_usb_pw_inputs()
]
return {"inputs": devices}
except Exception as e:
@@ -403,22 +501,11 @@ async def audio_inputs_pw_usb():
@app.get("/audio_inputs_pw_network")
async def audio_inputs_pw_network():
"""List PipeWire Network/AES67 input nodes mapped to sounddevice indices.
Returns a list of dicts: [{id, name, max_input_channels}].
"""
"""List PipeWire Network/AES67 input nodes from cache."""
try:
# Do not refresh PortAudio if we are currently streaming to avoid termination
streaming = False
try:
if multicaster1 is not None:
status = multicaster1.get_status()
streaming = bool(status.get('is_streaming'))
except Exception:
streaming = False
devices = [
{"id": idx, "name": dev.get("name"), "max_input_channels": dev.get("max_input_channels", 0)}
for idx, dev in list_network_pw_inputs(refresh=not streaming)
for idx, dev in get_network_pw_inputs()
]
return {"inputs": devices}
except Exception as e:
@@ -426,140 +513,209 @@ async def audio_inputs_pw_network():
raise HTTPException(status_code=500, detail=str(e))
@app.post("/offer")
async def offer(offer: Offer):
log.info("/offer endpoint called")
@app.post("/refresh_audio_devices")
async def refresh_audio_devices():
"""Triggers a re-scan of audio devices, but only if no stream is active."""
streaming = False
try:
status = await streamer.call(streamer._w_status_primary)
streaming = bool(status.get('is_streaming'))
except Exception:
pass # Ignore errors, default to not refreshing
# If a previous PeerConnection is still alive, close it so we only ever keep one active.
if pcs:
log.info("Closing %d existing PeerConnection(s) before creating a new one", len(pcs))
close_tasks = [p.close() for p in list(pcs)]
await asyncio.gather(*close_tasks, return_exceptions=True)
pcs.clear()
if streaming:
log.warning("Ignoring refresh request: an audio stream is active.")
raise HTTPException(status_code=409, detail="An audio stream is active. Stop the stream before refreshing devices.")
pc = RTCPeerConnection() # No STUN needed for localhost
pcs.add(pc)
id_ = uuid.uuid4().hex[:8]
log.info(f"{id_}: new PeerConnection")
# create directory for records - only for testing
os.makedirs("./records", exist_ok=True)
# Do NOT start the streamer yet we'll start it lazily once we actually
# receive the first audio frame, ensuring WebRTCAudioInput is ready and
# avoiding race-conditions on restarts.
@pc.on("track")
async def on_track(track: MediaStreamTrack):
log.info(f"{id_}: track {track.kind} received")
try:
first = True
while True:
frame: av.audio.frame.AudioFrame = await track.recv() # RTP audio frame (already decrypted)
if first:
log.info(f"{id_}: frame layout={frame.layout}")
log.info(f"{id_}: frame format={frame.format}")
log.info(
f"{id_}: frame sample_rate={frame.sample_rate}, samples_per_channel={frame.samples}, planes={frame.planes}"
)
# Lazily start the streamer now that we know a track exists.
if multicaster1.streamer is None:
await multicaster1.start_streaming()
# Yield control so the Streamer coroutine has a chance to
# create the WebRTCAudioInput before we push samples.
await asyncio.sleep(0)
first = False
# in stereo case this is interleaved data format
frame_array = frame.to_ndarray()
log.info(f"array.shape{frame_array.shape}")
log.info(f"array.dtype{frame_array.dtype}")
log.info(f"frame.to_ndarray(){frame_array}")
samples = frame_array.reshape(-1)
log.info(f"samples.shape: {samples.shape}")
if frame.layout.name == 'stereo':
# Interleaved stereo: [L0, R0, L1, R1, ...]
mono_array = samples[::2] # Take left channel
else:
mono_array = samples
log.info(f"mono_array.shape: {mono_array.shape}")
try:
log.info("Refreshing PipeWire device cache.")
refresh_pw_cache()
return {"status": "ok"}
except Exception as e:
log.error("Exception during device refresh: %s", traceback.format_exc())
raise HTTPException(status_code=500, detail=f"Failed to refresh devices: {e}")
frame_array = frame.to_ndarray()
# async def offer(offer: Offer):
# @app.post("/offer") #webrtc endpoint
# log.info("/offer endpoint called")
# Flatten in case it's (1, N) or (N,)
samples = frame_array.reshape(-1)
# # If a previous PeerConnection is still alive, close it so we only ever keep one active.
# if pcs:
# log.info("Closing %d existing PeerConnection(s) before creating a new one", len(pcs))
# close_tasks = [p.close() for p in list(pcs)]
# await asyncio.gather(*close_tasks, return_exceptions=True)
# pcs.clear()
if frame.layout.name == 'stereo':
# Interleaved stereo: [L0, R0, L1, R1, ...]
mono_array = samples[::2] # Take left channel
else:
mono_array = samples
# pc = RTCPeerConnection() # No STUN needed for localhost
# pcs.add(pc)
# id_ = uuid.uuid4().hex[:8]
# log.info(f"{id_}: new PeerConnection")
# Get current WebRTC audio input (streamer may have been restarted)
big0 = list(multicaster1.bigs.values())[0]
audio_input = big0.get('audio_input')
# Wait until the streamer has instantiated the WebRTCAudioInput
if audio_input is None or getattr(audio_input, 'closed', False):
continue
# Feed mono PCM samples to the global WebRTC audio input
await audio_input.put_samples(mono_array.astype(np.int16))
# # create directory for records - only for testing
# os.makedirs("./records", exist_ok=True)
# Save to WAV file - only for testing
# if not hasattr(pc, 'wav_writer'):
# import wave
# wav_path = f"./records/auracast_{id_}.wav"
# pc.wav_writer = wave.open(wav_path, "wb")
# pc.wav_writer.setnchannels(1) # mono
# pc.wav_writer.setsampwidth(2) # 16-bit PCM
# pc.wav_writer.setframerate(frame.sample_rate)
# # Do NOT start the streamer yet we'll start it lazily once we actually
# # receive the first audio frame, ensuring WebRTCAudioInput is ready and
# # avoiding race-conditions on restarts.
# @pc.on("track")
# async def on_track(track: MediaStreamTrack):
# log.info(f"{id_}: track {track.kind} received")
# try:
# first = True
# while True:
# frame: av.audio.frame.AudioFrame = await track.recv() # RTP audio frame (already decrypted)
# if first:
# log.info(f"{id_}: frame layout={frame.layout}")
# log.info(f"{id_}: frame format={frame.format}")
# log.info(
# f"{id_}: frame sample_rate={frame.sample_rate}, samples_per_channel={frame.samples}, planes={frame.planes}"
# )
# # Lazily start the streamer now that we know a track exists.
# if multicaster1.streamer is None:
# await multicaster1.start_streaming()
# # Yield control so the Streamer coroutine has a chance to
# # create the WebRTCAudioInput before we push samples.
# await asyncio.sleep(0)
# # Persist is_streaming=True for Webapp mode
# try:
# settings = load_stream_settings() or {}
# settings['is_streaming'] = True
# settings['timestamp'] = datetime.utcnow().isoformat()
# save_stream_settings(settings)
# except Exception:
# log.warning("Failed to persist is_streaming=True on WebRTC start", exc_info=True)
# first = False
# # in stereo case this is interleaved data format
# frame_array = frame.to_ndarray()
# log.info(f"array.shape{frame_array.shape}")
# log.info(f"array.dtype{frame_array.dtype}")
# log.info(f"frame.to_ndarray(){frame_array}")
# pcm_data = mono_array.astype(np.int16).tobytes()
# pc.wav_writer.writeframes(pcm_data)
# samples = frame_array.reshape(-1)
# log.info(f"samples.shape: {samples.shape}")
# if frame.layout.name == 'stereo':
# # Interleaved stereo: [L0, R0, L1, R1, ...]
# mono_array = samples[::2] # Take left channel
# else:
# mono_array = samples
# log.info(f"mono_array.shape: {mono_array.shape}")
except Exception as e:
log.error(f"{id_}: Exception in on_track: {e}")
finally:
# Always close the wav file when the track ends or on error
if hasattr(pc, 'wav_writer'):
try:
pc.wav_writer.close()
except Exception:
pass
del pc.wav_writer
# frame_array = frame.to_ndarray()
# --- SDP negotiation ---
log.info(f"{id_}: setting remote description")
await pc.setRemoteDescription(RTCSessionDescription(**offer.model_dump()))
# # Flatten in case it's (1, N) or (N,)
# samples = frame_array.reshape(-1)
log.info(f"{id_}: creating answer")
answer = await pc.createAnswer()
sdp = answer.sdp
# Insert a=ptime using the global PTIME variable
ptime_line = f"a=ptime:{PTIME}"
if "a=sendrecv" in sdp:
sdp = sdp.replace("a=sendrecv", f"a=sendrecv\n{ptime_line}")
else:
sdp += f"\n{ptime_line}"
new_answer = RTCSessionDescription(sdp=sdp, type=answer.type)
await pc.setLocalDescription(new_answer)
log.info(f"{id_}: sending answer with {ptime_line}")
return {"sdp": pc.localDescription.sdp,
"type": pc.localDescription.type}
# if frame.layout.name == 'stereo':
# # Interleaved stereo: [L0, R0, L1, R1, ...]
# mono_array = samples[::2] # Take left channel
# else:
# mono_array = samples
# # Get current WebRTC audio input (streamer may have been restarted)
# big0 = list(multicaster1.bigs.values())[0]
# audio_input = big0.get('audio_input')
# # Wait until the streamer has instantiated the WebRTCAudioInput
# if audio_input is None or getattr(audio_input, 'closed', False):
# continue
# # Feed mono PCM samples to the global WebRTC audio input
# await audio_input.put_samples(mono_array.astype(np.int16))
# # Save to WAV file - only for testing
# # if not hasattr(pc, 'wav_writer'):
# # import wave
# # wav_path = f"./records/auracast_{id_}.wav"
# # pc.wav_writer = wave.open(wav_path, "wb")
# # pc.wav_writer.setnchannels(1) # mono
# # pc.wav_writer.setsampwidth(2) # 16-bit PCM
# # pc.wav_writer.setframerate(frame.sample_rate)
# # pcm_data = mono_array.astype(np.int16).tobytes()
# # pc.wav_writer.writeframes(pcm_data)
# except Exception as e:
# log.error(f"{id_}: Exception in on_track: {e}")
# finally:
# # Always close the wav file when the track ends or on error
# if hasattr(pc, 'wav_writer'):
# try:
# pc.wav_writer.close()
# except Exception:
# pass
# del pc.wav_writer
# # --- SDP negotiation ---
# log.info(f"{id_}: setting remote description")
# await pc.setRemoteDescription(RTCSessionDescription(**offer.model_dump()))
# log.info(f"{id_}: creating answer")
# answer = await pc.createAnswer()
# sdp = answer.sdp
# # Insert a=ptime using the global PTIME variable
# ptime_line = f"a=ptime:{PTIME}"
# if "a=sendrecv" in sdp:
# sdp = sdp.replace("a=sendrecv", f"a=sendrecv\n{ptime_line}")
# else:
# sdp += f"\n{ptime_line}"
# new_answer = RTCSessionDescription(sdp=sdp, type=answer.type)
# await pc.setLocalDescription(new_answer)
# log.info(f"{id_}: sending answer with {ptime_line}")
# return {"sdp": pc.localDescription.sdp,
# "type": pc.localDescription.type}
@app.post("/shutdown")
async def shutdown():
"""Stops broadcasting and releases all audio/Bluetooth resources."""
try:
await multicaster1.shutdown()
await streamer.call(streamer._w_stop_all)
return {"status": "stopped"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/system_reboot")
async def system_reboot():
"""Stop audio and request a system reboot via sudo.
Requires the service user to have passwordless sudo permissions to run 'reboot'.
"""
try:
# Best-effort: stop any active streaming cleanly WITHOUT persisting state
try:
# Close any WebRTC peer connections
close_tasks = [pc.close() for pc in list(pcs)]
pcs.clear()
if close_tasks:
await asyncio.gather(*close_tasks, return_exceptions=True)
# Stop streaming on worker but DO NOT touch stream_settings.json
try:
await streamer.call(streamer._w_stop_all)
except Exception:
pass
except Exception:
log.warning("Non-fatal: failed to stop streams before reboot", exc_info=True)
# Launch reboot without waiting for completion
try:
await asyncio.create_subprocess_exec("sudo", "reboot")
except Exception as e:
log.error("Failed to invoke reboot: %s", e, exc_info=True)
raise HTTPException(status_code=500, detail=f"Failed to invoke reboot: {e}")
return {"status": "rebooting"}
except HTTPException:
raise
except Exception as e:
log.error("Exception in /system_reboot: %s", e, exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
if __name__ == '__main__':
import os
os.chdir(os.path.dirname(__file__))
@@ -569,4 +725,4 @@ if __name__ == '__main__':
format='%(module)s.py:%(lineno)d %(levelname)s: %(message)s'
)
# Bind to localhost only for security: prevents network access, only frontend on same machine can connect
uvicorn.run(app, host="127.0.0.1", port=5000)
uvicorn.run(app, host="127.0.0.1", port=5000, access_log=False)

View File

@@ -0,0 +1,77 @@
import os
import asyncio
import logging as log
async def reset_nrf54l(slot: int = 0, timeout: float = 8.0):
"""
Reset the nRF54L target using OpenOCD before starting broadcast.
Looks for interface config files in the project at `src/openocd/` relative to this module only.
Accepts filename variants per slot:
- slot 0: raspberrypi-swd0.cfg or swd0.cfg
- slot 1: raspberrypi-swd1.cfg or swd1.cfg
Executes the equivalent of:
openocd \
-f ./raspberrypi-${INTERFACE}.cfg \
-f target/nordic/nrf54l.cfg \
-c "init" \
-c "reset run" \
-c "shutdown"
Best-effort: if OpenOCD is unavailable, logs a warning and continues.
"""
try:
# Resolve project directory and filenames
proj_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', 'openocd'))
names = ['raspberrypi-swd0.cfg', 'swd0.cfg'] if slot == 0 else ['raspberrypi-swd1.cfg', 'swd1.cfg']
cfg = None
for n in names:
p = os.path.join(proj_dir, n)
if os.path.exists(p):
cfg = p
break
if not cfg:
log.warning("reset_nrf54l: no interface CFG found in project dir %s; skipping reset", proj_dir)
return
# Build openocd command (no sudo required as per project setup).
cmd = ['openocd', '-f', cfg, '-f', 'target/nordic/nrf54l.cfg',
'-c', 'init', '-c', 'reset run', '-c', 'shutdown']
async def _run(cmd):
proc = await asyncio.create_subprocess_exec(
*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT
)
try:
out, _ = await asyncio.wait_for(proc.communicate(), timeout=timeout)
except asyncio.TimeoutError:
log.error("reset_nrf54l: %s timed out; terminating", cmd[0])
proc.kill()
return False
rc = proc.returncode
if rc != 0:
log.error("reset_nrf54l: %s exited with code %s; output: %s", cmd[0], rc, (out or b'').decode(errors='ignore'))
return False
return True
ok = await _run(cmd)
if ok:
log.info("reset_nrf54l: reset succeeded (slot %d) using %s", slot, cfg)
except FileNotFoundError:
log.error("reset_nrf54l: openocd not found; skipping reset")
except Exception:
log.error("reset_nrf54l failed", exc_info=True)
if __name__ == '__main__':
# Basic logging setup
log.basicConfig(
level=os.environ.get('LOG_LEVEL', log.INFO),
format='%(asctime)s.%(msecs)03d %(levelname)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
slot_to_reset = 1
log.info(f"Executing reset for slot {slot_to_reset}")
asyncio.run(reset_nrf54l(slot=slot_to_reset))

View File

@@ -49,64 +49,40 @@ def _sd_matches_from_names(pa_idx, names):
out.append((i, d))
return out
def list_usb_pw_inputs(refresh: bool = True):
"""
Return [(device_index, device_dict), ...] for PipeWire **input** nodes
backed by **USB** devices (excludes monitor sources).
# Module-level caches for device lists
_usb_inputs_cache = []
_network_inputs_cache = []
Parameters:
- refresh (bool): If True (default), force PortAudio to re-enumerate devices
before mapping. Set to False to avoid disrupting active streams.
def get_usb_pw_inputs():
"""Return cached list of USB PipeWire inputs."""
return _usb_inputs_cache
def get_network_pw_inputs():
"""Return cached list of Network/AES67 PipeWire inputs."""
return _network_inputs_cache
def refresh_pw_cache():
"""
# Refresh PortAudio so we see newly added nodes before mapping
if refresh:
_sd_refresh()
Performs a full device scan and updates the internal caches for both USB
and Network audio devices. This is a heavy operation and should not be
called frequently or during active streams.
"""
global _usb_inputs_cache, _network_inputs_cache
# Force PortAudio to re-enumerate devices
_sd_refresh()
pa_idx = _pa_like_hostapi_index()
pw = _pw_dump()
# Map device.id -> device.bus ("usb"/"pci"/"platform"/"network"/...)
# --- Pass 1: Map device.id to device.bus ---
device_bus = {}
for obj in pw:
if obj.get("type") == "PipeWire:Interface:Device":
props = (obj.get("info") or {}).get("props") or {}
device_bus[obj["id"]] = (props.get("device.bus") or "").lower()
# Collect names/descriptions of USB input nodes
# --- Pass 2: Identify all USB and Network nodes ---
usb_input_names = set()
for obj in pw:
if obj.get("type") != "PipeWire:Interface:Node":
continue
props = (obj.get("info") or {}).get("props") or {}
media = (props.get("media.class") or "").lower()
if "source" not in media and "stream/input" not in media:
continue
# skip monitor sources ("Monitor of ..." or *.monitor)
nname = (props.get("node.name") or "").lower()
ndesc = (props.get("node.description") or "").lower()
if ".monitor" in nname or "monitor" in ndesc:
continue
bus = (props.get("device.bus") or device_bus.get(props.get("device.id")) or "").lower()
if bus == "usb":
usb_input_names.add(props.get("node.description") or props.get("node.name"))
# Map to sounddevice devices on PipeWire host API
return _sd_matches_from_names(pa_idx, usb_input_names)
def list_network_pw_inputs(refresh: bool = True):
"""
Return [(device_index, device_dict), ...] for PipeWire **input** nodes that
look like network/AES67/RTP sources (excludes monitor sources).
Parameters:
- refresh (bool): If True (default), force PortAudio to re-enumerate devices
before mapping. Set to False to avoid disrupting active streams.
"""
# Refresh PortAudio so we see newly added nodes before mapping
if refresh:
_sd_refresh()
pa_idx = _pa_like_hostapi_index()
pw = _pw_dump()
network_input_names = set()
for obj in pw:
if obj.get("type") != "PipeWire:Interface:Node":
@@ -115,26 +91,29 @@ def list_network_pw_inputs(refresh: bool = True):
media = (props.get("media.class") or "").lower()
if "source" not in media and "stream/input" not in media:
continue
nname = (props.get("node.name") or "")
ndesc = (props.get("node.description") or "")
# skip monitor sources
# Skip all monitor sources
if ".monitor" in nname.lower() or "monitor" in ndesc.lower():
continue
# Heuristics for network/AES67/RTP
# Check for USB
bus = (props.get("device.bus") or device_bus.get(props.get("device.id")) or "").lower()
if bus == "usb":
usb_input_names.add(ndesc or nname)
continue # A device is either USB or Network, not both
# Heuristics for Network/AES67/RTP
text = (nname + " " + ndesc).lower()
media_name = (props.get("media.name") or "").lower()
node_group = (props.get("node.group") or "").lower()
# Presence flags/keys that strongly indicate network RTP/AES67 sources
node_network_flag = bool(props.get("node.network"))
has_rtp_keys = any(k in props for k in (
"rtp.session", "rtp.source.ip", "rtp.source.port", "rtp.fmtp", "rtp.rate"
))
has_sess_keys = any(k in props for k in (
"sess.name", "sess.media", "sess.latency.msec"
))
has_rtp_keys = any(k in props for k in ("rtp.session", "rtp.source.ip"))
has_sess_keys = any(k in props for k in ("sess.name", "sess.media"))
is_network = (
(props.get("device.bus") or "").lower() == "network" or
bus == "network" or
node_network_flag or
"rtp" in media_name or
any(k in text for k in ("rtp", "sap", "aes67", "network", "raop", "airplay")) or
@@ -145,7 +124,13 @@ def list_network_pw_inputs(refresh: bool = True):
if is_network:
network_input_names.add(ndesc or nname)
return _sd_matches_from_names(pa_idx, network_input_names)
# --- Final Step: Update caches ---
_usb_inputs_cache = _sd_matches_from_names(pa_idx, usb_input_names)
_network_inputs_cache = _sd_matches_from_names(pa_idx, network_input_names)
# Populate cache on initial module load
refresh_pw_cache()
# Example usage:
# for i, d in list_usb_pw_inputs():

View File

@@ -0,0 +1,8 @@
adapter driver bcm2835gpio
transport select swd
adapter gpio swclk 17
adapter gpio swdio 18
#adapter gpio trst 26
#reset_config trst_only
adapter speed 1000

View File

@@ -0,0 +1,8 @@
adapter driver bcm2835gpio
transport select swd
adapter gpio swclk 24
adapter gpio swdio 23
#adapter gpio trst 27
#reset_config trst_only
adapter speed 1000

26
src/scripts/hit_status.sh Normal file
View File

@@ -0,0 +1,26 @@
#!/usr/bin/env bash
# Usage: ./hit_status.sh [COUNT] [SLEEP_SECONDS]
# Always targets http://127.0.0.1:5000/status
# Defaults: COUNT=100 SLEEP_SECONDS=0
# Example: ./hit_status.sh 100 0.05
set -euo pipefail
URL="http://127.0.0.1:5000/status"
COUNT="${1:-100}"
SLEEP_SECS="${2:-0}"
# Ensure COUNT is an integer
if ! [[ "$COUNT" =~ ^[0-9]+$ ]]; then
echo "COUNT must be an integer, got: $COUNT" >&2
exit 1
fi
for i in $(seq 1 "$COUNT"); do
echo "[$i/$COUNT] GET $URL"
curl -sS "$URL" > /dev/null || echo "Request $i failed"
# Sleep if non-zero (supports floats, no bc needed)
if [[ "$SLEEP_SECS" != "0" && "$SLEEP_SECS" != "0.0" && "$SLEEP_SECS" != "" ]]; then
sleep "$SLEEP_SECS"
fi
done