wrap the streamer coroutine in a class to be able to control it better
This commit is contained in:
+42
-23
@@ -285,29 +285,49 @@ async def setup_audio(
|
||||
big['audio_input'] = audio_input
|
||||
big['encoder'] = encoder
|
||||
|
||||
async def streamer(bigs):
|
||||
# TODO: do some pre buffering so the stream is stable from the beginning. One half iso queue would be appropriate
|
||||
logging.info("Streaming audio...")
|
||||
while True:
|
||||
stream_finished = [False for _ in range(len(bigs))]
|
||||
for i, big in enumerate(bigs.values()):
|
||||
pcm_frame = await anext(big['audio_input'].frames(big['lc3_frame_samples']), None)
|
||||
if pcm_frame is None: # Not all streams may stop at the same time
|
||||
stream_finished[i] = True
|
||||
continue
|
||||
class Streamer():
|
||||
def __init__(self, bigs):
|
||||
self.task = None
|
||||
self.is_streaming = False
|
||||
self.bigs = bigs
|
||||
|
||||
lc3_frame = big['encoder'].encode(
|
||||
pcm_frame, num_bytes=big['lc3_bytes_per_frame'], bit_depth=big['pcm_bit_depth']
|
||||
)
|
||||
await big['iso_queue'].write(lc3_frame)
|
||||
def start_streaming_task(self):
|
||||
|
||||
if all(stream_finished): # Take into account that multiple files have different lengths
|
||||
print('All streams finished, stopping streamer')
|
||||
break
|
||||
if not self.is_streaming:
|
||||
self.task = asyncio.create_task(self.stream())
|
||||
else:
|
||||
logging.warning('Streamer is already running')
|
||||
|
||||
def stop_streaming(self):
|
||||
"""Stops the background task if running."""
|
||||
if self.is_streaming:
|
||||
self.is_streaming = False
|
||||
if self.task:
|
||||
self.task.cancel() # Cancel the task safely
|
||||
self.task = None
|
||||
|
||||
async def stream(self):
|
||||
# TODO: do some pre buffering so the stream is stable from the beginning. One half iso queue would be appropriate
|
||||
logging.info("Streaming audio...")
|
||||
bigs = self.bigs
|
||||
self.is_streaming = True
|
||||
while self.is_streaming:
|
||||
stream_finished = [False for _ in range(len(bigs))]
|
||||
for i, big in enumerate(bigs.values()):
|
||||
pcm_frame = await anext(big['audio_input'].frames(big['lc3_frame_samples']), None)
|
||||
if pcm_frame is None: # Not all streams may stop at the same time
|
||||
stream_finished[i] = True
|
||||
continue
|
||||
|
||||
lc3_frame = big['encoder'].encode(
|
||||
pcm_frame, num_bytes=big['lc3_bytes_per_frame'], bit_depth=big['pcm_bit_depth']
|
||||
)
|
||||
await big['iso_queue'].write(lc3_frame)
|
||||
|
||||
if all(stream_finished): # Take into account that multiple files have different lengths
|
||||
print('All streams finished, stopping streamer')
|
||||
break
|
||||
|
||||
#return streamer(bigs)
|
||||
#await stream # running until stream ends
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Main
|
||||
@@ -330,11 +350,10 @@ async def broadcast(global_conf: auracast_config.AuracastGlobalConfig, big_conf:
|
||||
global_conf,
|
||||
big_conf
|
||||
)
|
||||
streamer = Streamer(bigs)
|
||||
streamer.start_streaming_task()
|
||||
|
||||
await streamer(bigs)
|
||||
|
||||
# make a second coroutine to run the streaming - maybe even use the streamer coroutine
|
||||
# start it without await and go into a infinite loop were further instrucations via a ui can be given ?
|
||||
await asyncio.wait([streamer.task])
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
Reference in New Issue
Block a user