@@ -10,6 +10,7 @@ from datetime import datetime
import asyncio
import random
import subprocess
import threading
from dotenv import load_dotenv
from fastapi import FastAPI , HTTPException
@@ -208,6 +209,28 @@ multicaster1: multicast_control.Multicaster | None = None
multicaster2 : multicast_control . Multicaster | None = None
_stream_lock = asyncio . Lock ( ) # serialize initialize/stop_audio on API side
# BLE / audio event loop – set in __main__ before uvicorn starts.
# All coroutines that touch Bumble objects or the audio pipeline MUST run
# on this loop. HTTP handlers call _on_ble_loop() to cross into it.
_ble_loop : asyncio . AbstractEventLoop | None = None
async def _on_ble_loop ( coro ) :
""" Submit *coro* to the BLE event loop and await the result.
Called from uvicorn ' s event loop. Bridges HTTP handler coroutines into
the isolated BLE loop so that serial I/O (serial_asyncio / HCI) and the
audio pipeline are never preempted by HTTP accept/read/write callbacks.
asyncio.run_coroutine_threadsafe() schedules the coroutine on _ble_loop
(thread-safe), returning a concurrent.futures.Future.
asyncio.wrap_future() adapts that into an asyncio.Future so the caller
can simply `await` it inside uvicorn ' s loop.
"""
assert _ble_loop is not None , " BLE loop not yet initialised "
future = asyncio . run_coroutine_threadsafe ( coro , _ble_loop )
return await asyncio . wrap_future ( future )
async def _init_i2c_on_startup ( ) - > None :
# Ensure i2c-dev kernel module is loaded (required for /dev/i2c-* access)
@@ -602,7 +625,10 @@ async def init_radio(transport: str, conf: auracast_config.AuracastConfigGroup,
@app.post ( " /init " )
async def initialize ( conf : auracast_config . AuracastConfigGroup ) :
""" Initializes the primary broadcaster on the streamer thread . """
""" Initializes the primary broadcaster on the BLE loop . """
return await _on_ble_loop ( _initialize_impl ( conf ) )
async def _initialize_impl ( conf : auracast_config . AuracastConfigGroup ) :
async with _stream_lock :
global multicaster1 , global_config_group
mc , persisted = await init_radio ( TRANSPORT1 , conf , multicaster1 )
@@ -612,7 +638,10 @@ async def initialize(conf: auracast_config.AuracastConfigGroup):
@app.post ( " /init2 " )
async def initialize2 ( conf : auracast_config . AuracastConfigGroup ) :
""" Initializes the secondary broadcaster on the streamer thread . """
""" Initializes the secondary broadcaster on the BLE loop . """
return await _on_ble_loop ( _initialize2_impl ( conf ) )
async def _initialize2_impl ( conf : auracast_config . AuracastConfigGroup ) :
async with _stream_lock :
global multicaster2
mc , persisted = await init_radio ( TRANSPORT2 , conf , multicaster2 )
@@ -631,7 +660,11 @@ async def set_led_enabled(body: dict):
@app.post ( " /stop_audio " )
async def stop_audio ( ) :
""" Stops streaming on both multicaster1 and multicaster2 (worker thread) . """
""" Stops streaming on both multicasters via the BLE loop . """
return await _on_ble_loop ( _stop_audio_impl ( ) )
async def _stop_audio_impl ( ) :
""" Runs on BLE loop: stops all streamers and persists is_streaming=False. """
try :
was_running = await _stop_all ( )
@@ -681,9 +714,9 @@ async def set_adc_gain(payload: dict):
@app.post ( " /stream_lc3 " )
async def send_audio ( audio_data : dict [ str , str ] ) :
""" Sends a block of pre-coded LC3 audio via the worker . """
""" Sends a block of pre-coded LC3 audio via the BLE loop . """
try :
await _stream_lc3 ( audio_data , list ( global_config_group . bigs ) )
await _on_ble_loop ( _stream_lc3 ( audio_data , list ( global_config_group . bigs ) ) )
return { " status " : " audio_sent " }
except Exception as e :
raise HTTPException ( status_code = 500 , detail = str ( e ) )
@@ -1058,6 +1091,19 @@ async def _autostart_from_settings():
await do_primary ( )
await do_secondary ( )
async def _ble_startup ( ) :
""" I2C init, ADC level reset, and autostart task scheduling on the BLE loop.
Bridged from _startup_autostart_event() so that these async subprocess
calls and the long-lived autostart coroutine all run on _ble_loop, never
on uvicorn ' s HTTP loop.
"""
await _init_i2c_on_startup ( )
await _set_adc_level ( 0.0 , 0.0 )
log . info ( " [STARTUP] Scheduling autostart task on BLE loop " )
asyncio . create_task ( _autostart_from_settings ( ) )
@app.on_event ( " startup " )
async def _startup_autostart_event ( ) :
# Spawn the autostart task without blocking startup
@@ -1078,12 +1124,11 @@ async def _startup_autostart_event():
# Hydrate settings cache once to avoid disk I/O during /status
_load_led_settings ( )
_init_settings_cache_from_disk ( )
await _init_i2c_on_startup ( )
# Ensure ADC mixer level is set at startup (default 0 dB)
await _set_adc_level ( 0.0 , 0.0 )
refresh_pw_cache ( )
log . info ( " [STARTUP] Scheduling autostart task " )
asyncio . create_task ( _autostart_from_settings ( ) )
# I2C init, ADC setup and the autostart task must run on the BLE loop so
# they share the same event loop as the Bumble HCI transport.
log . info ( " [STARTUP] Bridging I2C init and autostart to BLE loop " )
asyncio . run_coroutine_threadsafe ( _ble_startup ( ) , _ble_loop )
@app.get ( " /audio_inputs_pw_usb " )
async def audio_inputs_pw_usb ( ) :
@@ -1154,6 +1199,9 @@ async def refresh_audio_devices():
@app.post ( " /shutdown " )
async def shutdown ( ) :
""" Stops broadcasting and releases all audio/Bluetooth resources. """
return await _on_ble_loop ( _shutdown_impl ( ) )
async def _shutdown_impl ( ) :
try :
await _stop_all ( )
return { " status " : " stopped " }
@@ -1166,6 +1214,9 @@ async def system_reboot():
Requires the service user to have passwordless sudo permissions to run ' reboot ' .
"""
return await _on_ble_loop ( _system_reboot_impl ( ) )
async def _system_reboot_impl ( ) :
try :
# Best-effort: stop any active streaming cleanly WITHOUT persisting state
try :
@@ -1189,47 +1240,27 @@ async def system_reboot():
@app.post ( " /restart_dep " )
async def restart_dep ( ) :
""" Restart DEP by running dep.sh stop then dep.sh start in the dep directory .
""" Restart DEP via systemctl restart dep.service .
Requires the service user to have passwordless sudo permissions to run dep.sh .
Requires the service user to have passwordless sudo permissions for systemctl .
"""
try :
# Get the dep directory path (dep.sh is in dante_package subdirectory)
dep_dir = os . path . join ( os . path . dirname ( __file__ ) , ' .. ' , ' .. ' , ' dep ' , ' dante_package ' )
# Run dep.sh stop first
log . info ( " Stopping DEP... " )
stop_process = await asyncio . create_subprocess_exec (
" sudo " , " bash " , " dep.sh " , " stop " ,
cwd = dep_dir ,
log . info ( " Restarting DEP via systemctl... " )
proc = await asyncio . create_subprocess_exec (
" sudo " , " systemctl " , " restart " , " dep.service " ,
stdout = asyncio . subprocess . PIPE ,
stderr = asyncio . subprocess . PIPE
)
stop_stdout , stop_stderr = await stop_process . communicate ( )
if stop_process . returncode ! = 0 :
error_msg = stop_stderr . decode ( ) if stop_stderr else " Unknown error "
log . error ( f " Failed to stop DEP: { error_msg } " )
raise HTTPException ( status_code = 500 , detail = f " Failed to stop DEP: { error_msg } " )
# Run dep.sh start after stop succeeds
log . info ( " Starting DEP... " )
start_process = await asyncio . create_subprocess_exec (
" sudo " , " bash " , " dep.sh " , " start " ,
cwd = dep_dir ,
stdout = asyncio . subprocess . PIPE ,
stderr = asyncio . subprocess . PIPE
)
start_stdout , start_stderr = await start_process . communicate ( )
if start_process . returncode == 0 :
stdout , stderr = await proc . communicate ( )
if proc . returncode = = 0 :
log . info ( " DEP restarted successfully " )
return { " status " : " success " , " message " : " DEP restarted successfully " }
else :
error_msg = start_stderr . decode ( ) if start_stderr else " Unknown error "
log . error ( f " Failed to start DEP: { error_msg } " )
raise HTTPException ( status_code = 500 , detail = f " Failed to start DEP: { error_msg } " )
error_msg = stderr . decode ( ) if stderr else " Unknown error "
log . error ( f " Failed to re start DEP: { error_msg } " )
raise HTTPException ( status_code = 500 , detail = f " Failed to re start DEP: { error_msg } " )
except HTTPException :
raise
except Exception as e :
@@ -1322,6 +1353,9 @@ async def check_update():
@app.post ( " /system_update " )
async def system_update ( ) :
""" Update application: git pull main branch (latest tag), poetry install, restart services. """
return await _on_ble_loop ( _system_update_impl ( ) )
async def _system_update_impl ( ) :
try :
# Best-effort: stop any active streaming cleanly
try :
@@ -1789,5 +1823,170 @@ if __name__ == '__main__':
level = os . environ . get ( ' LOG_LEVEL ' , log . INFO ) ,
format = ' %(module)s .py: %(lineno)d %(levelname)s : %(message)s '
)
# ── GIL switch interval ─────────────────────────────────────────────────
# CPython releases the GIL every sys.getswitchinterval() seconds (default
# 5 ms). The audio pipeline fires every 10 ms, so a 5 ms granularity
# means up to half a frame period can be wasted waiting for the GIL.
# Reducing to 1 ms gives the BLE thread much tighter access.
import sys
sys . setswitchinterval ( 0.001 )
log . info ( " GIL switch interval set to 1 ms " )
# ── BLE / audio event loop ──────────────────────────────────────────────
# Bumble (serial_asyncio / HCI) and the audio pipeline run exclusively on
# this loop. Uvicorn's HTTP accept/read/write callbacks run on a separate
# asyncio loop in the main thread, so they can never stall BLE advertising
# or audio encoding.
#
# Route handlers that touch Bumble objects call _on_ble_loop(), which uses
# asyncio.run_coroutine_threadsafe() + asyncio.wrap_future() to submit the
# coroutine to _ble_loop and await the result back in uvicorn's loop.
# Hot-path read-only endpoints (/status, /audio_level*) access
# multicaster state directly – Python's GIL makes attribute reads safe.
def _pthread_sched_lib ( ) :
""" Return a ctypes handle with correctly typed pthread scheduling symbols.
Uses RTLD_DEFAULT (ctypes.CDLL(None)) to resolve symbols from all
currently loaded shared libraries. This handles both:
- glibc < 2.34: pthread_self/pthread_setschedparam live in libpthread.so.0
- glibc >= 2.34: pthreads merged into libc.so.6
using find_library( " c " ) would miss libpthread on older glibc and cause
a NULL function pointer → SEGV when called.
Explicit restype/argtypes are mandatory: pthread_t is c_ulong (64-bit
on ARM64/x86-64) but ctypes defaults to c_int (32-bit), truncating
the thread handle and causing a SEGV inside pthread_setschedparam.
"""
import ctypes
SCHED_FIFO = 1
SCHED_OTHER = 0
class SchedParam ( ctypes . Structure ) :
_fields_ = [ ( " sched_priority " , ctypes . c_int ) ]
lib = ctypes . CDLL ( None , use_errno = True ) # RTLD_DEFAULT
lib . pthread_self . restype = ctypes . c_ulong
lib . pthread_self . argtypes = [ ]
lib . pthread_getschedparam . restype = ctypes . c_int
lib . pthread_getschedparam . argtypes = [
ctypes . c_ulong ,
ctypes . POINTER ( ctypes . c_int ) ,
ctypes . POINTER ( SchedParam ) ,
]
lib . pthread_setschedparam . restype = ctypes . c_int
lib . pthread_setschedparam . argtypes = [
ctypes . c_ulong ,
ctypes . c_int ,
ctypes . POINTER ( SchedParam ) ,
]
return lib , SchedParam , SCHED_FIFO , SCHED_OTHER
def _configure_ble_thread_scheduling ( ) :
""" Confirm or establish SCHED_FIFO for the BLE/audio thread.
When launched via the systemd unit (CPUSchedulingPolicy=fifo), new
threads inherit the process RT policy automatically – just log and
return. When run directly (development), attempt to elevate to
SCHED_FIFO/30 (requires CAP_SYS_NICE), falling back gracefully.
"""
import ctypes
try :
lib , SchedParam , SCHED_FIFO , _ = _pthread_sched_lib ( )
tid = lib . pthread_self ( )
policy = ctypes . c_int ( - 1 )
param = SchedParam ( 0 )
lib . pthread_getschedparam ( tid , ctypes . byref ( policy ) , ctypes . byref ( param ) )
if policy . value == SCHED_FIFO :
log . info ( " [BLE-LOOP] Already SCHED_FIFO priority= %d (inherited from systemd) " ,
param . sched_priority )
return
param . sched_priority = 30
ret = lib . pthread_setschedparam ( tid , SCHED_FIFO , ctypes . byref ( param ) )
if ret == 0 :
log . info ( " [BLE-LOOP] SCHED_FIFO priority=30 set " )
else :
err = ctypes . get_errno ( )
log . warning ( " [BLE-LOOP] SCHED_FIFO failed (errno= %d : %s ) – "
" use systemd CPUSchedulingPolicy=fifo or grant CAP_SYS_NICE " ,
err , os . strerror ( err ) )
try :
os . setpriority ( os . PRIO_PROCESS , 0 ,
os . getpriority ( os . PRIO_PROCESS , 0 ) - 5 )
except PermissionError :
pass
except Exception as exc :
log . warning ( " [BLE-LOOP] Scheduling setup error: %s " , exc )
def _configure_http_thread_scheduling ( ) :
""" Demote the HTTP (uvicorn) thread to SCHED_OTHER + nice=+10.
When systemd sets CPUSchedulingPolicy=fifo, every thread in the
process – including uvicorn ' s main loop – inherits SCHED_FIFO.
We demote the HTTP thread back to SCHED_OTHER so the BLE thread
always wins CPU arbitration when both are runnable.
Lowering scheduling policy never requires special privileges.
"""
import ctypes
try :
lib , SchedParam , SCHED_FIFO , SCHED_OTHER = _pthread_sched_lib ( )
tid = lib . pthread_self ( )
policy = ctypes . c_int ( - 1 )
param = SchedParam ( 0 )
lib . pthread_getschedparam ( tid , ctypes . byref ( policy ) , ctypes . byref ( param ) )
if policy . value == SCHED_FIFO :
param . sched_priority = 0
ret = lib . pthread_setschedparam ( tid , SCHED_OTHER , ctypes . byref ( param ) )
if ret == 0 :
log . info ( " [HTTP] Demoted SCHED_FIFO → SCHED_OTHER " )
else :
err = ctypes . get_errno ( )
log . warning ( " [HTTP] Could not demote from SCHED_FIFO (errno= %d ) " , err )
else :
log . info ( " [HTTP] Already SCHED_OTHER, no demotion needed " )
except Exception as exc :
log . warning ( " [HTTP] Scheduling demotion error: %s " , exc )
try :
os . nice ( 10 )
log . info ( " [HTTP] nice=+10 (lower priority) " )
except Exception as exc :
log . debug ( " [HTTP] os.nice: %s " , exc )
_ble_loop_ready = threading . Event ( )
def _run_ble_loop ( ) :
# Confirm or establish RT scheduling before entering the event loop.
_configure_ble_thread_scheduling ( )
async def _ble_runner ( ) :
global _ble_loop
_ble_loop = asyncio . get_running_loop ( )
_ble_loop_ready . set ( )
# Keep the loop alive; it is stopped when the process exits because
# this is a daemon thread.
await asyncio . Event ( ) . wait ( )
asyncio . run ( _ble_runner ( ) )
_ble_thread = threading . Thread ( target = _run_ble_loop , name = " ble-loop " , daemon = True )
_ble_thread . start ( )
if not _ble_loop_ready . wait ( timeout = 5 ) :
log . error ( " BLE event loop failed to start within 5 s – aborting " )
raise RuntimeError ( " BLE event loop startup timeout " )
log . info ( " BLE event loop started on thread ' %s ' " , _ble_thread . name )
# ── HTTP / uvicorn event loop (main thread) ─────────────────────────────
# Demote the HTTP thread from SCHED_FIFO (if set by systemd) to
# SCHED_OTHER + nice=+10 so the BLE thread always preempts it.
_configure_http_thread_scheduling ( )
# Bind to localhost only for security: prevents network access, only frontend on same machine can connect
uvicorn . run ( app , host = " 127.0.0.1 " , port = 5000 , access_log = False )