From d43b91b633ac66991d547f37f3a0eb115a59a290 Mon Sep 17 00:00:00 2001 From: pstruebi Date: Sun, 23 Feb 2025 14:09:21 +0100 Subject: [PATCH] wrap the streamer coroutine in a class to be able to control it better --- auracast/multicast.py | 65 ++++++++++++++++++++++++++++--------------- 1 file changed, 42 insertions(+), 23 deletions(-) diff --git a/auracast/multicast.py b/auracast/multicast.py index 2fcb276..88f7e98 100644 --- a/auracast/multicast.py +++ b/auracast/multicast.py @@ -285,29 +285,49 @@ async def setup_audio( big['audio_input'] = audio_input big['encoder'] = encoder -async def streamer(bigs): - # TODO: do some pre buffering so the stream is stable from the beginning. One half iso queue would be appropriate - logging.info("Streaming audio...") - while True: - stream_finished = [False for _ in range(len(bigs))] - for i, big in enumerate(bigs.values()): - pcm_frame = await anext(big['audio_input'].frames(big['lc3_frame_samples']), None) - if pcm_frame is None: # Not all streams may stop at the same time - stream_finished[i] = True - continue +class Streamer(): + def __init__(self, bigs): + self.task = None + self.is_streaming = False + self.bigs = bigs - lc3_frame = big['encoder'].encode( - pcm_frame, num_bytes=big['lc3_bytes_per_frame'], bit_depth=big['pcm_bit_depth'] - ) - await big['iso_queue'].write(lc3_frame) + def start_streaming_task(self): - if all(stream_finished): # Take into account that multiple files have different lengths - print('All streams finished, stopping streamer') - break + if not self.is_streaming: + self.task = asyncio.create_task(self.stream()) + else: + logging.warning('Streamer is already running') + def stop_streaming(self): + """Stops the background task if running.""" + if self.is_streaming: + self.is_streaming = False + if self.task: + self.task.cancel() # Cancel the task safely + self.task = None + + async def stream(self): + # TODO: do some pre buffering so the stream is stable from the beginning. One half iso queue would be appropriate + logging.info("Streaming audio...") + bigs = self.bigs + self.is_streaming = True + while self.is_streaming: + stream_finished = [False for _ in range(len(bigs))] + for i, big in enumerate(bigs.values()): + pcm_frame = await anext(big['audio_input'].frames(big['lc3_frame_samples']), None) + if pcm_frame is None: # Not all streams may stop at the same time + stream_finished[i] = True + continue + + lc3_frame = big['encoder'].encode( + pcm_frame, num_bytes=big['lc3_bytes_per_frame'], bit_depth=big['pcm_bit_depth'] + ) + await big['iso_queue'].write(lc3_frame) + + if all(stream_finished): # Take into account that multiple files have different lengths + print('All streams finished, stopping streamer') + break - #return streamer(bigs) - #await stream # running until stream ends # ----------------------------------------------------------------------------- # Main @@ -330,11 +350,10 @@ async def broadcast(global_conf: auracast_config.AuracastGlobalConfig, big_conf: global_conf, big_conf ) + streamer = Streamer(bigs) + streamer.start_streaming_task() - await streamer(bigs) - - # make a second coroutine to run the streaming - maybe even use the streamer coroutine - # start it without await and go into a infinite loop were further instrucations via a ui can be given ? + await asyncio.wait([streamer.task]) # -----------------------------------------------------------------------------