7 Commits

Author SHA1 Message Date
Lars Immisch
a34bdd7a02 Debugging underruns, needs cleanup 2025-12-01 22:33:30 +00:00
Lars Immisch
5a4111dd8c Small cleanup 2025-12-01 22:33:30 +00:00
Lars Immisch
2a8f2a6699 Cleanup 2025-12-01 22:33:30 +00:00
Lars Immisch
a4168d7525 The capture device does not need to be reopened; better logging/comments 2025-12-01 22:33:30 +00:00
Lars Immisch
ac97e641bc Restarting the capture device looks ok 2025-12-01 22:33:30 +00:00
Lars Immisch
caafa8ae21 Make commands optional 2025-12-01 22:33:30 +00:00
Lars Immisch
2fae2c0ed7 Experment with state machine 2025-12-01 22:33:29 +00:00
2 changed files with 201 additions and 98 deletions

View File

@@ -1,5 +1,5 @@
#!/usr/bin/env python3
# -*- mode: python; indent-tabs-mode: t; c-basic-offset: 4; tab-width: 4 -*-
# -*- mode: python; indent-tabs-mode: t; c-basic-offset: 4; tab-width: 4; python-indent: 4 -*-
import sys
import select
@@ -7,12 +7,21 @@ import logging
import re
import struct
import subprocess
import errno
from enum import Enum, auto
from datetime import datetime, timedelta
from alsaaudio import (PCM, pcms, PCM_PLAYBACK, PCM_CAPTURE, PCM_NONBLOCK, Mixer,
PCM_STATE_OPEN, PCM_STATE_SETUP, PCM_STATE_PREPARED, PCM_STATE_RUNNING, PCM_STATE_XRUN, PCM_STATE_DRAINING,
PCM_STATE_PAUSED, PCM_STATE_SUSPENDED, ALSAAudioError)
from argparse import ArgumentParser
# wake up at least after *idle_timer_period* seconds
idle_timer_period = 0.25
# grace period for opening the device in seconds
open_grace_period = 0.5
# close the device after *idle_close_timeout* seconds silence
idle_close_timeout = 2.0
poll_names = {
select.POLLIN: 'POLLIN',
select.POLLPRI: 'POLLPRI',
@@ -34,6 +43,10 @@ state_names = {
PCM_STATE_SUSPENDED: 'PCM_STATE_SUSPENDED'
}
type NamedProcess = tuple[str, subprocess.Popen]
cmd_process : NamedProcess = None
def poll_desc(mask):
return '|'.join([poll_names[bit] for bit, name in poll_names.items() if mask & bit])
@@ -57,14 +70,19 @@ class PollDescriptor(object):
return cls(name, fd, mask)
class LoopbackState(Enum):
LISTENING = auto()
PLAYING = auto()
DEVICE_BUSY = auto()
class Loopback(object):
'''Loopback state and event handling'''
def __init__(self, capture, playback_args, volume_handler, run_after_stop=None, run_before_start=None):
self.playback_args = playback_args
self.playback = None
self.volume_handler = volume_handler
self.capture_started = None
self.last_capture_event = None
self.capture = capture
@@ -77,13 +95,10 @@ class Loopback(object):
self.run_before_start = None
if run_before_start:
self.run_before_start = run_before_start.split(' ')
self.run_after_stop_did_run = False
self.waitBeforeOpen = False
self.queue = []
self.period_size = 0
self.silent_periods = 0
self.state = None
self.last_state_change = None
self.silence_start = None
@staticmethod
def compute_energy(data):
@@ -97,43 +112,84 @@ class Loopback(object):
@staticmethod
def run_command(cmd):
if cmd:
rc = subprocess.run(cmd)
if rc.returncode:
logging.warning(f'run {cmd}, return code {rc.returncode}')
else:
logging.info(f'run {cmd}, return code {rc.returncode}')
global cmd_process
cmd_process = (cmd, subprocess.Popen(cmd))
@staticmethod
def check_command_idle_handler():
# an idle handler to watch the process created above
if cmd_process:
rc = cmd_process[1].poll()
if rc:
if rc.returncode:
logging.warning(f'run {cmd_process[0]}, return code {rc.returncode}')
else:
logging.info(f'run {cmd_process[0]}, return code {rc.returncode}')
cmd_process = None
def register(self, reactor):
reactor.register_timeout_handler(self.timeout_handler)
reactor.register_idle_handler(self.check_command_idle_handler)
reactor.register_idle_handler(self.idle_handler)
reactor.register(self.capture_pd, self)
def start(self):
assert self.state == None, "start must only be called once"
# start reading data
size, data = self.capture.read()
size, _ = self.capture.read()
if size:
self.queue.append(data)
logging.warning(f'initial data discarded ({size} bytes)')
def timeout_handler(self):
if self.playback and self.capture_started:
if self.last_capture_event:
if datetime.now() - self.last_capture_event > timedelta(seconds=2):
logging.info('timeout - closing playback device')
self.playback.close()
self.playback = None
self.capture_started = None
self.state = LoopbackState.LISTENING
def set_state(self, new_state: LoopbackState) -> LoopbackState:
'''Implement the Loopback state as a state machine'''
if self.state == new_state:
return self.state
logging.info(f'{self.state} -> {new_state}')
self.last_state_change = datetime.now()
if new_state == LoopbackState.LISTENING:
if self.state == LoopbackState.PLAYING:
self.playback.close()
self.playback = None
self.last_capture_event = None
if self.volume_handler:
self.volume_handler.stop()
self.run_command(self.run_after_stop)
elif self.state == LoopbackState.DEVICE_BUSY:
pass
elif new_state == LoopbackState.PLAYING:
if self.state == LoopbackState.LISTENING:
try:
self.run_command(self.run_before_start)
self.playback = PCM(**self.playback_args)
period_size = self.playback.info()['period_size']
logging.info(f'opened playback device with period_size {period_size}')
if self.volume_handler:
self.volume_handler.stop()
self.run_command(self.run_after_stop)
self.volume_handler.start()
except ALSAAudioError as e:
logging.warning('opening PCM playback device failed: %s', e)
self.state = LoopbackState.DEVICE_BUSY
return self.state
elif self.state == LoopbackState.DEVICE_BUSY:
# Only try to reopen the device after the grace period
if datetime.now() - self.last_state_change > timedelta(seconds=open_grace_period):
return self.set_state(LoopbackState.PLAYING)
elif new_state == LoopbackState.DEVICE_BUSY:
logging.error(f'{new_state} is internal and cannot be set directly')
self.state = new_state
return self.state
def idle_handler(self):
if self.state == LoopbackState.PLAYING:
if datetime.now() - self.last_capture_event > timedelta(seconds=idle_close_timeout):
logging.info('timeout - closing playback device')
self.set_state(LoopbackState.LISTENING)
return
self.waitBeforeOpen = False
if not self.run_after_stop_did_run and not self.playback:
if self.volume_handler:
self.volume_handler.stop()
self.run_command(self.run_after_stop)
self.run_after_stop_did_run = True
def pop(self):
if len(self.queue):
return self.queue.pop()
@@ -141,6 +197,15 @@ class Loopback(object):
return None
def handle_capture_event(self, eventmask, name):
if eventmask & select.POLLERR == select.POLLERR:
# This is typically an underrun caused by the external command being run synchronously
# (on the same thread)
state = self.capture.state()
if state == PCM_STATE_XRUN:
self.capture.drop()
logging.warning(f'POLLERR for capture device: {state_names[state]}')
'''called when data is available for reading'''
self.last_capture_event = datetime.now()
size, data = self.capture.read()
@@ -148,65 +213,37 @@ class Loopback(object):
logging.warning(f'capture event but no data')
return False
energy = self.compute_energy(data)
logging.debug(f'energy: {energy}')
# the usecase is a USB capture device where we get perfect silence when it's idle
# compute the energy and go back to LISTENING if nothing is captured
energy = self.compute_energy(data)
if energy == 0:
self.silent_periods = self.silent_periods + 1
if self.silence_start is None:
self.silence_start = datetime.now()
# turn off playback after two seconds of silence
# 2 channels * 2 seconds * 2 bytes per sample
fps = self.playback_args['rate'] * 8 // (self.playback_args['periodsize'] * self.playback_args['periods'])
logging.debug(f'{self.silent_periods} of {fps} silent periods: {self.playback}')
if self.silent_periods > fps and self.playback:
logging.info(f'closing playback due to silence')
self.playback.close()
self.playback = None
if self.volume_handler:
self.volume_handler.stop()
self.run_command(self.run_after_stop)
self.run_after_stop_did_run = True
if not self.playback:
return
# turn off playback after idle_close_timeout when there was only silence
if datetime.now() - self.silence_start > timedelta(seconds=idle_close_timeout):
logging.debug('silence')
self.set_state(LoopbackState.LISTENING)
return False
else:
self.silent_periods = 0
self.silence_start = None
if not self.playback:
if self.waitBeforeOpen:
return False
try:
if self.volume_handler:
self.volume_handler.start()
self.run_command(self.run_before_start)
self.playback = PCM(**self.playback_args)
self.period_size = self.playback.info()['period_size']
logging.info(f'opened playback device with period_size {self.period_size}')
except ALSAAudioError as e:
logging.info('opening PCM playback device failed: %s', e)
self.waitBeforeOpen = True
return False
self.capture_started = datetime.now()
logging.info(f'{self.playback} capture started: {self.capture_started}')
self.queue.append(data)
if len(self.queue) <= 2:
logging.info(f'buffering: {len(self.queue)}')
loop_state = self.set_state(LoopbackState.PLAYING)
if loop_state != LoopbackState.PLAYING:
logging.warning(f'setting state PLAYING failed: {str(loop_state)}')
return False
try:
data = self.pop()
if data:
space = self.playback.avail()
if data:
space = self.playback.avail()
if space > 0:
written = self.playback.write(data)
logging.debug(f'wrote {written} bytes while space was {space}')
except ALSAAudioError:
logging.error('underrun', exc_info=1)
if written == -errno.EPIPE:
logging.warning('playback underrun')
self.playback.write(data)
silence = ''
if energy == 0:
silence = '(silence)'
logging.debug(f'wrote {written} bytes while space was {space} {silence}')
return True
@@ -239,11 +276,13 @@ class VolumeForwarder(object):
def start(self):
self.active = True
if self.volume:
logging.info(f'start volume is {self.volume}')
self.volume = playback_control.setvolume(self.volume)
def stop(self):
self.active = False
self.volume = self.playback_control.getvolume(pcmtype=PCM_CAPTURE)[0]
logging.info(f'stop volume is {self.volume}')
def __call__(self, fd, eventmask, name):
if not self.active:
@@ -263,7 +302,7 @@ class Reactor(object):
def __init__(self):
self.poll = select.poll()
self.descriptors = {}
self.timeout_handlers = set()
self.idle_handlers = set()
def register(self, polldescriptor, callable):
logging.debug(f'registered {polldescriptor.name}: {poll_desc(polldescriptor.mask)}')
@@ -274,17 +313,17 @@ class Reactor(object):
self.poll.unregister(polldescriptor.fd)
del self.descriptors[polldescriptor.fd]
def register_timeout_handler(self, callable):
self.timeout_handlers.add(callable)
def register_idle_handler(self, callable):
self.idle_handlers.add(callable)
def unregister_timeout_handler(self, callable):
self.timeout_handlers.remove(callable)
def unregister_idle_handler(self, callable):
self.idle_handlers.remove(callable)
def run(self):
last_timeout_ev = datetime.now()
while True:
# poll for a bit, then send a timeout to registered handlers
events = self.poll.poll(0.25)
events = self.poll.poll(idle_timer_period)
for fd, ev in events:
polldescriptor, handler = self.descriptors[fd]
@@ -293,8 +332,8 @@ class Reactor(object):
handler(fd, ev, polldescriptor.name)
if datetime.now() - last_timeout_ev > timedelta(seconds=0.25):
for t in self.timeout_handlers:
if datetime.now() - last_timeout_ev > timedelta(seconds=idle_timer_period):
for t in self.idle_handlers:
t()
last_timeout_ev = datetime.now()
@@ -344,6 +383,16 @@ if __name__ == '__main__':
'periods': args.periods
}
capture_args = {
'type': PCM_CAPTURE,
'mode': PCM_NONBLOCK,
'device': args.input,
'rate': args.rate,
'channels': args.channels,
'periodsize': args.periodsize,
'periods': args.periods
}
reactor = Reactor()
# If args.input_mixer and args.output_mixer are set, forward the capture volume to the playback volume.
@@ -378,8 +427,7 @@ if __name__ == '__main__':
sys.exit(1)
if input_mixer_card is None:
capture = PCM(type=PCM_CAPTURE, mode=PCM_NONBLOCK, device=args.input, rate=args.rate,
channels=args.channels, periodsize=args.periodsize, periods=args.periods)
capture = PCM(**capture_args)
input_mixer_card = capture.info()['card_no']
if output_mixer_card is None:

View File

@@ -106,6 +106,9 @@ typedef struct {
snd_pcm_t *handle;
// Debug logging
int state;
// Configurable parameters
unsigned int channels;
unsigned int rate;
@@ -360,6 +363,40 @@ alsapcm_list(PyObject *self, PyObject *args, PyObject *kwds)
return result;
}
const char* state_name(int state) {
switch (state) {
case SND_PCM_STATE_OPEN:
return "SND_PCM_STATE_OPEN";
case SND_PCM_STATE_SETUP:
return "SND_PCM_STATE_SETUP";
case SND_PCM_STATE_PREPARED:
return "SND_PCM_STATE_PREPARED";
case SND_PCM_STATE_RUNNING:
return "SND_PCM_STATE_RUNNING";
case SND_PCM_STATE_XRUN:
return "SND_PCM_STATE_XRUN";
case SND_PCM_STATE_DISCONNECTED:
return "SND_PCM_STATE_DISCONNECTED";
case SND_PCM_STATE_DRAINING:
return "SND_PCM_STATE_DRAINING";
case SND_PCM_STATE_PAUSED:
return "SND_PCM_STATE_PAUSED";
case SND_PCM_STATE_SUSPENDED:
return "SND_PCM_STATE_SUSPENDED";
default:
return "invalid PCM state";
}
}
void print_state(alsapcm_t *self)
{
int state = snd_pcm_state(self->handle);
if (state != self->state) {
printf("[%s %s] %s->%s\n", self->cardname, self->pcmtype == SND_PCM_STREAM_CAPTURE ? "capture" : "playback", state_name(self->state), state_name(state));
self->state = state;
}
}
static int alsapcm_setup(alsapcm_t *self)
{
int res,dir;
@@ -405,6 +442,8 @@ static int alsapcm_setup(alsapcm_t *self)
self->framesize = self->channels * snd_pcm_format_physical_width(self->format)/8;
self->state = snd_pcm_state(self->handle);
return res;
}
@@ -1347,7 +1386,6 @@ static PyObject *
alsapcm_read(alsapcm_t *self, PyObject *args)
{
snd_pcm_state_t state;
int res;
int size = self->framesize * self->periodsize;
int sizeout = 0;
PyObject *buffer_obj, *tuple_obj, *res_obj;
@@ -1380,11 +1418,19 @@ alsapcm_read(alsapcm_t *self, PyObject *args)
buffer = PyBytes_AS_STRING(buffer_obj);
#endif
int res = 0;
print_state(self);
// After drop() and drain(), we need to prepare the stream again.
// Note that fresh streams are already prepared by snd_pcm_hw_params().
state = snd_pcm_state(self->handle);
if ((state != SND_PCM_STATE_SETUP) ||
!(res = snd_pcm_prepare(self->handle))) {
if (state == SND_PCM_STATE_SETUP) {
res = snd_pcm_prepare(self->handle);
printf("[%s] called snd_pcm_prepare: %d\n", self->cardname, res);
}
if (res == 0) {
Py_BEGIN_ALLOW_THREADS
res = snd_pcm_readi(self->handle, buffer, self->periodsize);
@@ -1488,6 +1534,8 @@ static PyObject *alsapcm_write(alsapcm_t *self, PyObject *args)
return NULL;
}
print_state(self);
int res;
// After drop() and drain(), we need to prepare the stream again.
// Note that fresh streams are already prepared by snd_pcm_hw_params().
@@ -1577,6 +1625,9 @@ static PyObject *alsapcm_pause(alsapcm_t *self, PyObject *args)
return NULL;
}
print_state(self);
return PyLong_FromLong(res);
}
@@ -1599,6 +1650,8 @@ static PyObject *alsapcm_drop(alsapcm_t *self)
return NULL;
}
print_state(self);
return PyLong_FromLong(res);
}
@@ -1623,6 +1676,8 @@ static PyObject *alsapcm_drain(alsapcm_t *self)
return NULL;
}
print_state(self);
return PyLong_FromLong(res);
}