refactor: consolidate dual-broadcaster state management
- Removed StreamerWorker thread wrapper in favor of direct async operations with module-level multicaster instances - Split settings persistence into primary and secondary caches (SETTINGS_CACHE1/2) with dedicated load/save functions - Removed unused imports and simplified initialization logic by inlining worker coroutines into API endpoints
This commit is contained in:
@@ -5,11 +5,7 @@ TODO: in the future the multicaster objects should run in their own threads or e
|
||||
import os
|
||||
import logging as log
|
||||
import json
|
||||
import sys
|
||||
import threading
|
||||
from concurrent.futures import Future
|
||||
from datetime import datetime
|
||||
import time
|
||||
import asyncio
|
||||
from dotenv import load_dotenv
|
||||
|
||||
@@ -24,19 +20,21 @@ from auracast.utils.sounddevice_utils import (
|
||||
resolve_input_device_index,
|
||||
refresh_pw_cache,
|
||||
)
|
||||
from auracast.utils.reset_utils import reset_nrf54l
|
||||
|
||||
load_dotenv()
|
||||
# make sure pipewire sets latency
|
||||
STREAM_SETTINGS_FILE = os.path.join(os.path.dirname(__file__), 'stream_settings.json')
|
||||
# Primary and secondary persisted settings files
|
||||
STREAM_SETTINGS_FILE1 = os.path.join(os.path.dirname(__file__), 'stream_settings.json')
|
||||
STREAM_SETTINGS_FILE2 = os.path.join(os.path.dirname(__file__), 'stream_settings2.json')
|
||||
# Raspberry Pi UART transports
|
||||
TRANSPORT1 = os.getenv('TRANSPORT1', 'serial:/dev/ttyAMA3,1000000,rtscts') # transport for raspberry pi gpio header
|
||||
TRANSPORT2 = os.getenv('TRANSPORT2', 'serial:/dev/ttyAMA4,1000000,rtscts') # transport for raspberry pi gpio header
|
||||
|
||||
os.environ["PULSE_LATENCY_MSEC"] = "3"
|
||||
|
||||
# In-memory cache to avoid disk I/O on hot paths like /status
|
||||
SETTINGS_CACHE: dict = {}
|
||||
# In-memory caches to avoid disk I/O on hot paths like /status
|
||||
SETTINGS_CACHE1: dict = {}
|
||||
SETTINGS_CACHE2: dict = {}
|
||||
|
||||
|
||||
|
||||
@@ -55,39 +53,66 @@ def get_device_index_by_name(name: str):
|
||||
return None
|
||||
|
||||
|
||||
def _hydrate_settings_cache_from_disk() -> None:
|
||||
"""Populate SETTINGS_CACHE once from disk at startup.
|
||||
def _init_settings_cache_from_disk() -> None:
|
||||
"""Populate SETTINGS_CACHE1 and SETTINGS_CACHE2 once from disk at startup.
|
||||
|
||||
Safe to call multiple times; errors fall back to empty dict.
|
||||
If a file doesn't exist, initialize to an empty dict. Any JSON or I/O errors raise.
|
||||
"""
|
||||
global SETTINGS_CACHE
|
||||
try:
|
||||
if os.path.exists(STREAM_SETTINGS_FILE):
|
||||
with open(STREAM_SETTINGS_FILE, 'r', encoding='utf-8') as f:
|
||||
SETTINGS_CACHE = json.load(f)
|
||||
else:
|
||||
SETTINGS_CACHE = {}
|
||||
except Exception:
|
||||
SETTINGS_CACHE = {}
|
||||
global SETTINGS_CACHE1, SETTINGS_CACHE2
|
||||
if os.path.exists(STREAM_SETTINGS_FILE1):
|
||||
with open(STREAM_SETTINGS_FILE1, 'r', encoding='utf-8') as f:
|
||||
SETTINGS_CACHE1 = json.load(f)
|
||||
else:
|
||||
SETTINGS_CACHE1 = {}
|
||||
if os.path.exists(STREAM_SETTINGS_FILE2):
|
||||
with open(STREAM_SETTINGS_FILE2, 'r', encoding='utf-8') as f:
|
||||
SETTINGS_CACHE2 = json.load(f)
|
||||
else:
|
||||
SETTINGS_CACHE2 = {}
|
||||
|
||||
def load_stream_settings() -> dict:
|
||||
"""Return stream settings from in-memory cache.
|
||||
"""Return PRIMARY stream settings from in-memory cache.
|
||||
|
||||
The cache is hydrated once at startup and updated by save_stream_settings().
|
||||
No disk I/O occurs here.
|
||||
Hydrated once at startup and updated by save_stream_settings(). No disk I/O occurs here.
|
||||
"""
|
||||
global SETTINGS_CACHE
|
||||
return SETTINGS_CACHE
|
||||
global SETTINGS_CACHE1
|
||||
return SETTINGS_CACHE1
|
||||
|
||||
def load_stream_settings2() -> dict:
|
||||
"""Return SECONDARY stream settings from in-memory cache."""
|
||||
global SETTINGS_CACHE2
|
||||
return SETTINGS_CACHE2
|
||||
|
||||
def save_stream_settings(settings: dict):
|
||||
"""Update in-memory settings cache and persist to disk."""
|
||||
global SETTINGS_CACHE
|
||||
SETTINGS_CACHE = dict(settings)
|
||||
try:
|
||||
with open(STREAM_SETTINGS_FILE, 'w', encoding='utf-8') as f:
|
||||
json.dump(SETTINGS_CACHE, f, indent=2)
|
||||
except Exception as e:
|
||||
log.error('Unable to persist stream settings: %s', e)
|
||||
"""Update PRIMARY in-memory settings cache and persist to disk."""
|
||||
global SETTINGS_CACHE1
|
||||
SETTINGS_CACHE1 = dict(settings)
|
||||
os.makedirs(os.path.dirname(STREAM_SETTINGS_FILE1), exist_ok=True)
|
||||
with open(STREAM_SETTINGS_FILE1, 'w', encoding='utf-8') as f:
|
||||
json.dump(SETTINGS_CACHE1, f, indent=2)
|
||||
f.flush()
|
||||
os.fsync(f.fileno())
|
||||
log.info("Saved primary settings to %s", STREAM_SETTINGS_FILE1)
|
||||
|
||||
def save_stream_settings2(settings: dict):
|
||||
"""Update SECONDARY in-memory settings cache and persist to disk."""
|
||||
global SETTINGS_CACHE2
|
||||
SETTINGS_CACHE2 = dict(settings)
|
||||
os.makedirs(os.path.dirname(STREAM_SETTINGS_FILE2), exist_ok=True)
|
||||
with open(STREAM_SETTINGS_FILE2, 'w', encoding='utf-8') as f:
|
||||
json.dump(SETTINGS_CACHE2, f, indent=2)
|
||||
f.flush()
|
||||
os.fsync(f.fileno())
|
||||
log.info("Saved secondary settings to %s", STREAM_SETTINGS_FILE2)
|
||||
|
||||
def save_settings(persisted: dict, secondary: bool = False) -> None:
|
||||
"""Attach timestamp and persist using the appropriate cache/file."""
|
||||
persisted = dict(persisted)
|
||||
persisted['timestamp'] = datetime.utcnow().isoformat()
|
||||
if secondary:
|
||||
save_stream_settings2(persisted)
|
||||
else:
|
||||
save_stream_settings(persisted)
|
||||
|
||||
|
||||
app = FastAPI()
|
||||
@@ -104,121 +129,176 @@ app.add_middleware(
|
||||
# Initialize global configuration
|
||||
global_config_group = auracast_config.AuracastConfigGroup()
|
||||
|
||||
class StreamerWorker: # TODO: is wraping in this Worker stricly nececcarry ?
|
||||
"""Owns multicaster(s) on a dedicated asyncio loop in a background thread."""
|
||||
# Module-level state replacing StreamerWorker
|
||||
multicaster1: multicast_control.Multicaster | None = None
|
||||
multicaster2: multicast_control.Multicaster | None = None
|
||||
_stream_lock = asyncio.Lock() # serialize initialize/stop_audio on API side
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._thread: threading.Thread | None = None
|
||||
self._loop: asyncio.AbstractEventLoop | None = None
|
||||
# These live only on the worker loop
|
||||
self._multicaster1: multicast_control.Multicaster | None = None
|
||||
self._multicaster2: multicast_control.Multicaster | None = None
|
||||
self._started = threading.Event()
|
||||
|
||||
# ---------- Thread/loop management ----------
|
||||
def start(self) -> None:
|
||||
if self._thread and self._thread.is_alive():
|
||||
return
|
||||
self._thread = threading.Thread(target=self._run, name="StreamerWorker", daemon=True)
|
||||
self._thread.start()
|
||||
self._started.wait(timeout=5)
|
||||
|
||||
def _run(self) -> None:
|
||||
loop = asyncio.new_event_loop()
|
||||
self._loop = loop
|
||||
asyncio.set_event_loop(loop)
|
||||
self._started.set()
|
||||
async def _stop_all() -> bool:
|
||||
global multicaster1, multicaster2
|
||||
was_running = False
|
||||
if multicaster1 is not None:
|
||||
try:
|
||||
loop.run_forever()
|
||||
await multicaster1.stop_streaming()
|
||||
await multicaster1.shutdown()
|
||||
was_running = True
|
||||
finally:
|
||||
try:
|
||||
pending = asyncio.all_tasks(loop)
|
||||
for t in pending:
|
||||
t.cancel()
|
||||
loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True))
|
||||
except Exception:
|
||||
pass
|
||||
loop.close()
|
||||
|
||||
def _ensure_loop(self) -> asyncio.AbstractEventLoop:
|
||||
if not self._loop:
|
||||
raise RuntimeError("StreamerWorker loop not started")
|
||||
return self._loop
|
||||
|
||||
async def call(self, coro_func, *args, **kwargs):
|
||||
"""Schedule a coroutine on the worker loop and await its result from the API loop."""
|
||||
loop = self._ensure_loop()
|
||||
fut: Future = asyncio.run_coroutine_threadsafe(coro_func(*args, **kwargs), loop)
|
||||
return await asyncio.wrap_future(fut)
|
||||
|
||||
# ---------- Worker-loop coroutines ----------
|
||||
async def _w_init_primary(self, conf: auracast_config.AuracastConfigGroup) -> dict:
|
||||
# Clean any previous
|
||||
if self._multicaster1 is not None:
|
||||
try:
|
||||
await self._multicaster1.shutdown()
|
||||
except Exception:
|
||||
pass
|
||||
self._multicaster1 = None
|
||||
|
||||
# overwrite some configurations
|
||||
conf.transport = TRANSPORT1
|
||||
# Enable adaptive frame dropping only for device-based inputs (not file/demo)
|
||||
multicaster1 = None
|
||||
if multicaster2 is not None:
|
||||
try:
|
||||
await multicaster2.stop_streaming()
|
||||
await multicaster2.shutdown()
|
||||
was_running = True
|
||||
finally:
|
||||
multicaster2 = None
|
||||
return was_running
|
||||
|
||||
async def _status_primary() -> dict:
|
||||
if multicaster1 is None:
|
||||
return {'is_initialized': False, 'is_streaming': False}
|
||||
return multicaster1.get_status()
|
||||
|
||||
async def _stream_lc3(audio_data: dict[str, str], bigs_template: list) -> None:
|
||||
if multicaster1 is None:
|
||||
raise HTTPException(status_code=500, detail='Auracast endpoint was never intialized')
|
||||
for big in bigs_template:
|
||||
if big.language not in audio_data:
|
||||
raise HTTPException(status_code=500, detail='language len missmatch')
|
||||
big.audio_source = audio_data[big.language].encode('latin-1')
|
||||
multicaster1.big_conf = bigs_template
|
||||
await multicaster1.start_streaming()
|
||||
|
||||
@app.post("/init")
|
||||
async def initialize(conf: auracast_config.AuracastConfigGroup):
|
||||
"""Initializes the primary broadcaster on the streamer thread."""
|
||||
global global_config_group
|
||||
async with _stream_lock:
|
||||
try:
|
||||
global_config_group = conf
|
||||
log.info('Initializing multicaster1 with config:\n %s', conf.model_dump_json(indent=2))
|
||||
# Inline of _init_primary
|
||||
global multicaster1
|
||||
if multicaster1 is not None:
|
||||
await multicaster1.shutdown()
|
||||
multicaster1 = None
|
||||
|
||||
conf.transport = TRANSPORT1
|
||||
conf.enable_adaptive_frame_dropping = any(
|
||||
isinstance(big.audio_source, str) and big.audio_source.startswith('device:')
|
||||
for big in conf.bigs
|
||||
)
|
||||
except Exception:
|
||||
conf.enable_adaptive_frame_dropping = False
|
||||
# Derive device name and input mode
|
||||
first_source = conf.bigs[0].audio_source if conf.bigs else ''
|
||||
input_device_name = None
|
||||
audio_mode_persist = 'Demo'
|
||||
if first_source.startswith('device:'):
|
||||
input_device_name = first_source.split(':', 1)[1] if ':' in first_source else None
|
||||
try:
|
||||
|
||||
first_source = conf.bigs[0].audio_source if conf.bigs else ''
|
||||
input_device_name = None
|
||||
audio_mode_persist = 'Demo'
|
||||
if isinstance(first_source, str) and first_source.startswith('device:'):
|
||||
input_device_name = first_source.split(':', 1)[1] if ':' in first_source else None
|
||||
alsa_usb_names = {d.get('name') for _, d in get_alsa_usb_inputs()}
|
||||
except Exception:
|
||||
alsa_usb_names = set()
|
||||
try:
|
||||
net_names = {d.get('name') for _, d in get_network_pw_inputs()}
|
||||
except Exception:
|
||||
usb_names, net_names = set(), set()
|
||||
audio_mode_persist = 'Network' if (input_device_name in net_names) else 'USB'
|
||||
audio_mode_persist = 'Network' if (input_device_name in net_names) else 'USB'
|
||||
|
||||
# Map device name to index using centralized resolver
|
||||
if input_device_name and input_device_name.isdigit():
|
||||
device_index = int(input_device_name)
|
||||
else:
|
||||
device_index = resolve_input_device_index(input_device_name or '')
|
||||
if device_index is None:
|
||||
raise HTTPException(status_code=400, detail=f"Audio device '{input_device_name}' not found.")
|
||||
for big in conf.bigs:
|
||||
if big.audio_source.startswith('device:'):
|
||||
big.audio_source = f'device:{device_index}'
|
||||
devinfo = sd.query_devices(device_index)
|
||||
# Force capture at 48 kHz to avoid resampler latency and 44.1 kHz incompatibilities
|
||||
capture_rate = 48000
|
||||
max_in = int(devinfo.get('max_input_channels') or 1)
|
||||
channels = max(1, min(2, max_in))
|
||||
for big in conf.bigs:
|
||||
big.input_format = f"int16le,{capture_rate},{channels}"
|
||||
if input_device_name and input_device_name.isdigit():
|
||||
device_index = int(input_device_name)
|
||||
else:
|
||||
device_index = resolve_input_device_index(input_device_name or '')
|
||||
if device_index is None:
|
||||
raise HTTPException(status_code=400, detail=f"Audio device '{input_device_name}' not found.")
|
||||
for big in conf.bigs:
|
||||
if isinstance(big.audio_source, str) and big.audio_source.startswith('device:'):
|
||||
big.audio_source = f'device:{device_index}'
|
||||
devinfo = sd.query_devices(device_index)
|
||||
capture_rate = 48000
|
||||
max_in = int(devinfo.get('max_input_channels') or 1)
|
||||
channels = max(1, min(2, max_in))
|
||||
for big in conf.bigs:
|
||||
big.input_format = f"int16le,{capture_rate},{channels}"
|
||||
|
||||
# Coerce QoS: compute max_transport_latency from RTN if qos_config present
|
||||
if getattr(conf, 'qos_config', None) and getattr(conf.qos_config, 'number_of_retransmissions', None) is not None:
|
||||
conf.qos_config.max_transport_latency_ms = int(conf.qos_config.number_of_retransmissions) * 10 + 3
|
||||
|
||||
multicaster1 = multicast_control.Multicaster(conf, conf.bigs)
|
||||
await multicaster1.init_broadcast()
|
||||
auto_started = False
|
||||
if any(isinstance(big.audio_source, str) and (big.audio_source.startswith("device:") or big.audio_source.startswith("file:")) for big in conf.bigs):
|
||||
await multicaster1.start_streaming()
|
||||
auto_started = True
|
||||
|
||||
demo_count = sum(1 for big in conf.bigs if isinstance(big.audio_source, str) and big.audio_source.startswith('file:'))
|
||||
demo_rate = int(conf.auracast_sampling_rate_hz or 0)
|
||||
demo_type = None
|
||||
if demo_count > 0 and demo_rate > 0:
|
||||
if demo_rate in (48000, 24000, 16000):
|
||||
demo_type = f"{demo_count} × {demo_rate//1000}kHz"
|
||||
else:
|
||||
demo_type = f"{demo_count} × {demo_rate}Hz"
|
||||
persisted = {
|
||||
'channel_names': [big.name for big in conf.bigs],
|
||||
'languages': [big.language for big in conf.bigs],
|
||||
'audio_mode': audio_mode_persist,
|
||||
'input_device': input_device_name,
|
||||
'program_info': [getattr(big, 'program_info', None) for big in conf.bigs],
|
||||
'gain': [getattr(big, 'input_gain', 1.0) for big in conf.bigs],
|
||||
'auracast_sampling_rate_hz': conf.auracast_sampling_rate_hz,
|
||||
'octets_per_frame': conf.octets_per_frame,
|
||||
'presentation_delay_us': getattr(conf, 'presentation_delay_us', None),
|
||||
'rtn': getattr(getattr(conf, 'qos_config', None), 'number_of_retransmissions', None),
|
||||
'immediate_rendering': getattr(conf, 'immediate_rendering', False),
|
||||
'assisted_listening_stream': getattr(conf, 'assisted_listening_stream', False),
|
||||
'stream_password': (conf.bigs[0].code if conf.bigs and getattr(conf.bigs[0], 'code', None) else None),
|
||||
'demo_total_streams': demo_count,
|
||||
'demo_stream_type': demo_type,
|
||||
'is_streaming': auto_started,
|
||||
}
|
||||
# Persist returned settings (avoid touching from worker thread)
|
||||
save_settings(persisted, secondary=False)
|
||||
except Exception as e:
|
||||
log.error("Exception in /init: %s", traceback.format_exc())
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
|
||||
@app.post("/init2")
|
||||
async def initialize2(conf: auracast_config.AuracastConfigGroup):
|
||||
"""Initializes the secondary broadcaster on the streamer thread."""
|
||||
try:
|
||||
log.info('Initializing multicaster2 with config:\n %s', conf.model_dump_json(indent=2))
|
||||
# Inline of _init_secondary
|
||||
global multicaster2
|
||||
if multicaster2 is not None:
|
||||
await multicaster2.shutdown()
|
||||
multicaster2 = None
|
||||
|
||||
conf.transport = TRANSPORT2
|
||||
conf.enable_adaptive_frame_dropping = any(
|
||||
isinstance(big.audio_source, str) and big.audio_source.startswith('device:')
|
||||
for big in conf.bigs
|
||||
)
|
||||
for big in conf.bigs:
|
||||
if isinstance(big.audio_source, str) and big.audio_source.startswith('device:'):
|
||||
device_name = big.audio_source.split(':', 1)[1]
|
||||
net_names = {d.get('name') for _, d in get_network_pw_inputs()}
|
||||
alsa_usb_names = {d.get('name') for _, d in get_alsa_usb_inputs()}
|
||||
device_index = resolve_input_device_index(device_name)
|
||||
if device_index is None:
|
||||
raise HTTPException(status_code=400, detail=f"Audio device '{device_name}' not found.")
|
||||
big.audio_source = f'device:{device_index}'
|
||||
if getattr(conf, 'qos_config', None) and getattr(conf.qos_config, 'number_of_retransmissions', None) is not None:
|
||||
conf.qos_config.max_transport_latency_ms = int(conf.qos_config.number_of_retransmissions) * 10 + 3
|
||||
|
||||
# Create and init multicaster1
|
||||
self._multicaster1 = multicast_control.Multicaster(conf, conf.bigs)
|
||||
#await reset_nrf54l(1)
|
||||
await self._multicaster1.init_broadcast()
|
||||
auto_started = False
|
||||
if any(big.audio_source.startswith("device:") or big.audio_source.startswith("file:") for big in conf.bigs):
|
||||
await self._multicaster1.start_streaming()
|
||||
auto_started = True
|
||||
|
||||
# Return proposed settings to persist on API side
|
||||
multicaster2 = multicast_control.Multicaster(conf, conf.bigs)
|
||||
await multicaster2.init_broadcast()
|
||||
if any(isinstance(big.audio_source, str) and (big.audio_source.startswith("device:") or big.audio_source.startswith("file:")) for big in conf.bigs):
|
||||
await multicaster2.start_streaming()
|
||||
# Build and persist secondary settings analogous to primary
|
||||
first_source = conf.bigs[0].audio_source if conf.bigs else ''
|
||||
input_device_name = None
|
||||
audio_mode_persist = 'Demo'
|
||||
if isinstance(first_source, str) and first_source.startswith('device:'):
|
||||
input_device_name = first_source.split(':', 1)[1] if ':' in first_source else None
|
||||
try:
|
||||
net_names = {d.get('name') for _, d in get_network_pw_inputs()}
|
||||
except Exception:
|
||||
net_names = set()
|
||||
audio_mode_persist = 'Network' if (input_device_name in net_names) else 'USB'
|
||||
demo_count = sum(1 for big in conf.bigs if isinstance(big.audio_source, str) and big.audio_source.startswith('file:'))
|
||||
demo_rate = int(conf.auracast_sampling_rate_hz or 0)
|
||||
demo_type = None
|
||||
@@ -227,7 +307,7 @@ class StreamerWorker: # TODO: is wraping in this Worker stricly nececcarry ?
|
||||
demo_type = f"{demo_count} × {demo_rate//1000}kHz"
|
||||
else:
|
||||
demo_type = f"{demo_count} × {demo_rate}Hz"
|
||||
return {
|
||||
persisted2 = {
|
||||
'channel_names': [big.name for big in conf.bigs],
|
||||
'languages': [big.language for big in conf.bigs],
|
||||
'audio_mode': audio_mode_persist,
|
||||
@@ -243,148 +323,33 @@ class StreamerWorker: # TODO: is wraping in this Worker stricly nececcarry ?
|
||||
'stream_password': (conf.bigs[0].code if conf.bigs and getattr(conf.bigs[0], 'code', None) else None),
|
||||
'demo_total_streams': demo_count,
|
||||
'demo_stream_type': demo_type,
|
||||
'is_streaming': auto_started,
|
||||
'is_streaming': any(isinstance(big.audio_source, str) and (big.audio_source.startswith("device:") or big.audio_source.startswith("file:")) for big in conf.bigs),
|
||||
}
|
||||
|
||||
async def _w_init_secondary(self, conf: auracast_config.AuracastConfigGroup) -> None:
|
||||
if self._multicaster2 is not None:
|
||||
try:
|
||||
await self._multicaster2.shutdown()
|
||||
except Exception:
|
||||
pass
|
||||
self._multicaster2 = None
|
||||
|
||||
conf.transport = TRANSPORT2
|
||||
# Enable adaptive frame dropping only for device-based inputs (not file/demo)
|
||||
try:
|
||||
conf.enable_adaptive_frame_dropping = any(
|
||||
isinstance(big.audio_source, str) and big.audio_source.startswith('device:')
|
||||
for big in conf.bigs
|
||||
)
|
||||
except Exception:
|
||||
conf.enable_adaptive_frame_dropping = False
|
||||
for big in conf.bigs:
|
||||
if big.audio_source.startswith('device:'):
|
||||
device_name = big.audio_source.split(':', 1)[1]
|
||||
# Resolve backend preference by membership
|
||||
try:
|
||||
net_names = {d.get('name') for _, d in get_network_pw_inputs()}
|
||||
except Exception:
|
||||
net_names = set()
|
||||
try:
|
||||
alsa_usb_names = {d.get('name') for _, d in get_alsa_usb_inputs()}
|
||||
except Exception:
|
||||
alsa_usb_names = set()
|
||||
device_index = resolve_input_device_index(device_name)
|
||||
if device_index is None:
|
||||
raise HTTPException(status_code=400, detail=f"Audio device '{device_name}' not found.")
|
||||
big.audio_source = f'device:{device_index}'
|
||||
# Coerce QoS: compute max_transport_latency from RTN if qos_config present
|
||||
if getattr(conf, 'qos_config', None) and getattr(conf.qos_config, 'number_of_retransmissions', None) is not None:
|
||||
conf.qos_config.max_transport_latency_ms = int(conf.qos_config.number_of_retransmissions) * 10 + 3
|
||||
|
||||
|
||||
self._multicaster2 = multicast_control.Multicaster(conf, conf.bigs)
|
||||
#await reset_nrf54l(0)
|
||||
await self._multicaster2.init_broadcast()
|
||||
if any(big.audio_source.startswith("device:") or big.audio_source.startswith("file:") for big in conf.bigs):
|
||||
await self._multicaster2.start_streaming()
|
||||
|
||||
async def _w_stop_all(self) -> bool:
|
||||
was_running = False
|
||||
if self._multicaster1 is not None:
|
||||
try:
|
||||
await self._multicaster1.stop_streaming()
|
||||
await self._multicaster1.shutdown()
|
||||
was_running = True
|
||||
finally:
|
||||
self._multicaster1 = None
|
||||
if self._multicaster2 is not None:
|
||||
try:
|
||||
await self._multicaster2.stop_streaming()
|
||||
await self._multicaster2.shutdown()
|
||||
was_running = True
|
||||
finally:
|
||||
self._multicaster2 = None
|
||||
return was_running
|
||||
|
||||
async def _w_status_primary(self) -> dict:
|
||||
if self._multicaster1 is None:
|
||||
return {'is_initialized': False, 'is_streaming': False}
|
||||
try:
|
||||
return self._multicaster1.get_status()
|
||||
except Exception:
|
||||
return {'is_initialized': True, 'is_streaming': False}
|
||||
|
||||
async def _w_stream_lc3(self, audio_data: dict[str, str], bigs_template: list) -> None:
|
||||
if self._multicaster1 is None:
|
||||
raise HTTPException(status_code=500, detail='Auracast endpoint was never intialized')
|
||||
# Update bigs audio_source with provided bytes and start
|
||||
for big in bigs_template:
|
||||
if big.language not in audio_data:
|
||||
raise HTTPException(status_code=500, detail='language len missmatch')
|
||||
big.audio_source = audio_data[big.language].encode('latin-1')
|
||||
self._multicaster1.big_conf = bigs_template
|
||||
await self._multicaster1.start_streaming()
|
||||
|
||||
|
||||
# Create the worker singleton and a route-level lock
|
||||
streamer = StreamerWorker()
|
||||
# multicaster1: multicast_control.Multicaster | None = None # kept for legacy references, do not use on API loop
|
||||
# multicaster2: multicast_control.Multicaster | None = None
|
||||
_stream_lock = asyncio.Lock() # serialize initialize/stop_audio on API side
|
||||
@app.post("/init")
|
||||
async def initialize(conf: auracast_config.AuracastConfigGroup):
|
||||
"""Initializes the primary broadcaster on the streamer thread."""
|
||||
global global_config_group
|
||||
async with _stream_lock:
|
||||
try:
|
||||
global_config_group = conf
|
||||
log.info('Initializing multicaster1 with config:\n %s', conf.model_dump_json(indent=2))
|
||||
persisted = await streamer.call(streamer._w_init_primary, conf)
|
||||
# Persist returned settings (avoid touching from worker thread)
|
||||
persisted['timestamp'] = datetime.utcnow().isoformat()
|
||||
save_stream_settings(persisted)
|
||||
except Exception as e:
|
||||
log.error("Exception in /init: %s", traceback.format_exc())
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
|
||||
@app.post("/init2")
|
||||
async def initialize2(conf: auracast_config.AuracastConfigGroup):
|
||||
"""Initializes the secondary broadcaster on the streamer thread."""
|
||||
try:
|
||||
log.info('Initializing multicaster2 with config:\n %s', conf.model_dump_json(indent=2))
|
||||
await streamer.call(streamer._w_init_secondary, conf)
|
||||
try:
|
||||
is_demo = any(isinstance(big.audio_source, str) and big.audio_source.startswith('file:') for big in conf.bigs)
|
||||
if is_demo:
|
||||
settings = load_stream_settings() or {}
|
||||
primary_count = int(settings.get('demo_total_streams') or len(settings.get('channel_names') or []))
|
||||
secondary_count = len(conf.bigs or [])
|
||||
total = primary_count + secondary_count
|
||||
settings['demo_total_streams'] = total
|
||||
demo_rate = int(conf.auracast_sampling_rate_hz or 0)
|
||||
if demo_rate > 0:
|
||||
if demo_rate in (48000, 24000, 16000):
|
||||
settings['demo_stream_type'] = f"{total} × {demo_rate//1000}kHz"
|
||||
else:
|
||||
settings['demo_stream_type'] = f"{total} × {demo_rate}Hz"
|
||||
settings['timestamp'] = datetime.utcnow().isoformat()
|
||||
save_stream_settings(settings)
|
||||
except Exception:
|
||||
log.warning("Failed to persist demo_total_streams in /init2", exc_info=True)
|
||||
save_settings(persisted2, secondary=True)
|
||||
is_demo = any(isinstance(big.audio_source, str) and big.audio_source.startswith('file:') for big in conf.bigs)
|
||||
if is_demo:
|
||||
settings = load_stream_settings() or {}
|
||||
primary_count = int(settings.get('demo_total_streams') or len(settings.get('channel_names') or []))
|
||||
secondary_count = len(conf.bigs or [])
|
||||
total = primary_count + secondary_count
|
||||
settings['demo_total_streams'] = total
|
||||
demo_rate = int(conf.auracast_sampling_rate_hz or 0)
|
||||
if demo_rate > 0:
|
||||
if demo_rate in (48000, 24000, 16000):
|
||||
settings['demo_stream_type'] = f"{total} × {demo_rate//1000}kHz"
|
||||
else:
|
||||
settings['demo_stream_type'] = f"{total} × {demo_rate}Hz"
|
||||
settings['timestamp'] = datetime.utcnow().isoformat()
|
||||
save_stream_settings(settings)
|
||||
except Exception as e:
|
||||
log.error("Exception in /init2: %s", traceback.format_exc())
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
|
||||
|
||||
|
||||
|
||||
@app.post("/stop_audio")
|
||||
async def stop_audio():
|
||||
"""Stops streaming on both multicaster1 and multicaster2 (worker thread)."""
|
||||
try:
|
||||
was_running = await streamer.call(streamer._w_stop_all)
|
||||
was_running = await _stop_all()
|
||||
|
||||
# Persist is_streaming=False
|
||||
try:
|
||||
@@ -402,25 +367,22 @@ async def stop_audio():
|
||||
log.error("Exception in /stop_audio: %s", traceback.format_exc())
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
|
||||
|
||||
@app.post("/stream_lc3")
|
||||
async def send_audio(audio_data: dict[str, str]):
|
||||
"""Sends a block of pre-coded LC3 audio via the worker."""
|
||||
try:
|
||||
await streamer.call(streamer._w_stream_lc3, audio_data, list(global_config_group.bigs))
|
||||
await _stream_lc3(audio_data, list(global_config_group.bigs))
|
||||
return {"status": "audio_sent"}
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
|
||||
|
||||
@app.get("/status")
|
||||
async def get_status():
|
||||
"""Gets current status (worker) merged with persisted settings cache."""
|
||||
status = await streamer.call(streamer._w_status_primary)
|
||||
status = await _status_primary()
|
||||
status.update(load_stream_settings())
|
||||
return status
|
||||
|
||||
|
||||
async def _autostart_from_settings():
|
||||
"""Background task: auto-start last selected device-based input at server startup.
|
||||
|
||||
@@ -428,107 +390,101 @@ async def _autostart_from_settings():
|
||||
saved device name appears in either USB or Network lists, then builds a config
|
||||
and initializes streaming.
|
||||
"""
|
||||
try:
|
||||
settings = load_stream_settings() or {}
|
||||
audio_mode = settings.get('audio_mode')
|
||||
input_device_name = settings.get('input_device')
|
||||
rate = settings.get('auracast_sampling_rate_hz')
|
||||
octets = settings.get('octets_per_frame')
|
||||
pres_delay = settings.get('presentation_delay_us')
|
||||
saved_rtn = settings.get('rtn')
|
||||
immediate_rendering = settings.get('immediate_rendering', False)
|
||||
assisted_listening_stream = settings.get('assisted_listening_stream', False)
|
||||
channel_names = settings.get('channel_names') or ["Broadcast0"]
|
||||
program_info = settings.get('program_info') or channel_names
|
||||
languages = settings.get('languages') or ["deu"]
|
||||
stream_password = settings.get('stream_password')
|
||||
original_ts = settings.get('timestamp')
|
||||
previously_streaming = bool(settings.get('is_streaming'))
|
||||
settings = load_stream_settings() or {}
|
||||
audio_mode = settings.get('audio_mode')
|
||||
input_device_name = settings.get('input_device')
|
||||
rate = settings.get('auracast_sampling_rate_hz')
|
||||
octets = settings.get('octets_per_frame')
|
||||
pres_delay = settings.get('presentation_delay_us')
|
||||
saved_rtn = settings.get('rtn')
|
||||
immediate_rendering = settings.get('immediate_rendering', False)
|
||||
assisted_listening_stream = settings.get('assisted_listening_stream', False)
|
||||
channel_names = settings.get('channel_names') or ["Broadcast0"]
|
||||
program_info = settings.get('program_info') or channel_names
|
||||
languages = settings.get('languages') or ["deu"]
|
||||
stream_password = settings.get('stream_password')
|
||||
original_ts = settings.get('timestamp')
|
||||
previously_streaming = bool(settings.get('is_streaming'))
|
||||
|
||||
# Only auto-start if the previous state was streaming and it's a device-based input.
|
||||
if not previously_streaming:
|
||||
return
|
||||
if not input_device_name:
|
||||
return
|
||||
if rate is None or octets is None:
|
||||
# Not enough info to reconstruct stream reliably
|
||||
return
|
||||
# Only auto-start if the previous state was streaming and it's a device-based input.
|
||||
if not previously_streaming:
|
||||
return
|
||||
if not input_device_name:
|
||||
return
|
||||
if rate is None or octets is None:
|
||||
# Not enough info to reconstruct stream reliably
|
||||
return
|
||||
|
||||
# Avoid duplicate start if already streaming
|
||||
current = await streamer.call(streamer._w_status_primary)
|
||||
# Avoid duplicate start if already streaming
|
||||
current = await _status_primary()
|
||||
if current.get('is_streaming'):
|
||||
return
|
||||
|
||||
while True:
|
||||
# Do not interfere if user started a stream manually in the meantime
|
||||
current = await _status_primary()
|
||||
if current.get('is_streaming'):
|
||||
return
|
||||
|
||||
while True:
|
||||
# Do not interfere if user started a stream manually in the meantime
|
||||
current = await streamer.call(streamer._w_status_primary)
|
||||
if current.get('is_streaming'):
|
||||
# Abort if saved settings changed to a different target while we were polling
|
||||
current_settings = load_stream_settings() or {}
|
||||
if current_settings.get('timestamp') != original_ts:
|
||||
# Settings were updated (likely by user via /init)
|
||||
# If the target device or mode changed, stop autostart
|
||||
if (
|
||||
current_settings.get('input_device') != input_device_name or
|
||||
current_settings.get('audio_mode') != audio_mode
|
||||
):
|
||||
return
|
||||
# Abort if saved settings changed to a different target while we were polling
|
||||
current_settings = load_stream_settings() or {}
|
||||
if current_settings.get('timestamp') != original_ts:
|
||||
# Settings were updated (likely by user via /init)
|
||||
# If the target device or mode changed, stop autostart
|
||||
if (
|
||||
current_settings.get('input_device') != input_device_name or
|
||||
current_settings.get('audio_mode') != audio_mode
|
||||
):
|
||||
return
|
||||
# Check against the cached device lists
|
||||
usb = [d for _, d in get_alsa_usb_inputs()]
|
||||
net = [d for _, d in get_network_pw_inputs()]
|
||||
names = {d.get('name') for d in usb} | {d.get('name') for d in net}
|
||||
if input_device_name in names:
|
||||
# Build a minimal config based on saved fields
|
||||
bigs = [
|
||||
auracast_config.AuracastBigConfig(
|
||||
code=stream_password,
|
||||
name=channel_names[0] if channel_names else "Broadcast0",
|
||||
program_info=program_info[0] if isinstance(program_info, list) and program_info else program_info,
|
||||
language=languages[0] if languages else "deu",
|
||||
audio_source=f"device:{input_device_name}",
|
||||
# input_format is intentionally omitted to use the default
|
||||
iso_que_len=1,
|
||||
sampling_frequency=rate,
|
||||
octets_per_frame=octets,
|
||||
)
|
||||
]
|
||||
conf = auracast_config.AuracastConfigGroup(
|
||||
auracast_sampling_rate_hz=rate,
|
||||
# Check against the cached device lists
|
||||
usb = [d for _, d in get_alsa_usb_inputs()]
|
||||
net = [d for _, d in get_network_pw_inputs()]
|
||||
names = {d.get('name') for d in usb} | {d.get('name') for d in net}
|
||||
if input_device_name in names:
|
||||
# Build a minimal config based on saved fields
|
||||
bigs = [
|
||||
auracast_config.AuracastBigConfig(
|
||||
code=stream_password,
|
||||
name=channel_names[0] if channel_names else "Broadcast0",
|
||||
program_info=program_info[0] if isinstance(program_info, list) and program_info else program_info,
|
||||
language=languages[0] if languages else "deu",
|
||||
audio_source=f"device:{input_device_name}",
|
||||
# input_format is intentionally omitted to use the default
|
||||
iso_que_len=1,
|
||||
sampling_frequency=rate,
|
||||
octets_per_frame=octets,
|
||||
transport=TRANSPORT1,
|
||||
immediate_rendering=immediate_rendering,
|
||||
assisted_listening_stream=assisted_listening_stream,
|
||||
presentation_delay_us=pres_delay if pres_delay is not None else 40000,
|
||||
bigs=bigs,
|
||||
)
|
||||
# Attach QoS if saved_rtn present
|
||||
conf.qos_config = auracast_config.AuracastQoSConfig(
|
||||
iso_int_multiple_10ms=1,
|
||||
number_of_retransmissions=int(saved_rtn),
|
||||
max_transport_latency_ms=int(saved_rtn) * 10 + 3,
|
||||
)
|
||||
]
|
||||
conf = auracast_config.AuracastConfigGroup(
|
||||
auracast_sampling_rate_hz=rate,
|
||||
octets_per_frame=octets,
|
||||
transport=TRANSPORT1,
|
||||
immediate_rendering=immediate_rendering,
|
||||
assisted_listening_stream=assisted_listening_stream,
|
||||
presentation_delay_us=pres_delay if pres_delay is not None else 40000,
|
||||
bigs=bigs,
|
||||
)
|
||||
# Attach QoS if saved_rtn present
|
||||
conf.qos_config = auracast_config.AuracastQoSConfig(
|
||||
iso_int_multiple_10ms=1,
|
||||
number_of_retransmissions=int(saved_rtn),
|
||||
max_transport_latency_ms=int(saved_rtn) * 10 + 3,
|
||||
)
|
||||
|
||||
# Initialize and start
|
||||
await asyncio.sleep(2)
|
||||
await initialize(conf)
|
||||
return
|
||||
# Initialize and start
|
||||
await asyncio.sleep(2)
|
||||
except Exception:
|
||||
log.warning("Autostart task failed", exc_info=True)
|
||||
await initialize(conf)
|
||||
return
|
||||
await asyncio.sleep(2)
|
||||
|
||||
@app.on_event("startup")
|
||||
async def _startup_autostart_event():
|
||||
# Spawn the autostart task without blocking startup
|
||||
log.info("Refreshing PipeWire device cache.")
|
||||
# Hydrate settings cache once to avoid disk I/O during /status
|
||||
_hydrate_settings_cache_from_disk()
|
||||
_init_settings_cache_from_disk()
|
||||
refresh_pw_cache()
|
||||
# Start the streamer worker thread
|
||||
streamer.start()
|
||||
asyncio.create_task(_autostart_from_settings())
|
||||
|
||||
|
||||
@app.get("/audio_inputs_pw_usb")
|
||||
async def audio_inputs_pw_usb():
|
||||
"""List USB input devices using ALSA backend (USB is ALSA in our scheme)."""
|
||||
@@ -542,7 +498,6 @@ async def audio_inputs_pw_usb():
|
||||
log.error("Exception in /audio_inputs_pw_usb: %s", traceback.format_exc())
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
|
||||
|
||||
@app.get("/audio_inputs_pw_network")
|
||||
async def audio_inputs_pw_network():
|
||||
"""List PipeWire Network/AES67 input nodes from cache."""
|
||||
@@ -556,40 +511,33 @@ async def audio_inputs_pw_network():
|
||||
log.error("Exception in /audio_inputs_pw_network: %s", traceback.format_exc())
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
|
||||
|
||||
@app.post("/refresh_audio_devices")
|
||||
async def refresh_audio_devices():
|
||||
"""Triggers a re-scan of audio devices, but only if no stream is active."""
|
||||
streaming = False
|
||||
try:
|
||||
status = await streamer.call(streamer._w_status_primary)
|
||||
status = await _status_primary()
|
||||
streaming = bool(status.get('is_streaming'))
|
||||
except Exception:
|
||||
pass # Ignore errors, default to not refreshing
|
||||
except Exception as e:
|
||||
log.error("Exception in /refresh_audio_devices: %s", traceback.format_exc())
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
|
||||
if streaming:
|
||||
log.warning("Ignoring refresh request: an audio stream is active.")
|
||||
raise HTTPException(status_code=409, detail="An audio stream is active. Stop the stream before refreshing devices.")
|
||||
|
||||
try:
|
||||
log.info("Refreshing PipeWire device cache.")
|
||||
refresh_pw_cache()
|
||||
return {"status": "ok"}
|
||||
except Exception as e:
|
||||
log.error("Exception during device refresh: %s", traceback.format_exc())
|
||||
raise HTTPException(status_code=500, detail=f"Failed to refresh devices: {e}")
|
||||
|
||||
log.info("Refreshing PipeWire device cache.")
|
||||
refresh_pw_cache()
|
||||
return {"status": "ok"}
|
||||
|
||||
@app.post("/shutdown")
|
||||
async def shutdown():
|
||||
"""Stops broadcasting and releases all audio/Bluetooth resources."""
|
||||
try:
|
||||
await streamer.call(streamer._w_stop_all)
|
||||
await _stop_all()
|
||||
return {"status": "stopped"}
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
|
||||
|
||||
@app.post("/system_reboot")
|
||||
async def system_reboot():
|
||||
"""Stop audio and request a system reboot via sudo.
|
||||
@@ -599,13 +547,9 @@ async def system_reboot():
|
||||
try:
|
||||
# Best-effort: stop any active streaming cleanly WITHOUT persisting state
|
||||
try:
|
||||
try:
|
||||
await streamer.call(streamer._w_stop_all)
|
||||
except Exception:
|
||||
pass
|
||||
await _stop_all()
|
||||
except Exception:
|
||||
log.warning("Non-fatal: failed to stop streams before reboot", exc_info=True)
|
||||
|
||||
pass
|
||||
# Launch reboot without waiting for completion
|
||||
try:
|
||||
await asyncio.create_subprocess_exec("sudo", "reboot")
|
||||
|
||||
@@ -9,6 +9,9 @@ ExecStart=/home/caster/.local/bin/poetry run python src/auracast/server/multicas
|
||||
Restart=on-failure
|
||||
Environment=PYTHONUNBUFFERED=1
|
||||
Environment=LOG_LEVEL=INFO
|
||||
CPUSchedulingPolicy=fifo
|
||||
CPUSchedulingPriority=99
|
||||
LimitRTPRIO=99
|
||||
|
||||
[Install]
|
||||
WantedBy=default.target
|
||||
|
||||
Reference in New Issue
Block a user