diff --git a/src/auracast/multicast.py b/src/auracast/multicast.py index f680a27..60c959f 100644 --- a/src/auracast/multicast.py +++ b/src/auracast/multicast.py @@ -155,11 +155,13 @@ class PyAlsaAudioInput(audio_io.ThreadedAudioInput): self._callback_thread = None self._actual_channels = None self._periodsize = None + self._buffer_log_last_ts = time.monotonic() + self._buffer_log_max_last_sec = 0 def _open(self) -> audio_io.PcmFormat: # ========== LATENCY CONFIGURATION ========== # Adjust these parameters to tune latency vs stability - ALSA_PERIODSIZE = 120 # Samples per ALSA read (240@48kHz = 5ms, 120 = 2.5ms, 96 = 2ms) + ALSA_PERIODSIZE = 96 # Samples per ALSA read (240@48kHz = 5ms, 120 = 2.5ms, 96 = 2ms) ALSA_PERIODS = 2 # Number of periods in ALSA buffer (lower = less latency, more risk of underrun) # Ring buffer: keep only 3 periods max to minimize latency (safety margin only) # =========================================== @@ -168,22 +170,19 @@ class PyAlsaAudioInput(audio_io.ThreadedAudioInput): requested_channels = int(self._pcm_format.channels) self._periodsize = ALSA_PERIODSIZE # Max ring buffer = 3 periods worth of data (tight coupling, minimal latency) - self._max_buffer_bytes = ALSA_PERIODSIZE * 3 * 2 * requested_channels + self._max_buffer_bytes = ALSA_PERIODSIZE * 60 * 2 * requested_channels self._pcm = alsaaudio.PCM( type=alsaaudio.PCM_CAPTURE, mode=alsaaudio.PCM_NORMAL, device=self._device, + periods=ALSA_PERIODS, ) self._pcm.setchannels(requested_channels) self._pcm.setformat(alsaaudio.PCM_FORMAT_S16_LE) actual_rate = self._pcm.setrate(requested_rate) self._pcm.setperiodsize(ALSA_PERIODSIZE) - try: - self._pcm.setperiods(ALSA_PERIODS) - except AttributeError: - pass # Some pyalsaaudio versions don't have setperiods() ring_buf_samples = self._max_buffer_bytes // (2 * requested_channels) ring_buf_ms = (ring_buf_samples / actual_rate) * 1000 @@ -229,39 +228,101 @@ class PyAlsaAudioInput(audio_io.ThreadedAudioInput): with self._ring_lock: self._ring_buffer.append(data) - total_bytes = sum(len(chunk) for chunk in self._ring_buffer) - while total_bytes > self._max_buffer_bytes: - self._ring_buffer.popleft() - total_bytes = sum(len(chunk) for chunk in self._ring_buffer) + # total_bytes = sum(len(chunk) for chunk in self._ring_buffer) + # # logging.info("Ringbuffer: bytes=%d", total_bytes) + # while total_bytes > self._max_buffer_bytes: + # self._ring_buffer.popleft() + # logging.error("Ringbuffer: OVERFLOW") + # total_bytes = sum(len(chunk) for chunk in self._ring_buffer) except: if self._running: break + # def _read(self, frame_size: int) -> bytes: + # bytes_needed = frame_size * 2 + # result = b'' + + # buffer_not_empty = True + # while (len(result) < bytes_needed) and buffer_not_empty: + # with self._ring_lock: + # buffer_size = sum(len(chunk) for chunk in self._ring_buffer) + # self._buffer_log_max_last_sec = max(self._buffer_log_max_last_sec, buffer_size) + # now = time.monotonic() + # if now - self._buffer_log_last_ts >= 1.0: + # logging.info( + # "Buffer size (bytes): current=%d max_last_sec=%d", + # buffer_size, + # self._buffer_log_max_last_sec, + # ) + # self._buffer_log_last_ts = now + # self._buffer_log_max_last_sec = 0 + # if self._ring_buffer and buffer_size > bytes_needed : + # chunk = self._ring_buffer.popleft() + # needed = bytes_needed - len(result) + # if len(chunk) <= needed: + # result += chunk + # else: + # result += chunk[:needed] + # self._ring_buffer.appendleft(chunk[needed:]) + # else: + # # Ring buffer empty - release lock and wait a bit + # pass + + # if len(result) < bytes_needed: + # # Don't busy-wait - sleep briefly to let capture thread fill buffer + # time.sleep(0.001) # 0.1ms + + # return result + def _read(self, frame_size: int) -> bytes: bytes_needed = frame_size * 2 result = b'' - - while len(result) < bytes_needed: - with self._ring_lock: - if self._ring_buffer: - chunk = self._ring_buffer.popleft() - needed = bytes_needed - len(result) - if len(chunk) <= needed: - result += chunk - else: - result += chunk[:needed] - self._ring_buffer.appendleft(chunk[needed:]) + + if self._ring_buffer: + buffer_size = sum(len(chunk) for chunk in self._ring_buffer) + else: + buffer_size = 0 + buffer_not_empty = (buffer_size != 0) + with self._ring_lock: + while (len(result) < bytes_needed) and buffer_not_empty: + chunk = self._ring_buffer.popleft() + needed = bytes_needed - len(result) + if len(chunk) <= needed: + result += chunk else: - # Ring buffer empty - release lock and wait a bit - pass + result += chunk[:needed] + self._ring_buffer.appendleft(chunk[needed:]) + if self._ring_buffer: + buffer_size = sum(len(chunk) for chunk in self._ring_buffer) + self._buffer_log_max_last_sec = max(self._buffer_log_max_last_sec, buffer_size) + now = time.monotonic() + if now - self._buffer_log_last_ts >= 1.0: + logging.info( + "Buffer size (bytes): current=%d max_last_sec=%d", + buffer_size, + self._buffer_log_max_last_sec, + ) + self._buffer_log_last_ts = now + self._buffer_log_max_last_sec = 0 + else: + buffer_size = 0 + buffer_not_empty = (buffer_size != 0) + #append to bytesneeded + if len(result) < bytes_needed: + result += b'\x00' * (bytes_needed - len(result)) - if len(result) < bytes_needed: - # Don't busy-wait - sleep briefly to let capture thread fill buffer - import time - time.sleep(0.0001) # 0.1ms - return result + # def _read(self, frame_size: int) -> bytes: + # bytes_needed = frame_size * 2 + # result = b'' + # # Generate 500Hz sine wave + # samples = [] + # for i in range(frame_size): + # sample = int(np.sin(2 * np.pi * 500 * i / 48000) * 32767) + # samples.append(sample) + # return struct.pack('h' * len(samples), *samples) + def _close(self) -> None: self._running = False if self._callback_thread: @@ -580,7 +641,7 @@ async def init_broadcast( def on_flow(): data_packet_queue = iso_queue.data_packet_queue - print( + logging.info( f'\rPACKETS: pending={data_packet_queue.pending}, ' f'queued={data_packet_queue.queued}, ' f'completed={data_packet_queue.completed}', diff --git a/src/auracast/multicast_control.py b/src/auracast/multicast_control.py index c34f3a5..38a0a2c 100644 --- a/src/auracast/multicast_control.py +++ b/src/auracast/multicast_control.py @@ -143,6 +143,10 @@ async def main(): level=os.environ.get('LOG_LEVEL', logging.DEBUG), format='%(module)s.py:%(lineno)d %(levelname)s: %(message)s' ) + + # Enable debug logging for bumble + # logging.getLogger('bumble').setLevel(logging.DEBUG) + os.chdir(os.path.dirname(__file__)) global_conf = auracast_config.AuracastGlobalConfig( diff --git a/src/auracast/server/multicast_server.py b/src/auracast/server/multicast_server.py index 7a05a1b..4852627 100644 --- a/src/auracast/server/multicast_server.py +++ b/src/auracast/server/multicast_server.py @@ -26,6 +26,10 @@ from auracast.utils.sounddevice_utils import ( ) load_dotenv() + +# Configure bumble debug logging +# log.getLogger('bumble').setLevel(log.DEBUG) + # make sure pipewire sets latency # Primary and secondary persisted settings files STREAM_SETTINGS_FILE1 = os.path.join(os.path.dirname(__file__), 'stream_settings.json')