""" the main server where our multicaster objects live. TODO: in the future the multicaster objects should run in their own threads or even make a second server since everything thats blocking the main event loop leads to inceased latency. """ import os import re import logging as log import json from datetime import datetime import asyncio import random import subprocess import threading from dotenv import load_dotenv from fastapi import FastAPI, HTTPException from fastapi.responses import FileResponse from fastapi.middleware.cors import CORSMiddleware from auracast import multicast_control, auracast_config import sounddevice as sd # type: ignore import traceback from auracast.utils.sounddevice_utils import ( get_network_pw_inputs, get_alsa_usb_inputs, resolve_input_device_index, refresh_pw_cache, devices_by_backend, ) load_dotenv() # Blue LED on GPIO pin 12 (BCM) – turns on while transmitting LED_PIN = 12 try: import RPi.GPIO as _GPIO _GPIO.setmode(_GPIO.BCM) _GPIO.setup(LED_PIN, _GPIO.OUT) _GPIO_AVAILABLE = True except Exception: _GPIO_AVAILABLE = False _GPIO = None # type: ignore _LED_ENABLED: bool = True # toggled via /set_led_enabled _LED_SETTINGS_FILE = os.path.join(os.path.dirname(__file__), 'led_settings.json') def _load_led_settings() -> None: global _LED_ENABLED try: if os.path.exists(_LED_SETTINGS_FILE): with open(_LED_SETTINGS_FILE, 'r', encoding='utf-8') as f: data = json.load(f) _LED_ENABLED = bool(data.get('led_enabled', True)) except Exception: _LED_ENABLED = True def _save_led_settings() -> None: try: os.makedirs(os.path.dirname(_LED_SETTINGS_FILE), exist_ok=True) with open(_LED_SETTINGS_FILE, 'w', encoding='utf-8') as f: json.dump({'led_enabled': _LED_ENABLED}, f) except Exception: pass def _led_on(): if _GPIO_AVAILABLE and _LED_ENABLED: try: _GPIO.output(LED_PIN, _GPIO.LOW) except Exception: pass def _led_off(): if _GPIO_AVAILABLE: try: _GPIO.output(LED_PIN, _GPIO.HIGH) except Exception: pass # Configure bumble debug logging # log.getLogger('bumble').setLevel(log.DEBUG) # make sure pipewire sets latency # Primary and secondary persisted settings files STREAM_SETTINGS_FILE1 = os.path.join(os.path.dirname(__file__), 'stream_settings.json') STREAM_SETTINGS_FILE2 = os.path.join(os.path.dirname(__file__), 'stream_settings2.json') CA_CERT_PATH = os.path.join(os.path.dirname(__file__), 'certs', 'ca', 'ca_cert.pem') # Raspberry Pi UART transports TRANSPORT1 = os.getenv('TRANSPORT1', 'serial:/dev/ttyAMA3,1000000,rtscts') # transport for raspberry pi gpio header TRANSPORT2 = os.getenv('TRANSPORT2', 'serial:/dev/ttyAMA4,1000000,rtscts') # transport for raspberry pi gpio header os.environ["PULSE_LATENCY_MSEC"] = "3" # Defaults from the AuracastBigConfig model, used to detect whether random_address/id # were explicitly set or are still at their model default values. _DEFAULT_BIG = auracast_config.AuracastBigConfig() DEFAULT_BIG_ID = _DEFAULT_BIG.id DEFAULT_RANDOM_ADDRESS = _DEFAULT_BIG.random_address # QoS presets mapping - must match frontend QOS_PRESET_MAP = { "Fast": auracast_config.AuracastQosFast(), "Robust": auracast_config.AuracastQosRobust(), } # In-memory caches to avoid disk I/O on hot paths like /status SETTINGS_CACHE1: dict = {} SETTINGS_CACHE2: dict = {} def get_device_index_by_name(name: str): """Return the device index for a given device name, or None if not found. Queries the current sounddevice list directly (no cache). """ try: devs = sd.query_devices() for idx, d in enumerate(devs): if d.get("name") == name and d.get("max_input_channels", 0) > 0: return idx except Exception: pass return None def _init_settings_cache_from_disk() -> None: """Populate SETTINGS_CACHE1 and SETTINGS_CACHE2 once from disk at startup. If a file doesn't exist, initialize to an empty dict. Any JSON or I/O errors raise. """ global SETTINGS_CACHE1, SETTINGS_CACHE2 if os.path.exists(STREAM_SETTINGS_FILE1): with open(STREAM_SETTINGS_FILE1, 'r', encoding='utf-8') as f: SETTINGS_CACHE1 = json.load(f) else: SETTINGS_CACHE1 = {} if os.path.exists(STREAM_SETTINGS_FILE2): with open(STREAM_SETTINGS_FILE2, 'r', encoding='utf-8') as f: SETTINGS_CACHE2 = json.load(f) else: SETTINGS_CACHE2 = {} def load_stream_settings() -> dict: """Return PRIMARY stream settings from in-memory cache. Hydrated once at startup and updated by save_stream_settings(). No disk I/O occurs here. """ global SETTINGS_CACHE1 return SETTINGS_CACHE1 def load_stream_settings2() -> dict: """Return SECONDARY stream settings from in-memory cache.""" global SETTINGS_CACHE2 return SETTINGS_CACHE2 def save_stream_settings(settings: dict): """Update PRIMARY in-memory settings cache and persist to disk.""" global SETTINGS_CACHE1 SETTINGS_CACHE1 = dict(settings) os.makedirs(os.path.dirname(STREAM_SETTINGS_FILE1), exist_ok=True) with open(STREAM_SETTINGS_FILE1, 'w', encoding='utf-8') as f: json.dump(SETTINGS_CACHE1, f, indent=2) f.flush() os.fsync(f.fileno()) log.info("Saved primary settings to %s", STREAM_SETTINGS_FILE1) def save_stream_settings2(settings: dict): """Update SECONDARY in-memory settings cache and persist to disk.""" global SETTINGS_CACHE2 SETTINGS_CACHE2 = dict(settings) os.makedirs(os.path.dirname(STREAM_SETTINGS_FILE2), exist_ok=True) with open(STREAM_SETTINGS_FILE2, 'w', encoding='utf-8') as f: json.dump(SETTINGS_CACHE2, f, indent=2) f.flush() os.fsync(f.fileno()) log.info("Saved secondary settings to %s", STREAM_SETTINGS_FILE2) def save_settings(persisted: dict, secondary: bool = False) -> None: """Attach timestamp and persist using the appropriate cache/file.""" persisted = dict(persisted) persisted['timestamp'] = datetime.utcnow().isoformat() if secondary: save_stream_settings2(persisted) else: save_stream_settings(persisted) def gen_random_add() -> str: return ':'.join(['%02X' % random.randint(0, 255) for _ in range(6)]) def gen_random_broadcast_id() -> int: """Generate a random 24-bit Broadcast ID (1..0xFFFFFF).""" return random.randint(1, 0xFFFFFF) app = FastAPI() # Allow CORS for frontend on localhost app.add_middleware( CORSMiddleware, allow_origins=["*"], # You can restrict this to ["http://localhost:8501"] if you want allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Initialize global configuration global_config_group = auracast_config.AuracastConfigGroup() # Module-level state replacing StreamerWorker multicaster1: multicast_control.Multicaster | None = None multicaster2: multicast_control.Multicaster | None = None _stream_lock = asyncio.Lock() # serialize initialize/stop_audio on API side # BLE / audio event loop – set in __main__ before uvicorn starts. # All coroutines that touch Bumble objects or the audio pipeline MUST run # on this loop. HTTP handlers call _on_ble_loop() to cross into it. _ble_loop: asyncio.AbstractEventLoop | None = None async def _on_ble_loop(coro): """Submit *coro* to the BLE event loop and await the result. Called from uvicorn's event loop. Bridges HTTP handler coroutines into the isolated BLE loop so that serial I/O (serial_asyncio / HCI) and the audio pipeline are never preempted by HTTP accept/read/write callbacks. asyncio.run_coroutine_threadsafe() schedules the coroutine on _ble_loop (thread-safe), returning a concurrent.futures.Future. asyncio.wrap_future() adapts that into an asyncio.Future so the caller can simply `await` it inside uvicorn's loop. """ assert _ble_loop is not None, "BLE loop not yet initialised" future = asyncio.run_coroutine_threadsafe(coro, _ble_loop) return await asyncio.wrap_future(future) async def _init_i2c_on_startup() -> None: # Ensure i2c-dev kernel module is loaded (required for /dev/i2c-* access) try: proc = await asyncio.create_subprocess_exec( "sudo", "modprobe", "i2c-dev", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await proc.communicate() if proc.returncode != 0: log.warning( "modprobe i2c-dev failed (rc=%s): %s", proc.returncode, (stderr or b"").decode(errors="ignore").strip(), ) else: log.info("i2c-dev module loaded successfully") except Exception as e: log.warning("Exception running modprobe i2c-dev: %s", e, exc_info=True) # Table of (register, expected_value) dev_add = "0x4a" reg_table = [ ("0x00", "0x00"), ("0x06", "0x10"), ("0x07", "0x10"), ] for reg, expected in reg_table: write_cmd = ["i2cset", "-f", "-y", "1", dev_add, reg, expected] try: proc = await asyncio.create_subprocess_exec( *write_cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await proc.communicate() if proc.returncode != 0: log.warning( "i2cset failed (%s): rc=%s stderr=%s", " ".join(write_cmd), proc.returncode, (stderr or b"").decode(errors="ignore").strip(), ) # If the write failed, skip verification for this register continue except Exception as e: log.warning("Exception running i2cset (%s): %s", " ".join(write_cmd), e, exc_info=True) continue # Verify configured register with i2cget read_cmd = ["i2cget", "-f", "-y", "1", dev_add, reg] try: proc = await asyncio.create_subprocess_exec( *read_cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await proc.communicate() if proc.returncode != 0: log.warning( "i2cget failed (%s): rc=%s stderr=%s", " ".join(read_cmd), proc.returncode, (stderr or b"").decode(errors="ignore").strip(), ) continue value = (stdout or b"").decode(errors="ignore").strip() if value != expected: log.error( "I2C register verify failed: addr=0x4a reg=%s expected=%s got=%s", reg, expected, value, ) else: log.info( "I2C register verified: addr=0x4a reg=%s value=%s", reg, value, ) except Exception as e: log.warning("Exception running i2cget (%s): %s", " ".join(read_cmd), e, exc_info=True) async def _set_adc_level(gain_db_left: float = 0.0, gain_db_right: float = 0.0) -> None: """Set ADC mixer gain in dB for left and right channels independently. Runs: amixer -c 2 sset ADC {gain_db_left}dB,{gain_db_right}dB Args: gain_db_left: Left channel gain in dB (-12 to 18), default 0 gain_db_right: Right channel gain in dB (-12 to 18), default 0 """ gain_db_left = max(-12.0, min(18.0, gain_db_left)) gain_db_right = max(-12.0, min(18.0, gain_db_right)) cmd = ["amixer", "-c", "2", "sset", "ADC", "--", f"{int(gain_db_left)}dB,{int(gain_db_right)}dB"] try: proc = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await proc.communicate() if proc.returncode != 0: log.error( "amixer ADC level command failed (rc=%s): %s", proc.returncode, (stderr or b"" ).decode(errors="ignore").strip(), ) else: log.info("amixer ADC level set successfully: %s", (stdout or b"" ).decode(errors="ignore").strip()) read_proc = await asyncio.create_subprocess_exec( "amixer", "-c", "2", "sget", "ADC", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) read_stdout, read_stderr = await read_proc.communicate() if read_proc.returncode != 0: log.error( "amixer ADC sget failed (rc=%s): %s", read_proc.returncode, (read_stderr or b"").decode(errors="ignore").strip(), ) else: sget_output = (read_stdout or b"").decode(errors="ignore") actual = {} for line in sget_output.splitlines(): for ch_key, ch_name in (("left", "Front Left"), ("right", "Front Right")): if ch_name in line: m = re.search(r'\[(-?\d+(?:\.\d+)?)dB\]', line) if m: actual[ch_key] = round(float(m.group(1))) expected_left = int(gain_db_left) expected_right = int(gain_db_right) if actual.get("left") != expected_left or actual.get("right") != expected_right: mismatch = ( f"ADC level mismatch after set: expected L={expected_left}dB R={expected_right}dB, " f"got L={actual.get('left')}dB R={actual.get('right')}dB" ) log.error(mismatch) else: log.info("ADC level set successfully: L=%sdB R=%sdB", expected_left, expected_right) except Exception as e: log.error("Exception running amixer ADC level command: %s", e, exc_info=True) async def _stop_all() -> bool: global multicaster1, multicaster2 was_running = False if multicaster1 is not None: try: await multicaster1.stop_streaming() await multicaster1.shutdown() was_running = True finally: multicaster1 = None if multicaster2 is not None: try: await multicaster2.stop_streaming() await multicaster2.shutdown() was_running = True finally: multicaster2 = None _led_off() return was_running async def _status_primary() -> dict: if multicaster1 is None: return {'is_initialized': False, 'is_streaming': False} return multicaster1.get_status() async def _status_secondary() -> dict: """Return runtime status for the SECONDARY multicaster. Mirrors _status_primary but for multicaster2 so that /status can expose both primary and secondary state to the frontend. """ if multicaster2 is None: return {'is_initialized': False, 'is_streaming': False} return multicaster2.get_status() async def _stream_lc3(audio_data: dict[str, str], bigs_template: list) -> None: if multicaster1 is None: raise HTTPException(status_code=500, detail='Auracast endpoint was never intialized') for big in bigs_template: if big.language not in audio_data: raise HTTPException(status_code=500, detail='language len missmatch') big.audio_source = audio_data[big.language].encode('latin-1') multicaster1.big_conf = bigs_template await multicaster1.start_streaming() def _resolve_qos_preset_name(qos_config) -> str: """Resolve qos_config to preset name based on retransmission count.""" if qos_config is None: return "Fast" rtn = getattr(qos_config, 'number_of_retransmissions', 2) # Fast has 2 retransmissions, Robust has 4 return "Robust" if rtn >= 4 else "Fast" async def init_radio(transport: str, conf: auracast_config.AuracastConfigGroup, current_mc: multicast_control.Multicaster | None): try: log.info('Initializing multicaster with transport %s and config:\n %s', transport, conf.model_dump_json(indent=2)) if current_mc is not None: await current_mc.shutdown() current_mc = None conf.transport = transport first_source = conf.bigs[0].audio_source if conf.bigs else '' input_device_name = None audio_mode_persist = 'Demo' # Capture original per-BIG device names before transformation original_input_devices = [ big.audio_source.split(':', 1)[1] if (isinstance(big.audio_source, str) and big.audio_source.startswith('device:')) else None for big in conf.bigs ] if any(isinstance(b.audio_source, str) and b.audio_source.startswith('device:') for b in conf.bigs): if isinstance(first_source, str) and first_source.startswith('device:'): input_device_name = first_source.split(':', 1)[1] if ':' in first_source else None alsa_usb_names = {d.get('name') for _, d in get_alsa_usb_inputs()} net_names = {d.get('name') for _, d in get_network_pw_inputs()} dante_channels = {"dante_asrc_ch1", "dante_asrc_ch2", "dante_asrc_ch3", "dante_asrc_ch4", "dante_asrc_ch5", "dante_asrc_ch6"} if input_device_name in ('ch1', 'ch2'): audio_mode_persist = 'Analog' # Set ADC gain level for analog mode analog_gain_db_left = getattr(conf, 'analog_gain_db_left', 0.0) analog_gain_db_right = getattr(conf, 'analog_gain_db_right', 0.0) await _set_adc_level(analog_gain_db_left, analog_gain_db_right) elif input_device_name in dante_channels: audio_mode_persist = 'Network - Dante' else: audio_mode_persist = 'Network' if (input_device_name in net_names) else 'USB' # Configure each BIG independently so Dante multi-stream can select different channels. for big in conf.bigs: if not (isinstance(big.audio_source, str) and big.audio_source.startswith('device:')): continue sel = big.audio_source.split(':', 1)[1] if ':' in big.audio_source else None # IMPORTANT: All hardware capture is at 48kHz; LC3 encoder may downsample. hardware_capture_rate = 48000 if sel in dante_channels: # Use ALSA directly (PortAudio doesn't enumerate route PCMs on some systems). big.audio_source = f'alsa:{sel}' big.input_format = f"int16le,{hardware_capture_rate},1" continue # Dante stereo devices: dante_stereo_X_Y (e.g., dante_stereo_1_2) if sel and sel.startswith('dante_stereo_'): is_stereo = getattr(big, 'num_bis', 1) == 2 if is_stereo: # Stereo mode: use the stereo ALSA device with 2 channels big.audio_source = f'alsa:{sel}' big.input_format = f"int16le,{hardware_capture_rate},2" log.info("Configured Dante stereo input: using ALSA %s with 2 channels", sel) else: # Fallback to mono if num_bis != 2 (shouldn't happen) big.audio_source = f'alsa:{sel}' big.input_format = f"int16le,{hardware_capture_rate},2" log.warning("Dante stereo device %s used but num_bis=%d, capturing as stereo anyway", sel, getattr(big, 'num_bis', 1)) continue if sel in ('ch1', 'ch2'): # Analog channels: check if this should be stereo based on num_bis is_stereo = getattr(big, 'num_bis', 1) == 2 if is_stereo and sel == 'ch1': # Stereo mode: use ALSA directly to capture both channels from hardware # ch1=left (channel 0), ch2=right (channel 1) big.audio_source = 'device:hw:2' big.input_format = f"int16le,{hardware_capture_rate},2" log.info("Configured analog stereo input: using ALSA hw:CARD=i2s,DEV=0 with ch1=left, ch2=right") elif is_stereo and sel == 'ch2': # Skip ch2 in stereo mode as it's already captured as part of stereo pair continue else: # Mono mode: use dsnoop virtual device directly (ch1=left, ch2=right) big.audio_source = f'device:{sel}' big.input_format = f"int16le,{hardware_capture_rate},1" continue if sel and sel.isdigit(): device_index = int(sel) else: device_index = resolve_input_device_index(sel or '') if device_index is None: raise HTTPException(status_code=400, detail=f"Audio device '{sel}' not found.") try: resolved_devinfo = sd.query_devices(device_index) log.info( "Resolved input device '%s' -> idx=%s name='%s' hostapi=%s max_in=%s", sel, device_index, resolved_devinfo.get('name'), resolved_devinfo.get('hostapi'), resolved_devinfo.get('max_input_channels'), ) except Exception: log.info("Resolved input device '%s' -> idx=%s (devinfo unavailable)", sel, device_index) big.audio_source = f'device:{device_index}' devinfo = sd.query_devices(device_index) max_in = int(devinfo.get('max_input_channels') or 1) channels = max(1, min(2, max_in)) big.input_format = f"int16le,{hardware_capture_rate},{channels}" # The config group keeps the target sampling rate for LC3 encoder # The audio input will capture at 48kHz and LC3 encoder will downsample target_sampling_rate = getattr(conf, 'auracast_sampling_rate_hz', None) if target_sampling_rate is None and conf.bigs: target_sampling_rate = getattr(conf.bigs[0], 'sampling_frequency', 48000) if target_sampling_rate is None: target_sampling_rate = 48000 # Keep the config group sampling rate as set by frontend conf.auracast_sampling_rate_hz = target_sampling_rate # Ensure octets_per_frame matches the target sampling rate if target_sampling_rate == 48000: conf.octets_per_frame = 120 elif target_sampling_rate == 32000: conf.octets_per_frame = 80 elif target_sampling_rate == 24000: conf.octets_per_frame = 60 elif target_sampling_rate == 16000: conf.octets_per_frame = 40 else: conf.octets_per_frame = 120 # default to 48000 setting conf.qos_config.max_transport_latency_ms = int(conf.qos_config.number_of_retransmissions) * 10 + 3 # Generate fresh random_address and broadcast ID for any BIG still at model defaults. for big in conf.bigs: if not getattr(big, 'random_address', None) or big.random_address == DEFAULT_RANDOM_ADDRESS: big.random_address = gen_random_add() if big.id == DEFAULT_BIG_ID: big.id = gen_random_broadcast_id() # Log the final, fully-updated configuration just before creating the Multicaster log.info('Final multicaster config (transport=%s):\n %s', transport, conf.model_dump_json(indent=2)) mc = multicast_control.Multicaster(conf, conf.bigs) await mc.init_broadcast() auto_started = False if any(isinstance(big.audio_source, str) and (big.audio_source.startswith("device:") or big.audio_source.startswith("alsa:") or big.audio_source.startswith("file:")) for big in conf.bigs): await mc.start_streaming() _led_on() auto_started = True demo_count = sum(1 for big in conf.bigs if isinstance(big.audio_source, str) and big.audio_source.startswith('file:')) demo_rate = int(conf.auracast_sampling_rate_hz or 0) demo_type = None demo_sources = [ str(b.audio_source) for b in conf.bigs if isinstance(b.audio_source, str) and b.audio_source.startswith('file:') ] is_demo_tone = bool(demo_sources) and all('test_tone_1k_' in src for src in demo_sources) demo_content = '1 kHz test tone' if is_demo_tone else 'Program material' if demo_count > 0 and demo_rate > 0: if demo_rate in (48000, 24000, 16000): demo_type = f"{demo_count} × {demo_rate//1000}kHz" else: demo_type = f"{demo_count} × {demo_rate}Hz" persisted = { 'channel_names': [big.name for big in conf.bigs], 'languages': [big.language for big in conf.bigs], 'audio_mode': audio_mode_persist, 'input_device': input_device_name, 'input_devices': original_input_devices, 'program_info': [getattr(big, 'program_info', None) for big in conf.bigs], 'gain': [getattr(big, 'input_gain', 1.0) for big in conf.bigs], 'auracast_sampling_rate_hz': conf.auracast_sampling_rate_hz, 'octets_per_frame': conf.octets_per_frame, 'presentation_delay_us': getattr(conf, 'presentation_delay_us', None), 'qos_preset': _resolve_qos_preset_name(conf.qos_config), 'immediate_rendering': getattr(conf, 'immediate_rendering', False), 'assisted_listening_stream': getattr(conf, 'assisted_listening_stream', False), 'analog_stereo_mode': getattr(conf.bigs[0], 'analog_stereo_mode', False) if conf.bigs else False, 'analog_gain_db_left': getattr(conf, 'analog_gain_db_left', 0.0), 'analog_gain_db_right': getattr(conf, 'analog_gain_db_right', 0.0), 'advertising_tx_power': getattr(conf, 'advertising_tx_power', 8), 'stream_password': (conf.bigs[0].code if conf.bigs and getattr(conf.bigs[0], 'code', None) else None), 'big_ids': [getattr(big, 'id', DEFAULT_BIG_ID) for big in conf.bigs], 'big_random_addresses': [getattr(big, 'random_address', DEFAULT_RANDOM_ADDRESS) for big in conf.bigs], 'demo_total_streams': demo_count, 'demo_stream_type': demo_type, 'demo_content': demo_content, 'is_streaming': auto_started, 'demo_sources': demo_sources, } return mc, persisted except HTTPException: raise except Exception as e: log.error("Exception in init_radio: %s", traceback.format_exc()) raise HTTPException(status_code=500, detail=str(e)) @app.post("/init") async def initialize(conf: auracast_config.AuracastConfigGroup): """Initializes the primary broadcaster on the BLE loop.""" return await _on_ble_loop(_initialize_impl(conf)) async def _initialize_impl(conf: auracast_config.AuracastConfigGroup): async with _stream_lock: global multicaster1, global_config_group mc, persisted = await init_radio(TRANSPORT1, conf, multicaster1) multicaster1 = mc global_config_group = conf save_settings(persisted, secondary=False) @app.post("/init2") async def initialize2(conf: auracast_config.AuracastConfigGroup): """Initializes the secondary broadcaster on the BLE loop.""" return await _on_ble_loop(_initialize2_impl(conf)) async def _initialize2_impl(conf: auracast_config.AuracastConfigGroup): async with _stream_lock: global multicaster2 mc, persisted = await init_radio(TRANSPORT2, conf, multicaster2) multicaster2 = mc save_settings(persisted, secondary=True) @app.post("/set_led_enabled") async def set_led_enabled(body: dict): """Enable or disable the blue status LED. Persisted across restarts.""" global _LED_ENABLED _LED_ENABLED = bool(body.get("led_enabled", True)) _save_led_settings() if not _LED_ENABLED: _led_off() return {"led_enabled": _LED_ENABLED} @app.post("/stop_audio") async def stop_audio(): """Stops streaming on both multicasters via the BLE loop.""" return await _on_ble_loop(_stop_audio_impl()) async def _stop_audio_impl(): """Runs on BLE loop: stops all streamers and persists is_streaming=False.""" try: was_running = await _stop_all() # Persist is_streaming=False for both primary and secondary try: settings1 = load_stream_settings() or {} if settings1.get('is_streaming'): settings1['is_streaming'] = False settings1['timestamp'] = datetime.utcnow().isoformat() save_stream_settings(settings1) settings2 = load_stream_settings2() or {} if settings2.get('is_streaming'): settings2['is_streaming'] = False settings2['timestamp'] = datetime.utcnow().isoformat() save_stream_settings2(settings2) except Exception: log.warning("Failed to persist is_streaming=False during stop_audio", exc_info=True) await asyncio.sleep(0.2) return {"status": "stopped", "was_running": was_running} except Exception as e: log.error("Exception in /stop_audio: %s", traceback.format_exc()) raise HTTPException(status_code=500, detail=str(e)) @app.post("/adc_gain") async def set_adc_gain(payload: dict): """Set ADC gain in dB for left and right channels without restarting the stream. Body: {"gain_db_left": float, "gain_db_right": float} """ try: gain_db_left = float(payload.get("gain_db_left", 0.0)) gain_db_right = float(payload.get("gain_db_right", 0.0)) await _set_adc_level(gain_db_left, gain_db_right) # Persist the new values so they survive a restart for load_fn, save_fn in [(load_stream_settings, save_stream_settings), (load_stream_settings2, save_stream_settings2)]: s = load_fn() or {} if s: s['analog_gain_db_left'] = gain_db_left s['analog_gain_db_right'] = gain_db_right save_fn(s) return {"status": "ok", "gain_db_left": gain_db_left, "gain_db_right": gain_db_right} except Exception as e: log.error("Exception in /adc_gain: %s", traceback.format_exc()) raise HTTPException(status_code=500, detail=str(e)) @app.post("/stream_lc3") async def send_audio(audio_data: dict[str, str]): """Sends a block of pre-coded LC3 audio via the BLE loop.""" try: await _on_ble_loop(_stream_lc3(audio_data, list(global_config_group.bigs))) return {"status": "audio_sent"} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/cert") async def download_ca_cert(): """Download the CA certificate for TLS verification.""" if not os.path.exists(CA_CERT_PATH): raise HTTPException(status_code=404, detail="CA certificate not found") return FileResponse(CA_CERT_PATH, filename="ca_cert.pem", media_type="application/x-pem-file") @app.get("/status") async def get_status(): """Gets current status (worker) merged with persisted settings cache.""" primary_runtime = await _status_primary() primary_persisted = load_stream_settings() or {} # Preserve existing top-level shape for primary for compatibility status: dict = {} status.update(primary_runtime) status.update(primary_persisted) # Attach secondary block with its own runtime + persisted settings secondary_runtime = await _status_secondary() secondary_persisted = load_stream_settings2() or {} secondary: dict = {} secondary.update(secondary_runtime) secondary.update(secondary_persisted) status["secondary"] = secondary status["secondary_is_streaming"] = bool(secondary.get("is_streaming", False)) status["led_enabled"] = _LED_ENABLED return status @app.get("/audio_level") async def get_audio_level(): """Return current RMS audio levels for primary radio (lightweight, for polling).""" if multicaster1 is None: return {"levels": []} return {"levels": multicaster1.get_audio_levels()} @app.get("/audio_level2") async def get_audio_level2(): """Return current RMS audio levels for secondary radio (lightweight, for polling).""" if multicaster2 is None: return {"levels": []} return {"levels": multicaster2.get_audio_levels()} async def _autostart_from_settings(): settings1 = load_stream_settings() or {} settings2 = load_stream_settings2() or {} log.info("[AUTOSTART] Starting autostart check: primary_ts=%s secondary_ts=%s", settings1.get('timestamp'), settings2.get('timestamp')) async def do_primary(): global multicaster1, global_config_group settings = settings1 audio_mode = settings.get('audio_mode') input_device_name = settings.get('input_device') rate = settings.get('auracast_sampling_rate_hz') octets = settings.get('octets_per_frame') pres_delay = settings.get('presentation_delay_us') saved_qos_preset = settings.get('qos_preset', 'Fast') immediate_rendering = settings.get('immediate_rendering', False) assisted_listening_stream = settings.get('assisted_listening_stream', False) channel_names = settings.get('channel_names') or ["Broadcast0"] program_info = settings.get('program_info') or channel_names languages = settings.get('languages') or ["deu"] big_ids = settings.get('big_ids') or [] big_addrs = settings.get('big_random_addresses') or [] stream_password = settings.get('stream_password') tx_power = int(settings.get('advertising_tx_power', 8)) original_ts = settings.get('timestamp') previously_streaming = bool(settings.get('is_streaming')) log.info( "[AUTOSTART][PRIMARY] loaded settings: previously_streaming=%s audio_mode=%s rate=%s octets=%s pres_delay=%s qos_preset=%s immediate_rendering=%s assisted_listening_stream=%s tx_power=%+d dBm demo_sources=%s", previously_streaming, audio_mode, rate, octets, pres_delay, saved_qos_preset, immediate_rendering, assisted_listening_stream, tx_power, (settings.get('demo_sources') or []), ) if not previously_streaming: log.info("[AUTOSTART][PRIMARY] Skipping autostart: is_streaming flag was False in persisted settings") return if audio_mode == 'Demo': demo_sources = settings.get('demo_sources') or [] if not demo_sources or rate is None or octets is None: log.warning( "[AUTOSTART][PRIMARY] Demo autostart aborted: demo_sources_present=%s rate=%s octets=%s", bool(demo_sources), rate, octets, ) return bigs = [] for i, src in enumerate(demo_sources): name = channel_names[i] if i < len(channel_names) else f"Broadcast{i}" pinfo = program_info[i] if isinstance(program_info, list) and i < len(program_info) else (program_info[0] if isinstance(program_info, list) and program_info else program_info) lang = languages[i] if i < len(languages) else (languages[0] if languages else "deu") bigs.append( auracast_config.AuracastBigConfig( id=big_ids[i] if i < len(big_ids) else DEFAULT_BIG_ID, random_address=big_addrs[i] if i < len(big_addrs) else DEFAULT_RANDOM_ADDRESS, code=stream_password, name=name, program_info=pinfo, language=lang, audio_source=src, iso_que_len=1, sampling_frequency=rate, octets_per_frame=octets, ) ) log.info( "[AUTOSTART][PRIMARY] Building demo config for %d streams, rate=%s, octets=%s", len(bigs), rate, octets, ) conf = auracast_config.AuracastConfigGroup( auracast_sampling_rate_hz=rate, octets_per_frame=octets, transport=TRANSPORT1, immediate_rendering=immediate_rendering, assisted_listening_stream=assisted_listening_stream, presentation_delay_us=pres_delay if pres_delay is not None else 40000, advertising_tx_power=tx_power, bigs=bigs, ) # Set num_bis for stereo mode if needed if conf.bigs and settings.get('analog_stereo_mode', False): conf.bigs[0].num_bis = 2 conf.qos_config = QOS_PRESET_MAP.get(saved_qos_preset, QOS_PRESET_MAP["Fast"]) log.info("[AUTOSTART][PRIMARY] Scheduling demo init_radio in 2s") await asyncio.sleep(2) async with _stream_lock: log.info("[AUTOSTART][PRIMARY] Calling init_radio for demo autostart") mc, persisted = await init_radio(TRANSPORT1, conf, multicaster1) multicaster1 = mc global_config_group = conf save_settings(persisted, secondary=False) log.info("[AUTOSTART][PRIMARY] Demo autostart completed; settings persisted with is_streaming=%s", persisted.get('is_streaming')) return if not input_device_name or rate is None or octets is None: log.info( "[AUTOSTART][PRIMARY] Skipping device-based autostart: input_device=%s rate=%s octets=%s", input_device_name, rate, octets, ) return current = await _status_primary() if current.get('is_streaming'): log.info("[AUTOSTART][PRIMARY] Skipping device-based autostart: stream already running") return while True: current = await _status_primary() if current.get('is_streaming'): log.info("[AUTOSTART][PRIMARY] Aborting wait loop: stream started externally") return current_settings = load_stream_settings() or {} if current_settings.get('timestamp') != original_ts: if ( current_settings.get('input_device') != input_device_name or current_settings.get('audio_mode') != audio_mode ): log.info("[AUTOSTART][PRIMARY] Aborting wait loop: settings changed (audio_mode/input_device)") return usb = [d for _, d in get_alsa_usb_inputs()] net = [d for _, d in get_network_pw_inputs()] names = {d.get('name') for d in usb} | {d.get('name') for d in net} if input_device_name in names: log.info("[AUTOSTART][PRIMARY] Device '%s' detected, starting autostart", input_device_name) bigs = [ auracast_config.AuracastBigConfig( id=big_ids[0] if big_ids else DEFAULT_BIG_ID, random_address=big_addrs[0] if big_addrs else DEFAULT_RANDOM_ADDRESS, code=stream_password, name=channel_names[0] if channel_names else "Broadcast0", program_info=program_info[0] if isinstance(program_info, list) and program_info else program_info, language=languages[0] if languages else "deu", audio_source=f"device:{input_device_name}", iso_que_len=1, sampling_frequency=rate, octets_per_frame=octets, ) ] conf = auracast_config.AuracastConfigGroup( auracast_sampling_rate_hz=rate, octets_per_frame=octets, transport=TRANSPORT1, immediate_rendering=immediate_rendering, assisted_listening_stream=assisted_listening_stream, presentation_delay_us=pres_delay if pres_delay is not None else 40000, analog_gain_db_left=settings.get('analog_gain_db_left', 0.0), analog_gain_db_right=settings.get('analog_gain_db_right', 0.0), advertising_tx_power=tx_power, bigs=bigs, ) # Set num_bis for stereo mode if needed if conf.bigs and settings.get('analog_stereo_mode', False): conf.bigs[0].num_bis = 2 conf.qos_config = QOS_PRESET_MAP.get(saved_qos_preset, QOS_PRESET_MAP["Fast"]) log.info("[AUTOSTART][PRIMARY] Scheduling device init_radio in 2s") await asyncio.sleep(2) async with _stream_lock: log.info("[AUTOSTART][PRIMARY] Calling init_radio for device autostart") mc, persisted = await init_radio(TRANSPORT1, conf, multicaster1) multicaster1 = mc global_config_group = conf save_settings(persisted, secondary=False) log.info("[AUTOSTART][PRIMARY] Device autostart completed; settings persisted with is_streaming=%s", persisted.get('is_streaming')) return await asyncio.sleep(2) async def do_secondary(): global multicaster2 settings = settings2 audio_mode = settings.get('audio_mode') input_device_name = settings.get('input_device') rate = settings.get('auracast_sampling_rate_hz') octets = settings.get('octets_per_frame') pres_delay = settings.get('presentation_delay_us') saved_qos_preset = settings.get('qos_preset', 'Fast') immediate_rendering = settings.get('immediate_rendering', False) assisted_listening_stream = settings.get('assisted_listening_stream', False) channel_names = settings.get('channel_names') or ["Broadcast0"] program_info = settings.get('program_info') or channel_names languages = settings.get('languages') or ["deu"] big_ids = settings.get('big_ids') or [] big_addrs = settings.get('big_random_addresses') or [] stream_password = settings.get('stream_password') tx_power = int(settings.get('advertising_tx_power', 8)) original_ts = settings.get('timestamp') previously_streaming = bool(settings.get('is_streaming')) log.info( "[AUTOSTART][SECONDARY] loaded settings: previously_streaming=%s audio_mode=%s rate=%s octets=%s pres_delay=%s qos_preset=%s immediate_rendering=%s assisted_listening_stream=%s tx_power=%+d dBm demo_sources=%s", previously_streaming, audio_mode, rate, octets, pres_delay, saved_qos_preset, immediate_rendering, assisted_listening_stream, tx_power, (settings.get('demo_sources') or []), ) if not previously_streaming: log.info("[AUTOSTART][SECONDARY] Skipping autostart: is_streaming flag was False in persisted settings") return if audio_mode == 'Demo': demo_sources = settings.get('demo_sources') or [] if not demo_sources or rate is None or octets is None: log.warning( "[AUTOSTART][SECONDARY] Demo autostart aborted: demo_sources_present=%s rate=%s octets=%s", bool(demo_sources), rate, octets, ) return bigs = [] for i, src in enumerate(demo_sources): name = channel_names[i] if i < len(channel_names) else f"Broadcast{i}" pinfo = program_info[i] if isinstance(program_info, list) and i < len(program_info) else (program_info[0] if isinstance(program_info, list) and program_info else program_info) lang = languages[i] if i < len(languages) else (languages[0] if languages else "deu") bigs.append( auracast_config.AuracastBigConfig( code=stream_password, name=name, program_info=pinfo, language=lang, audio_source=src, iso_que_len=1, sampling_frequency=rate, octets_per_frame=octets, ) ) conf = auracast_config.AuracastConfigGroup( auracast_sampling_rate_hz=rate, octets_per_frame=octets, transport=TRANSPORT2, immediate_rendering=immediate_rendering, assisted_listening_stream=assisted_listening_stream, presentation_delay_us=pres_delay if pres_delay is not None else 40000, advertising_tx_power=tx_power, bigs=bigs, ) conf.qos_config = QOS_PRESET_MAP.get(saved_qos_preset, QOS_PRESET_MAP["Fast"]) log.info("[AUTOSTART][SECONDARY] Scheduling demo init_radio in 2s") await asyncio.sleep(2) async with _stream_lock: log.info("[AUTOSTART][SECONDARY] Calling init_radio for demo autostart") mc, persisted = await init_radio(TRANSPORT2, conf, multicaster2) multicaster2 = mc save_settings(persisted, secondary=True) log.info("[AUTOSTART][SECONDARY] Demo autostart completed; settings persisted with is_streaming=%s", persisted.get('is_streaming')) return if not input_device_name or rate is None or octets is None: log.info( "[AUTOSTART][SECONDARY] Skipping device-based autostart: input_device=%s rate=%s octets=%s", input_device_name, rate, octets, ) return if multicaster2 is not None: try: if multicaster2.get_status().get('is_streaming'): log.info("[AUTOSTART][SECONDARY] Skipping device-based autostart: stream already running") return except Exception: pass while True: if multicaster2 is not None: try: if multicaster2.get_status().get('is_streaming'): log.info("[AUTOSTART][SECONDARY] Aborting wait loop: stream started externally") return except Exception: pass current_settings = load_stream_settings2() or {} if current_settings.get('timestamp') != original_ts: if ( current_settings.get('input_device') != input_device_name or current_settings.get('audio_mode') != audio_mode ): return usb = [d for _, d in get_alsa_usb_inputs()] net = [d for _, d in get_network_pw_inputs()] names = {d.get('name') for d in usb} | {d.get('name') for d in net} if input_device_name in names: bigs = [ auracast_config.AuracastBigConfig( id=big_ids[0] if big_ids else DEFAULT_BIG_ID, random_address=big_addrs[0] if big_addrs else DEFAULT_RANDOM_ADDRESS, code=stream_password, name=channel_names[0] if channel_names else "Broadcast0", program_info=program_info[0] if isinstance(program_info, list) and program_info else program_info, language=languages[0] if languages else "deu", audio_source=f"device:{input_device_name}", iso_que_len=1, sampling_frequency=rate, octets_per_frame=octets, ) ] conf = auracast_config.AuracastConfigGroup( auracast_sampling_rate_hz=rate, octets_per_frame=octets, transport=TRANSPORT2, immediate_rendering=immediate_rendering, assisted_listening_stream=assisted_listening_stream, presentation_delay_us=pres_delay if pres_delay is not None else 40000, analog_gain_db_left=settings.get('analog_gain_db_left', 0.0), analog_gain_db_right=settings.get('analog_gain_db_right', 0.0), advertising_tx_power=tx_power, bigs=bigs, ) conf.qos_config = QOS_PRESET_MAP.get(saved_qos_preset, QOS_PRESET_MAP["Fast"]) log.info("[AUTOSTART][SECONDARY] Scheduling device init_radio in 2s") await asyncio.sleep(2) async with _stream_lock: log.info("[AUTOSTART][SECONDARY] Calling init_radio for device autostart") mc, persisted = await init_radio(TRANSPORT2, conf, multicaster2) multicaster2 = mc save_settings(persisted, secondary=True) log.info("[AUTOSTART][SECONDARY] Device autostart completed; settings persisted with is_streaming=%s", persisted.get('is_streaming')) return await asyncio.sleep(2) await do_primary() await do_secondary() async def _ble_startup(): """I2C init, ADC level reset, and autostart task scheduling on the BLE loop. Bridged from _startup_autostart_event() so that these async subprocess calls and the long-lived autostart coroutine all run on _ble_loop, never on uvicorn's HTTP loop. """ await _init_i2c_on_startup() await _set_adc_level(0.0, 0.0) log.info("[STARTUP] Scheduling autostart task on BLE loop") asyncio.create_task(_autostart_from_settings()) @app.on_event("startup") async def _startup_autostart_event(): # Spawn the autostart task without blocking startup log.info("[STARTUP] Auracast multicast server startup: initializing settings cache, I2C, and PipeWire cache") _led_off() # Run install_asoundconf.sh script script_path = os.path.join(os.path.dirname(__file__), '..', '..', 'misc', 'install_asoundconf.sh') try: log.info("[STARTUP] Running install_asoundconf.sh script") result = subprocess.run(['bash', script_path], capture_output=True, text=True, check=True) log.info(f"[STARTUP] install_asoundconf.sh completed: {result.stdout.strip()}") except subprocess.CalledProcessError as e: log.error(f"[STARTUP] Failed to run install_asoundconf.sh: {e.stderr.strip()}") except Exception as e: log.error(f"[STARTUP] Error running install_asoundconf.sh: {str(e)}") # Hydrate settings cache once to avoid disk I/O during /status _load_led_settings() _init_settings_cache_from_disk() refresh_pw_cache() # I2C init, ADC setup and the autostart task must run on the BLE loop so # they share the same event loop as the Bumble HCI transport. log.info("[STARTUP] Bridging I2C init and autostart to BLE loop") asyncio.run_coroutine_threadsafe(_ble_startup(), _ble_loop) @app.get("/audio_inputs_pw_usb") async def audio_inputs_pw_usb(): """List USB input devices using ALSA backend (USB is ALSA in our scheme).""" try: devices = [ {"id": idx, "name": dev.get("name"), "max_input_channels": dev.get("max_input_channels", 0)} for idx, dev in get_alsa_usb_inputs() ] return {"inputs": devices} except Exception as e: log.error("Exception in /audio_inputs_pw_usb: %s", traceback.format_exc()) raise HTTPException(status_code=500, detail=str(e)) @app.get("/audio_inputs_pw_network") async def audio_inputs_pw_network(): """List PipeWire Network/AES67 input nodes from cache.""" try: devices = [ {"id": idx, "name": dev.get("name"), "max_input_channels": dev.get("max_input_channels", 0)} for idx, dev in get_network_pw_inputs() ] return {"inputs": devices} except Exception as e: log.error("Exception in /audio_inputs_pw_network: %s", traceback.format_exc()) raise HTTPException(status_code=500, detail=str(e)) @app.get("/audio_inputs_dante") async def audio_inputs_dante(): """List Dante ALSA input devices from asound.conf.""" try: dante_channels = [ "dante_asrc_ch1", "dante_asrc_ch2", "dante_asrc_ch3", "dante_asrc_ch4", "dante_asrc_ch5", "dante_asrc_ch6", ] return { "inputs": [ {"id": name, "name": name, "max_input_channels": 1} for name in dante_channels ] } except Exception as e: log.error("Exception in /audio_inputs_dante: %s", traceback.format_exc()) raise HTTPException(status_code=500, detail=str(e)) @app.post("/refresh_audio_devices") async def refresh_audio_devices(): """Triggers a re-scan of audio devices, but only if no stream is active.""" try: status = await _status_primary() streaming = bool(status.get('is_streaming')) except Exception as e: log.error("Exception in /refresh_audio_devices: %s", traceback.format_exc()) raise HTTPException(status_code=500, detail=str(e)) if streaming: log.warning("Ignoring refresh request: an audio stream is active.") raise HTTPException(status_code=409, detail="An audio stream is active. Stop the stream before refreshing devices.") log.info("Refreshing PipeWire device cache.") refresh_pw_cache() return {"status": "ok"} @app.post("/shutdown") async def shutdown(): """Stops broadcasting and releases all audio/Bluetooth resources.""" return await _on_ble_loop(_shutdown_impl()) async def _shutdown_impl(): try: await _stop_all() return {"status": "stopped"} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/system_reboot") async def system_reboot(): """Stop audio and request a system reboot via sudo. Requires the service user to have passwordless sudo permissions to run 'reboot'. """ return await _on_ble_loop(_system_reboot_impl()) async def _system_reboot_impl(): try: # Best-effort: stop any active streaming cleanly WITHOUT persisting state try: await _stop_all() except Exception: pass # Launch reboot without waiting for completion try: await asyncio.create_subprocess_exec("sudo", "reboot") except Exception as e: log.error("Failed to invoke reboot: %s", e, exc_info=True) raise HTTPException(status_code=500, detail=f"Failed to invoke reboot: {e}") return {"status": "rebooting"} except HTTPException: raise except Exception as e: log.error("Exception in /system_reboot: %s", e, exc_info=True) raise HTTPException(status_code=500, detail=str(e)) @app.post("/restart_dep") async def restart_dep(): """Restart DEP via systemctl restart dep.service. Requires the service user to have passwordless sudo permissions for systemctl. """ try: log.info("Restarting DEP via systemctl...") proc = await asyncio.create_subprocess_exec( "sudo", "systemctl", "restart", "dep.service", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await proc.communicate() if proc.returncode == 0: log.info("DEP restarted successfully") return {"status": "success", "message": "DEP restarted successfully"} else: error_msg = stderr.decode() if stderr else "Unknown error" log.error(f"Failed to restart DEP: {error_msg}") raise HTTPException(status_code=500, detail=f"Failed to restart DEP: {error_msg}") except HTTPException: raise except Exception as e: log.error("Exception in /restart_dep: %s", e, exc_info=True) raise HTTPException(status_code=500, detail=str(e)) @app.get("/version") async def get_version(): """Get the current software version (git tag or commit).""" try: # server -> auracast -> src -> bumble-auracast (3 levels up) project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '..')) # Try to get the current tag proc = await asyncio.create_subprocess_exec( "git", "describe", "--tags", "--exact-match", cwd=project_root, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await proc.communicate() if proc.returncode == 0: return {"version": stdout.decode().strip(), "type": "tag"} # Fallback: get the current commit short hash proc = await asyncio.create_subprocess_exec( "git", "rev-parse", "--short", "HEAD", cwd=project_root, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await proc.communicate() if proc.returncode == 0: return {"version": stdout.decode().strip(), "type": "commit"} return {"version": "unknown", "type": "unknown"} except Exception as e: log.error("Exception in /version: %s", e, exc_info=True) return {"version": "unknown", "type": "error"} @app.get("/check_update") async def check_update(): """Check for available updates by comparing current version with latest tag on main branch.""" try: # server -> auracast -> src -> bumble-auracast (3 levels up) project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '..')) # Fetch tags and main branch from origin proc = await asyncio.create_subprocess_exec( "git", "fetch", "--tags", "origin", "main", cwd=project_root, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await proc.communicate() if proc.returncode != 0: log.warning("git fetch failed: %s", stderr.decode()) # Get the latest tag reachable from main branch proc = await asyncio.create_subprocess_exec( "git", "describe", "--tags", "--abbrev=0", "origin/main", cwd=project_root, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await proc.communicate() if proc.returncode != 0: return {"available": None, "error": f"No tags found on main branch: {stderr.decode()}"} latest_tag = stdout.decode().strip() if not latest_tag: return {"available": None, "error": "No tags found on main branch"} # Get current version current_version = (await get_version()).get("version", "unknown") update_available = latest_tag != current_version return { "current": current_version, "available": latest_tag, "update_available": update_available } except Exception as e: log.error("Exception in /check_update: %s", e, exc_info=True) return {"available": None, "error": str(e)} @app.post("/system_update") async def system_update(): """Update application: git pull main branch (latest tag), poetry install, restart services.""" return await _on_ble_loop(_system_update_impl()) async def _system_update_impl(): try: # Best-effort: stop any active streaming cleanly try: await _stop_all() except Exception: pass # server -> auracast -> src -> bumble-auracast (3 levels up) project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '..')) # 1. Fetch and checkout the latest tag from the main branch # First fetch all tags and the main branch proc = await asyncio.create_subprocess_exec( "git", "fetch", "--tags", "origin", "main", cwd=project_root, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await proc.communicate() if proc.returncode != 0: log.error("git fetch failed: %s", stderr.decode()) raise HTTPException(status_code=500, detail=f"git fetch failed: {stderr.decode()}") # Get the latest tag on the main branch proc = await asyncio.create_subprocess_exec( "git", "describe", "--tags", "--abbrev=0", "origin/main", cwd=project_root, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await proc.communicate() if proc.returncode != 0: log.error("git describe failed: %s", stderr.decode()) raise HTTPException(status_code=500, detail=f"Failed to get latest tag: {stderr.decode()}") latest_tag = stdout.decode().strip() log.info("Updating to tag: %s", latest_tag) # Checkout the latest tag proc = await asyncio.create_subprocess_exec( "git", "checkout", latest_tag, cwd=project_root, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await proc.communicate() if proc.returncode != 0: log.error("git checkout failed: %s", stderr.decode()) raise HTTPException(status_code=500, detail=f"git checkout failed: {stderr.decode()}") # 2. Hand off remaining work to the (now-updated) system_update.sh script update_script = os.path.join(os.path.dirname(__file__), 'system_update.sh') log.info("Handing off to system_update.sh...") await asyncio.create_subprocess_exec( "bash", update_script, cwd=project_root, ) # Don't wait for completion as we'll be restarted await asyncio.sleep(0.5) return {"status": "updating", "tag": latest_tag} except HTTPException: raise except Exception as e: log.error("Exception in /system_update: %s", e, exc_info=True) raise HTTPException(status_code=500, detail=str(e)) # Recording functionality RECORDINGS_DIR = os.path.join(os.path.dirname(__file__), 'recordings') os.makedirs(RECORDINGS_DIR, exist_ok=True) def cleanup_old_recordings(keep_latest: str = None): """Delete all recordings except the latest one (or specified file).""" try: recordings = [] for filename in os.listdir(RECORDINGS_DIR): if filename.endswith('.wav'): filepath = os.path.join(RECORDINGS_DIR, filename) if os.path.isfile(filepath): recordings.append((filename, os.path.getmtime(filepath))) # Sort by modification time (newest first) recordings.sort(key=lambda x: x[1], reverse=True) # Keep only the latest recording (or the specified one) if keep_latest and os.path.exists(os.path.join(RECORDINGS_DIR, keep_latest)): files_to_keep = {keep_latest} else: files_to_keep = {recordings[0][0]} if recordings else set() # Delete old recordings for filename, _ in recordings: if filename not in files_to_keep: filepath = os.path.join(RECORDINGS_DIR, filename) try: os.remove(filepath) log.info("Deleted old recording: %s", filename) except Exception as e: log.warning("Failed to delete recording %s: %s", filename, e) except Exception as e: log.warning("Error during recording cleanup: %s", e) @app.get("/alsa_devices") async def get_alsa_devices(): """Get list of available ALSA input devices.""" try: devices = [] dev_list = sd.query_devices() for idx, dev in enumerate(dev_list): if dev.get('max_input_channels', 0) > 0: devices.append({ 'id': idx, 'name': dev['name'], 'max_input_channels': dev['max_input_channels'] }) # Add individual Dante ASRC channels if shared device is found dante_shared_device = None for device in devices: if device['name'] == 'dante_asrc_shared6': dante_shared_device = device break if dante_shared_device: # Add individual Dante ASRC channels as virtual devices for i in range(1, 7): # ch1 to ch6 devices.append({ 'id': f"dante_asrc_ch{i}", 'name': f'dante_asrc_ch{i}', 'max_input_channels': 1, 'parent_device': dante_shared_device['name'], 'parent_id': dante_shared_device['id'] }) return {"devices": devices} except Exception as e: log.error("Exception in /alsa_devices: %s", e, exc_info=True) raise HTTPException(status_code=500, detail=str(e)) @app.post("/start_recording") async def start_recording(request: dict): """Start a 5-second recording from the specified ALSA device.""" try: device_name = request.get('device') if not device_name: raise HTTPException(status_code=400, detail="Device name is required") # Generate filename with timestamp timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"recording_{timestamp}.wav" filepath = os.path.join(RECORDINGS_DIR, filename) # Determine channel count based on device type # For other devices, try to find actual channel count channels = 1 # Default to mono try: devices = sd.query_devices() for dev in devices: if dev['name'] == device_name and dev.get('max_input_channels', 0) > 0: channels = dev.get('max_input_channels', 1) break except Exception: pass # Build arecord command cmd = [ "arecord", "-D", device_name, # Use the device name directly "-f", "cd", # CD quality (16-bit little-endian, 44100 Hz) "-c", str(channels), # Channel count "-d", "5", # Duration in seconds "-t", "wav", # WAV format filepath ] log.info("Starting recording with command: %s", " ".join(cmd)) proc = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await proc.communicate() if proc.returncode != 0: error_msg = stderr.decode(errors="ignore").strip() if stderr else "Unknown error" log.error("Recording failed: %s", error_msg) raise HTTPException(status_code=500, detail=f"Recording failed: {error_msg}") # Verify file was created and has content if not os.path.exists(filepath) or os.path.getsize(filepath) == 0: raise HTTPException(status_code=500, detail="Recording file was not created or is empty") # Clean up old recordings, keeping only this new one cleanup_old_recordings(keep_latest=filename) log.info("Recording completed successfully: %s", filename) return {"success": True, "filename": filename} except HTTPException: raise except Exception as e: log.error("Exception in /start_recording: %s", e, exc_info=True) raise HTTPException(status_code=500, detail=str(e)) @app.get("/download_recording/{filename}") async def download_recording(filename: str): """Download a recorded WAV file.""" try: # Validate filename to prevent directory traversal if not filename.endswith('.wav') or '/' in filename or '\\' in filename: raise HTTPException(status_code=400, detail="Invalid filename") filepath = os.path.join(RECORDINGS_DIR, filename) if not os.path.exists(filepath): raise HTTPException(status_code=404, detail="Recording file not found") return FileResponse(filepath, filename=filename, media_type="audio/wav") except HTTPException: raise except Exception as e: log.error("Exception in /download_recording: %s", e, exc_info=True) raise HTTPException(status_code=500, detail=str(e)) @app.delete("/delete_recordings") async def delete_recordings(): """Delete all recordings in the recordings folder.""" try: deleted_count = 0 for filename in os.listdir(RECORDINGS_DIR): filepath = os.path.join(RECORDINGS_DIR, filename) if os.path.isfile(filepath): try: os.remove(filepath) deleted_count += 1 log.info("Deleted recording: %s", filename) except Exception as e: log.warning("Failed to delete %s: %s", filename, e) log.info("Deleted %d recordings", deleted_count) return {"success": True, "deleted_count": deleted_count} except Exception as e: log.error("Exception in /delete_recordings: %s", e, exc_info=True) raise HTTPException(status_code=500, detail=str(e)) @app.get("/network_info") async def get_network_info(): """Get network information for all ethernet interfaces.""" try: interfaces = {} hardcoded_devices = ["eth0", "eth1"] proc = await asyncio.create_subprocess_exec( "nmcli", "-t", "-f", "NAME,DEVICE", "connection", "show", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await proc.communicate() device_to_connection = {} if proc.returncode == 0: for line in stdout.decode().strip().split('\n'): if not line: continue parts = line.split(':') if len(parts) >= 2: connection_name = parts[0] device_name = parts[1] if device_name in hardcoded_devices: device_to_connection[device_name] = connection_name for device in hardcoded_devices: ip_address = None proc = await asyncio.create_subprocess_exec( "nmcli", "-t", "-f", "IP4.ADDRESS", "device", "show", device, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await proc.communicate() if proc.returncode == 0: for line in stdout.decode().strip().split('\n'): if line.startswith('IP4.ADDRESS'): ip_parts = line.split(':') if len(ip_parts) >= 2: full_ip = ip_parts[1] ip_address = full_ip.split('/')[0] if not ip_address.startswith('169.254.'): break method = "auto" connection_name = device_to_connection.get(device) if connection_name: proc = await asyncio.create_subprocess_exec( "nmcli", "-t", "-f", "ipv4.method", "connection", "show", connection_name, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await proc.communicate() if proc.returncode == 0: for line in stdout.decode().strip().split('\n'): if line.startswith('ipv4.method:'): method = line.split(':')[1] break is_dhcp = method == "auto" interfaces[device] = { "ip_address": ip_address or "N/A", "is_dhcp": is_dhcp, "method": method, "connection_name": connection_name } port_mapping = { "port1": "eth0", "port2": "eth1" } return { "interfaces": interfaces, "port_mapping": port_mapping } except HTTPException: raise except Exception as e: log.error("Exception in /network_info: %s", e, exc_info=True) raise HTTPException(status_code=500, detail=str(e)) @app.post("/set_network_config") async def set_network_config(config: dict): """Set network configuration (DHCP or Static IP) for a specific interface. Expected payload: { "interface": "eth0", "is_dhcp": true/false, "ip_address": "192.168.1.100" (required if is_dhcp is false), "netmask": "24" (optional, defaults to 24) } """ try: interface = config.get("interface") is_dhcp = config.get("is_dhcp", True) ip_address = config.get("ip_address") netmask = config.get("netmask", "24") if not interface: raise HTTPException(status_code=400, detail="Interface name is required") proc = await asyncio.create_subprocess_exec( "nmcli", "-t", "-f", "NAME,DEVICE", "connection", "show", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await proc.communicate() if proc.returncode != 0: raise HTTPException(status_code=500, detail="Failed to get network connections") connection_name = None for line in stdout.decode().strip().split('\n'): if not line: continue parts = line.split(':') if len(parts) >= 2 and parts[1] == interface: connection_name = parts[0] break if not connection_name: log.info(f"No connection found for {interface}, creating new connection") proc = await asyncio.create_subprocess_exec( "sudo", "nmcli", "con", "add", "type", "ethernet", "ifname", interface, "con-name", f"Wired connection {interface}", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await proc.communicate() if proc.returncode != 0: raise HTTPException(status_code=500, detail=f"Failed to create connection for {interface}: {stderr.decode()}") connection_name = f"Wired connection {interface}" if is_dhcp: proc = await asyncio.create_subprocess_exec( "sudo", "nmcli", "con", "modify", connection_name, "ipv4.method", "auto", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) await proc.communicate() if proc.returncode != 0: raise HTTPException(status_code=500, detail="Failed to set DHCP mode") proc = await asyncio.create_subprocess_exec( "sudo", "nmcli", "con", "modify", connection_name, "ipv4.addresses", "", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) await proc.communicate() else: if not ip_address: raise HTTPException(status_code=400, detail="IP address is required for static configuration") import re ip_pattern = re.compile(r'^(\d{1,3}\.){3}\d{1,3}$') if not ip_pattern.match(ip_address): raise HTTPException(status_code=400, detail="Invalid IP address format") octets = ip_address.split('.') if not all(0 <= int(octet) <= 255 for octet in octets): raise HTTPException(status_code=400, detail="IP address octets must be between 0 and 255") proc = await asyncio.create_subprocess_exec( "sudo", "nmcli", "con", "modify", connection_name, "ipv4.method", "manual", "ipv4.addresses", f"{ip_address}/{netmask}", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) await proc.communicate() if proc.returncode != 0: raise HTTPException(status_code=500, detail="Failed to set static IP") proc = await asyncio.create_subprocess_exec( "sudo", "nmcli", "con", "up", connection_name, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await proc.communicate() if proc.returncode != 0: log.info("Connection activation returned non-zero (may be expected if no cable): %s", stderr.decode()) return {"status": "success", "message": f"Network configuration updated for {interface}"} except HTTPException: raise except Exception as e: log.error("Exception in /set_network_config: %s", e, exc_info=True) raise HTTPException(status_code=500, detail=str(e)) if __name__ == '__main__': import os os.chdir(os.path.dirname(__file__)) import uvicorn log.basicConfig( # for debug log level export LOG_LEVEL=DEBUG level=os.environ.get('LOG_LEVEL', log.INFO), format='%(module)s.py:%(lineno)d %(levelname)s: %(message)s' ) # ── GIL switch interval ───────────────────────────────────────────────── # CPython releases the GIL every sys.getswitchinterval() seconds (default # 5 ms). The audio pipeline fires every 10 ms, so a 5 ms granularity # means up to half a frame period can be wasted waiting for the GIL. # Reducing to 1 ms gives the BLE thread much tighter access. import sys sys.setswitchinterval(0.001) log.info("GIL switch interval set to 1 ms") # ── BLE / audio event loop ────────────────────────────────────────────── # Bumble (serial_asyncio / HCI) and the audio pipeline run exclusively on # this loop. Uvicorn's HTTP accept/read/write callbacks run on a separate # asyncio loop in the main thread, so they can never stall BLE advertising # or audio encoding. # # Route handlers that touch Bumble objects call _on_ble_loop(), which uses # asyncio.run_coroutine_threadsafe() + asyncio.wrap_future() to submit the # coroutine to _ble_loop and await the result back in uvicorn's loop. # Hot-path read-only endpoints (/status, /audio_level*) access # multicaster state directly – Python's GIL makes attribute reads safe. def _pthread_sched_lib(): """Return a ctypes handle with correctly typed pthread scheduling symbols. Uses RTLD_DEFAULT (ctypes.CDLL(None)) to resolve symbols from all currently loaded shared libraries. This handles both: - glibc < 2.34: pthread_self/pthread_setschedparam live in libpthread.so.0 - glibc >= 2.34: pthreads merged into libc.so.6 using find_library("c") would miss libpthread on older glibc and cause a NULL function pointer → SEGV when called. Explicit restype/argtypes are mandatory: pthread_t is c_ulong (64-bit on ARM64/x86-64) but ctypes defaults to c_int (32-bit), truncating the thread handle and causing a SEGV inside pthread_setschedparam. """ import ctypes SCHED_FIFO = 1 SCHED_OTHER = 0 class SchedParam(ctypes.Structure): _fields_ = [("sched_priority", ctypes.c_int)] lib = ctypes.CDLL(None, use_errno=True) # RTLD_DEFAULT lib.pthread_self.restype = ctypes.c_ulong lib.pthread_self.argtypes = [] lib.pthread_getschedparam.restype = ctypes.c_int lib.pthread_getschedparam.argtypes = [ ctypes.c_ulong, ctypes.POINTER(ctypes.c_int), ctypes.POINTER(SchedParam), ] lib.pthread_setschedparam.restype = ctypes.c_int lib.pthread_setschedparam.argtypes = [ ctypes.c_ulong, ctypes.c_int, ctypes.POINTER(SchedParam), ] return lib, SchedParam, SCHED_FIFO, SCHED_OTHER def _configure_ble_thread_scheduling(): """Confirm or establish SCHED_FIFO for the BLE/audio thread. When launched via the systemd unit (CPUSchedulingPolicy=fifo), new threads inherit the process RT policy automatically – just log and return. When run directly (development), attempt to elevate to SCHED_FIFO/30 (requires CAP_SYS_NICE), falling back gracefully. """ import ctypes try: lib, SchedParam, SCHED_FIFO, _ = _pthread_sched_lib() tid = lib.pthread_self() policy = ctypes.c_int(-1) param = SchedParam(0) lib.pthread_getschedparam(tid, ctypes.byref(policy), ctypes.byref(param)) if policy.value == SCHED_FIFO: log.info("[BLE-LOOP] Already SCHED_FIFO priority=%d (inherited from systemd)", param.sched_priority) return param.sched_priority = 30 ret = lib.pthread_setschedparam(tid, SCHED_FIFO, ctypes.byref(param)) if ret == 0: log.info("[BLE-LOOP] SCHED_FIFO priority=30 set") else: err = ctypes.get_errno() log.warning("[BLE-LOOP] SCHED_FIFO failed (errno=%d: %s) – " "use systemd CPUSchedulingPolicy=fifo or grant CAP_SYS_NICE", err, os.strerror(err)) try: os.setpriority(os.PRIO_PROCESS, 0, os.getpriority(os.PRIO_PROCESS, 0) - 5) except PermissionError: pass except Exception as exc: log.warning("[BLE-LOOP] Scheduling setup error: %s", exc) def _configure_http_thread_scheduling(): """Demote the HTTP (uvicorn) thread to SCHED_OTHER + nice=+10. When systemd sets CPUSchedulingPolicy=fifo, every thread in the process – including uvicorn's main loop – inherits SCHED_FIFO. We demote the HTTP thread back to SCHED_OTHER so the BLE thread always wins CPU arbitration when both are runnable. Lowering scheduling policy never requires special privileges. """ import ctypes try: lib, SchedParam, SCHED_FIFO, SCHED_OTHER = _pthread_sched_lib() tid = lib.pthread_self() policy = ctypes.c_int(-1) param = SchedParam(0) lib.pthread_getschedparam(tid, ctypes.byref(policy), ctypes.byref(param)) if policy.value == SCHED_FIFO: param.sched_priority = 0 ret = lib.pthread_setschedparam(tid, SCHED_OTHER, ctypes.byref(param)) if ret == 0: log.info("[HTTP] Demoted SCHED_FIFO → SCHED_OTHER") else: err = ctypes.get_errno() log.warning("[HTTP] Could not demote from SCHED_FIFO (errno=%d)", err) else: log.info("[HTTP] Already SCHED_OTHER, no demotion needed") except Exception as exc: log.warning("[HTTP] Scheduling demotion error: %s", exc) try: os.nice(10) log.info("[HTTP] nice=+10 (lower priority)") except Exception as exc: log.debug("[HTTP] os.nice: %s", exc) _ble_loop_ready = threading.Event() def _run_ble_loop(): # Confirm or establish RT scheduling before entering the event loop. _configure_ble_thread_scheduling() async def _ble_runner(): global _ble_loop _ble_loop = asyncio.get_running_loop() _ble_loop_ready.set() # Keep the loop alive; it is stopped when the process exits because # this is a daemon thread. await asyncio.Event().wait() asyncio.run(_ble_runner()) _ble_thread = threading.Thread(target=_run_ble_loop, name="ble-loop", daemon=True) _ble_thread.start() if not _ble_loop_ready.wait(timeout=5): log.error("BLE event loop failed to start within 5 s – aborting") raise RuntimeError("BLE event loop startup timeout") log.info("BLE event loop started on thread '%s'", _ble_thread.name) # ── HTTP / uvicorn event loop (main thread) ───────────────────────────── # Demote the HTTP thread from SCHED_FIFO (if set by systemd) to # SCHED_OTHER + nice=+10 so the BLE thread always preempts it. _configure_http_thread_scheduling() # Bind to localhost only for security: prevents network access, only frontend on same machine can connect uvicorn.run(app, host="127.0.0.1", port=5000, access_log=False)