basic streaming from webinterface is functioning
This commit is contained in:
@@ -49,9 +49,12 @@ from bumble.audio import io as audio_io
|
|||||||
|
|
||||||
from auracast import auracast_config
|
from auracast import auracast_config
|
||||||
from auracast.utils.read_lc3_file import read_lc3_file
|
from auracast.utils.read_lc3_file import read_lc3_file
|
||||||
from auracast.network_audio_receiver import NetworkAudioReceiverUncoded
|
from auracast.utils.network_audio_receiver import NetworkAudioReceiverUncoded
|
||||||
|
from auracast.utils.webrtc_audio_input import WebRTCAudioInput
|
||||||
|
|
||||||
|
|
||||||
|
# Instantiate WebRTC audio input for streaming (can be used per-BIG or globally)
|
||||||
|
|
||||||
# modified from bumble
|
# modified from bumble
|
||||||
class ModWaveAudioInput(audio_io.ThreadedAudioInput):
|
class ModWaveAudioInput(audio_io.ThreadedAudioInput):
|
||||||
"""Audio input that reads PCM samples from a .wav file."""
|
"""Audio input that reads PCM samples from a .wav file."""
|
||||||
@@ -357,6 +360,20 @@ class Streamer():
|
|||||||
big['encoder'] = encoder
|
big['encoder'] = encoder
|
||||||
big['precoded'] = False
|
big['precoded'] = False
|
||||||
|
|
||||||
|
elif audio_source == 'webrtc':
|
||||||
|
big['audio_input'] = WebRTCAudioInput()
|
||||||
|
encoder = lc3.Encoder(
|
||||||
|
frame_duration_us=global_config.frame_duration_us,
|
||||||
|
sample_rate_hz=global_config.auracast_sampling_rate_hz,
|
||||||
|
num_channels=1,
|
||||||
|
input_sample_rate_hz=48000, # TODO: get samplerate from webrtc
|
||||||
|
)
|
||||||
|
lc3_frame_samples = encoder.get_frame_samples()
|
||||||
|
big['pcm_bit_depth'] = 16
|
||||||
|
big['lc3_frame_samples'] = lc3_frame_samples
|
||||||
|
big['encoder'] = encoder
|
||||||
|
big['precoded'] = False
|
||||||
|
|
||||||
# precoded lc3 from ram
|
# precoded lc3 from ram
|
||||||
elif isinstance(big_config[i].audio_source, bytes):
|
elif isinstance(big_config[i].audio_source, bytes):
|
||||||
big['precoded'] = True
|
big['precoded'] = True
|
||||||
@@ -470,7 +487,7 @@ class Streamer():
|
|||||||
if lc3_frame == b'': # Not all streams may stop at the same time
|
if lc3_frame == b'': # Not all streams may stop at the same time
|
||||||
stream_finished[i] = True
|
stream_finished[i] = True
|
||||||
continue
|
continue
|
||||||
else:
|
else: # code lc3 on the fly
|
||||||
pcm_frame = await anext(big['audio_input'].frames(big['lc3_frame_samples']), None)
|
pcm_frame = await anext(big['audio_input'].frames(big['lc3_frame_samples']), None)
|
||||||
|
|
||||||
if pcm_frame is None: # Not all streams may stop at the same time
|
if pcm_frame is None: # Not all streams may stop at the same time
|
||||||
@@ -483,10 +500,10 @@ class Streamer():
|
|||||||
|
|
||||||
await big['iso_queue'].write(lc3_frame)
|
await big['iso_queue'].write(lc3_frame)
|
||||||
|
|
||||||
if all(stream_finished): # Take into account that multiple files have different lengths
|
# if all(stream_finished): # Take into account that multiple files have different lengths
|
||||||
logging.info('All streams finished, stopping streamer')
|
# logging.info('All streams finished, stopping streamer')
|
||||||
self.is_streaming = False
|
# self.is_streaming = False
|
||||||
break
|
# break
|
||||||
|
|
||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
|
|||||||
@@ -40,9 +40,9 @@ if audio_mode in ["Webapp", "Network"]:
|
|||||||
name=stream_name,
|
name=stream_name,
|
||||||
program_info=f"{stream_name} {quality}",
|
program_info=f"{stream_name} {quality}",
|
||||||
language=language,
|
language=language,
|
||||||
audio_source="network" if audio_mode=="Webapp" else "network",
|
audio_source="webrtc" if audio_mode=="Webapp" else "network",
|
||||||
input_format="auto",
|
input_format="auto",
|
||||||
iso_que_len=64, # TODO: this should be way less to decrease delay
|
iso_que_len=1, # TODO: this should be way less to decrease delay
|
||||||
sampling_frequency=q['rate'],
|
sampling_frequency=q['rate'],
|
||||||
octets_per_frame=q['octets'],
|
octets_per_frame=q['octets'],
|
||||||
),
|
),
|
||||||
|
|||||||
@@ -13,6 +13,7 @@ import av
|
|||||||
import av.audio.layout
|
import av.audio.layout
|
||||||
from typing import List, Set
|
from typing import List, Set
|
||||||
import traceback
|
import traceback
|
||||||
|
from auracast.utils.webrtc_audio_input import WebRTCAudioInput
|
||||||
|
|
||||||
app = FastAPI()
|
app = FastAPI()
|
||||||
|
|
||||||
@@ -31,7 +32,6 @@ global_config_group = auracast_config.AuracastConfigGroup()
|
|||||||
# Create multicast controller
|
# Create multicast controller
|
||||||
multicaster: multicast_control.Multicaster | None = None
|
multicaster: multicast_control.Multicaster | None = None
|
||||||
|
|
||||||
|
|
||||||
@app.post("/init")
|
@app.post("/init")
|
||||||
async def initialize(conf: auracast_config.AuracastConfigGroup):
|
async def initialize(conf: auracast_config.AuracastConfigGroup):
|
||||||
"""Initializes the broadcasters."""
|
"""Initializes the broadcasters."""
|
||||||
@@ -126,6 +126,7 @@ async def offer(offer: Offer):
|
|||||||
# create directory for records - only for testing
|
# create directory for records - only for testing
|
||||||
os.makedirs("./records", exist_ok=True)
|
os.makedirs("./records", exist_ok=True)
|
||||||
|
|
||||||
|
multicaster.start_streaming()
|
||||||
@pc.on("track")
|
@pc.on("track")
|
||||||
async def on_track(track: MediaStreamTrack):
|
async def on_track(track: MediaStreamTrack):
|
||||||
log.info(f"{id_}: track {track.kind} received")
|
log.info(f"{id_}: track {track.kind} received")
|
||||||
@@ -146,10 +147,6 @@ async def offer(offer: Offer):
|
|||||||
log.info(f"array.dtype{frame_array.dtype}")
|
log.info(f"array.dtype{frame_array.dtype}")
|
||||||
log.info(f"frame.to_ndarray(){frame_array}")
|
log.info(f"frame.to_ndarray(){frame_array}")
|
||||||
|
|
||||||
frame_array = frame.to_ndarray()
|
|
||||||
log.info(f"frame_array.shape: {frame_array.shape}")
|
|
||||||
|
|
||||||
# Flatten in case it's (1, N) or (N,)
|
|
||||||
samples = frame_array.reshape(-1)
|
samples = frame_array.reshape(-1)
|
||||||
log.info(f"samples.shape: {samples.shape}")
|
log.info(f"samples.shape: {samples.shape}")
|
||||||
|
|
||||||
@@ -161,6 +158,23 @@ async def offer(offer: Offer):
|
|||||||
|
|
||||||
log.info(f"mono_array.shape: {mono_array.shape}")
|
log.info(f"mono_array.shape: {mono_array.shape}")
|
||||||
|
|
||||||
|
audio_input = list(multicaster.bigs.values())[0]['audio_input']
|
||||||
|
|
||||||
|
|
||||||
|
frame_array = frame.to_ndarray()
|
||||||
|
|
||||||
|
# Flatten in case it's (1, N) or (N,)
|
||||||
|
samples = frame_array.reshape(-1)
|
||||||
|
|
||||||
|
if frame.layout.name == 'stereo':
|
||||||
|
# Interleaved stereo: [L0, R0, L1, R1, ...]
|
||||||
|
mono_array = samples[::2] # Take left channel
|
||||||
|
else:
|
||||||
|
mono_array = samples
|
||||||
|
|
||||||
|
# Feed mono PCM samples to the global WebRTC audio input
|
||||||
|
await audio_input.put_samples(mono_array.astype(np.int16))
|
||||||
|
|
||||||
# Save to WAV file - only for testing
|
# Save to WAV file - only for testing
|
||||||
# if not hasattr(pc, 'wav_writer'):
|
# if not hasattr(pc, 'wav_writer'):
|
||||||
# import wave
|
# import wave
|
||||||
@@ -174,7 +188,6 @@ async def offer(offer: Offer):
|
|||||||
# pc.wav_writer.writeframes(pcm_data)
|
# pc.wav_writer.writeframes(pcm_data)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
log.error(f"{id_}: Exception in on_track: {e}")
|
log.error(f"{id_}: Exception in on_track: {e}")
|
||||||
finally:
|
finally:
|
||||||
@@ -218,7 +231,7 @@ async def shutdown():
|
|||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
import uvicorn
|
import uvicorn
|
||||||
log.basicConfig(
|
log.basicConfig(
|
||||||
level=log.INFO,
|
level=log.DEBUG,
|
||||||
format='%(module)s.py:%(lineno)d %(levelname)s: %(message)s'
|
format='%(module)s.py:%(lineno)d %(levelname)s: %(message)s'
|
||||||
)
|
)
|
||||||
uvicorn.run(app, host="0.0.0.0", port=5000)
|
uvicorn.run(app, host="0.0.0.0", port=5000)
|
||||||
41
src/auracast/utils/webrtc_audio_input.py
Normal file
41
src/auracast/utils/webrtc_audio_input.py
Normal file
@@ -0,0 +1,41 @@
|
|||||||
|
import asyncio
|
||||||
|
import numpy as np
|
||||||
|
import logging
|
||||||
|
|
||||||
|
class WebRTCAudioInput:
|
||||||
|
"""
|
||||||
|
Buffer PCM samples from WebRTC and provide an async generator interface for chunked frames.
|
||||||
|
"""
|
||||||
|
def __init__(self):
|
||||||
|
self.buffer = np.array([], dtype=np.int16)
|
||||||
|
self.lock = asyncio.Lock()
|
||||||
|
self.data_available = asyncio.Event()
|
||||||
|
self.closed = False
|
||||||
|
|
||||||
|
async def frames(self, frame_size: int):
|
||||||
|
"""
|
||||||
|
Async generator yielding exactly frame_size samples as numpy arrays.
|
||||||
|
"""
|
||||||
|
while not self.closed:
|
||||||
|
async with self.lock:
|
||||||
|
if len(self.buffer) >= frame_size:
|
||||||
|
chunk = self.buffer[:frame_size]
|
||||||
|
self.buffer = self.buffer[frame_size:]
|
||||||
|
logging.debug(f"WebRTCAudioInput: Yielding {frame_size} samples, buffer now has {len(self.buffer)} samples remaining.")
|
||||||
|
yield chunk
|
||||||
|
else:
|
||||||
|
self.data_available.clear()
|
||||||
|
await self.data_available.wait()
|
||||||
|
|
||||||
|
async def put_samples(self, samples: np.ndarray):
|
||||||
|
"""
|
||||||
|
Add new PCM samples (1D np.int16 array, mono) to the buffer.
|
||||||
|
"""
|
||||||
|
async with self.lock:
|
||||||
|
self.buffer = np.concatenate([self.buffer, samples])
|
||||||
|
logging.debug(f"WebRTCAudioInput: Added {len(samples)} samples, buffer now has {len(self.buffer)} samples.")
|
||||||
|
self.data_available.set()
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
self.closed = True
|
||||||
|
self.data_available.set()
|
||||||
Reference in New Issue
Block a user