Files
bumble-auracast/src/auracast/server/multicast_server.py
pstruebi 7bdf6f8417 feature/blue_led (#23)
Co-authored-by: pstruebi <office@summitwave.eu>
Co-authored-by: pober <paul.obernesser@summitwave.eu>
Co-authored-by: Pbopbo <p.obernesser@freenet.de>
Reviewed-on: #23
Co-authored-by: pstruebi <struebin.patrick@gmail.com>
Co-committed-by: pstruebi <struebin.patrick@gmail.com>
2026-04-07 14:34:11 +00:00

1749 lines
74 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
""" the main server where our multicaster objects live.
TODO: in the future the multicaster objects should run in their own threads or even make a second server since everything thats blocking the main event loop leads to inceased latency.
"""
import os
import logging as log
import json
from datetime import datetime
import asyncio
import random
import subprocess
from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException
from fastapi.responses import FileResponse
from fastapi.middleware.cors import CORSMiddleware
from auracast import multicast_control, auracast_config
import sounddevice as sd # type: ignore
import traceback
from auracast.utils.sounddevice_utils import (
get_network_pw_inputs,
get_alsa_usb_inputs,
resolve_input_device_index,
refresh_pw_cache,
devices_by_backend,
)
load_dotenv()
# Blue LED on GPIO pin 12 (BCM) turns on while transmitting
LED_PIN = 12
try:
import RPi.GPIO as _GPIO
_GPIO.setmode(_GPIO.BCM)
_GPIO.setup(LED_PIN, _GPIO.OUT)
_GPIO_AVAILABLE = True
except Exception:
_GPIO_AVAILABLE = False
_GPIO = None # type: ignore
_LED_ENABLED: bool = True # toggled via /set_led_enabled
_LED_SETTINGS_FILE = os.path.join(os.path.dirname(__file__), 'led_settings.json')
def _load_led_settings() -> None:
global _LED_ENABLED
try:
if os.path.exists(_LED_SETTINGS_FILE):
with open(_LED_SETTINGS_FILE, 'r', encoding='utf-8') as f:
data = json.load(f)
_LED_ENABLED = bool(data.get('led_enabled', True))
except Exception:
_LED_ENABLED = True
def _save_led_settings() -> None:
try:
os.makedirs(os.path.dirname(_LED_SETTINGS_FILE), exist_ok=True)
with open(_LED_SETTINGS_FILE, 'w', encoding='utf-8') as f:
json.dump({'led_enabled': _LED_ENABLED}, f)
except Exception:
pass
def _led_on():
if _GPIO_AVAILABLE and _LED_ENABLED:
try:
_GPIO.output(LED_PIN, _GPIO.LOW)
except Exception:
pass
def _led_off():
if _GPIO_AVAILABLE:
try:
_GPIO.output(LED_PIN, _GPIO.HIGH)
except Exception:
pass
# make sure pipewire sets latency
# Primary and secondary persisted settings files
STREAM_SETTINGS_FILE1 = os.path.join(os.path.dirname(__file__), 'stream_settings.json')
STREAM_SETTINGS_FILE2 = os.path.join(os.path.dirname(__file__), 'stream_settings2.json')
CA_CERT_PATH = os.path.join(os.path.dirname(__file__), 'certs', 'ca', 'ca_cert.pem')
# Raspberry Pi UART transports
TRANSPORT1 = os.getenv('TRANSPORT1', 'serial:/dev/ttyAMA3,1000000,rtscts') # transport for raspberry pi gpio header
TRANSPORT2 = os.getenv('TRANSPORT2', 'serial:/dev/ttyAMA4,1000000,rtscts') # transport for raspberry pi gpio header
os.environ["PULSE_LATENCY_MSEC"] = "3"
# Defaults from the AuracastBigConfig model, used to detect whether random_address/id
# were explicitly set or are still at their model default values.
_DEFAULT_BIG = auracast_config.AuracastBigConfig()
DEFAULT_BIG_ID = _DEFAULT_BIG.id
DEFAULT_RANDOM_ADDRESS = _DEFAULT_BIG.random_address
# QoS presets mapping - must match frontend
QOS_PRESET_MAP = {
"Fast": auracast_config.AuracastQosFast(),
"Robust": auracast_config.AuracastQosRobust(),
}
# In-memory caches to avoid disk I/O on hot paths like /status
SETTINGS_CACHE1: dict = {}
SETTINGS_CACHE2: dict = {}
def get_device_index_by_name(name: str):
"""Return the device index for a given device name, or None if not found.
Queries the current sounddevice list directly (no cache).
"""
try:
devs = sd.query_devices()
for idx, d in enumerate(devs):
if d.get("name") == name and d.get("max_input_channels", 0) > 0:
return idx
except Exception:
pass
return None
def _init_settings_cache_from_disk() -> None:
"""Populate SETTINGS_CACHE1 and SETTINGS_CACHE2 once from disk at startup.
If a file doesn't exist, initialize to an empty dict. Any JSON or I/O errors raise.
"""
global SETTINGS_CACHE1, SETTINGS_CACHE2
if os.path.exists(STREAM_SETTINGS_FILE1):
with open(STREAM_SETTINGS_FILE1, 'r', encoding='utf-8') as f:
SETTINGS_CACHE1 = json.load(f)
else:
SETTINGS_CACHE1 = {}
if os.path.exists(STREAM_SETTINGS_FILE2):
with open(STREAM_SETTINGS_FILE2, 'r', encoding='utf-8') as f:
SETTINGS_CACHE2 = json.load(f)
else:
SETTINGS_CACHE2 = {}
def load_stream_settings() -> dict:
"""Return PRIMARY stream settings from in-memory cache.
Hydrated once at startup and updated by save_stream_settings(). No disk I/O occurs here.
"""
global SETTINGS_CACHE1
return SETTINGS_CACHE1
def load_stream_settings2() -> dict:
"""Return SECONDARY stream settings from in-memory cache."""
global SETTINGS_CACHE2
return SETTINGS_CACHE2
def save_stream_settings(settings: dict):
"""Update PRIMARY in-memory settings cache and persist to disk."""
global SETTINGS_CACHE1
SETTINGS_CACHE1 = dict(settings)
os.makedirs(os.path.dirname(STREAM_SETTINGS_FILE1), exist_ok=True)
with open(STREAM_SETTINGS_FILE1, 'w', encoding='utf-8') as f:
json.dump(SETTINGS_CACHE1, f, indent=2)
f.flush()
os.fsync(f.fileno())
log.info("Saved primary settings to %s", STREAM_SETTINGS_FILE1)
def save_stream_settings2(settings: dict):
"""Update SECONDARY in-memory settings cache and persist to disk."""
global SETTINGS_CACHE2
SETTINGS_CACHE2 = dict(settings)
os.makedirs(os.path.dirname(STREAM_SETTINGS_FILE2), exist_ok=True)
with open(STREAM_SETTINGS_FILE2, 'w', encoding='utf-8') as f:
json.dump(SETTINGS_CACHE2, f, indent=2)
f.flush()
os.fsync(f.fileno())
log.info("Saved secondary settings to %s", STREAM_SETTINGS_FILE2)
def save_settings(persisted: dict, secondary: bool = False) -> None:
"""Attach timestamp and persist using the appropriate cache/file."""
persisted = dict(persisted)
persisted['timestamp'] = datetime.utcnow().isoformat()
if secondary:
save_stream_settings2(persisted)
else:
save_stream_settings(persisted)
def gen_random_add() -> str:
return ':'.join(['%02X' % random.randint(0, 255) for _ in range(6)])
def gen_random_broadcast_id() -> int:
"""Generate a random 24-bit Broadcast ID (1..0xFFFFFF)."""
return random.randint(1, 0xFFFFFF)
app = FastAPI()
# Allow CORS for frontend on localhost
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # You can restrict this to ["http://localhost:8501"] if you want
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Initialize global configuration
global_config_group = auracast_config.AuracastConfigGroup()
# Module-level state replacing StreamerWorker
multicaster1: multicast_control.Multicaster | None = None
multicaster2: multicast_control.Multicaster | None = None
_stream_lock = asyncio.Lock() # serialize initialize/stop_audio on API side
async def _init_i2c_on_startup() -> None:
# Ensure i2c-dev kernel module is loaded (required for /dev/i2c-* access)
try:
proc = await asyncio.create_subprocess_exec(
"sudo", "modprobe", "i2c-dev",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate()
if proc.returncode != 0:
log.warning(
"modprobe i2c-dev failed (rc=%s): %s",
proc.returncode,
(stderr or b"").decode(errors="ignore").strip(),
)
else:
log.info("i2c-dev module loaded successfully")
except Exception as e:
log.warning("Exception running modprobe i2c-dev: %s", e, exc_info=True)
# Table of (register, expected_value)
dev_add = "0x4a"
reg_table = [
("0x00", "0x00"),
("0x06", "0x10"),
("0x07", "0x10"),
]
for reg, expected in reg_table:
write_cmd = ["i2cset", "-f", "-y", "1", dev_add, reg, expected]
try:
proc = await asyncio.create_subprocess_exec(
*write_cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate()
if proc.returncode != 0:
log.warning(
"i2cset failed (%s): rc=%s stderr=%s",
" ".join(write_cmd),
proc.returncode,
(stderr or b"").decode(errors="ignore").strip(),
)
# If the write failed, skip verification for this register
continue
except Exception as e:
log.warning("Exception running i2cset (%s): %s", " ".join(write_cmd), e, exc_info=True)
continue
# Verify configured register with i2cget
read_cmd = ["i2cget", "-f", "-y", "1", dev_add, reg]
try:
proc = await asyncio.create_subprocess_exec(
*read_cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate()
if proc.returncode != 0:
log.warning(
"i2cget failed (%s): rc=%s stderr=%s",
" ".join(read_cmd),
proc.returncode,
(stderr or b"").decode(errors="ignore").strip(),
)
continue
value = (stdout or b"").decode(errors="ignore").strip()
if value != expected:
log.error(
"I2C register verify failed: addr=0x4a reg=%s expected=%s got=%s",
reg,
expected,
value,
)
else:
log.info(
"I2C register verified: addr=0x4a reg=%s value=%s",
reg,
value,
)
except Exception as e:
log.warning("Exception running i2cget (%s): %s", " ".join(read_cmd), e, exc_info=True)
async def _set_adc_level(gain_percent: int = 50) -> None:
"""Set ADC mixer level.
Runs: amixer -c 1 set 'ADC' x%
Args:
gain_percent: Gain level 10-60, default 50 (above 60% causes clipping/silence)
"""
gain_percent = max(10, min(60, gain_percent))
cmd = ["amixer", "-c", "1", "set", "ADC", f"{gain_percent}%"]
try:
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate()
if proc.returncode != 0:
log.warning(
"amixer ADC level command failed (rc=%s): %s",
proc.returncode,
(stderr or b"" ).decode(errors="ignore").strip(),
)
else:
log.info("amixer ADC level set successfully: %s", (stdout or b"" ).decode(errors="ignore").strip())
except Exception as e:
log.warning("Exception running amixer ADC level command: %s", e, exc_info=True)
async def _stop_all() -> bool:
global multicaster1, multicaster2
was_running = False
if multicaster1 is not None:
try:
await multicaster1.stop_streaming()
await multicaster1.shutdown()
was_running = True
finally:
multicaster1 = None
if multicaster2 is not None:
try:
await multicaster2.stop_streaming()
await multicaster2.shutdown()
was_running = True
finally:
multicaster2 = None
_led_off()
return was_running
async def _status_primary() -> dict:
if multicaster1 is None:
return {'is_initialized': False, 'is_streaming': False}
return multicaster1.get_status()
async def _status_secondary() -> dict:
"""Return runtime status for the SECONDARY multicaster.
Mirrors _status_primary but for multicaster2 so that /status can expose
both primary and secondary state to the frontend.
"""
if multicaster2 is None:
return {'is_initialized': False, 'is_streaming': False}
return multicaster2.get_status()
async def _stream_lc3(audio_data: dict[str, str], bigs_template: list) -> None:
if multicaster1 is None:
raise HTTPException(status_code=500, detail='Auracast endpoint was never intialized')
for big in bigs_template:
if big.language not in audio_data:
raise HTTPException(status_code=500, detail='language len missmatch')
big.audio_source = audio_data[big.language].encode('latin-1')
multicaster1.big_conf = bigs_template
await multicaster1.start_streaming()
def _resolve_qos_preset_name(qos_config) -> str:
"""Resolve qos_config to preset name based on retransmission count."""
if qos_config is None:
return "Fast"
rtn = getattr(qos_config, 'number_of_retransmissions', 2)
# Fast has 2 retransmissions, Robust has 4
return "Robust" if rtn >= 4 else "Fast"
async def init_radio(transport: str, conf: auracast_config.AuracastConfigGroup, current_mc: multicast_control.Multicaster | None):
try:
log.info('Initializing multicaster with transport %s and config:\n %s', transport, conf.model_dump_json(indent=2))
if current_mc is not None:
await current_mc.shutdown()
current_mc = None
conf.transport = transport
first_source = conf.bigs[0].audio_source if conf.bigs else ''
input_device_name = None
audio_mode_persist = 'Demo'
if any(isinstance(b.audio_source, str) and b.audio_source.startswith('device:') for b in conf.bigs):
if isinstance(first_source, str) and first_source.startswith('device:'):
input_device_name = first_source.split(':', 1)[1] if ':' in first_source else None
alsa_usb_names = {d.get('name') for _, d in get_alsa_usb_inputs()}
net_names = {d.get('name') for _, d in get_network_pw_inputs()}
dante_channels = {"dante_asrc_ch1", "dante_asrc_ch2", "dante_asrc_ch3", "dante_asrc_ch4", "dante_asrc_ch5", "dante_asrc_ch6"}
if input_device_name in ('ch1', 'ch2'):
audio_mode_persist = 'Analog'
# Set ADC gain level for analog mode
analog_gain = getattr(conf, 'analog_gain', 50)
await _set_adc_level(analog_gain)
elif input_device_name in dante_channels:
audio_mode_persist = 'Network - Dante'
else:
audio_mode_persist = 'Network' if (input_device_name in net_names) else 'USB'
# Configure each BIG independently so Dante multi-stream can select different channels.
for big in conf.bigs:
if not (isinstance(big.audio_source, str) and big.audio_source.startswith('device:')):
continue
sel = big.audio_source.split(':', 1)[1] if ':' in big.audio_source else None
# IMPORTANT: All hardware capture is at 48kHz; LC3 encoder may downsample.
hardware_capture_rate = 48000
if sel in dante_channels:
# Use ALSA directly (PortAudio doesn't enumerate route PCMs on some systems).
big.audio_source = f'alsa:{sel}'
big.input_format = f"int16le,{hardware_capture_rate},1"
continue
# Dante stereo devices: dante_stereo_X_Y (e.g., dante_stereo_1_2)
if sel and sel.startswith('dante_stereo_'):
is_stereo = getattr(big, 'num_bis', 1) == 2
if is_stereo:
# Stereo mode: use the stereo ALSA device with 2 channels
big.audio_source = f'alsa:{sel}'
big.input_format = f"int16le,{hardware_capture_rate},2"
log.info("Configured Dante stereo input: using ALSA %s with 2 channels", sel)
else:
# Fallback to mono if num_bis != 2 (shouldn't happen)
big.audio_source = f'alsa:{sel}'
big.input_format = f"int16le,{hardware_capture_rate},2"
log.warning("Dante stereo device %s used but num_bis=%d, capturing as stereo anyway", sel, getattr(big, 'num_bis', 1))
continue
if sel in ('ch1', 'ch2'):
# Analog channels: check if this should be stereo based on num_bis
is_stereo = getattr(big, 'num_bis', 1) == 2
if is_stereo and sel == 'ch1':
# Stereo mode: use ALSA directly to capture both channels from hardware
# ch1=left (channel 0), ch2=right (channel 1)
big.audio_source = 'alsa:hw:CARD=i2s,DEV=0'
big.input_format = f"int16le,{hardware_capture_rate},2"
log.info("Configured analog stereo input: using ALSA hw:CARD=i2s,DEV=0 with ch1=left, ch2=right")
elif is_stereo and sel == 'ch2':
# Skip ch2 in stereo mode as it's already captured as part of stereo pair
continue
else:
# Mono mode: individual channel capture
device_index = resolve_input_device_index(sel)
if device_index is None:
raise HTTPException(status_code=400, detail=f"Audio device '{sel}' not found.")
big.audio_source = f'device:{device_index}'
big.input_format = f"int16le,{hardware_capture_rate},1"
continue
if sel and sel.isdigit():
device_index = int(sel)
else:
device_index = resolve_input_device_index(sel or '')
if device_index is None:
raise HTTPException(status_code=400, detail=f"Audio device '{sel}' not found.")
try:
resolved_devinfo = sd.query_devices(device_index)
log.info(
"Resolved input device '%s' -> idx=%s name='%s' hostapi=%s max_in=%s",
sel,
device_index,
resolved_devinfo.get('name'),
resolved_devinfo.get('hostapi'),
resolved_devinfo.get('max_input_channels'),
)
except Exception:
log.info("Resolved input device '%s' -> idx=%s (devinfo unavailable)", sel, device_index)
big.audio_source = f'device:{device_index}'
devinfo = sd.query_devices(device_index)
max_in = int(devinfo.get('max_input_channels') or 1)
channels = max(1, min(2, max_in))
big.input_format = f"int16le,{hardware_capture_rate},{channels}"
# The config group keeps the target sampling rate for LC3 encoder
# The audio input will capture at 48kHz and LC3 encoder will downsample
target_sampling_rate = getattr(conf, 'auracast_sampling_rate_hz', None)
if target_sampling_rate is None and conf.bigs:
target_sampling_rate = getattr(conf.bigs[0], 'sampling_frequency', 48000)
if target_sampling_rate is None:
target_sampling_rate = 48000
# Keep the config group sampling rate as set by frontend
conf.auracast_sampling_rate_hz = target_sampling_rate
# Ensure octets_per_frame matches the target sampling rate
if target_sampling_rate == 48000:
conf.octets_per_frame = 120
elif target_sampling_rate == 32000:
conf.octets_per_frame = 80
elif target_sampling_rate == 24000:
conf.octets_per_frame = 60
elif target_sampling_rate == 16000:
conf.octets_per_frame = 40
else:
conf.octets_per_frame = 120 # default to 48000 setting
conf.qos_config.max_transport_latency_ms = int(conf.qos_config.number_of_retransmissions) * 10 + 3
# Generate fresh random_address and broadcast ID for any BIG still at model defaults.
for big in conf.bigs:
if not getattr(big, 'random_address', None) or big.random_address == DEFAULT_RANDOM_ADDRESS:
big.random_address = gen_random_add()
if big.id == DEFAULT_BIG_ID:
big.id = gen_random_broadcast_id()
# Log the final, fully-updated configuration just before creating the Multicaster
log.info('Final multicaster config (transport=%s):\n %s', transport, conf.model_dump_json(indent=2))
mc = multicast_control.Multicaster(conf, conf.bigs)
await mc.init_broadcast()
auto_started = False
if any(isinstance(big.audio_source, str) and (big.audio_source.startswith("device:") or big.audio_source.startswith("alsa:") or big.audio_source.startswith("file:")) for big in conf.bigs):
await mc.start_streaming()
_led_on()
auto_started = True
demo_count = sum(1 for big in conf.bigs if isinstance(big.audio_source, str) and big.audio_source.startswith('file:'))
demo_rate = int(conf.auracast_sampling_rate_hz or 0)
demo_type = None
if demo_count > 0 and demo_rate > 0:
if demo_rate in (48000, 24000, 16000):
demo_type = f"{demo_count} × {demo_rate//1000}kHz"
else:
demo_type = f"{demo_count} × {demo_rate}Hz"
persisted = {
'channel_names': [big.name for big in conf.bigs],
'languages': [big.language for big in conf.bigs],
'audio_mode': audio_mode_persist,
'input_device': input_device_name,
'program_info': [getattr(big, 'program_info', None) for big in conf.bigs],
'gain': [getattr(big, 'input_gain', 1.0) for big in conf.bigs],
'auracast_sampling_rate_hz': conf.auracast_sampling_rate_hz,
'octets_per_frame': conf.octets_per_frame,
'presentation_delay_us': getattr(conf, 'presentation_delay_us', None),
'qos_preset': _resolve_qos_preset_name(conf.qos_config),
'immediate_rendering': getattr(conf, 'immediate_rendering', False),
'assisted_listening_stream': getattr(conf, 'assisted_listening_stream', False),
'analog_stereo_mode': getattr(conf.bigs[0], 'analog_stereo_mode', False) if conf.bigs else False,
'analog_gain': getattr(conf, 'analog_gain', 50),
'software_boost_db': getattr(conf.bigs[0], 'input_gain_db', 0.0) if conf.bigs else 0.0,
'stream_password': (conf.bigs[0].code if conf.bigs and getattr(conf.bigs[0], 'code', None) else None),
'big_ids': [getattr(big, 'id', DEFAULT_BIG_ID) for big in conf.bigs],
'big_random_addresses': [getattr(big, 'random_address', DEFAULT_RANDOM_ADDRESS) for big in conf.bigs],
'demo_total_streams': demo_count,
'demo_stream_type': demo_type,
'is_streaming': auto_started,
'demo_sources': [str(b.audio_source) for b in conf.bigs if isinstance(b.audio_source, str) and b.audio_source.startswith('file:')],
}
return mc, persisted
except HTTPException:
raise
except Exception as e:
log.error("Exception in init_radio: %s", traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e))
@app.post("/init")
async def initialize(conf: auracast_config.AuracastConfigGroup):
"""Initializes the primary broadcaster on the streamer thread."""
async with _stream_lock:
global multicaster1, global_config_group
mc, persisted = await init_radio(TRANSPORT1, conf, multicaster1)
multicaster1 = mc
global_config_group = conf
save_settings(persisted, secondary=False)
@app.post("/init2")
async def initialize2(conf: auracast_config.AuracastConfigGroup):
"""Initializes the secondary broadcaster on the streamer thread."""
async with _stream_lock:
global multicaster2
mc, persisted = await init_radio(TRANSPORT2, conf, multicaster2)
multicaster2 = mc
save_settings(persisted, secondary=True)
@app.post("/set_led_enabled")
async def set_led_enabled(body: dict):
"""Enable or disable the blue status LED. Persisted across restarts."""
global _LED_ENABLED
_LED_ENABLED = bool(body.get("led_enabled", True))
_save_led_settings()
if not _LED_ENABLED:
_led_off()
return {"led_enabled": _LED_ENABLED}
@app.post("/stop_audio")
async def stop_audio():
"""Stops streaming on both multicaster1 and multicaster2 (worker thread)."""
try:
was_running = await _stop_all()
# Persist is_streaming=False for both primary and secondary
try:
settings1 = load_stream_settings() or {}
if settings1.get('is_streaming'):
settings1['is_streaming'] = False
settings1['timestamp'] = datetime.utcnow().isoformat()
save_stream_settings(settings1)
settings2 = load_stream_settings2() or {}
if settings2.get('is_streaming'):
settings2['is_streaming'] = False
settings2['timestamp'] = datetime.utcnow().isoformat()
save_stream_settings2(settings2)
except Exception:
log.warning("Failed to persist is_streaming=False during stop_audio", exc_info=True)
await asyncio.sleep(0.2)
return {"status": "stopped", "was_running": was_running}
except Exception as e:
log.error("Exception in /stop_audio: %s", traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e))
@app.post("/stream_lc3")
async def send_audio(audio_data: dict[str, str]):
"""Sends a block of pre-coded LC3 audio via the worker."""
try:
await _stream_lc3(audio_data, list(global_config_group.bigs))
return {"status": "audio_sent"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/cert")
async def download_ca_cert():
"""Download the CA certificate for TLS verification."""
if not os.path.exists(CA_CERT_PATH):
raise HTTPException(status_code=404, detail="CA certificate not found")
return FileResponse(CA_CERT_PATH, filename="ca_cert.pem", media_type="application/x-pem-file")
@app.get("/status")
async def get_status():
"""Gets current status (worker) merged with persisted settings cache."""
primary_runtime = await _status_primary()
primary_persisted = load_stream_settings() or {}
# Preserve existing top-level shape for primary for compatibility
status: dict = {}
status.update(primary_runtime)
status.update(primary_persisted)
# Attach secondary block with its own runtime + persisted settings
secondary_runtime = await _status_secondary()
secondary_persisted = load_stream_settings2() or {}
secondary: dict = {}
secondary.update(secondary_runtime)
secondary.update(secondary_persisted)
status["secondary"] = secondary
status["secondary_is_streaming"] = bool(secondary.get("is_streaming", False))
status["led_enabled"] = _LED_ENABLED
return status
@app.get("/audio_level")
async def get_audio_level():
"""Return current RMS audio levels for primary radio (lightweight, for polling)."""
if multicaster1 is None:
return {"levels": []}
return {"levels": multicaster1.get_audio_levels()}
@app.get("/audio_level2")
async def get_audio_level2():
"""Return current RMS audio levels for secondary radio (lightweight, for polling)."""
if multicaster2 is None:
return {"levels": []}
return {"levels": multicaster2.get_audio_levels()}
async def _autostart_from_settings():
settings1 = load_stream_settings() or {}
settings2 = load_stream_settings2() or {}
log.info("[AUTOSTART] Starting autostart check: primary_ts=%s secondary_ts=%s", settings1.get('timestamp'), settings2.get('timestamp'))
async def do_primary():
global multicaster1, global_config_group
settings = settings1
audio_mode = settings.get('audio_mode')
input_device_name = settings.get('input_device')
rate = settings.get('auracast_sampling_rate_hz')
octets = settings.get('octets_per_frame')
pres_delay = settings.get('presentation_delay_us')
saved_qos_preset = settings.get('qos_preset', 'Fast')
immediate_rendering = settings.get('immediate_rendering', False)
assisted_listening_stream = settings.get('assisted_listening_stream', False)
channel_names = settings.get('channel_names') or ["Broadcast0"]
program_info = settings.get('program_info') or channel_names
languages = settings.get('languages') or ["deu"]
big_ids = settings.get('big_ids') or []
big_addrs = settings.get('big_random_addresses') or []
stream_password = settings.get('stream_password')
original_ts = settings.get('timestamp')
previously_streaming = bool(settings.get('is_streaming'))
log.info(
"[AUTOSTART][PRIMARY] loaded settings: previously_streaming=%s audio_mode=%s rate=%s octets=%s pres_delay=%s qos_preset=%s immediate_rendering=%s assisted_listening_stream=%s demo_sources=%s",
previously_streaming,
audio_mode,
rate,
octets,
pres_delay,
saved_qos_preset,
immediate_rendering,
assisted_listening_stream,
(settings.get('demo_sources') or []),
)
if not previously_streaming:
log.info("[AUTOSTART][PRIMARY] Skipping autostart: is_streaming flag was False in persisted settings")
return
if audio_mode == 'Demo':
demo_sources = settings.get('demo_sources') or []
if not demo_sources or rate is None or octets is None:
log.warning(
"[AUTOSTART][PRIMARY] Demo autostart aborted: demo_sources_present=%s rate=%s octets=%s",
bool(demo_sources),
rate,
octets,
)
return
bigs = []
for i, src in enumerate(demo_sources):
name = channel_names[i] if i < len(channel_names) else f"Broadcast{i}"
pinfo = program_info[i] if isinstance(program_info, list) and i < len(program_info) else (program_info[0] if isinstance(program_info, list) and program_info else program_info)
lang = languages[i] if i < len(languages) else (languages[0] if languages else "deu")
bigs.append(
auracast_config.AuracastBigConfig(
id=big_ids[i] if i < len(big_ids) else DEFAULT_BIG_ID,
random_address=big_addrs[i] if i < len(big_addrs) else DEFAULT_RANDOM_ADDRESS,
code=stream_password,
name=name,
program_info=pinfo,
language=lang,
audio_source=src,
iso_que_len=1,
sampling_frequency=rate,
octets_per_frame=octets,
)
)
log.info(
"[AUTOSTART][PRIMARY] Building demo config for %d streams, rate=%s, octets=%s",
len(bigs),
rate,
octets,
)
conf = auracast_config.AuracastConfigGroup(
auracast_sampling_rate_hz=rate,
octets_per_frame=octets,
transport=TRANSPORT1,
immediate_rendering=immediate_rendering,
assisted_listening_stream=assisted_listening_stream,
presentation_delay_us=pres_delay if pres_delay is not None else 40000,
bigs=bigs,
)
# Set num_bis for stereo mode if needed
if conf.bigs and settings.get('analog_stereo_mode', False):
conf.bigs[0].num_bis = 2
conf.qos_config = QOS_PRESET_MAP.get(saved_qos_preset, QOS_PRESET_MAP["Fast"])
log.info("[AUTOSTART][PRIMARY] Scheduling demo init_radio in 2s")
await asyncio.sleep(2)
async with _stream_lock:
log.info("[AUTOSTART][PRIMARY] Calling init_radio for demo autostart")
mc, persisted = await init_radio(TRANSPORT1, conf, multicaster1)
multicaster1 = mc
global_config_group = conf
save_settings(persisted, secondary=False)
log.info("[AUTOSTART][PRIMARY] Demo autostart completed; settings persisted with is_streaming=%s", persisted.get('is_streaming'))
return
if not input_device_name or rate is None or octets is None:
log.info(
"[AUTOSTART][PRIMARY] Skipping device-based autostart: input_device=%s rate=%s octets=%s",
input_device_name,
rate,
octets,
)
return
current = await _status_primary()
if current.get('is_streaming'):
log.info("[AUTOSTART][PRIMARY] Skipping device-based autostart: stream already running")
return
while True:
current = await _status_primary()
if current.get('is_streaming'):
log.info("[AUTOSTART][PRIMARY] Aborting wait loop: stream started externally")
return
current_settings = load_stream_settings() or {}
if current_settings.get('timestamp') != original_ts:
if (
current_settings.get('input_device') != input_device_name or
current_settings.get('audio_mode') != audio_mode
):
log.info("[AUTOSTART][PRIMARY] Aborting wait loop: settings changed (audio_mode/input_device)")
return
usb = [d for _, d in get_alsa_usb_inputs()]
net = [d for _, d in get_network_pw_inputs()]
names = {d.get('name') for d in usb} | {d.get('name') for d in net}
if input_device_name in names:
log.info("[AUTOSTART][PRIMARY] Device '%s' detected, starting autostart", input_device_name)
bigs = [
auracast_config.AuracastBigConfig(
id=big_ids[0] if big_ids else DEFAULT_BIG_ID,
random_address=big_addrs[0] if big_addrs else DEFAULT_RANDOM_ADDRESS,
code=stream_password,
name=channel_names[0] if channel_names else "Broadcast0",
program_info=program_info[0] if isinstance(program_info, list) and program_info else program_info,
language=languages[0] if languages else "deu",
audio_source=f"device:{input_device_name}",
iso_que_len=1,
sampling_frequency=rate,
octets_per_frame=octets,
input_gain_db=float(settings.get('software_boost_db', 0)),
)
]
conf = auracast_config.AuracastConfigGroup(
auracast_sampling_rate_hz=rate,
octets_per_frame=octets,
transport=TRANSPORT1,
immediate_rendering=immediate_rendering,
assisted_listening_stream=assisted_listening_stream,
presentation_delay_us=pres_delay if pres_delay is not None else 40000,
analog_gain=settings.get('analog_gain', 50),
bigs=bigs,
)
# Set num_bis for stereo mode if needed
if conf.bigs and settings.get('analog_stereo_mode', False):
conf.bigs[0].num_bis = 2
conf.qos_config = QOS_PRESET_MAP.get(saved_qos_preset, QOS_PRESET_MAP["Fast"])
log.info("[AUTOSTART][PRIMARY] Scheduling device init_radio in 2s")
await asyncio.sleep(2)
async with _stream_lock:
log.info("[AUTOSTART][PRIMARY] Calling init_radio for device autostart")
mc, persisted = await init_radio(TRANSPORT1, conf, multicaster1)
multicaster1 = mc
global_config_group = conf
save_settings(persisted, secondary=False)
log.info("[AUTOSTART][PRIMARY] Device autostart completed; settings persisted with is_streaming=%s", persisted.get('is_streaming'))
return
await asyncio.sleep(2)
async def do_secondary():
global multicaster2
settings = settings2
audio_mode = settings.get('audio_mode')
input_device_name = settings.get('input_device')
rate = settings.get('auracast_sampling_rate_hz')
octets = settings.get('octets_per_frame')
pres_delay = settings.get('presentation_delay_us')
saved_qos_preset = settings.get('qos_preset', 'Fast')
immediate_rendering = settings.get('immediate_rendering', False)
assisted_listening_stream = settings.get('assisted_listening_stream', False)
channel_names = settings.get('channel_names') or ["Broadcast0"]
program_info = settings.get('program_info') or channel_names
languages = settings.get('languages') or ["deu"]
big_ids = settings.get('big_ids') or []
big_addrs = settings.get('big_random_addresses') or []
stream_password = settings.get('stream_password')
original_ts = settings.get('timestamp')
previously_streaming = bool(settings.get('is_streaming'))
log.info(
"[AUTOSTART][SECONDARY] loaded settings: previously_streaming=%s audio_mode=%s rate=%s octets=%s pres_delay=%s qos_preset=%s immediate_rendering=%s assisted_listening_stream=%s demo_sources=%s",
previously_streaming,
audio_mode,
rate,
octets,
pres_delay,
saved_qos_preset,
immediate_rendering,
assisted_listening_stream,
(settings.get('demo_sources') or []),
)
if not previously_streaming:
log.info("[AUTOSTART][SECONDARY] Skipping autostart: is_streaming flag was False in persisted settings")
return
if audio_mode == 'Demo':
demo_sources = settings.get('demo_sources') or []
if not demo_sources or rate is None or octets is None:
log.warning(
"[AUTOSTART][SECONDARY] Demo autostart aborted: demo_sources_present=%s rate=%s octets=%s",
bool(demo_sources),
rate,
octets,
)
return
bigs = []
for i, src in enumerate(demo_sources):
name = channel_names[i] if i < len(channel_names) else f"Broadcast{i}"
pinfo = program_info[i] if isinstance(program_info, list) and i < len(program_info) else (program_info[0] if isinstance(program_info, list) and program_info else program_info)
lang = languages[i] if i < len(languages) else (languages[0] if languages else "deu")
bigs.append(
auracast_config.AuracastBigConfig(
code=stream_password,
name=name,
program_info=pinfo,
language=lang,
audio_source=src,
iso_que_len=1,
sampling_frequency=rate,
octets_per_frame=octets,
)
)
conf = auracast_config.AuracastConfigGroup(
auracast_sampling_rate_hz=rate,
octets_per_frame=octets,
transport=TRANSPORT2,
immediate_rendering=immediate_rendering,
assisted_listening_stream=assisted_listening_stream,
presentation_delay_us=pres_delay if pres_delay is not None else 40000,
bigs=bigs,
)
conf.qos_config = QOS_PRESET_MAP.get(saved_qos_preset, QOS_PRESET_MAP["Fast"])
log.info("[AUTOSTART][SECONDARY] Scheduling demo init_radio in 2s")
await asyncio.sleep(2)
async with _stream_lock:
log.info("[AUTOSTART][SECONDARY] Calling init_radio for demo autostart")
mc, persisted = await init_radio(TRANSPORT2, conf, multicaster2)
multicaster2 = mc
save_settings(persisted, secondary=True)
log.info("[AUTOSTART][SECONDARY] Demo autostart completed; settings persisted with is_streaming=%s", persisted.get('is_streaming'))
return
if not input_device_name or rate is None or octets is None:
log.info(
"[AUTOSTART][SECONDARY] Skipping device-based autostart: input_device=%s rate=%s octets=%s",
input_device_name,
rate,
octets,
)
return
if multicaster2 is not None:
try:
if multicaster2.get_status().get('is_streaming'):
log.info("[AUTOSTART][SECONDARY] Skipping device-based autostart: stream already running")
return
except Exception:
pass
while True:
if multicaster2 is not None:
try:
if multicaster2.get_status().get('is_streaming'):
log.info("[AUTOSTART][SECONDARY] Aborting wait loop: stream started externally")
return
except Exception:
pass
current_settings = load_stream_settings2() or {}
if current_settings.get('timestamp') != original_ts:
if (
current_settings.get('input_device') != input_device_name or
current_settings.get('audio_mode') != audio_mode
):
return
usb = [d for _, d in get_alsa_usb_inputs()]
net = [d for _, d in get_network_pw_inputs()]
names = {d.get('name') for d in usb} | {d.get('name') for d in net}
if input_device_name in names:
bigs = [
auracast_config.AuracastBigConfig(
id=big_ids[0] if big_ids else DEFAULT_BIG_ID,
random_address=big_addrs[0] if big_addrs else DEFAULT_RANDOM_ADDRESS,
code=stream_password,
name=channel_names[0] if channel_names else "Broadcast0",
program_info=program_info[0] if isinstance(program_info, list) and program_info else program_info,
language=languages[0] if languages else "deu",
audio_source=f"device:{input_device_name}",
iso_que_len=1,
sampling_frequency=rate,
octets_per_frame=octets,
input_gain_db=float(settings.get('software_boost_db', 0)),
)
]
conf = auracast_config.AuracastConfigGroup(
auracast_sampling_rate_hz=rate,
octets_per_frame=octets,
transport=TRANSPORT2,
immediate_rendering=immediate_rendering,
assisted_listening_stream=assisted_listening_stream,
presentation_delay_us=pres_delay if pres_delay is not None else 40000,
analog_gain=settings.get('analog_gain', 50),
bigs=bigs,
)
conf.qos_config = QOS_PRESET_MAP.get(saved_qos_preset, QOS_PRESET_MAP["Fast"])
log.info("[AUTOSTART][SECONDARY] Scheduling device init_radio in 2s")
await asyncio.sleep(2)
async with _stream_lock:
log.info("[AUTOSTART][SECONDARY] Calling init_radio for device autostart")
mc, persisted = await init_radio(TRANSPORT2, conf, multicaster2)
multicaster2 = mc
save_settings(persisted, secondary=True)
log.info("[AUTOSTART][SECONDARY] Device autostart completed; settings persisted with is_streaming=%s", persisted.get('is_streaming'))
return
await asyncio.sleep(2)
await do_primary()
await do_secondary()
@app.on_event("startup")
async def _startup_autostart_event():
# Spawn the autostart task without blocking startup
log.info("[STARTUP] Auracast multicast server startup: initializing settings cache, I2C, and PipeWire cache")
_led_off()
# Run install_asoundconf.sh script
script_path = os.path.join(os.path.dirname(__file__), '..', 'misc', 'install_asoundconf.sh')
try:
log.info("[STARTUP] Running install_asoundconf.sh script")
result = subprocess.run(['bash', script_path], capture_output=True, text=True, check=True)
log.info(f"[STARTUP] install_asoundconf.sh completed: {result.stdout.strip()}")
except subprocess.CalledProcessError as e:
log.error(f"[STARTUP] Failed to run install_asoundconf.sh: {e.stderr.strip()}")
except Exception as e:
log.error(f"[STARTUP] Error running install_asoundconf.sh: {str(e)}")
# Hydrate settings cache once to avoid disk I/O during /status
_load_led_settings()
_init_settings_cache_from_disk()
await _init_i2c_on_startup()
# Ensure ADC mixer level is set at startup (default 50%)
await _set_adc_level(50)
refresh_pw_cache()
log.info("[STARTUP] Scheduling autostart task")
asyncio.create_task(_autostart_from_settings())
@app.get("/audio_inputs_pw_usb")
async def audio_inputs_pw_usb():
"""List USB input devices using ALSA backend (USB is ALSA in our scheme)."""
try:
devices = [
{"id": idx, "name": dev.get("name"), "max_input_channels": dev.get("max_input_channels", 0)}
for idx, dev in get_alsa_usb_inputs()
]
return {"inputs": devices}
except Exception as e:
log.error("Exception in /audio_inputs_pw_usb: %s", traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e))
@app.get("/audio_inputs_pw_network")
async def audio_inputs_pw_network():
"""List PipeWire Network/AES67 input nodes from cache."""
try:
devices = [
{"id": idx, "name": dev.get("name"), "max_input_channels": dev.get("max_input_channels", 0)}
for idx, dev in get_network_pw_inputs()
]
return {"inputs": devices}
except Exception as e:
log.error("Exception in /audio_inputs_pw_network: %s", traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e))
@app.get("/audio_inputs_dante")
async def audio_inputs_dante():
"""List Dante ALSA input devices from asound.conf."""
try:
dante_channels = [
"dante_asrc_ch1",
"dante_asrc_ch2",
"dante_asrc_ch3",
"dante_asrc_ch4",
"dante_asrc_ch5",
"dante_asrc_ch6",
]
return {
"inputs": [
{"id": name, "name": name, "max_input_channels": 1}
for name in dante_channels
]
}
except Exception as e:
log.error("Exception in /audio_inputs_dante: %s", traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e))
@app.post("/refresh_audio_devices")
async def refresh_audio_devices():
"""Triggers a re-scan of audio devices, but only if no stream is active."""
try:
status = await _status_primary()
streaming = bool(status.get('is_streaming'))
except Exception as e:
log.error("Exception in /refresh_audio_devices: %s", traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e))
if streaming:
log.warning("Ignoring refresh request: an audio stream is active.")
raise HTTPException(status_code=409, detail="An audio stream is active. Stop the stream before refreshing devices.")
log.info("Refreshing PipeWire device cache.")
refresh_pw_cache()
return {"status": "ok"}
@app.post("/shutdown")
async def shutdown():
"""Stops broadcasting and releases all audio/Bluetooth resources."""
try:
await _stop_all()
return {"status": "stopped"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/system_reboot")
async def system_reboot():
"""Stop audio and request a system reboot via sudo.
Requires the service user to have passwordless sudo permissions to run 'reboot'.
"""
try:
# Best-effort: stop any active streaming cleanly WITHOUT persisting state
try:
await _stop_all()
except Exception:
pass
# Launch reboot without waiting for completion
try:
await asyncio.create_subprocess_exec("sudo", "reboot")
except Exception as e:
log.error("Failed to invoke reboot: %s", e, exc_info=True)
raise HTTPException(status_code=500, detail=f"Failed to invoke reboot: {e}")
return {"status": "rebooting"}
except HTTPException:
raise
except Exception as e:
log.error("Exception in /system_reboot: %s", e, exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@app.post("/restart_dep")
async def restart_dep():
"""Restart DEP by running dep.sh stop then dep.sh start in the dep directory.
Requires the service user to have passwordless sudo permissions to run dep.sh.
"""
try:
# Get the dep directory path (dep.sh is in dante_package subdirectory)
dep_dir = os.path.join(os.path.dirname(__file__), '..', '..', 'dep', 'dante_package')
# Run dep.sh stop first
log.info("Stopping DEP...")
stop_process = await asyncio.create_subprocess_exec(
"sudo", "bash", "dep.sh", "stop",
cwd=dep_dir,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stop_stdout, stop_stderr = await stop_process.communicate()
if stop_process.returncode != 0:
error_msg = stop_stderr.decode() if stop_stderr else "Unknown error"
log.error(f"Failed to stop DEP: {error_msg}")
raise HTTPException(status_code=500, detail=f"Failed to stop DEP: {error_msg}")
# Run dep.sh start after stop succeeds
log.info("Starting DEP...")
start_process = await asyncio.create_subprocess_exec(
"sudo", "bash", "dep.sh", "start",
cwd=dep_dir,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
start_stdout, start_stderr = await start_process.communicate()
if start_process.returncode == 0:
log.info("DEP restarted successfully")
return {"status": "success", "message": "DEP restarted successfully"}
else:
error_msg = start_stderr.decode() if start_stderr else "Unknown error"
log.error(f"Failed to start DEP: {error_msg}")
raise HTTPException(status_code=500, detail=f"Failed to start DEP: {error_msg}")
except HTTPException:
raise
except Exception as e:
log.error("Exception in /restart_dep: %s", e, exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@app.get("/version")
async def get_version():
"""Get the current software version (git tag or commit)."""
try:
# server -> auracast -> src -> bumble-auracast (3 levels up)
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '..'))
# Try to get the current tag
proc = await asyncio.create_subprocess_exec(
"git", "describe", "--tags", "--exact-match",
cwd=project_root,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
if proc.returncode == 0:
return {"version": stdout.decode().strip(), "type": "tag"}
# Fallback: get the current commit short hash
proc = await asyncio.create_subprocess_exec(
"git", "rev-parse", "--short", "HEAD",
cwd=project_root,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
if proc.returncode == 0:
return {"version": stdout.decode().strip(), "type": "commit"}
return {"version": "unknown", "type": "unknown"}
except Exception as e:
log.error("Exception in /version: %s", e, exc_info=True)
return {"version": "unknown", "type": "error"}
@app.get("/check_update")
async def check_update():
"""Check for available updates by comparing current version with latest tag on main branch."""
try:
# server -> auracast -> src -> bumble-auracast (3 levels up)
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '..'))
# Fetch tags and main branch from origin
proc = await asyncio.create_subprocess_exec(
"git", "fetch", "--tags", "origin", "main",
cwd=project_root,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
if proc.returncode != 0:
log.warning("git fetch failed: %s", stderr.decode())
# Get the latest tag reachable from main branch
proc = await asyncio.create_subprocess_exec(
"git", "describe", "--tags", "--abbrev=0", "origin/main",
cwd=project_root,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
if proc.returncode != 0:
return {"available": None, "error": f"No tags found on main branch: {stderr.decode()}"}
latest_tag = stdout.decode().strip()
if not latest_tag:
return {"available": None, "error": "No tags found on main branch"}
# Get current version
current_version = (await get_version()).get("version", "unknown")
update_available = latest_tag != current_version
return {
"current": current_version,
"available": latest_tag,
"update_available": update_available
}
except Exception as e:
log.error("Exception in /check_update: %s", e, exc_info=True)
return {"available": None, "error": str(e)}
@app.post("/system_update")
async def system_update():
"""Update application: git pull main branch (latest tag), poetry install, restart services."""
try:
# Best-effort: stop any active streaming cleanly
try:
await _stop_all()
except Exception:
pass
# server -> auracast -> src -> bumble-auracast (3 levels up)
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '..'))
# 1. Fetch and checkout the latest tag from the main branch
# First fetch all tags and the main branch
proc = await asyncio.create_subprocess_exec(
"git", "fetch", "--tags", "origin", "main",
cwd=project_root,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
if proc.returncode != 0:
log.error("git fetch failed: %s", stderr.decode())
raise HTTPException(status_code=500, detail=f"git fetch failed: {stderr.decode()}")
# Get the latest tag on the main branch
proc = await asyncio.create_subprocess_exec(
"git", "describe", "--tags", "--abbrev=0", "origin/main",
cwd=project_root,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
if proc.returncode != 0:
log.error("git describe failed: %s", stderr.decode())
raise HTTPException(status_code=500, detail=f"Failed to get latest tag: {stderr.decode()}")
latest_tag = stdout.decode().strip()
log.info("Updating to tag: %s", latest_tag)
# Checkout the latest tag
proc = await asyncio.create_subprocess_exec(
"git", "checkout", latest_tag,
cwd=project_root,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
if proc.returncode != 0:
log.error("git checkout failed: %s", stderr.decode())
raise HTTPException(status_code=500, detail=f"git checkout failed: {stderr.decode()}")
# 2. Run poetry install (use full path as poetry is in user's ~/.local/bin)
poetry_path = os.path.expanduser("~/.local/bin/poetry")
proc = await asyncio.create_subprocess_exec(
poetry_path, "install",
cwd=project_root,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
if proc.returncode != 0:
log.error("poetry install failed: %s", stderr.decode())
raise HTTPException(status_code=500, detail=f"poetry install failed: {stderr.decode()}")
# 3. Restart services via the update script
update_script = os.path.join(project_root, 'src', 'service', 'update_and_run_server_and_frontend.sh')
proc = await asyncio.create_subprocess_exec(
"bash", update_script,
cwd=project_root,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
# Don't wait for completion as we'll be restarted
await asyncio.sleep(0.5)
return {"status": "updating", "tag": latest_tag}
except HTTPException:
raise
except Exception as e:
log.error("Exception in /system_update: %s", e, exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
# Recording functionality
RECORDINGS_DIR = os.path.join(os.path.dirname(__file__), 'recordings')
os.makedirs(RECORDINGS_DIR, exist_ok=True)
def cleanup_old_recordings(keep_latest: str = None):
"""Delete all recordings except the latest one (or specified file)."""
try:
recordings = []
for filename in os.listdir(RECORDINGS_DIR):
if filename.endswith('.wav'):
filepath = os.path.join(RECORDINGS_DIR, filename)
if os.path.isfile(filepath):
recordings.append((filename, os.path.getmtime(filepath)))
# Sort by modification time (newest first)
recordings.sort(key=lambda x: x[1], reverse=True)
# Keep only the latest recording (or the specified one)
if keep_latest and os.path.exists(os.path.join(RECORDINGS_DIR, keep_latest)):
files_to_keep = {keep_latest}
else:
files_to_keep = {recordings[0][0]} if recordings else set()
# Delete old recordings
for filename, _ in recordings:
if filename not in files_to_keep:
filepath = os.path.join(RECORDINGS_DIR, filename)
try:
os.remove(filepath)
log.info("Deleted old recording: %s", filename)
except Exception as e:
log.warning("Failed to delete recording %s: %s", filename, e)
except Exception as e:
log.warning("Error during recording cleanup: %s", e)
@app.get("/alsa_devices")
async def get_alsa_devices():
"""Get list of available ALSA input devices."""
try:
devices = []
dev_list = sd.query_devices()
for idx, dev in enumerate(dev_list):
if dev.get('max_input_channels', 0) > 0:
devices.append({
'id': idx,
'name': dev['name'],
'max_input_channels': dev['max_input_channels']
})
# Add individual Dante ASRC channels if shared device is found
dante_shared_device = None
for device in devices:
if device['name'] == 'dante_asrc_shared6':
dante_shared_device = device
break
if dante_shared_device:
# Add individual Dante ASRC channels as virtual devices
for i in range(1, 7): # ch1 to ch6
devices.append({
'id': f"dante_asrc_ch{i}",
'name': f'dante_asrc_ch{i}',
'max_input_channels': 1,
'parent_device': dante_shared_device['name'],
'parent_id': dante_shared_device['id']
})
return {"devices": devices}
except Exception as e:
log.error("Exception in /alsa_devices: %s", e, exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@app.post("/start_recording")
async def start_recording(request: dict):
"""Start a 5-second recording from the specified ALSA device."""
try:
device_name = request.get('device')
if not device_name:
raise HTTPException(status_code=400, detail="Device name is required")
# Generate filename with timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"recording_{timestamp}.wav"
filepath = os.path.join(RECORDINGS_DIR, filename)
# Determine channel count based on device type
# For other devices, try to find actual channel count
channels = 1 # Default to mono
try:
devices = sd.query_devices()
for dev in devices:
if dev['name'] == device_name and dev.get('max_input_channels', 0) > 0:
channels = dev.get('max_input_channels', 1)
break
except Exception:
pass
# Build arecord command
cmd = [
"arecord",
"-D", device_name, # Use the device name directly
"-f", "cd", # CD quality (16-bit little-endian, 44100 Hz)
"-c", str(channels), # Channel count
"-d", "5", # Duration in seconds
"-t", "wav", # WAV format
filepath
]
log.info("Starting recording with command: %s", " ".join(cmd))
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
if proc.returncode != 0:
error_msg = stderr.decode(errors="ignore").strip() if stderr else "Unknown error"
log.error("Recording failed: %s", error_msg)
raise HTTPException(status_code=500, detail=f"Recording failed: {error_msg}")
# Verify file was created and has content
if not os.path.exists(filepath) or os.path.getsize(filepath) == 0:
raise HTTPException(status_code=500, detail="Recording file was not created or is empty")
# Clean up old recordings, keeping only this new one
cleanup_old_recordings(keep_latest=filename)
log.info("Recording completed successfully: %s", filename)
return {"success": True, "filename": filename}
except HTTPException:
raise
except Exception as e:
log.error("Exception in /start_recording: %s", e, exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@app.get("/download_recording/{filename}")
async def download_recording(filename: str):
"""Download a recorded WAV file."""
try:
# Validate filename to prevent directory traversal
if not filename.endswith('.wav') or '/' in filename or '\\' in filename:
raise HTTPException(status_code=400, detail="Invalid filename")
filepath = os.path.join(RECORDINGS_DIR, filename)
if not os.path.exists(filepath):
raise HTTPException(status_code=404, detail="Recording file not found")
return FileResponse(filepath, filename=filename, media_type="audio/wav")
except HTTPException:
raise
except Exception as e:
log.error("Exception in /download_recording: %s", e, exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@app.delete("/delete_recordings")
async def delete_recordings():
"""Delete all recordings in the recordings folder."""
try:
deleted_count = 0
for filename in os.listdir(RECORDINGS_DIR):
filepath = os.path.join(RECORDINGS_DIR, filename)
if os.path.isfile(filepath):
try:
os.remove(filepath)
deleted_count += 1
log.info("Deleted recording: %s", filename)
except Exception as e:
log.warning("Failed to delete %s: %s", filename, e)
log.info("Deleted %d recordings", deleted_count)
return {"success": True, "deleted_count": deleted_count}
except Exception as e:
log.error("Exception in /delete_recordings: %s", e, exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@app.get("/network_info")
async def get_network_info():
"""Get network information for all ethernet interfaces."""
try:
interfaces = {}
hardcoded_devices = ["eth0", "eth1"]
proc = await asyncio.create_subprocess_exec(
"nmcli", "-t", "-f", "NAME,DEVICE", "connection", "show",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
device_to_connection = {}
if proc.returncode == 0:
for line in stdout.decode().strip().split('\n'):
if not line:
continue
parts = line.split(':')
if len(parts) >= 2:
connection_name = parts[0]
device_name = parts[1]
if device_name in hardcoded_devices:
device_to_connection[device_name] = connection_name
for device in hardcoded_devices:
ip_address = None
proc = await asyncio.create_subprocess_exec(
"nmcli", "-t", "-f", "IP4.ADDRESS", "device", "show", device,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
if proc.returncode == 0:
for line in stdout.decode().strip().split('\n'):
if line.startswith('IP4.ADDRESS'):
ip_parts = line.split(':')
if len(ip_parts) >= 2:
full_ip = ip_parts[1]
ip_address = full_ip.split('/')[0]
if not ip_address.startswith('169.254.'):
break
method = "auto"
connection_name = device_to_connection.get(device)
if connection_name:
proc = await asyncio.create_subprocess_exec(
"nmcli", "-t", "-f", "ipv4.method", "connection", "show", connection_name,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
if proc.returncode == 0:
for line in stdout.decode().strip().split('\n'):
if line.startswith('ipv4.method:'):
method = line.split(':')[1]
break
is_dhcp = method == "auto"
interfaces[device] = {
"ip_address": ip_address or "N/A",
"is_dhcp": is_dhcp,
"method": method,
"connection_name": connection_name
}
port_mapping = {
"port1": "eth0",
"port2": "eth1"
}
return {
"interfaces": interfaces,
"port_mapping": port_mapping
}
except HTTPException:
raise
except Exception as e:
log.error("Exception in /network_info: %s", e, exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@app.post("/set_network_config")
async def set_network_config(config: dict):
"""Set network configuration (DHCP or Static IP) for a specific interface.
Expected payload:
{
"interface": "eth0",
"is_dhcp": true/false,
"ip_address": "192.168.1.100" (required if is_dhcp is false),
"netmask": "24" (optional, defaults to 24)
}
"""
try:
interface = config.get("interface")
is_dhcp = config.get("is_dhcp", True)
ip_address = config.get("ip_address")
netmask = config.get("netmask", "24")
if not interface:
raise HTTPException(status_code=400, detail="Interface name is required")
proc = await asyncio.create_subprocess_exec(
"nmcli", "-t", "-f", "NAME,DEVICE", "connection", "show",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
if proc.returncode != 0:
raise HTTPException(status_code=500, detail="Failed to get network connections")
connection_name = None
for line in stdout.decode().strip().split('\n'):
if not line:
continue
parts = line.split(':')
if len(parts) >= 2 and parts[1] == interface:
connection_name = parts[0]
break
if not connection_name:
log.info(f"No connection found for {interface}, creating new connection")
proc = await asyncio.create_subprocess_exec(
"sudo", "nmcli", "con", "add", "type", "ethernet",
"ifname", interface, "con-name", f"Wired connection {interface}",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
if proc.returncode != 0:
raise HTTPException(status_code=500, detail=f"Failed to create connection for {interface}: {stderr.decode()}")
connection_name = f"Wired connection {interface}"
if is_dhcp:
proc = await asyncio.create_subprocess_exec(
"sudo", "nmcli", "con", "modify", connection_name, "ipv4.method", "auto",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
await proc.communicate()
if proc.returncode != 0:
raise HTTPException(status_code=500, detail="Failed to set DHCP mode")
proc = await asyncio.create_subprocess_exec(
"sudo", "nmcli", "con", "modify", connection_name, "ipv4.addresses", "",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
await proc.communicate()
else:
if not ip_address:
raise HTTPException(status_code=400, detail="IP address is required for static configuration")
import re
ip_pattern = re.compile(r'^(\d{1,3}\.){3}\d{1,3}$')
if not ip_pattern.match(ip_address):
raise HTTPException(status_code=400, detail="Invalid IP address format")
octets = ip_address.split('.')
if not all(0 <= int(octet) <= 255 for octet in octets):
raise HTTPException(status_code=400, detail="IP address octets must be between 0 and 255")
proc = await asyncio.create_subprocess_exec(
"sudo", "nmcli", "con", "modify", connection_name,
"ipv4.method", "manual",
"ipv4.addresses", f"{ip_address}/{netmask}",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
await proc.communicate()
if proc.returncode != 0:
raise HTTPException(status_code=500, detail="Failed to set static IP")
proc = await asyncio.create_subprocess_exec(
"sudo", "nmcli", "con", "up", connection_name,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
if proc.returncode != 0:
log.info("Connection activation returned non-zero (may be expected if no cable): %s", stderr.decode())
return {"status": "success", "message": f"Network configuration updated for {interface}"}
except HTTPException:
raise
except Exception as e:
log.error("Exception in /set_network_config: %s", e, exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
if __name__ == '__main__':
import os
os.chdir(os.path.dirname(__file__))
import uvicorn
log.basicConfig( # for debug log level export LOG_LEVEL=DEBUG
level=os.environ.get('LOG_LEVEL', log.INFO),
format='%(module)s.py:%(lineno)d %(levelname)s: %(message)s'
)
# Bind to localhost only for security: prevents network access, only frontend on same machine can connect
uvicorn.run(app, host="127.0.0.1", port=5000, access_log=False)