diff --git a/network_audio_streaming/audio_sink.py b/network_audio_streaming/audio_sink.py index b407d4b..418b883 100644 --- a/network_audio_streaming/audio_sink.py +++ b/network_audio_streaming/audio_sink.py @@ -1,6 +1,7 @@ import socket import sounddevice as sd import numpy as np +import asyncio PORT = 50007 SAMPLERATE = 44100 @@ -9,11 +10,54 @@ CHUNK = 1024 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.bind(('0.0.0.0', PORT)) +sock.setblocking(False) -def audio_callback(outdata, frames, time, status): - data, _ = sock.recvfrom(CHUNK * 2) # 2 bytes per sample (int16) - outdata[:] = np.frombuffer(data, dtype=np.int16).reshape(-1, CHANNELS) +STREAM_TIMEOUT = 2.0 # seconds without packets = stopped +AUDIO_QUEUE_MAXSIZE = 20 -with sd.OutputStream(samplerate=SAMPLERATE, channels=CHANNELS, dtype='int16', blocksize=CHUNK, callback=audio_callback): - print("Receiving audio...") - input() # Press Enter to stop \ No newline at end of file +async def udp_receiver(audio_queue, stream_state): + while True: + try: + data, _ = await asyncio.get_event_loop().sock_recvfrom(sock, CHUNK * 2) + await audio_queue.put(data) + stream_state['last_packet'] = asyncio.get_event_loop().time() + if not stream_state['active']: + print("Audio stream arriving at sink.") + stream_state['active'] = True + except Exception: + await asyncio.sleep(0.01) + +async def stream_monitor(stream_state): + while True: + await asyncio.sleep(0.5) + if stream_state['active']: + now = asyncio.get_event_loop().time() + if now - stream_state['last_packet'] > STREAM_TIMEOUT: + print("Audio stream stopped at sink.") + stream_state['active'] = False + +def make_audio_callback(audio_queue): + def audio_callback(outdata, frames, time, status): + try: + data = audio_queue.get_nowait() + outdata[:] = np.frombuffer(data, dtype=np.int16).reshape(-1, CHANNELS) + except asyncio.QueueEmpty: + outdata[:] = np.zeros((frames, CHANNELS), dtype=np.int16) + return audio_callback + +async def main(): + audio_queue = asyncio.Queue(maxsize=AUDIO_QUEUE_MAXSIZE) + stream_state = {'active': False, 'last_packet': asyncio.get_event_loop().time()} + + receiver_task = asyncio.create_task(udp_receiver(audio_queue, stream_state)) + monitor_task = asyncio.create_task(stream_monitor(stream_state)) + + print("Audio sink running. Waiting for audio packets...") + with sd.OutputStream(samplerate=SAMPLERATE, channels=CHANNELS, dtype='int16', blocksize=CHUNK, callback=make_audio_callback(audio_queue)): + try: + await asyncio.Future() # Run forever + except (asyncio.CancelledError, KeyboardInterrupt): + pass + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file