delay method wip save to test no thread method.
This commit is contained in:
@@ -155,11 +155,13 @@ class PyAlsaAudioInput(audio_io.ThreadedAudioInput):
|
|||||||
self._callback_thread = None
|
self._callback_thread = None
|
||||||
self._actual_channels = None
|
self._actual_channels = None
|
||||||
self._periodsize = None
|
self._periodsize = None
|
||||||
|
self._buffer_log_last_ts = time.monotonic()
|
||||||
|
self._buffer_log_max_last_sec = 0
|
||||||
|
|
||||||
def _open(self) -> audio_io.PcmFormat:
|
def _open(self) -> audio_io.PcmFormat:
|
||||||
# ========== LATENCY CONFIGURATION ==========
|
# ========== LATENCY CONFIGURATION ==========
|
||||||
# Adjust these parameters to tune latency vs stability
|
# Adjust these parameters to tune latency vs stability
|
||||||
ALSA_PERIODSIZE = 120 # Samples per ALSA read (240@48kHz = 5ms, 120 = 2.5ms, 96 = 2ms)
|
ALSA_PERIODSIZE = 96 # Samples per ALSA read (240@48kHz = 5ms, 120 = 2.5ms, 96 = 2ms)
|
||||||
ALSA_PERIODS = 2 # Number of periods in ALSA buffer (lower = less latency, more risk of underrun)
|
ALSA_PERIODS = 2 # Number of periods in ALSA buffer (lower = less latency, more risk of underrun)
|
||||||
# Ring buffer: keep only 3 periods max to minimize latency (safety margin only)
|
# Ring buffer: keep only 3 periods max to minimize latency (safety margin only)
|
||||||
# ===========================================
|
# ===========================================
|
||||||
@@ -168,22 +170,19 @@ class PyAlsaAudioInput(audio_io.ThreadedAudioInput):
|
|||||||
requested_channels = int(self._pcm_format.channels)
|
requested_channels = int(self._pcm_format.channels)
|
||||||
self._periodsize = ALSA_PERIODSIZE
|
self._periodsize = ALSA_PERIODSIZE
|
||||||
# Max ring buffer = 3 periods worth of data (tight coupling, minimal latency)
|
# Max ring buffer = 3 periods worth of data (tight coupling, minimal latency)
|
||||||
self._max_buffer_bytes = ALSA_PERIODSIZE * 3 * 2 * requested_channels
|
self._max_buffer_bytes = ALSA_PERIODSIZE * 60 * 2 * requested_channels
|
||||||
|
|
||||||
self._pcm = alsaaudio.PCM(
|
self._pcm = alsaaudio.PCM(
|
||||||
type=alsaaudio.PCM_CAPTURE,
|
type=alsaaudio.PCM_CAPTURE,
|
||||||
mode=alsaaudio.PCM_NORMAL,
|
mode=alsaaudio.PCM_NORMAL,
|
||||||
device=self._device,
|
device=self._device,
|
||||||
|
periods=ALSA_PERIODS,
|
||||||
)
|
)
|
||||||
|
|
||||||
self._pcm.setchannels(requested_channels)
|
self._pcm.setchannels(requested_channels)
|
||||||
self._pcm.setformat(alsaaudio.PCM_FORMAT_S16_LE)
|
self._pcm.setformat(alsaaudio.PCM_FORMAT_S16_LE)
|
||||||
actual_rate = self._pcm.setrate(requested_rate)
|
actual_rate = self._pcm.setrate(requested_rate)
|
||||||
self._pcm.setperiodsize(ALSA_PERIODSIZE)
|
self._pcm.setperiodsize(ALSA_PERIODSIZE)
|
||||||
try:
|
|
||||||
self._pcm.setperiods(ALSA_PERIODS)
|
|
||||||
except AttributeError:
|
|
||||||
pass # Some pyalsaaudio versions don't have setperiods()
|
|
||||||
|
|
||||||
ring_buf_samples = self._max_buffer_bytes // (2 * requested_channels)
|
ring_buf_samples = self._max_buffer_bytes // (2 * requested_channels)
|
||||||
ring_buf_ms = (ring_buf_samples / actual_rate) * 1000
|
ring_buf_ms = (ring_buf_samples / actual_rate) * 1000
|
||||||
@@ -229,39 +228,101 @@ class PyAlsaAudioInput(audio_io.ThreadedAudioInput):
|
|||||||
|
|
||||||
with self._ring_lock:
|
with self._ring_lock:
|
||||||
self._ring_buffer.append(data)
|
self._ring_buffer.append(data)
|
||||||
total_bytes = sum(len(chunk) for chunk in self._ring_buffer)
|
# total_bytes = sum(len(chunk) for chunk in self._ring_buffer)
|
||||||
while total_bytes > self._max_buffer_bytes:
|
# # logging.info("Ringbuffer: bytes=%d", total_bytes)
|
||||||
self._ring_buffer.popleft()
|
# while total_bytes > self._max_buffer_bytes:
|
||||||
total_bytes = sum(len(chunk) for chunk in self._ring_buffer)
|
# self._ring_buffer.popleft()
|
||||||
|
# logging.error("Ringbuffer: OVERFLOW")
|
||||||
|
# total_bytes = sum(len(chunk) for chunk in self._ring_buffer)
|
||||||
except:
|
except:
|
||||||
if self._running:
|
if self._running:
|
||||||
break
|
break
|
||||||
|
|
||||||
|
# def _read(self, frame_size: int) -> bytes:
|
||||||
|
# bytes_needed = frame_size * 2
|
||||||
|
# result = b''
|
||||||
|
|
||||||
|
# buffer_not_empty = True
|
||||||
|
# while (len(result) < bytes_needed) and buffer_not_empty:
|
||||||
|
# with self._ring_lock:
|
||||||
|
# buffer_size = sum(len(chunk) for chunk in self._ring_buffer)
|
||||||
|
# self._buffer_log_max_last_sec = max(self._buffer_log_max_last_sec, buffer_size)
|
||||||
|
# now = time.monotonic()
|
||||||
|
# if now - self._buffer_log_last_ts >= 1.0:
|
||||||
|
# logging.info(
|
||||||
|
# "Buffer size (bytes): current=%d max_last_sec=%d",
|
||||||
|
# buffer_size,
|
||||||
|
# self._buffer_log_max_last_sec,
|
||||||
|
# )
|
||||||
|
# self._buffer_log_last_ts = now
|
||||||
|
# self._buffer_log_max_last_sec = 0
|
||||||
|
# if self._ring_buffer and buffer_size > bytes_needed :
|
||||||
|
# chunk = self._ring_buffer.popleft()
|
||||||
|
# needed = bytes_needed - len(result)
|
||||||
|
# if len(chunk) <= needed:
|
||||||
|
# result += chunk
|
||||||
|
# else:
|
||||||
|
# result += chunk[:needed]
|
||||||
|
# self._ring_buffer.appendleft(chunk[needed:])
|
||||||
|
# else:
|
||||||
|
# # Ring buffer empty - release lock and wait a bit
|
||||||
|
# pass
|
||||||
|
|
||||||
|
# if len(result) < bytes_needed:
|
||||||
|
# # Don't busy-wait - sleep briefly to let capture thread fill buffer
|
||||||
|
# time.sleep(0.001) # 0.1ms
|
||||||
|
|
||||||
|
# return result
|
||||||
|
|
||||||
def _read(self, frame_size: int) -> bytes:
|
def _read(self, frame_size: int) -> bytes:
|
||||||
bytes_needed = frame_size * 2
|
bytes_needed = frame_size * 2
|
||||||
result = b''
|
result = b''
|
||||||
|
|
||||||
while len(result) < bytes_needed:
|
if self._ring_buffer:
|
||||||
with self._ring_lock:
|
buffer_size = sum(len(chunk) for chunk in self._ring_buffer)
|
||||||
if self._ring_buffer:
|
else:
|
||||||
chunk = self._ring_buffer.popleft()
|
buffer_size = 0
|
||||||
needed = bytes_needed - len(result)
|
buffer_not_empty = (buffer_size != 0)
|
||||||
if len(chunk) <= needed:
|
with self._ring_lock:
|
||||||
result += chunk
|
while (len(result) < bytes_needed) and buffer_not_empty:
|
||||||
else:
|
chunk = self._ring_buffer.popleft()
|
||||||
result += chunk[:needed]
|
needed = bytes_needed - len(result)
|
||||||
self._ring_buffer.appendleft(chunk[needed:])
|
if len(chunk) <= needed:
|
||||||
|
result += chunk
|
||||||
else:
|
else:
|
||||||
# Ring buffer empty - release lock and wait a bit
|
result += chunk[:needed]
|
||||||
pass
|
self._ring_buffer.appendleft(chunk[needed:])
|
||||||
|
if self._ring_buffer:
|
||||||
if len(result) < bytes_needed:
|
buffer_size = sum(len(chunk) for chunk in self._ring_buffer)
|
||||||
# Don't busy-wait - sleep briefly to let capture thread fill buffer
|
self._buffer_log_max_last_sec = max(self._buffer_log_max_last_sec, buffer_size)
|
||||||
import time
|
now = time.monotonic()
|
||||||
time.sleep(0.0001) # 0.1ms
|
if now - self._buffer_log_last_ts >= 1.0:
|
||||||
|
logging.info(
|
||||||
|
"Buffer size (bytes): current=%d max_last_sec=%d",
|
||||||
|
buffer_size,
|
||||||
|
self._buffer_log_max_last_sec,
|
||||||
|
)
|
||||||
|
self._buffer_log_last_ts = now
|
||||||
|
self._buffer_log_max_last_sec = 0
|
||||||
|
else:
|
||||||
|
buffer_size = 0
|
||||||
|
buffer_not_empty = (buffer_size != 0)
|
||||||
|
#append to bytesneeded
|
||||||
|
if len(result) < bytes_needed:
|
||||||
|
result += b'\x00' * (bytes_needed - len(result))
|
||||||
|
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
# def _read(self, frame_size: int) -> bytes:
|
||||||
|
# bytes_needed = frame_size * 2
|
||||||
|
# result = b''
|
||||||
|
# # Generate 500Hz sine wave
|
||||||
|
# samples = []
|
||||||
|
# for i in range(frame_size):
|
||||||
|
# sample = int(np.sin(2 * np.pi * 500 * i / 48000) * 32767)
|
||||||
|
# samples.append(sample)
|
||||||
|
# return struct.pack('h' * len(samples), *samples)
|
||||||
|
|
||||||
def _close(self) -> None:
|
def _close(self) -> None:
|
||||||
self._running = False
|
self._running = False
|
||||||
if self._callback_thread:
|
if self._callback_thread:
|
||||||
@@ -580,7 +641,7 @@ async def init_broadcast(
|
|||||||
|
|
||||||
def on_flow():
|
def on_flow():
|
||||||
data_packet_queue = iso_queue.data_packet_queue
|
data_packet_queue = iso_queue.data_packet_queue
|
||||||
print(
|
logging.info(
|
||||||
f'\rPACKETS: pending={data_packet_queue.pending}, '
|
f'\rPACKETS: pending={data_packet_queue.pending}, '
|
||||||
f'queued={data_packet_queue.queued}, '
|
f'queued={data_packet_queue.queued}, '
|
||||||
f'completed={data_packet_queue.completed}',
|
f'completed={data_packet_queue.completed}',
|
||||||
|
|||||||
@@ -143,6 +143,10 @@ async def main():
|
|||||||
level=os.environ.get('LOG_LEVEL', logging.DEBUG),
|
level=os.environ.get('LOG_LEVEL', logging.DEBUG),
|
||||||
format='%(module)s.py:%(lineno)d %(levelname)s: %(message)s'
|
format='%(module)s.py:%(lineno)d %(levelname)s: %(message)s'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Enable debug logging for bumble
|
||||||
|
# logging.getLogger('bumble').setLevel(logging.DEBUG)
|
||||||
|
|
||||||
os.chdir(os.path.dirname(__file__))
|
os.chdir(os.path.dirname(__file__))
|
||||||
|
|
||||||
global_conf = auracast_config.AuracastGlobalConfig(
|
global_conf = auracast_config.AuracastGlobalConfig(
|
||||||
|
|||||||
@@ -26,6 +26,10 @@ from auracast.utils.sounddevice_utils import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
load_dotenv()
|
load_dotenv()
|
||||||
|
|
||||||
|
# Configure bumble debug logging
|
||||||
|
# log.getLogger('bumble').setLevel(log.DEBUG)
|
||||||
|
|
||||||
# make sure pipewire sets latency
|
# make sure pipewire sets latency
|
||||||
# Primary and secondary persisted settings files
|
# Primary and secondary persisted settings files
|
||||||
STREAM_SETTINGS_FILE1 = os.path.join(os.path.dirname(__file__), 'stream_settings.json')
|
STREAM_SETTINGS_FILE1 = os.path.join(os.path.dirname(__file__), 'stream_settings.json')
|
||||||
|
|||||||
Reference in New Issue
Block a user