Compare commits

...

7 Commits

Author SHA1 Message Date
Lars Immisch
a34bdd7a02 Debugging underruns, needs cleanup 2025-12-01 22:33:30 +00:00
Lars Immisch
5a4111dd8c Small cleanup 2025-12-01 22:33:30 +00:00
Lars Immisch
2a8f2a6699 Cleanup 2025-12-01 22:33:30 +00:00
Lars Immisch
a4168d7525 The capture device does not need to be reopened; better logging/comments 2025-12-01 22:33:30 +00:00
Lars Immisch
ac97e641bc Restarting the capture device looks ok 2025-12-01 22:33:30 +00:00
Lars Immisch
caafa8ae21 Make commands optional 2025-12-01 22:33:30 +00:00
Lars Immisch
2fae2c0ed7 Experment with state machine 2025-12-01 22:33:29 +00:00
2 changed files with 201 additions and 98 deletions

View File

@@ -1,5 +1,5 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# -*- mode: python; indent-tabs-mode: t; c-basic-offset: 4; tab-width: 4 -*- # -*- mode: python; indent-tabs-mode: t; c-basic-offset: 4; tab-width: 4; python-indent: 4 -*-
import sys import sys
import select import select
@@ -7,12 +7,21 @@ import logging
import re import re
import struct import struct
import subprocess import subprocess
import errno
from enum import Enum, auto
from datetime import datetime, timedelta from datetime import datetime, timedelta
from alsaaudio import (PCM, pcms, PCM_PLAYBACK, PCM_CAPTURE, PCM_NONBLOCK, Mixer, from alsaaudio import (PCM, pcms, PCM_PLAYBACK, PCM_CAPTURE, PCM_NONBLOCK, Mixer,
PCM_STATE_OPEN, PCM_STATE_SETUP, PCM_STATE_PREPARED, PCM_STATE_RUNNING, PCM_STATE_XRUN, PCM_STATE_DRAINING, PCM_STATE_OPEN, PCM_STATE_SETUP, PCM_STATE_PREPARED, PCM_STATE_RUNNING, PCM_STATE_XRUN, PCM_STATE_DRAINING,
PCM_STATE_PAUSED, PCM_STATE_SUSPENDED, ALSAAudioError) PCM_STATE_PAUSED, PCM_STATE_SUSPENDED, ALSAAudioError)
from argparse import ArgumentParser from argparse import ArgumentParser
# wake up at least after *idle_timer_period* seconds
idle_timer_period = 0.25
# grace period for opening the device in seconds
open_grace_period = 0.5
# close the device after *idle_close_timeout* seconds silence
idle_close_timeout = 2.0
poll_names = { poll_names = {
select.POLLIN: 'POLLIN', select.POLLIN: 'POLLIN',
select.POLLPRI: 'POLLPRI', select.POLLPRI: 'POLLPRI',
@@ -34,6 +43,10 @@ state_names = {
PCM_STATE_SUSPENDED: 'PCM_STATE_SUSPENDED' PCM_STATE_SUSPENDED: 'PCM_STATE_SUSPENDED'
} }
type NamedProcess = tuple[str, subprocess.Popen]
cmd_process : NamedProcess = None
def poll_desc(mask): def poll_desc(mask):
return '|'.join([poll_names[bit] for bit, name in poll_names.items() if mask & bit]) return '|'.join([poll_names[bit] for bit, name in poll_names.items() if mask & bit])
@@ -57,14 +70,19 @@ class PollDescriptor(object):
return cls(name, fd, mask) return cls(name, fd, mask)
class LoopbackState(Enum):
LISTENING = auto()
PLAYING = auto()
DEVICE_BUSY = auto()
class Loopback(object): class Loopback(object):
'''Loopback state and event handling''' '''Loopback state and event handling'''
def __init__(self, capture, playback_args, volume_handler, run_after_stop=None, run_before_start=None): def __init__(self, capture, playback_args, volume_handler, run_after_stop=None, run_before_start=None):
self.playback_args = playback_args self.playback_args = playback_args
self.playback = None self.playback = None
self.volume_handler = volume_handler self.volume_handler = volume_handler
self.capture_started = None
self.last_capture_event = None self.last_capture_event = None
self.capture = capture self.capture = capture
@@ -77,13 +95,10 @@ class Loopback(object):
self.run_before_start = None self.run_before_start = None
if run_before_start: if run_before_start:
self.run_before_start = run_before_start.split(' ') self.run_before_start = run_before_start.split(' ')
self.run_after_stop_did_run = False
self.waitBeforeOpen = False self.state = None
self.queue = [] self.last_state_change = None
self.silence_start = None
self.period_size = 0
self.silent_periods = 0
@staticmethod @staticmethod
def compute_energy(data): def compute_energy(data):
@@ -97,43 +112,84 @@ class Loopback(object):
@staticmethod @staticmethod
def run_command(cmd): def run_command(cmd):
if cmd: if cmd:
rc = subprocess.run(cmd) global cmd_process
cmd_process = (cmd, subprocess.Popen(cmd))
@staticmethod
def check_command_idle_handler():
# an idle handler to watch the process created above
if cmd_process:
rc = cmd_process[1].poll()
if rc:
if rc.returncode: if rc.returncode:
logging.warning(f'run {cmd}, return code {rc.returncode}') logging.warning(f'run {cmd_process[0]}, return code {rc.returncode}')
else: else:
logging.info(f'run {cmd}, return code {rc.returncode}') logging.info(f'run {cmd_process[0]}, return code {rc.returncode}')
cmd_process = None
def register(self, reactor): def register(self, reactor):
reactor.register_timeout_handler(self.timeout_handler) reactor.register_idle_handler(self.check_command_idle_handler)
reactor.register_idle_handler(self.idle_handler)
reactor.register(self.capture_pd, self) reactor.register(self.capture_pd, self)
def start(self): def start(self):
assert self.state == None, "start must only be called once"
# start reading data # start reading data
size, data = self.capture.read() size, _ = self.capture.read()
if size: if size:
self.queue.append(data) logging.warning(f'initial data discarded ({size} bytes)')
def timeout_handler(self): self.state = LoopbackState.LISTENING
if self.playback and self.capture_started:
if self.last_capture_event: def set_state(self, new_state: LoopbackState) -> LoopbackState:
if datetime.now() - self.last_capture_event > timedelta(seconds=2): '''Implement the Loopback state as a state machine'''
logging.info('timeout - closing playback device')
if self.state == new_state:
return self.state
logging.info(f'{self.state} -> {new_state}')
self.last_state_change = datetime.now()
if new_state == LoopbackState.LISTENING:
if self.state == LoopbackState.PLAYING:
self.playback.close() self.playback.close()
self.playback = None self.playback = None
self.capture_started = None self.last_capture_event = None
if self.volume_handler: if self.volume_handler:
self.volume_handler.stop() self.volume_handler.stop()
self.run_command(self.run_after_stop) self.run_command(self.run_after_stop)
elif self.state == LoopbackState.DEVICE_BUSY:
pass
elif new_state == LoopbackState.PLAYING:
if self.state == LoopbackState.LISTENING:
try:
self.run_command(self.run_before_start)
self.playback = PCM(**self.playback_args)
period_size = self.playback.info()['period_size']
logging.info(f'opened playback device with period_size {period_size}')
if self.volume_handler:
self.volume_handler.start()
except ALSAAudioError as e:
logging.warning('opening PCM playback device failed: %s', e)
self.state = LoopbackState.DEVICE_BUSY
return self.state
elif self.state == LoopbackState.DEVICE_BUSY:
# Only try to reopen the device after the grace period
if datetime.now() - self.last_state_change > timedelta(seconds=open_grace_period):
return self.set_state(LoopbackState.PLAYING)
elif new_state == LoopbackState.DEVICE_BUSY:
logging.error(f'{new_state} is internal and cannot be set directly')
self.state = new_state
return self.state
def idle_handler(self):
if self.state == LoopbackState.PLAYING:
if datetime.now() - self.last_capture_event > timedelta(seconds=idle_close_timeout):
logging.info('timeout - closing playback device')
self.set_state(LoopbackState.LISTENING)
return return
self.waitBeforeOpen = False
if not self.run_after_stop_did_run and not self.playback:
if self.volume_handler:
self.volume_handler.stop()
self.run_command(self.run_after_stop)
self.run_after_stop_did_run = True
def pop(self): def pop(self):
if len(self.queue): if len(self.queue):
return self.queue.pop() return self.queue.pop()
@@ -141,6 +197,15 @@ class Loopback(object):
return None return None
def handle_capture_event(self, eventmask, name): def handle_capture_event(self, eventmask, name):
if eventmask & select.POLLERR == select.POLLERR:
# This is typically an underrun caused by the external command being run synchronously
# (on the same thread)
state = self.capture.state()
if state == PCM_STATE_XRUN:
self.capture.drop()
logging.warning(f'POLLERR for capture device: {state_names[state]}')
'''called when data is available for reading''' '''called when data is available for reading'''
self.last_capture_event = datetime.now() self.last_capture_event = datetime.now()
size, data = self.capture.read() size, data = self.capture.read()
@@ -148,65 +213,37 @@ class Loopback(object):
logging.warning(f'capture event but no data') logging.warning(f'capture event but no data')
return False return False
energy = self.compute_energy(data)
logging.debug(f'energy: {energy}')
# the usecase is a USB capture device where we get perfect silence when it's idle # the usecase is a USB capture device where we get perfect silence when it's idle
# compute the energy and go back to LISTENING if nothing is captured
energy = self.compute_energy(data)
if energy == 0: if energy == 0:
self.silent_periods = self.silent_periods + 1 if self.silence_start is None:
self.silence_start = datetime.now()
# turn off playback after two seconds of silence # turn off playback after idle_close_timeout when there was only silence
# 2 channels * 2 seconds * 2 bytes per sample if datetime.now() - self.silence_start > timedelta(seconds=idle_close_timeout):
fps = self.playback_args['rate'] * 8 // (self.playback_args['periodsize'] * self.playback_args['periods']) logging.debug('silence')
self.set_state(LoopbackState.LISTENING)
logging.debug(f'{self.silent_periods} of {fps} silent periods: {self.playback}') return False
if self.silent_periods > fps and self.playback:
logging.info(f'closing playback due to silence')
self.playback.close()
self.playback = None
if self.volume_handler:
self.volume_handler.stop()
self.run_command(self.run_after_stop)
self.run_after_stop_did_run = True
if not self.playback:
return
else: else:
self.silent_periods = 0 self.silence_start = None
if not self.playback: loop_state = self.set_state(LoopbackState.PLAYING)
if self.waitBeforeOpen: if loop_state != LoopbackState.PLAYING:
return False logging.warning(f'setting state PLAYING failed: {str(loop_state)}')
try:
if self.volume_handler:
self.volume_handler.start()
self.run_command(self.run_before_start)
self.playback = PCM(**self.playback_args)
self.period_size = self.playback.info()['period_size']
logging.info(f'opened playback device with period_size {self.period_size}')
except ALSAAudioError as e:
logging.info('opening PCM playback device failed: %s', e)
self.waitBeforeOpen = True
return False return False
self.capture_started = datetime.now()
logging.info(f'{self.playback} capture started: {self.capture_started}')
self.queue.append(data)
if len(self.queue) <= 2:
logging.info(f'buffering: {len(self.queue)}')
return False
try:
data = self.pop()
if data: if data:
space = self.playback.avail() space = self.playback.avail()
if space > 0:
written = self.playback.write(data) written = self.playback.write(data)
logging.debug(f'wrote {written} bytes while space was {space}') if written == -errno.EPIPE:
except ALSAAudioError: logging.warning('playback underrun')
logging.error('underrun', exc_info=1) self.playback.write(data)
silence = ''
if energy == 0:
silence = '(silence)'
logging.debug(f'wrote {written} bytes while space was {space} {silence}')
return True return True
@@ -239,11 +276,13 @@ class VolumeForwarder(object):
def start(self): def start(self):
self.active = True self.active = True
if self.volume: if self.volume:
logging.info(f'start volume is {self.volume}')
self.volume = playback_control.setvolume(self.volume) self.volume = playback_control.setvolume(self.volume)
def stop(self): def stop(self):
self.active = False self.active = False
self.volume = self.playback_control.getvolume(pcmtype=PCM_CAPTURE)[0] self.volume = self.playback_control.getvolume(pcmtype=PCM_CAPTURE)[0]
logging.info(f'stop volume is {self.volume}')
def __call__(self, fd, eventmask, name): def __call__(self, fd, eventmask, name):
if not self.active: if not self.active:
@@ -263,7 +302,7 @@ class Reactor(object):
def __init__(self): def __init__(self):
self.poll = select.poll() self.poll = select.poll()
self.descriptors = {} self.descriptors = {}
self.timeout_handlers = set() self.idle_handlers = set()
def register(self, polldescriptor, callable): def register(self, polldescriptor, callable):
logging.debug(f'registered {polldescriptor.name}: {poll_desc(polldescriptor.mask)}') logging.debug(f'registered {polldescriptor.name}: {poll_desc(polldescriptor.mask)}')
@@ -274,17 +313,17 @@ class Reactor(object):
self.poll.unregister(polldescriptor.fd) self.poll.unregister(polldescriptor.fd)
del self.descriptors[polldescriptor.fd] del self.descriptors[polldescriptor.fd]
def register_timeout_handler(self, callable): def register_idle_handler(self, callable):
self.timeout_handlers.add(callable) self.idle_handlers.add(callable)
def unregister_timeout_handler(self, callable): def unregister_idle_handler(self, callable):
self.timeout_handlers.remove(callable) self.idle_handlers.remove(callable)
def run(self): def run(self):
last_timeout_ev = datetime.now() last_timeout_ev = datetime.now()
while True: while True:
# poll for a bit, then send a timeout to registered handlers # poll for a bit, then send a timeout to registered handlers
events = self.poll.poll(0.25) events = self.poll.poll(idle_timer_period)
for fd, ev in events: for fd, ev in events:
polldescriptor, handler = self.descriptors[fd] polldescriptor, handler = self.descriptors[fd]
@@ -293,8 +332,8 @@ class Reactor(object):
handler(fd, ev, polldescriptor.name) handler(fd, ev, polldescriptor.name)
if datetime.now() - last_timeout_ev > timedelta(seconds=0.25): if datetime.now() - last_timeout_ev > timedelta(seconds=idle_timer_period):
for t in self.timeout_handlers: for t in self.idle_handlers:
t() t()
last_timeout_ev = datetime.now() last_timeout_ev = datetime.now()
@@ -344,6 +383,16 @@ if __name__ == '__main__':
'periods': args.periods 'periods': args.periods
} }
capture_args = {
'type': PCM_CAPTURE,
'mode': PCM_NONBLOCK,
'device': args.input,
'rate': args.rate,
'channels': args.channels,
'periodsize': args.periodsize,
'periods': args.periods
}
reactor = Reactor() reactor = Reactor()
# If args.input_mixer and args.output_mixer are set, forward the capture volume to the playback volume. # If args.input_mixer and args.output_mixer are set, forward the capture volume to the playback volume.
@@ -378,8 +427,7 @@ if __name__ == '__main__':
sys.exit(1) sys.exit(1)
if input_mixer_card is None: if input_mixer_card is None:
capture = PCM(type=PCM_CAPTURE, mode=PCM_NONBLOCK, device=args.input, rate=args.rate, capture = PCM(**capture_args)
channels=args.channels, periodsize=args.periodsize, periods=args.periods)
input_mixer_card = capture.info()['card_no'] input_mixer_card = capture.info()['card_no']
if output_mixer_card is None: if output_mixer_card is None:

View File

@@ -106,6 +106,9 @@ typedef struct {
snd_pcm_t *handle; snd_pcm_t *handle;
// Debug logging
int state;
// Configurable parameters // Configurable parameters
unsigned int channels; unsigned int channels;
unsigned int rate; unsigned int rate;
@@ -360,6 +363,40 @@ alsapcm_list(PyObject *self, PyObject *args, PyObject *kwds)
return result; return result;
} }
const char* state_name(int state) {
switch (state) {
case SND_PCM_STATE_OPEN:
return "SND_PCM_STATE_OPEN";
case SND_PCM_STATE_SETUP:
return "SND_PCM_STATE_SETUP";
case SND_PCM_STATE_PREPARED:
return "SND_PCM_STATE_PREPARED";
case SND_PCM_STATE_RUNNING:
return "SND_PCM_STATE_RUNNING";
case SND_PCM_STATE_XRUN:
return "SND_PCM_STATE_XRUN";
case SND_PCM_STATE_DISCONNECTED:
return "SND_PCM_STATE_DISCONNECTED";
case SND_PCM_STATE_DRAINING:
return "SND_PCM_STATE_DRAINING";
case SND_PCM_STATE_PAUSED:
return "SND_PCM_STATE_PAUSED";
case SND_PCM_STATE_SUSPENDED:
return "SND_PCM_STATE_SUSPENDED";
default:
return "invalid PCM state";
}
}
void print_state(alsapcm_t *self)
{
int state = snd_pcm_state(self->handle);
if (state != self->state) {
printf("[%s %s] %s->%s\n", self->cardname, self->pcmtype == SND_PCM_STREAM_CAPTURE ? "capture" : "playback", state_name(self->state), state_name(state));
self->state = state;
}
}
static int alsapcm_setup(alsapcm_t *self) static int alsapcm_setup(alsapcm_t *self)
{ {
int res,dir; int res,dir;
@@ -405,6 +442,8 @@ static int alsapcm_setup(alsapcm_t *self)
self->framesize = self->channels * snd_pcm_format_physical_width(self->format)/8; self->framesize = self->channels * snd_pcm_format_physical_width(self->format)/8;
self->state = snd_pcm_state(self->handle);
return res; return res;
} }
@@ -1347,7 +1386,6 @@ static PyObject *
alsapcm_read(alsapcm_t *self, PyObject *args) alsapcm_read(alsapcm_t *self, PyObject *args)
{ {
snd_pcm_state_t state; snd_pcm_state_t state;
int res;
int size = self->framesize * self->periodsize; int size = self->framesize * self->periodsize;
int sizeout = 0; int sizeout = 0;
PyObject *buffer_obj, *tuple_obj, *res_obj; PyObject *buffer_obj, *tuple_obj, *res_obj;
@@ -1380,11 +1418,19 @@ alsapcm_read(alsapcm_t *self, PyObject *args)
buffer = PyBytes_AS_STRING(buffer_obj); buffer = PyBytes_AS_STRING(buffer_obj);
#endif #endif
int res = 0;
print_state(self);
// After drop() and drain(), we need to prepare the stream again. // After drop() and drain(), we need to prepare the stream again.
// Note that fresh streams are already prepared by snd_pcm_hw_params(). // Note that fresh streams are already prepared by snd_pcm_hw_params().
state = snd_pcm_state(self->handle); state = snd_pcm_state(self->handle);
if ((state != SND_PCM_STATE_SETUP) || if (state == SND_PCM_STATE_SETUP) {
!(res = snd_pcm_prepare(self->handle))) { res = snd_pcm_prepare(self->handle);
printf("[%s] called snd_pcm_prepare: %d\n", self->cardname, res);
}
if (res == 0) {
Py_BEGIN_ALLOW_THREADS Py_BEGIN_ALLOW_THREADS
res = snd_pcm_readi(self->handle, buffer, self->periodsize); res = snd_pcm_readi(self->handle, buffer, self->periodsize);
@@ -1488,6 +1534,8 @@ static PyObject *alsapcm_write(alsapcm_t *self, PyObject *args)
return NULL; return NULL;
} }
print_state(self);
int res; int res;
// After drop() and drain(), we need to prepare the stream again. // After drop() and drain(), we need to prepare the stream again.
// Note that fresh streams are already prepared by snd_pcm_hw_params(). // Note that fresh streams are already prepared by snd_pcm_hw_params().
@@ -1577,6 +1625,9 @@ static PyObject *alsapcm_pause(alsapcm_t *self, PyObject *args)
return NULL; return NULL;
} }
print_state(self);
return PyLong_FromLong(res); return PyLong_FromLong(res);
} }
@@ -1599,6 +1650,8 @@ static PyObject *alsapcm_drop(alsapcm_t *self)
return NULL; return NULL;
} }
print_state(self);
return PyLong_FromLong(res); return PyLong_FromLong(res);
} }
@@ -1623,6 +1676,8 @@ static PyObject *alsapcm_drain(alsapcm_t *self)
return NULL; return NULL;
} }
print_state(self);
return PyLong_FromLong(res); return PyLong_FromLong(res);
} }