Files
bumble-auracast/src/auracast/server/multicast_server.py
T
pstruebi 6a37695f0b feat: load i2c-dev kernel module on startup
Ensure i2c-dev kernel module is loaded before initializing I2C communication to guarantee /dev/i2c-* device access. Add error handling and logging for module loading process.
2025-12-18 16:17:07 +01:00

913 lines
40 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
""" the main server where our multicaster objects live.
TODO: in the future the multicaster objects should run in their own threads or even make a second server since everything thats blocking the main event loop leads to inceased latency.
"""
import os
import logging as log
import json
from datetime import datetime
import asyncio
import random
from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException
from fastapi.responses import FileResponse
from fastapi.middleware.cors import CORSMiddleware
from auracast import multicast_control, auracast_config
import sounddevice as sd # type: ignore
import traceback
from auracast.utils.sounddevice_utils import (
get_network_pw_inputs,
get_alsa_usb_inputs,
resolve_input_device_index,
refresh_pw_cache,
)
load_dotenv()
# make sure pipewire sets latency
# Primary and secondary persisted settings files
STREAM_SETTINGS_FILE1 = os.path.join(os.path.dirname(__file__), 'stream_settings.json')
STREAM_SETTINGS_FILE2 = os.path.join(os.path.dirname(__file__), 'stream_settings2.json')
CA_CERT_PATH = os.path.join(os.path.dirname(__file__), 'certs', 'ca', 'ca_cert.pem')
# Raspberry Pi UART transports
TRANSPORT1 = os.getenv('TRANSPORT1', 'serial:/dev/ttyAMA3,1000000,rtscts') # transport for raspberry pi gpio header
TRANSPORT2 = os.getenv('TRANSPORT2', 'serial:/dev/ttyAMA4,1000000,rtscts') # transport for raspberry pi gpio header
os.environ["PULSE_LATENCY_MSEC"] = "3"
# Defaults from the AuracastBigConfig model, used to detect whether random_address/id
# were explicitly set or are still at their model default values.
_DEFAULT_BIG = auracast_config.AuracastBigConfig()
DEFAULT_BIG_ID = _DEFAULT_BIG.id
DEFAULT_RANDOM_ADDRESS = _DEFAULT_BIG.random_address
# In-memory caches to avoid disk I/O on hot paths like /status
SETTINGS_CACHE1: dict = {}
SETTINGS_CACHE2: dict = {}
def get_device_index_by_name(name: str):
"""Return the device index for a given device name, or None if not found.
Queries the current sounddevice list directly (no cache).
"""
try:
devs = sd.query_devices()
for idx, d in enumerate(devs):
if d.get("name") == name and d.get("max_input_channels", 0) > 0:
return idx
except Exception:
pass
return None
def _init_settings_cache_from_disk() -> None:
"""Populate SETTINGS_CACHE1 and SETTINGS_CACHE2 once from disk at startup.
If a file doesn't exist, initialize to an empty dict. Any JSON or I/O errors raise.
"""
global SETTINGS_CACHE1, SETTINGS_CACHE2
if os.path.exists(STREAM_SETTINGS_FILE1):
with open(STREAM_SETTINGS_FILE1, 'r', encoding='utf-8') as f:
SETTINGS_CACHE1 = json.load(f)
else:
SETTINGS_CACHE1 = {}
if os.path.exists(STREAM_SETTINGS_FILE2):
with open(STREAM_SETTINGS_FILE2, 'r', encoding='utf-8') as f:
SETTINGS_CACHE2 = json.load(f)
else:
SETTINGS_CACHE2 = {}
def load_stream_settings() -> dict:
"""Return PRIMARY stream settings from in-memory cache.
Hydrated once at startup and updated by save_stream_settings(). No disk I/O occurs here.
"""
global SETTINGS_CACHE1
return SETTINGS_CACHE1
def load_stream_settings2() -> dict:
"""Return SECONDARY stream settings from in-memory cache."""
global SETTINGS_CACHE2
return SETTINGS_CACHE2
def save_stream_settings(settings: dict):
"""Update PRIMARY in-memory settings cache and persist to disk."""
global SETTINGS_CACHE1
SETTINGS_CACHE1 = dict(settings)
os.makedirs(os.path.dirname(STREAM_SETTINGS_FILE1), exist_ok=True)
with open(STREAM_SETTINGS_FILE1, 'w', encoding='utf-8') as f:
json.dump(SETTINGS_CACHE1, f, indent=2)
f.flush()
os.fsync(f.fileno())
log.info("Saved primary settings to %s", STREAM_SETTINGS_FILE1)
def save_stream_settings2(settings: dict):
"""Update SECONDARY in-memory settings cache and persist to disk."""
global SETTINGS_CACHE2
SETTINGS_CACHE2 = dict(settings)
os.makedirs(os.path.dirname(STREAM_SETTINGS_FILE2), exist_ok=True)
with open(STREAM_SETTINGS_FILE2, 'w', encoding='utf-8') as f:
json.dump(SETTINGS_CACHE2, f, indent=2)
f.flush()
os.fsync(f.fileno())
log.info("Saved secondary settings to %s", STREAM_SETTINGS_FILE2)
def save_settings(persisted: dict, secondary: bool = False) -> None:
"""Attach timestamp and persist using the appropriate cache/file."""
persisted = dict(persisted)
persisted['timestamp'] = datetime.utcnow().isoformat()
if secondary:
save_stream_settings2(persisted)
else:
save_stream_settings(persisted)
def gen_random_add() -> str:
return ':'.join(['%02X' % random.randint(0, 255) for _ in range(6)])
app = FastAPI()
# Allow CORS for frontend on localhost
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # You can restrict this to ["http://localhost:8501"] if you want
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Initialize global configuration
global_config_group = auracast_config.AuracastConfigGroup()
# Module-level state replacing StreamerWorker
multicaster1: multicast_control.Multicaster | None = None
multicaster2: multicast_control.Multicaster | None = None
_stream_lock = asyncio.Lock() # serialize initialize/stop_audio on API side
async def _init_i2c_on_startup() -> None:
# Ensure i2c-dev kernel module is loaded (required for /dev/i2c-* access)
try:
proc = await asyncio.create_subprocess_exec(
"sudo", "modprobe", "i2c-dev",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate()
if proc.returncode != 0:
log.warning(
"modprobe i2c-dev failed (rc=%s): %s",
proc.returncode,
(stderr or b"").decode(errors="ignore").strip(),
)
else:
log.info("i2c-dev module loaded successfully")
except Exception as e:
log.warning("Exception running modprobe i2c-dev: %s", e, exc_info=True)
# Table of (register, expected_value)
dev_add = "0x4a"
reg_table = [
("0x00", "0x00"),
("0x06", "0x10"),
("0x07", "0x10"),
]
for reg, expected in reg_table:
write_cmd = ["i2cset", "-f", "-y", "1", dev_add, reg, expected]
try:
proc = await asyncio.create_subprocess_exec(
*write_cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate()
if proc.returncode != 0:
log.warning(
"i2cset failed (%s): rc=%s stderr=%s",
" ".join(write_cmd),
proc.returncode,
(stderr or b"").decode(errors="ignore").strip(),
)
# If the write failed, skip verification for this register
continue
except Exception as e:
log.warning("Exception running i2cset (%s): %s", " ".join(write_cmd), e, exc_info=True)
continue
# Verify configured register with i2cget
read_cmd = ["i2cget", "-f", "-y", "1", dev_add, reg]
try:
proc = await asyncio.create_subprocess_exec(
*read_cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate()
if proc.returncode != 0:
log.warning(
"i2cget failed (%s): rc=%s stderr=%s",
" ".join(read_cmd),
proc.returncode,
(stderr or b"").decode(errors="ignore").strip(),
)
continue
value = (stdout or b"").decode(errors="ignore").strip()
if value != expected:
log.error(
"I2C register verify failed: addr=0x4a reg=%s expected=%s got=%s",
reg,
expected,
value,
)
else:
log.info(
"I2C register verified: addr=0x4a reg=%s value=%s",
reg,
value,
)
except Exception as e:
log.warning("Exception running i2cget (%s): %s", " ".join(read_cmd), e, exc_info=True)
async def _set_adc_level_on_startup() -> None:
"""Ensure ADC mixer level is set at startup.
Runs: amixer -c 2 set 'ADC' x%
"""
cmd = ["amixer", "-c", "2", "set", "ADC", "60%"]
try:
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate()
if proc.returncode != 0:
log.warning(
"amixer ADC level command failed (rc=%s): %s",
proc.returncode,
(stderr or b"" ).decode(errors="ignore").strip(),
)
else:
log.info("amixer ADC level set successfully: %s", (stdout or b"" ).decode(errors="ignore").strip())
except Exception as e:
log.warning("Exception running amixer ADC level command: %s", e, exc_info=True)
async def _stop_all() -> bool:
global multicaster1, multicaster2
was_running = False
if multicaster1 is not None:
try:
await multicaster1.stop_streaming()
await multicaster1.shutdown()
was_running = True
finally:
multicaster1 = None
if multicaster2 is not None:
try:
await multicaster2.stop_streaming()
await multicaster2.shutdown()
was_running = True
finally:
multicaster2 = None
return was_running
async def _status_primary() -> dict:
if multicaster1 is None:
return {'is_initialized': False, 'is_streaming': False}
return multicaster1.get_status()
async def _status_secondary() -> dict:
"""Return runtime status for the SECONDARY multicaster.
Mirrors _status_primary but for multicaster2 so that /status can expose
both primary and secondary state to the frontend.
"""
if multicaster2 is None:
return {'is_initialized': False, 'is_streaming': False}
return multicaster2.get_status()
async def _stream_lc3(audio_data: dict[str, str], bigs_template: list) -> None:
if multicaster1 is None:
raise HTTPException(status_code=500, detail='Auracast endpoint was never intialized')
for big in bigs_template:
if big.language not in audio_data:
raise HTTPException(status_code=500, detail='language len missmatch')
big.audio_source = audio_data[big.language].encode('latin-1')
multicaster1.big_conf = bigs_template
await multicaster1.start_streaming()
async def init_radio(transport: str, conf: auracast_config.AuracastConfigGroup, current_mc: multicast_control.Multicaster | None):
try:
log.info('Initializing multicaster with transport %s and config:\n %s', transport, conf.model_dump_json(indent=2))
if current_mc is not None:
await current_mc.shutdown()
current_mc = None
conf.transport = transport
first_source = conf.bigs[0].audio_source if conf.bigs else ''
input_device_name = None
audio_mode_persist = 'Demo'
if isinstance(first_source, str) and first_source.startswith('device:'):
input_device_name = first_source.split(':', 1)[1] if ':' in first_source else None
alsa_usb_names = {d.get('name') for _, d in get_alsa_usb_inputs()}
net_names = {d.get('name') for _, d in get_network_pw_inputs()}
if input_device_name in ('ch1', 'ch2'):
# Explicitly treat ch1/ch2 as Analog input mode
audio_mode_persist = 'Analog'
else:
audio_mode_persist = 'Network' if (input_device_name in net_names) else 'USB'
if input_device_name and input_device_name.isdigit():
device_index = int(input_device_name)
else:
device_index = resolve_input_device_index(input_device_name or '')
if device_index is None:
raise HTTPException(status_code=400, detail=f"Audio device '{input_device_name}' not found.")
for big in conf.bigs:
if isinstance(big.audio_source, str) and big.audio_source.startswith('device:'):
big.audio_source = f'device:{device_index}'
devinfo = sd.query_devices(device_index)
max_in = int(devinfo.get('max_input_channels') or 1)
channels = max(1, min(2, max_in))
for big in conf.bigs:
big.input_format = f"int16le,{48000},{channels}"
conf.qos_config.max_transport_latency_ms = int(conf.qos_config.number_of_retransmissions) * 10 + 3
# Only generate a new random_address if the BIG is still at the model default.
for big in conf.bigs:
if not getattr(big, 'random_address', None) or big.random_address == DEFAULT_RANDOM_ADDRESS:
big.random_address = gen_random_add()
# Log the final, fully-updated configuration just before creating the Multicaster
log.info('Final multicaster config (transport=%s):\n %s', transport, conf.model_dump_json(indent=2))
mc = multicast_control.Multicaster(conf, conf.bigs)
await mc.init_broadcast()
auto_started = False
if any(isinstance(big.audio_source, str) and (big.audio_source.startswith("device:") or big.audio_source.startswith("file:")) for big in conf.bigs):
await mc.start_streaming()
auto_started = True
demo_count = sum(1 for big in conf.bigs if isinstance(big.audio_source, str) and big.audio_source.startswith('file:'))
demo_rate = int(conf.auracast_sampling_rate_hz or 0)
demo_type = None
if demo_count > 0 and demo_rate > 0:
if demo_rate in (48000, 24000, 16000):
demo_type = f"{demo_count} × {demo_rate//1000}kHz"
else:
demo_type = f"{demo_count} × {demo_rate}Hz"
persisted = {
'channel_names': [big.name for big in conf.bigs],
'languages': [big.language for big in conf.bigs],
'audio_mode': audio_mode_persist,
'input_device': input_device_name,
'program_info': [getattr(big, 'program_info', None) for big in conf.bigs],
'gain': [getattr(big, 'input_gain', 1.0) for big in conf.bigs],
'auracast_sampling_rate_hz': conf.auracast_sampling_rate_hz,
'octets_per_frame': conf.octets_per_frame,
'presentation_delay_us': getattr(conf, 'presentation_delay_us', None),
'rtn': getattr(getattr(conf, 'qos_config', None), 'number_of_retransmissions', None),
'immediate_rendering': getattr(conf, 'immediate_rendering', False),
'assisted_listening_stream': getattr(conf, 'assisted_listening_stream', False),
'stream_password': (conf.bigs[0].code if conf.bigs and getattr(conf.bigs[0], 'code', None) else None),
'big_ids': [getattr(big, 'id', DEFAULT_BIG_ID) for big in conf.bigs],
'big_random_addresses': [getattr(big, 'random_address', DEFAULT_RANDOM_ADDRESS) for big in conf.bigs],
'demo_total_streams': demo_count,
'demo_stream_type': demo_type,
'is_streaming': auto_started,
'demo_sources': [str(b.audio_source) for b in conf.bigs if isinstance(b.audio_source, str) and b.audio_source.startswith('file:')],
}
return mc, persisted
except HTTPException:
raise
except Exception as e:
log.error("Exception in init_radio: %s", traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e))
@app.post("/init")
async def initialize(conf: auracast_config.AuracastConfigGroup):
"""Initializes the primary broadcaster on the streamer thread."""
async with _stream_lock:
global multicaster1, global_config_group
mc, persisted = await init_radio(TRANSPORT1, conf, multicaster1)
multicaster1 = mc
global_config_group = conf
save_settings(persisted, secondary=False)
@app.post("/init2")
async def initialize2(conf: auracast_config.AuracastConfigGroup):
"""Initializes the secondary broadcaster on the streamer thread."""
async with _stream_lock:
global multicaster2
mc, persisted = await init_radio(TRANSPORT2, conf, multicaster2)
multicaster2 = mc
save_settings(persisted, secondary=True)
@app.post("/stop_audio")
async def stop_audio():
"""Stops streaming on both multicaster1 and multicaster2 (worker thread)."""
try:
was_running = await _stop_all()
# Persist is_streaming=False for both primary and secondary
try:
settings1 = load_stream_settings() or {}
if settings1.get('is_streaming'):
settings1['is_streaming'] = False
settings1['timestamp'] = datetime.utcnow().isoformat()
save_stream_settings(settings1)
settings2 = load_stream_settings2() or {}
if settings2.get('is_streaming'):
settings2['is_streaming'] = False
settings2['timestamp'] = datetime.utcnow().isoformat()
save_stream_settings2(settings2)
except Exception:
log.warning("Failed to persist is_streaming=False during stop_audio", exc_info=True)
await asyncio.sleep(0.2)
return {"status": "stopped", "was_running": was_running}
except Exception as e:
log.error("Exception in /stop_audio: %s", traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e))
@app.post("/stream_lc3")
async def send_audio(audio_data: dict[str, str]):
"""Sends a block of pre-coded LC3 audio via the worker."""
try:
await _stream_lc3(audio_data, list(global_config_group.bigs))
return {"status": "audio_sent"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/cert")
async def download_ca_cert():
"""Download the CA certificate for TLS verification."""
if not os.path.exists(CA_CERT_PATH):
raise HTTPException(status_code=404, detail="CA certificate not found")
return FileResponse(CA_CERT_PATH, filename="ca_cert.pem", media_type="application/x-pem-file")
@app.get("/status")
async def get_status():
"""Gets current status (worker) merged with persisted settings cache."""
primary_runtime = await _status_primary()
primary_persisted = load_stream_settings() or {}
# Preserve existing top-level shape for primary for compatibility
status: dict = {}
status.update(primary_runtime)
status.update(primary_persisted)
# Attach secondary block with its own runtime + persisted settings
secondary_runtime = await _status_secondary()
secondary_persisted = load_stream_settings2() or {}
secondary: dict = {}
secondary.update(secondary_runtime)
secondary.update(secondary_persisted)
status["secondary"] = secondary
status["secondary_is_streaming"] = bool(secondary.get("is_streaming", False))
return status
async def _autostart_from_settings():
settings1 = load_stream_settings() or {}
settings2 = load_stream_settings2() or {}
log.info("[AUTOSTART] Starting autostart check: primary_ts=%s secondary_ts=%s", settings1.get('timestamp'), settings2.get('timestamp'))
async def do_primary():
global multicaster1, global_config_group
settings = settings1
audio_mode = settings.get('audio_mode')
input_device_name = settings.get('input_device')
rate = settings.get('auracast_sampling_rate_hz')
octets = settings.get('octets_per_frame')
pres_delay = settings.get('presentation_delay_us')
saved_rtn = settings.get('rtn')
immediate_rendering = settings.get('immediate_rendering', False)
assisted_listening_stream = settings.get('assisted_listening_stream', False)
channel_names = settings.get('channel_names') or ["Broadcast0"]
program_info = settings.get('program_info') or channel_names
languages = settings.get('languages') or ["deu"]
big_ids = settings.get('big_ids') or []
big_addrs = settings.get('big_random_addresses') or []
stream_password = settings.get('stream_password')
original_ts = settings.get('timestamp')
previously_streaming = bool(settings.get('is_streaming'))
log.info(
"[AUTOSTART][PRIMARY] loaded settings: previously_streaming=%s audio_mode=%s rate=%s octets=%s pres_delay=%s rtn=%s immediate_rendering=%s assisted_listening_stream=%s demo_sources=%s",
previously_streaming,
audio_mode,
rate,
octets,
pres_delay,
saved_rtn,
immediate_rendering,
assisted_listening_stream,
(settings.get('demo_sources') or []),
)
if not previously_streaming:
log.info("[AUTOSTART][PRIMARY] Skipping autostart: is_streaming flag was False in persisted settings")
return
if audio_mode == 'Demo':
demo_sources = settings.get('demo_sources') or []
if not demo_sources or rate is None or octets is None:
log.warning(
"[AUTOSTART][PRIMARY] Demo autostart aborted: demo_sources_present=%s rate=%s octets=%s",
bool(demo_sources),
rate,
octets,
)
return
bigs = []
for i, src in enumerate(demo_sources):
name = channel_names[i] if i < len(channel_names) else f"Broadcast{i}"
pinfo = program_info[i] if isinstance(program_info, list) and i < len(program_info) else (program_info[0] if isinstance(program_info, list) and program_info else program_info)
lang = languages[i] if i < len(languages) else (languages[0] if languages else "deu")
bigs.append(
auracast_config.AuracastBigConfig(
id=big_ids[i] if i < len(big_ids) else DEFAULT_BIG_ID,
random_address=big_addrs[i] if i < len(big_addrs) else DEFAULT_RANDOM_ADDRESS,
code=stream_password,
name=name,
program_info=pinfo,
language=lang,
audio_source=src,
iso_que_len=1,
sampling_frequency=rate,
octets_per_frame=octets,
)
)
log.info(
"[AUTOSTART][PRIMARY] Building demo config for %d streams, rate=%s, octets=%s",
len(bigs),
rate,
octets,
)
conf = auracast_config.AuracastConfigGroup(
auracast_sampling_rate_hz=rate,
octets_per_frame=octets,
transport=TRANSPORT1,
immediate_rendering=immediate_rendering,
assisted_listening_stream=assisted_listening_stream,
presentation_delay_us=pres_delay if pres_delay is not None else 40000,
bigs=bigs,
)
conf.qos_config = auracast_config.AuracastQoSConfig(
iso_int_multiple_10ms=1,
number_of_retransmissions=int(saved_rtn) if saved_rtn is not None else 1,
max_transport_latency_ms=(int(saved_rtn) * 10 + 3) if saved_rtn is not None else 13,
)
log.info("[AUTOSTART][PRIMARY] Scheduling demo init_radio in 2s")
await asyncio.sleep(2)
async with _stream_lock:
log.info("[AUTOSTART][PRIMARY] Calling init_radio for demo autostart")
mc, persisted = await init_radio(TRANSPORT1, conf, multicaster1)
multicaster1 = mc
global_config_group = conf
save_settings(persisted, secondary=False)
log.info("[AUTOSTART][PRIMARY] Demo autostart completed; settings persisted with is_streaming=%s", persisted.get('is_streaming'))
return
if not input_device_name or rate is None or octets is None:
log.info(
"[AUTOSTART][PRIMARY] Skipping device-based autostart: input_device=%s rate=%s octets=%s",
input_device_name,
rate,
octets,
)
return
current = await _status_primary()
if current.get('is_streaming'):
log.info("[AUTOSTART][PRIMARY] Skipping device-based autostart: stream already running")
return
while True:
current = await _status_primary()
if current.get('is_streaming'):
log.info("[AUTOSTART][PRIMARY] Aborting wait loop: stream started externally")
return
current_settings = load_stream_settings() or {}
if current_settings.get('timestamp') != original_ts:
if (
current_settings.get('input_device') != input_device_name or
current_settings.get('audio_mode') != audio_mode
):
log.info("[AUTOSTART][PRIMARY] Aborting wait loop: settings changed (audio_mode/input_device)")
return
usb = [d for _, d in get_alsa_usb_inputs()]
net = [d for _, d in get_network_pw_inputs()]
names = {d.get('name') for d in usb} | {d.get('name') for d in net}
if input_device_name in names:
log.info("[AUTOSTART][PRIMARY] Device '%s' detected, starting autostart", input_device_name)
bigs = [
auracast_config.AuracastBigConfig(
id=big_ids[0] if big_ids else DEFAULT_BIG_ID,
random_address=big_addrs[0] if big_addrs else DEFAULT_RANDOM_ADDRESS,
code=stream_password,
name=channel_names[0] if channel_names else "Broadcast0",
program_info=program_info[0] if isinstance(program_info, list) and program_info else program_info,
language=languages[0] if languages else "deu",
audio_source=f"device:{input_device_name}",
iso_que_len=1,
sampling_frequency=rate,
octets_per_frame=octets,
)
]
conf = auracast_config.AuracastConfigGroup(
auracast_sampling_rate_hz=rate,
octets_per_frame=octets,
transport=TRANSPORT1,
immediate_rendering=immediate_rendering,
assisted_listening_stream=assisted_listening_stream,
presentation_delay_us=pres_delay if pres_delay is not None else 40000,
bigs=bigs,
)
conf.qos_config = auracast_config.AuracastQoSConfig(
iso_int_multiple_10ms=1,
number_of_retransmissions=int(saved_rtn),
max_transport_latency_ms=int(saved_rtn) * 10 + 3,
)
log.info("[AUTOSTART][PRIMARY] Scheduling device init_radio in 2s")
await asyncio.sleep(2)
async with _stream_lock:
log.info("[AUTOSTART][PRIMARY] Calling init_radio for device autostart")
mc, persisted = await init_radio(TRANSPORT1, conf, multicaster1)
multicaster1 = mc
global_config_group = conf
save_settings(persisted, secondary=False)
log.info("[AUTOSTART][PRIMARY] Device autostart completed; settings persisted with is_streaming=%s", persisted.get('is_streaming'))
return
await asyncio.sleep(2)
async def do_secondary():
global multicaster2
settings = settings2
audio_mode = settings.get('audio_mode')
input_device_name = settings.get('input_device')
rate = settings.get('auracast_sampling_rate_hz')
octets = settings.get('octets_per_frame')
pres_delay = settings.get('presentation_delay_us')
saved_rtn = settings.get('rtn')
immediate_rendering = settings.get('immediate_rendering', False)
assisted_listening_stream = settings.get('assisted_listening_stream', False)
channel_names = settings.get('channel_names') or ["Broadcast0"]
program_info = settings.get('program_info') or channel_names
languages = settings.get('languages') or ["deu"]
big_ids = settings.get('big_ids') or []
big_addrs = settings.get('big_random_addresses') or []
stream_password = settings.get('stream_password')
original_ts = settings.get('timestamp')
previously_streaming = bool(settings.get('is_streaming'))
log.info(
"[AUTOSTART][SECONDARY] loaded settings: previously_streaming=%s audio_mode=%s rate=%s octets=%s pres_delay=%s rtn=%s immediate_rendering=%s assisted_listening_stream=%s demo_sources=%s",
previously_streaming,
audio_mode,
rate,
octets,
pres_delay,
saved_rtn,
immediate_rendering,
assisted_listening_stream,
(settings.get('demo_sources') or []),
)
if not previously_streaming:
log.info("[AUTOSTART][SECONDARY] Skipping autostart: is_streaming flag was False in persisted settings")
return
if audio_mode == 'Demo':
demo_sources = settings.get('demo_sources') or []
if not demo_sources or rate is None or octets is None:
log.warning(
"[AUTOSTART][SECONDARY] Demo autostart aborted: demo_sources_present=%s rate=%s octets=%s",
bool(demo_sources),
rate,
octets,
)
return
bigs = []
for i, src in enumerate(demo_sources):
name = channel_names[i] if i < len(channel_names) else f"Broadcast{i}"
pinfo = program_info[i] if isinstance(program_info, list) and i < len(program_info) else (program_info[0] if isinstance(program_info, list) and program_info else program_info)
lang = languages[i] if i < len(languages) else (languages[0] if languages else "deu")
bigs.append(
auracast_config.AuracastBigConfig(
code=stream_password,
name=name,
program_info=pinfo,
language=lang,
audio_source=src,
iso_que_len=1,
sampling_frequency=rate,
octets_per_frame=octets,
)
)
conf = auracast_config.AuracastConfigGroup(
auracast_sampling_rate_hz=rate,
octets_per_frame=octets,
transport=TRANSPORT2,
immediate_rendering=immediate_rendering,
assisted_listening_stream=assisted_listening_stream,
presentation_delay_us=pres_delay if pres_delay is not None else 40000,
bigs=bigs,
)
conf.qos_config = auracast_config.AuracastQoSConfig(
iso_int_multiple_10ms=1,
number_of_retransmissions=int(saved_rtn) if saved_rtn is not None else 1,
max_transport_latency_ms=(int(saved_rtn) * 10 + 3) if saved_rtn is not None else 13,
)
log.info("[AUTOSTART][SECONDARY] Scheduling demo init_radio in 2s")
await asyncio.sleep(2)
async with _stream_lock:
log.info("[AUTOSTART][SECONDARY] Calling init_radio for demo autostart")
mc, persisted = await init_radio(TRANSPORT2, conf, multicaster2)
multicaster2 = mc
save_settings(persisted, secondary=True)
log.info("[AUTOSTART][SECONDARY] Demo autostart completed; settings persisted with is_streaming=%s", persisted.get('is_streaming'))
return
if not input_device_name or rate is None or octets is None:
log.info(
"[AUTOSTART][SECONDARY] Skipping device-based autostart: input_device=%s rate=%s octets=%s",
input_device_name,
rate,
octets,
)
return
if multicaster2 is not None:
try:
if multicaster2.get_status().get('is_streaming'):
log.info("[AUTOSTART][SECONDARY] Skipping device-based autostart: stream already running")
return
except Exception:
pass
while True:
if multicaster2 is not None:
try:
if multicaster2.get_status().get('is_streaming'):
log.info("[AUTOSTART][SECONDARY] Aborting wait loop: stream started externally")
return
except Exception:
pass
current_settings = load_stream_settings2() or {}
if current_settings.get('timestamp') != original_ts:
if (
current_settings.get('input_device') != input_device_name or
current_settings.get('audio_mode') != audio_mode
):
return
usb = [d for _, d in get_alsa_usb_inputs()]
net = [d for _, d in get_network_pw_inputs()]
names = {d.get('name') for d in usb} | {d.get('name') for d in net}
if input_device_name in names:
bigs = [
auracast_config.AuracastBigConfig(
id=big_ids[0] if big_ids else DEFAULT_BIG_ID,
random_address=big_addrs[0] if big_addrs else DEFAULT_RANDOM_ADDRESS,
code=stream_password,
name=channel_names[0] if channel_names else "Broadcast0",
program_info=program_info[0] if isinstance(program_info, list) and program_info else program_info,
language=languages[0] if languages else "deu",
audio_source=f"device:{input_device_name}",
iso_que_len=1,
sampling_frequency=rate,
octets_per_frame=octets,
)
]
conf = auracast_config.AuracastConfigGroup(
auracast_sampling_rate_hz=rate,
octets_per_frame=octets,
transport=TRANSPORT2,
immediate_rendering=immediate_rendering,
assisted_listening_stream=assisted_listening_stream,
presentation_delay_us=pres_delay if pres_delay is not None else 40000,
bigs=bigs,
)
conf.qos_config = auracast_config.AuracastQoSConfig(
iso_int_multiple_10ms=1,
number_of_retransmissions=int(saved_rtn),
max_transport_latency_ms=int(saved_rtn) * 10 + 3,
)
log.info("[AUTOSTART][SECONDARY] Scheduling device init_radio in 2s")
await asyncio.sleep(2)
async with _stream_lock:
log.info("[AUTOSTART][SECONDARY] Calling init_radio for device autostart")
mc, persisted = await init_radio(TRANSPORT2, conf, multicaster2)
multicaster2 = mc
save_settings(persisted, secondary=True)
log.info("[AUTOSTART][SECONDARY] Device autostart completed; settings persisted with is_streaming=%s", persisted.get('is_streaming'))
return
await asyncio.sleep(2)
await do_primary()
await do_secondary()
@app.on_event("startup")
async def _startup_autostart_event():
# Spawn the autostart task without blocking startup
log.info("[STARTUP] Auracast multicast server startup: initializing settings cache, I2C, and PipeWire cache")
# Hydrate settings cache once to avoid disk I/O during /status
_init_settings_cache_from_disk()
await _init_i2c_on_startup()
# Ensure ADC mixer level is set at startup
await _set_adc_level_on_startup()
refresh_pw_cache()
log.info("[STARTUP] Scheduling autostart task")
asyncio.create_task(_autostart_from_settings())
@app.get("/audio_inputs_pw_usb")
async def audio_inputs_pw_usb():
"""List USB input devices using ALSA backend (USB is ALSA in our scheme)."""
try:
devices = [
{"id": idx, "name": dev.get("name"), "max_input_channels": dev.get("max_input_channels", 0)}
for idx, dev in get_alsa_usb_inputs()
]
return {"inputs": devices}
except Exception as e:
log.error("Exception in /audio_inputs_pw_usb: %s", traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e))
@app.get("/audio_inputs_pw_network")
async def audio_inputs_pw_network():
"""List PipeWire Network/AES67 input nodes from cache."""
try:
devices = [
{"id": idx, "name": dev.get("name"), "max_input_channels": dev.get("max_input_channels", 0)}
for idx, dev in get_network_pw_inputs()
]
return {"inputs": devices}
except Exception as e:
log.error("Exception in /audio_inputs_pw_network: %s", traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e))
@app.post("/refresh_audio_devices")
async def refresh_audio_devices():
"""Triggers a re-scan of audio devices, but only if no stream is active."""
try:
status = await _status_primary()
streaming = bool(status.get('is_streaming'))
except Exception as e:
log.error("Exception in /refresh_audio_devices: %s", traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e))
if streaming:
log.warning("Ignoring refresh request: an audio stream is active.")
raise HTTPException(status_code=409, detail="An audio stream is active. Stop the stream before refreshing devices.")
log.info("Refreshing PipeWire device cache.")
refresh_pw_cache()
return {"status": "ok"}
@app.post("/shutdown")
async def shutdown():
"""Stops broadcasting and releases all audio/Bluetooth resources."""
try:
await _stop_all()
return {"status": "stopped"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/system_reboot")
async def system_reboot():
"""Stop audio and request a system reboot via sudo.
Requires the service user to have passwordless sudo permissions to run 'reboot'.
"""
try:
# Best-effort: stop any active streaming cleanly WITHOUT persisting state
try:
await _stop_all()
except Exception:
pass
# Launch reboot without waiting for completion
try:
await asyncio.create_subprocess_exec("sudo", "reboot")
except Exception as e:
log.error("Failed to invoke reboot: %s", e, exc_info=True)
raise HTTPException(status_code=500, detail=f"Failed to invoke reboot: {e}")
return {"status": "rebooting"}
except HTTPException:
raise
except Exception as e:
log.error("Exception in /system_reboot: %s", e, exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
if __name__ == '__main__':
import os
os.chdir(os.path.dirname(__file__))
import uvicorn
log.basicConfig( # for debug log level export LOG_LEVEL=DEBUG
level=os.environ.get('LOG_LEVEL', log.INFO),
format='%(module)s.py:%(lineno)d %(levelname)s: %(message)s'
)
# Bind to localhost only for security: prevents network access, only frontend on same machine can connect
uvicorn.run(app, host="127.0.0.1", port=5000, access_log=False)