refactor: move multicast streaming to dedicated worker thread with async loop

This commit is contained in:
pstruebi
2025-10-01 16:09:40 +02:00
parent 653de2da6a
commit 2dc1cf0751
2 changed files with 325 additions and 251 deletions

View File

@@ -106,7 +106,12 @@ st.title("Auracast Audio Mode Control")
# Audio mode selection with persisted default # Audio mode selection with persisted default
# Note: backend persists 'USB' for any device:<name> source (including AES67). We default to 'USB' in that case. # Note: backend persists 'USB' for any device:<name> source (including AES67). We default to 'USB' in that case.
options = ["Demo", "USB", "AES67", "Webapp"] options = [
"Demo",
"USB",
"AES67",
# "Webapp"
]
saved_audio_mode = saved_settings.get("audio_mode", "Demo") saved_audio_mode = saved_settings.get("audio_mode", "Demo")
if saved_audio_mode not in options: if saved_audio_mode not in options:
# Map legacy/unknown modes to closest # Map legacy/unknown modes to closest

View File

@@ -7,6 +7,8 @@ import logging as log
import uuid import uuid
import json import json
import sys import sys
import threading
from concurrent.futures import Future
from datetime import datetime from datetime import datetime
import time import time
import asyncio import asyncio
@@ -113,112 +115,203 @@ app.add_middleware(
# Initialize global configuration # Initialize global configuration
global_config_group = auracast_config.AuracastConfigGroup() global_config_group = auracast_config.AuracastConfigGroup()
# Create multicast controller class StreamerWorker:
multicaster1: multicast_control.Multicaster | None = None """Owns multicaster(s) on a dedicated asyncio loop in a background thread."""
multicaster2: multicast_control.Multicaster | None = None
_stream_lock = asyncio.Lock() # serialize initialize/stop_audio def __init__(self) -> None:
self._thread: threading.Thread | None = None
self._loop: asyncio.AbstractEventLoop | None = None
# These live only on the worker loop
self._multicaster1: multicast_control.Multicaster | None = None
self._multicaster2: multicast_control.Multicaster | None = None
self._started = threading.Event()
# ---------- Thread/loop management ----------
def start(self) -> None:
if self._thread and self._thread.is_alive():
return
self._thread = threading.Thread(target=self._run, name="StreamerWorker", daemon=True)
self._thread.start()
self._started.wait(timeout=5)
def _run(self) -> None:
loop = asyncio.new_event_loop()
self._loop = loop
asyncio.set_event_loop(loop)
self._started.set()
try:
loop.run_forever()
finally:
try:
pending = asyncio.all_tasks(loop)
for t in pending:
t.cancel()
loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True))
except Exception:
pass
loop.close()
def _ensure_loop(self) -> asyncio.AbstractEventLoop:
if not self._loop:
raise RuntimeError("StreamerWorker loop not started")
return self._loop
async def call(self, coro_func, *args, **kwargs):
"""Schedule a coroutine on the worker loop and await its result from the API loop."""
loop = self._ensure_loop()
fut: Future = asyncio.run_coroutine_threadsafe(coro_func(*args, **kwargs), loop)
return await asyncio.wrap_future(fut)
# ---------- Worker-loop coroutines ----------
async def _w_init_primary(self, conf: auracast_config.AuracastConfigGroup) -> dict:
# Clean any previous
if self._multicaster1 is not None:
try:
await self._multicaster1.shutdown()
except Exception:
pass
self._multicaster1 = None
conf.transport = TRANSPORT1
# Derive device name and input mode
first_source = conf.bigs[0].audio_source if conf.bigs else ''
input_device_name = None
audio_mode_persist = 'Demo'
if first_source.startswith('device:'):
input_device_name = first_source.split(':', 1)[1] if ':' in first_source else None
try:
usb_names = {d.get('name') for _, d in get_usb_pw_inputs()}
net_names = {d.get('name') for _, d in get_network_pw_inputs()}
except Exception:
usb_names, net_names = set(), set()
audio_mode_persist = 'AES67' if (input_device_name in net_names) else 'USB'
# Map device name to index and configure input_format
device_index = int(input_device_name) if (input_device_name and input_device_name.isdigit()) else get_device_index_by_name(input_device_name or '')
if device_index is None:
raise HTTPException(status_code=400, detail=f"Audio device '{input_device_name}' not found.")
for big in conf.bigs:
if big.audio_source.startswith('device:'):
big.audio_source = f'device:{device_index}'
devinfo = sd.query_devices(device_index)
capture_rate = int(devinfo.get('default_samplerate') or 48000)
max_in = int(devinfo.get('max_input_channels') or 1)
channels = max(1, min(2, max_in))
for big in conf.bigs:
big.input_format = f"int16le,{capture_rate},{channels}"
# Create and init multicaster1
self._multicaster1 = multicast_control.Multicaster(conf, conf.bigs)
await reset_nrf54l(1)
await self._multicaster1.init_broadcast()
auto_started = False
if any(big.audio_source.startswith("device:") or big.audio_source.startswith("file:") for big in conf.bigs):
await self._multicaster1.start_streaming()
auto_started = True
# Return proposed settings to persist on API side
return {
'channel_names': [big.name for big in conf.bigs],
'languages': [big.language for big in conf.bigs],
'audio_mode': audio_mode_persist,
'input_device': input_device_name,
'program_info': [getattr(big, 'program_info', None) for big in conf.bigs],
'gain': [getattr(big, 'input_gain', 1.0) for big in conf.bigs],
'auracast_sampling_rate_hz': conf.auracast_sampling_rate_hz,
'octets_per_frame': conf.octets_per_frame,
'immediate_rendering': getattr(conf, 'immediate_rendering', False),
'assisted_listening_stream': getattr(conf, 'assisted_listening_stream', False),
'stream_password': (conf.bigs[0].code if conf.bigs and getattr(conf.bigs[0], 'code', None) else None),
'is_streaming': auto_started,
}
async def _w_init_secondary(self, conf: auracast_config.AuracastConfigGroup) -> None:
if self._multicaster2 is not None:
try:
await self._multicaster2.shutdown()
except Exception:
pass
self._multicaster2 = None
conf.transport = TRANSPORT2
for big in conf.bigs:
if big.audio_source.startswith('device:'):
device_name = big.audio_source.split(':', 1)[1]
device_index = get_device_index_by_name(device_name)
if device_index is None:
raise HTTPException(status_code=400, detail=f"Audio device '{device_name}' not found.")
big.audio_source = f'device:{device_index}'
self._multicaster2 = multicast_control.Multicaster(conf, conf.bigs)
await reset_nrf54l(0)
await self._multicaster2.init_broadcast()
if any(big.audio_source.startswith("device:") or big.audio_source.startswith("file:") for big in conf.bigs):
await self._multicaster2.start_streaming()
async def _w_stop_all(self) -> bool:
was_running = False
if self._multicaster1 is not None:
try:
await self._multicaster1.stop_streaming()
await self._multicaster1.shutdown()
was_running = True
finally:
self._multicaster1 = None
if self._multicaster2 is not None:
try:
await self._multicaster2.stop_streaming()
await self._multicaster2.shutdown()
was_running = True
finally:
self._multicaster2 = None
return was_running
async def _w_status_primary(self) -> dict:
if self._multicaster1 is None:
return {'is_initialized': False, 'is_streaming': False}
try:
return self._multicaster1.get_status()
except Exception:
return {'is_initialized': True, 'is_streaming': False}
async def _w_stream_lc3(self, audio_data: dict[str, str], bigs_template: list) -> None:
if self._multicaster1 is None:
raise HTTPException(status_code=500, detail='Auracast endpoint was never intialized')
# Update bigs audio_source with provided bytes and start
for big in bigs_template:
if big.language not in audio_data:
raise HTTPException(status_code=500, detail='language len missmatch')
big.audio_source = audio_data[big.language].encode('latin-1')
self._multicaster1.big_conf = bigs_template
await self._multicaster1.start_streaming()
# Create the worker singleton and a route-level lock
streamer = StreamerWorker()
# multicaster1: multicast_control.Multicaster | None = None # kept for legacy references, do not use on API loop
# multicaster2: multicast_control.Multicaster | None = None
_stream_lock = asyncio.Lock() # serialize initialize/stop_audio on API side
@app.post("/init") @app.post("/init")
async def initialize(conf: auracast_config.AuracastConfigGroup): async def initialize(conf: auracast_config.AuracastConfigGroup):
"""Initializes the primary broadcaster (multicaster1).""" """Initializes the primary broadcaster on the streamer thread."""
global global_config_group global global_config_group
global multicaster1
async with _stream_lock: async with _stream_lock:
try: try:
# Cleanly stop any existing instance to avoid lingering PipeWire streams global_config_group = conf
if multicaster1 is not None: log.info('Initializing multicaster1 with config:\n %s', conf.model_dump_json(indent=2))
log.info("Shutting down existing multicaster instance before re-initializing.") persisted = await streamer.call(streamer._w_init_primary, conf)
await multicaster1.shutdown() # Persist returned settings (avoid touching from worker thread)
multicaster1 = None persisted['timestamp'] = datetime.utcnow().isoformat()
save_stream_settings(persisted)
conf.transport = TRANSPORT1
# Derive audio_mode and input_device from first BIG audio_source
first_source = conf.bigs[0].audio_source if conf.bigs else ''
if first_source.startswith('device:'):
input_device_name = first_source.split(':', 1)[1] if ':' in first_source else None
# Determine if the device is a USB or Network(AES67) PipeWire input
try:
usb_names = {d.get('name') for _, d in get_usb_pw_inputs()}
net_names = {d.get('name') for _, d in get_network_pw_inputs()}
except Exception:
usb_names, net_names = set(), set()
audio_mode_persist = 'AES67' if input_device_name in net_names else 'USB'
# Resolve to device index and set input_format to avoid PipeWire resampling
device_index = None
if input_device_name:
device_index = int(input_device_name) if input_device_name.isdigit() else get_device_index_by_name(input_device_name)
if device_index is None:
log.error(f"Device name '{input_device_name}' not found in current device list.")
raise HTTPException(status_code=400, detail=f"Audio device '{input_device_name}' not found.")
for big in conf.bigs:
if big.audio_source.startswith('device:'):
big.audio_source = f'device:{device_index}'
devinfo = sd.query_devices(device_index)
capture_rate = int(devinfo.get('default_samplerate') or 48000)
max_in = int(devinfo.get('max_input_channels') or 1)
channels = max(1, min(2, max_in))
for big in conf.bigs:
big.input_format = f"int16le,{capture_rate},{channels}"
save_stream_settings({
'channel_names': [big.name for big in conf.bigs],
'languages': [big.language for big in conf.bigs],
'audio_mode': audio_mode_persist,
'input_device': input_device_name,
'program_info': [getattr(big, 'program_info', None) for big in conf.bigs],
'gain': [getattr(big, 'input_gain', 1.0) for big in conf.bigs],
'auracast_sampling_rate_hz': conf.auracast_sampling_rate_hz,
'octets_per_frame': conf.octets_per_frame,
'immediate_rendering': getattr(conf, 'immediate_rendering', False),
'assisted_listening_stream': getattr(conf, 'assisted_listening_stream', False),
'stream_password': (conf.bigs[0].code if conf.bigs and getattr(conf.bigs[0], 'code', None) else None),
'is_streaming': False, # will be set to True below if we actually start
'timestamp': datetime.utcnow().isoformat()
})
# Proceed with initialization and optional auto-start
global_config_group = conf
log.info('Initializing multicaster1 with config:\n %s', conf.model_dump_json(indent=2))
multicaster1 = multicast_control.Multicaster(conf, conf.bigs)
# Ensure target is reset before initializing broadcast
await reset_nrf54l(1)
await multicaster1.init_broadcast()
if any(big.audio_source.startswith("device:") or big.audio_source.startswith("file:") for big in conf.bigs):
log.info("Auto-starting streaming on multicaster1")
await multicaster1.start_streaming()
# Mark persisted state as streaming
settings = load_stream_settings() or {}
settings['is_streaming'] = True
settings['timestamp'] = datetime.utcnow().isoformat()
save_stream_settings(settings)
except Exception as e: except Exception as e:
log.error("Exception in /init: %s", traceback.format_exc()) log.error("Exception in /init: %s", traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
@app.post("/init2") @app.post("/init2")
async def initialize2(conf: auracast_config.AuracastConfigGroup): async def initialize2(conf: auracast_config.AuracastConfigGroup):
"""Initializes the secondary broadcaster (multicaster2). Does NOT persist stream settings.""" """Initializes the secondary broadcaster on the streamer thread."""
global multicaster2
try: try:
conf.transport = TRANSPORT2
# Patch device name to index for sounddevice
for big in conf.bigs:
if big.audio_source.startswith('device:'):
device_name = big.audio_source.split(':', 1)[1]
device_index = get_device_index_by_name(device_name)
if device_index is not None:
big.audio_source = f'device:{device_index}'
else:
log.error(f"Device name '{device_name}' not found in current device list.")
raise HTTPException(status_code=400, detail=f"Audio device '{device_name}' not found.")
log.info('Initializing multicaster2 with config:\n %s', conf.model_dump_json(indent=2)) log.info('Initializing multicaster2 with config:\n %s', conf.model_dump_json(indent=2))
multicaster2 = multicast_control.Multicaster(conf, conf.bigs) await streamer.call(streamer._w_init_secondary, conf)
# Ensure target is reset before initializing broadcast
await reset_nrf54l(0)
await multicaster2.init_broadcast()
if any(big.audio_source.startswith("device:") or big.audio_source.startswith("file:") for big in conf.bigs):
log.info("Auto-starting streaming on multicaster2")
await multicaster2.start_streaming()
except Exception as e: except Exception as e:
log.error("Exception in /init2: %s", traceback.format_exc()) log.error("Exception in /init2: %s", traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
@@ -228,27 +321,15 @@ async def initialize2(conf: auracast_config.AuracastConfigGroup):
@app.post("/stop_audio") @app.post("/stop_audio")
async def stop_audio(): async def stop_audio():
"""Stops streaming on both multicaster1 and multicaster2.""" """Stops streaming on both multicaster1 and multicaster2 (worker thread)."""
try: try:
# First close any active WebRTC peer connections so their track loops finish cleanly # Close any active PeerConnections
close_tasks = [pc.close() for pc in list(pcs)] close_tasks = [pc.close() for pc in list(pcs)]
pcs.clear() pcs.clear()
if close_tasks: if close_tasks:
await asyncio.gather(*close_tasks, return_exceptions=True) await asyncio.gather(*close_tasks, return_exceptions=True)
# Now shut down both multicasters and release audio devices was_running = await streamer.call(streamer._w_stop_all)
global multicaster1, multicaster2
was_running = False
if multicaster1 is not None:
await multicaster1.stop_streaming()
await multicaster1.shutdown()
multicaster1 = None
was_running = True
if multicaster2 is not None:
await multicaster2.stop_streaming()
await multicaster2.shutdown()
multicaster2 = None
was_running = True
# Persist is_streaming=False # Persist is_streaming=False
try: try:
@@ -260,9 +341,7 @@ async def stop_audio():
except Exception: except Exception:
log.warning("Failed to persist is_streaming=False during stop_audio", exc_info=True) log.warning("Failed to persist is_streaming=False during stop_audio", exc_info=True)
# Grace period: allow PipeWire/PortAudio to fully drop capture nodes
await asyncio.sleep(0.2) await asyncio.sleep(0.2)
return {"status": "stopped", "was_running": was_running} return {"status": "stopped", "was_running": was_running}
except Exception as e: except Exception as e:
log.error("Exception in /stop_audio: %s", traceback.format_exc()) log.error("Exception in /stop_audio: %s", traceback.format_exc())
@@ -271,17 +350,9 @@ async def stop_audio():
@app.post("/stream_lc3") @app.post("/stream_lc3")
async def send_audio(audio_data: dict[str, str]): async def send_audio(audio_data: dict[str, str]):
"""Sends a block of pre-coded LC3 audio.""" """Sends a block of pre-coded LC3 audio via the worker."""
if multicaster1 is None:
raise HTTPException(status_code=500, detail='Auracast endpoint was never intialized')
try: try:
for big in global_config_group.bigs: await streamer.call(streamer._w_stream_lc3, audio_data, list(global_config_group.bigs))
assert big.language in audio_data, HTTPException(status_code=500, detail='language len missmatch')
log.info('Received a send audio request for %s', big.language)
big.audio_source = audio_data[big.language].encode('latin-1') # TODO: use base64 encoding
multicaster1.big_conf = global_config_group.bigs
await multicaster1.start_streaming()
return {"status": "audio_sent"} return {"status": "audio_sent"}
except Exception as e: except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
@@ -289,11 +360,8 @@ async def send_audio(audio_data: dict[str, str]):
@app.get("/status") @app.get("/status")
async def get_status(): async def get_status():
"""Gets the current status of the multicaster together with persisted stream info.""" """Gets current status (worker) merged with persisted settings cache."""
status = multicaster1.get_status() if multicaster1 else { status = await streamer.call(streamer._w_status_primary)
'is_initialized': False,
'is_streaming': False,
}
status.update(load_stream_settings()) status.update(load_stream_settings())
return status return status
@@ -330,12 +398,14 @@ async def _autostart_from_settings():
return return
# Avoid duplicate start if already streaming # Avoid duplicate start if already streaming
if multicaster1 and multicaster1.get_status().get('is_streaming'): current = await streamer.call(streamer._w_status_primary)
if current.get('is_streaming'):
return return
while True: while True:
# Do not interfere if user started a stream manually in the meantime # Do not interfere if user started a stream manually in the meantime
if multicaster1 and multicaster1.get_status().get('is_streaming'): current = await streamer.call(streamer._w_status_primary)
if current.get('is_streaming'):
return return
# Abort if saved settings changed to a different target while we were polling # Abort if saved settings changed to a different target while we were polling
current_settings = load_stream_settings() or {} current_settings = load_stream_settings() or {}
@@ -389,6 +459,8 @@ async def _startup_autostart_event():
# Hydrate settings cache once to avoid disk I/O during /status # Hydrate settings cache once to avoid disk I/O during /status
_hydrate_settings_cache_from_disk() _hydrate_settings_cache_from_disk()
refresh_pw_cache() refresh_pw_cache()
# Start the streamer worker thread
streamer.start()
asyncio.create_task(_autostart_from_settings()) asyncio.create_task(_autostart_from_settings())
@@ -425,9 +497,8 @@ async def refresh_audio_devices():
"""Triggers a re-scan of audio devices, but only if no stream is active.""" """Triggers a re-scan of audio devices, but only if no stream is active."""
streaming = False streaming = False
try: try:
if multicaster1 is not None: status = await streamer.call(streamer._w_status_primary)
status = multicaster1.get_status() streaming = bool(status.get('is_streaming'))
streaming = bool(status.get('is_streaming'))
except Exception: except Exception:
pass # Ignore errors, default to not refreshing pass # Ignore errors, default to not refreshing
@@ -444,143 +515,143 @@ async def refresh_audio_devices():
raise HTTPException(status_code=500, detail=f"Failed to refresh devices: {e}") raise HTTPException(status_code=500, detail=f"Failed to refresh devices: {e}")
@app.post("/offer") # async def offer(offer: Offer):
async def offer(offer: Offer): # @app.post("/offer") #webrtc endpoint
log.info("/offer endpoint called") # log.info("/offer endpoint called")
# If a previous PeerConnection is still alive, close it so we only ever keep one active. # # If a previous PeerConnection is still alive, close it so we only ever keep one active.
if pcs: # if pcs:
log.info("Closing %d existing PeerConnection(s) before creating a new one", len(pcs)) # log.info("Closing %d existing PeerConnection(s) before creating a new one", len(pcs))
close_tasks = [p.close() for p in list(pcs)] # close_tasks = [p.close() for p in list(pcs)]
await asyncio.gather(*close_tasks, return_exceptions=True) # await asyncio.gather(*close_tasks, return_exceptions=True)
pcs.clear() # pcs.clear()
pc = RTCPeerConnection() # No STUN needed for localhost # pc = RTCPeerConnection() # No STUN needed for localhost
pcs.add(pc) # pcs.add(pc)
id_ = uuid.uuid4().hex[:8] # id_ = uuid.uuid4().hex[:8]
log.info(f"{id_}: new PeerConnection") # log.info(f"{id_}: new PeerConnection")
# create directory for records - only for testing # # create directory for records - only for testing
os.makedirs("./records", exist_ok=True) # os.makedirs("./records", exist_ok=True)
# Do NOT start the streamer yet we'll start it lazily once we actually # # Do NOT start the streamer yet we'll start it lazily once we actually
# receive the first audio frame, ensuring WebRTCAudioInput is ready and # # receive the first audio frame, ensuring WebRTCAudioInput is ready and
# avoiding race-conditions on restarts. # # avoiding race-conditions on restarts.
@pc.on("track") # @pc.on("track")
async def on_track(track: MediaStreamTrack): # async def on_track(track: MediaStreamTrack):
log.info(f"{id_}: track {track.kind} received") # log.info(f"{id_}: track {track.kind} received")
try: # try:
first = True # first = True
while True: # while True:
frame: av.audio.frame.AudioFrame = await track.recv() # RTP audio frame (already decrypted) # frame: av.audio.frame.AudioFrame = await track.recv() # RTP audio frame (already decrypted)
if first: # if first:
log.info(f"{id_}: frame layout={frame.layout}") # log.info(f"{id_}: frame layout={frame.layout}")
log.info(f"{id_}: frame format={frame.format}") # log.info(f"{id_}: frame format={frame.format}")
log.info( # log.info(
f"{id_}: frame sample_rate={frame.sample_rate}, samples_per_channel={frame.samples}, planes={frame.planes}" # f"{id_}: frame sample_rate={frame.sample_rate}, samples_per_channel={frame.samples}, planes={frame.planes}"
) # )
# Lazily start the streamer now that we know a track exists. # # Lazily start the streamer now that we know a track exists.
if multicaster1.streamer is None: # if multicaster1.streamer is None:
await multicaster1.start_streaming() # await multicaster1.start_streaming()
# Yield control so the Streamer coroutine has a chance to # # Yield control so the Streamer coroutine has a chance to
# create the WebRTCAudioInput before we push samples. # # create the WebRTCAudioInput before we push samples.
await asyncio.sleep(0) # await asyncio.sleep(0)
# Persist is_streaming=True for Webapp mode # # Persist is_streaming=True for Webapp mode
try: # try:
settings = load_stream_settings() or {} # settings = load_stream_settings() or {}
settings['is_streaming'] = True # settings['is_streaming'] = True
settings['timestamp'] = datetime.utcnow().isoformat() # settings['timestamp'] = datetime.utcnow().isoformat()
save_stream_settings(settings) # save_stream_settings(settings)
except Exception: # except Exception:
log.warning("Failed to persist is_streaming=True on WebRTC start", exc_info=True) # log.warning("Failed to persist is_streaming=True on WebRTC start", exc_info=True)
first = False # first = False
# in stereo case this is interleaved data format # # in stereo case this is interleaved data format
frame_array = frame.to_ndarray() # frame_array = frame.to_ndarray()
log.info(f"array.shape{frame_array.shape}") # log.info(f"array.shape{frame_array.shape}")
log.info(f"array.dtype{frame_array.dtype}") # log.info(f"array.dtype{frame_array.dtype}")
log.info(f"frame.to_ndarray(){frame_array}") # log.info(f"frame.to_ndarray(){frame_array}")
samples = frame_array.reshape(-1) # samples = frame_array.reshape(-1)
log.info(f"samples.shape: {samples.shape}") # log.info(f"samples.shape: {samples.shape}")
if frame.layout.name == 'stereo': # if frame.layout.name == 'stereo':
# Interleaved stereo: [L0, R0, L1, R1, ...] # # Interleaved stereo: [L0, R0, L1, R1, ...]
mono_array = samples[::2] # Take left channel # mono_array = samples[::2] # Take left channel
else: # else:
mono_array = samples # mono_array = samples
log.info(f"mono_array.shape: {mono_array.shape}") # log.info(f"mono_array.shape: {mono_array.shape}")
frame_array = frame.to_ndarray() # frame_array = frame.to_ndarray()
# Flatten in case it's (1, N) or (N,) # # Flatten in case it's (1, N) or (N,)
samples = frame_array.reshape(-1) # samples = frame_array.reshape(-1)
if frame.layout.name == 'stereo': # if frame.layout.name == 'stereo':
# Interleaved stereo: [L0, R0, L1, R1, ...] # # Interleaved stereo: [L0, R0, L1, R1, ...]
mono_array = samples[::2] # Take left channel # mono_array = samples[::2] # Take left channel
else: # else:
mono_array = samples # mono_array = samples
# Get current WebRTC audio input (streamer may have been restarted) # # Get current WebRTC audio input (streamer may have been restarted)
big0 = list(multicaster1.bigs.values())[0] # big0 = list(multicaster1.bigs.values())[0]
audio_input = big0.get('audio_input') # audio_input = big0.get('audio_input')
# Wait until the streamer has instantiated the WebRTCAudioInput # # Wait until the streamer has instantiated the WebRTCAudioInput
if audio_input is None or getattr(audio_input, 'closed', False): # if audio_input is None or getattr(audio_input, 'closed', False):
continue # continue
# Feed mono PCM samples to the global WebRTC audio input # # Feed mono PCM samples to the global WebRTC audio input
await audio_input.put_samples(mono_array.astype(np.int16)) # await audio_input.put_samples(mono_array.astype(np.int16))
# Save to WAV file - only for testing # # Save to WAV file - only for testing
# if not hasattr(pc, 'wav_writer'): # # if not hasattr(pc, 'wav_writer'):
# import wave # # import wave
# wav_path = f"./records/auracast_{id_}.wav" # # wav_path = f"./records/auracast_{id_}.wav"
# pc.wav_writer = wave.open(wav_path, "wb") # # pc.wav_writer = wave.open(wav_path, "wb")
# pc.wav_writer.setnchannels(1) # mono # # pc.wav_writer.setnchannels(1) # mono
# pc.wav_writer.setsampwidth(2) # 16-bit PCM # # pc.wav_writer.setsampwidth(2) # 16-bit PCM
# pc.wav_writer.setframerate(frame.sample_rate) # # pc.wav_writer.setframerate(frame.sample_rate)
# pcm_data = mono_array.astype(np.int16).tobytes() # # pcm_data = mono_array.astype(np.int16).tobytes()
# pc.wav_writer.writeframes(pcm_data) # # pc.wav_writer.writeframes(pcm_data)
except Exception as e: # except Exception as e:
log.error(f"{id_}: Exception in on_track: {e}") # log.error(f"{id_}: Exception in on_track: {e}")
finally: # finally:
# Always close the wav file when the track ends or on error # # Always close the wav file when the track ends or on error
if hasattr(pc, 'wav_writer'): # if hasattr(pc, 'wav_writer'):
try: # try:
pc.wav_writer.close() # pc.wav_writer.close()
except Exception: # except Exception:
pass # pass
del pc.wav_writer # del pc.wav_writer
# --- SDP negotiation --- # # --- SDP negotiation ---
log.info(f"{id_}: setting remote description") # log.info(f"{id_}: setting remote description")
await pc.setRemoteDescription(RTCSessionDescription(**offer.model_dump())) # await pc.setRemoteDescription(RTCSessionDescription(**offer.model_dump()))
log.info(f"{id_}: creating answer") # log.info(f"{id_}: creating answer")
answer = await pc.createAnswer() # answer = await pc.createAnswer()
sdp = answer.sdp # sdp = answer.sdp
# Insert a=ptime using the global PTIME variable # # Insert a=ptime using the global PTIME variable
ptime_line = f"a=ptime:{PTIME}" # ptime_line = f"a=ptime:{PTIME}"
if "a=sendrecv" in sdp: # if "a=sendrecv" in sdp:
sdp = sdp.replace("a=sendrecv", f"a=sendrecv\n{ptime_line}") # sdp = sdp.replace("a=sendrecv", f"a=sendrecv\n{ptime_line}")
else: # else:
sdp += f"\n{ptime_line}" # sdp += f"\n{ptime_line}"
new_answer = RTCSessionDescription(sdp=sdp, type=answer.type) # new_answer = RTCSessionDescription(sdp=sdp, type=answer.type)
await pc.setLocalDescription(new_answer) # await pc.setLocalDescription(new_answer)
log.info(f"{id_}: sending answer with {ptime_line}") # log.info(f"{id_}: sending answer with {ptime_line}")
return {"sdp": pc.localDescription.sdp, # return {"sdp": pc.localDescription.sdp,
"type": pc.localDescription.type} # "type": pc.localDescription.type}
@app.post("/shutdown") @app.post("/shutdown")
async def shutdown(): async def shutdown():
"""Stops broadcasting and releases all audio/Bluetooth resources.""" """Stops broadcasting and releases all audio/Bluetooth resources."""
try: try:
await multicaster1.shutdown() await streamer.call(streamer._w_stop_all)
return {"status": "stopped"} return {"status": "stopped"}
except Exception as e: except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
@@ -601,13 +672,11 @@ async def system_reboot():
if close_tasks: if close_tasks:
await asyncio.gather(*close_tasks, return_exceptions=True) await asyncio.gather(*close_tasks, return_exceptions=True)
# Stop streaming on multicasters but DO NOT touch stream_settings.json # Stop streaming on worker but DO NOT touch stream_settings.json
if multicaster1 is not None: try:
await multicaster1.stop_streaming() await streamer.call(streamer._w_stop_all)
await multicaster1.shutdown() except Exception:
if multicaster2 is not None: pass
await multicaster2.stop_streaming()
await multicaster2.shutdown()
except Exception: except Exception:
log.warning("Non-fatal: failed to stop streams before reboot", exc_info=True) log.warning("Non-fatal: failed to stop streams before reboot", exc_info=True)