use async io and monitor arriving of stream
This commit is contained in:
@@ -1,6 +1,7 @@
|
|||||||
import socket
|
import socket
|
||||||
import sounddevice as sd
|
import sounddevice as sd
|
||||||
import numpy as np
|
import numpy as np
|
||||||
|
import asyncio
|
||||||
|
|
||||||
PORT = 50007
|
PORT = 50007
|
||||||
SAMPLERATE = 44100
|
SAMPLERATE = 44100
|
||||||
@@ -9,11 +10,54 @@ CHUNK = 1024
|
|||||||
|
|
||||||
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||||
sock.bind(('0.0.0.0', PORT))
|
sock.bind(('0.0.0.0', PORT))
|
||||||
|
sock.setblocking(False)
|
||||||
|
|
||||||
def audio_callback(outdata, frames, time, status):
|
STREAM_TIMEOUT = 2.0 # seconds without packets = stopped
|
||||||
data, _ = sock.recvfrom(CHUNK * 2) # 2 bytes per sample (int16)
|
AUDIO_QUEUE_MAXSIZE = 20
|
||||||
outdata[:] = np.frombuffer(data, dtype=np.int16).reshape(-1, CHANNELS)
|
|
||||||
|
|
||||||
with sd.OutputStream(samplerate=SAMPLERATE, channels=CHANNELS, dtype='int16', blocksize=CHUNK, callback=audio_callback):
|
async def udp_receiver(audio_queue, stream_state):
|
||||||
print("Receiving audio...")
|
while True:
|
||||||
input() # Press Enter to stop
|
try:
|
||||||
|
data, _ = await asyncio.get_event_loop().sock_recvfrom(sock, CHUNK * 2)
|
||||||
|
await audio_queue.put(data)
|
||||||
|
stream_state['last_packet'] = asyncio.get_event_loop().time()
|
||||||
|
if not stream_state['active']:
|
||||||
|
print("Audio stream arriving at sink.")
|
||||||
|
stream_state['active'] = True
|
||||||
|
except Exception:
|
||||||
|
await asyncio.sleep(0.01)
|
||||||
|
|
||||||
|
async def stream_monitor(stream_state):
|
||||||
|
while True:
|
||||||
|
await asyncio.sleep(0.5)
|
||||||
|
if stream_state['active']:
|
||||||
|
now = asyncio.get_event_loop().time()
|
||||||
|
if now - stream_state['last_packet'] > STREAM_TIMEOUT:
|
||||||
|
print("Audio stream stopped at sink.")
|
||||||
|
stream_state['active'] = False
|
||||||
|
|
||||||
|
def make_audio_callback(audio_queue):
|
||||||
|
def audio_callback(outdata, frames, time, status):
|
||||||
|
try:
|
||||||
|
data = audio_queue.get_nowait()
|
||||||
|
outdata[:] = np.frombuffer(data, dtype=np.int16).reshape(-1, CHANNELS)
|
||||||
|
except asyncio.QueueEmpty:
|
||||||
|
outdata[:] = np.zeros((frames, CHANNELS), dtype=np.int16)
|
||||||
|
return audio_callback
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
audio_queue = asyncio.Queue(maxsize=AUDIO_QUEUE_MAXSIZE)
|
||||||
|
stream_state = {'active': False, 'last_packet': asyncio.get_event_loop().time()}
|
||||||
|
|
||||||
|
receiver_task = asyncio.create_task(udp_receiver(audio_queue, stream_state))
|
||||||
|
monitor_task = asyncio.create_task(stream_monitor(stream_state))
|
||||||
|
|
||||||
|
print("Audio sink running. Waiting for audio packets...")
|
||||||
|
with sd.OutputStream(samplerate=SAMPLERATE, channels=CHANNELS, dtype='int16', blocksize=CHUNK, callback=make_audio_callback(audio_queue)):
|
||||||
|
try:
|
||||||
|
await asyncio.Future() # Run forever
|
||||||
|
except (asyncio.CancelledError, KeyboardInterrupt):
|
||||||
|
pass
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
asyncio.run(main())
|
||||||
Reference in New Issue
Block a user