import asyncio import numpy as np import logging class WebRTCAudioInput: """ Buffer PCM samples from WebRTC and provide an async generator interface for chunked frames. """ def __init__(self): self.buffer = np.array([], dtype=np.int16) self.lock = asyncio.Lock() self.data_available = asyncio.Event() self.closed = False async def frames(self, frame_size: int): """ Async generator yielding exactly frame_size samples as numpy arrays. """ while not self.closed: async with self.lock: if len(self.buffer) >= frame_size: chunk = self.buffer[:frame_size] self.buffer = self.buffer[frame_size:] logging.debug(f"WebRTCAudioInput: Yielding {frame_size} samples, buffer now has {len(self.buffer)} samples remaining.") yield chunk else: self.data_available.clear() await self.data_available.wait() async def put_samples(self, samples: np.ndarray): """ Add new PCM samples (1D np.int16 array, mono) to the buffer. """ async with self.lock: self.buffer = np.concatenate([self.buffer, samples]) logging.debug(f"WebRTCAudioInput: Added {len(samples)} samples, buffer now has {len(self.buffer)} samples.") self.data_available.set() async def close(self): """Mark the input closed so frames() stops yielding.""" self.closed = True self.data_available.set()