diff --git a/src/auracast/multicast.py b/src/auracast/multicast.py index ef0303e..29f0eea 100644 --- a/src/auracast/multicast.py +++ b/src/auracast/multicast.py @@ -49,9 +49,12 @@ from bumble.audio import io as audio_io from auracast import auracast_config from auracast.utils.read_lc3_file import read_lc3_file -from auracast.network_audio_receiver import NetworkAudioReceiverUncoded +from auracast.utils.network_audio_receiver import NetworkAudioReceiverUncoded +from auracast.utils.webrtc_audio_input import WebRTCAudioInput +# Instantiate WebRTC audio input for streaming (can be used per-BIG or globally) + # modified from bumble class ModWaveAudioInput(audio_io.ThreadedAudioInput): """Audio input that reads PCM samples from a .wav file.""" @@ -357,6 +360,20 @@ class Streamer(): big['encoder'] = encoder big['precoded'] = False + elif audio_source == 'webrtc': + big['audio_input'] = WebRTCAudioInput() + encoder = lc3.Encoder( + frame_duration_us=global_config.frame_duration_us, + sample_rate_hz=global_config.auracast_sampling_rate_hz, + num_channels=1, + input_sample_rate_hz=48000, # TODO: get samplerate from webrtc + ) + lc3_frame_samples = encoder.get_frame_samples() + big['pcm_bit_depth'] = 16 + big['lc3_frame_samples'] = lc3_frame_samples + big['encoder'] = encoder + big['precoded'] = False + # precoded lc3 from ram elif isinstance(big_config[i].audio_source, bytes): big['precoded'] = True @@ -470,7 +487,7 @@ class Streamer(): if lc3_frame == b'': # Not all streams may stop at the same time stream_finished[i] = True continue - else: + else: # code lc3 on the fly pcm_frame = await anext(big['audio_input'].frames(big['lc3_frame_samples']), None) if pcm_frame is None: # Not all streams may stop at the same time @@ -483,10 +500,10 @@ class Streamer(): await big['iso_queue'].write(lc3_frame) - if all(stream_finished): # Take into account that multiple files have different lengths - logging.info('All streams finished, stopping streamer') - self.is_streaming = False - break + # if all(stream_finished): # Take into account that multiple files have different lengths + # logging.info('All streams finished, stopping streamer') + # self.is_streaming = False + # break # ----------------------------------------------------------------------------- diff --git a/src/auracast/server/multicast_frontend.py b/src/auracast/server/multicast_frontend.py index ed3af61..17ce1cd 100644 --- a/src/auracast/server/multicast_frontend.py +++ b/src/auracast/server/multicast_frontend.py @@ -40,9 +40,9 @@ if audio_mode in ["Webapp", "Network"]: name=stream_name, program_info=f"{stream_name} {quality}", language=language, - audio_source="network" if audio_mode=="Webapp" else "network", + audio_source="webrtc" if audio_mode=="Webapp" else "network", input_format="auto", - iso_que_len=64, # TODO: this should be way less to decrease delay + iso_que_len=1, # TODO: this should be way less to decrease delay sampling_frequency=q['rate'], octets_per_frame=q['octets'], ), diff --git a/src/auracast/server/multicast_server.py b/src/auracast/server/multicast_server.py index 346a169..36e5a7f 100644 --- a/src/auracast/server/multicast_server.py +++ b/src/auracast/server/multicast_server.py @@ -13,6 +13,7 @@ import av import av.audio.layout from typing import List, Set import traceback +from auracast.utils.webrtc_audio_input import WebRTCAudioInput app = FastAPI() @@ -31,7 +32,6 @@ global_config_group = auracast_config.AuracastConfigGroup() # Create multicast controller multicaster: multicast_control.Multicaster | None = None - @app.post("/init") async def initialize(conf: auracast_config.AuracastConfigGroup): """Initializes the broadcasters.""" @@ -126,6 +126,7 @@ async def offer(offer: Offer): # create directory for records - only for testing os.makedirs("./records", exist_ok=True) + multicaster.start_streaming() @pc.on("track") async def on_track(track: MediaStreamTrack): log.info(f"{id_}: track {track.kind} received") @@ -145,13 +146,25 @@ async def offer(offer: Offer): log.info(f"array.shape{frame_array.shape}") log.info(f"array.dtype{frame_array.dtype}") log.info(f"frame.to_ndarray(){frame_array}") + + samples = frame_array.reshape(-1) + log.info(f"samples.shape: {samples.shape}") + + if frame.layout.name == 'stereo': + # Interleaved stereo: [L0, R0, L1, R1, ...] + mono_array = samples[::2] # Take left channel + else: + mono_array = samples + + log.info(f"mono_array.shape: {mono_array.shape}") + + audio_input = list(multicaster.bigs.values())[0]['audio_input'] + frame_array = frame.to_ndarray() - log.info(f"frame_array.shape: {frame_array.shape}") # Flatten in case it's (1, N) or (N,) samples = frame_array.reshape(-1) - log.info(f"samples.shape: {samples.shape}") if frame.layout.name == 'stereo': # Interleaved stereo: [L0, R0, L1, R1, ...] @@ -159,7 +172,8 @@ async def offer(offer: Offer): else: mono_array = samples - log.info(f"mono_array.shape: {mono_array.shape}") + # Feed mono PCM samples to the global WebRTC audio input + await audio_input.put_samples(mono_array.astype(np.int16)) # Save to WAV file - only for testing # if not hasattr(pc, 'wav_writer'): @@ -172,9 +186,8 @@ async def offer(offer: Offer): # pcm_data = mono_array.astype(np.int16).tobytes() # pc.wav_writer.writeframes(pcm_data) - - + except Exception as e: log.error(f"{id_}: Exception in on_track: {e}") finally: @@ -218,7 +231,7 @@ async def shutdown(): if __name__ == '__main__': import uvicorn log.basicConfig( - level=log.INFO, + level=log.DEBUG, format='%(module)s.py:%(lineno)d %(levelname)s: %(message)s' ) uvicorn.run(app, host="0.0.0.0", port=5000) \ No newline at end of file diff --git a/src/auracast/network_audio_receiver.py b/src/auracast/utils/network_audio_receiver.py similarity index 100% rename from src/auracast/network_audio_receiver.py rename to src/auracast/utils/network_audio_receiver.py diff --git a/src/auracast/utils/webrtc_audio_input.py b/src/auracast/utils/webrtc_audio_input.py new file mode 100644 index 0000000..0e6b1a4 --- /dev/null +++ b/src/auracast/utils/webrtc_audio_input.py @@ -0,0 +1,41 @@ +import asyncio +import numpy as np +import logging + +class WebRTCAudioInput: + """ + Buffer PCM samples from WebRTC and provide an async generator interface for chunked frames. + """ + def __init__(self): + self.buffer = np.array([], dtype=np.int16) + self.lock = asyncio.Lock() + self.data_available = asyncio.Event() + self.closed = False + + async def frames(self, frame_size: int): + """ + Async generator yielding exactly frame_size samples as numpy arrays. + """ + while not self.closed: + async with self.lock: + if len(self.buffer) >= frame_size: + chunk = self.buffer[:frame_size] + self.buffer = self.buffer[frame_size:] + logging.debug(f"WebRTCAudioInput: Yielding {frame_size} samples, buffer now has {len(self.buffer)} samples remaining.") + yield chunk + else: + self.data_available.clear() + await self.data_available.wait() + + async def put_samples(self, samples: np.ndarray): + """ + Add new PCM samples (1D np.int16 array, mono) to the buffer. + """ + async with self.lock: + self.buffer = np.concatenate([self.buffer, samples]) + logging.debug(f"WebRTCAudioInput: Added {len(samples)} samples, buffer now has {len(self.buffer)} samples.") + self.data_available.set() + + def close(self): + self.closed = True + self.data_available.set()