From 42ca8acbad9ca7bf5552a5e6b1ef34d8b8b4a1ee Mon Sep 17 00:00:00 2001 From: Lars Immisch Date: Tue, 12 Sep 2023 21:04:02 +0200 Subject: [PATCH] Add a (naive) loopback implementation (#132) * WIP * Open/close the playback device when idle. It takes a long time until it's stopped, though. * open/close logic of playback device * Fix opening logic, make period size divisible by 6 * Be less verbose in level info * Extra argument for output mixer card index Sometimes, this cannot be deduced from the output device * Better silence detection * Run run_after_stop when idle on startup --- loopback.py | 292 ++++++++++++++++++++++++++++++++++++++++++---------- test.py | 27 +++++ 2 files changed, 266 insertions(+), 53 deletions(-) diff --git a/loopback.py b/loopback.py index a4a0cef..0efafa3 100644 --- a/loopback.py +++ b/loopback.py @@ -4,8 +4,13 @@ import sys import select import logging -from collections import namedtuple -from alsaaudio import PCM, pcms, PCM_PLAYBACK, PCM_CAPTURE, PCM_FORMAT_S16_LE, PCM_NONBLOCK, Mixer +import re +import struct +import subprocess +from datetime import datetime, timedelta +from alsaaudio import (PCM, pcms, PCM_PLAYBACK, PCM_CAPTURE, PCM_FORMAT_S16_LE, PCM_NONBLOCK, Mixer, + PCM_STATE_OPEN, PCM_STATE_SETUP, PCM_STATE_PREPARED, PCM_STATE_RUNNING, PCM_STATE_XRUN, PCM_STATE_DRAINING, + PCM_STATE_PAUSED, PCM_STATE_SUSPENDED, ALSAAudioError) from argparse import ArgumentParser poll_names = { @@ -18,6 +23,17 @@ poll_names = { select.POLLNVAL: 'POLLNVAL' } +state_names = { + PCM_STATE_OPEN: 'PCM_STATE_OPEN', + PCM_STATE_SETUP: 'PCM_STATE_SETUP', + PCM_STATE_PREPARED: 'PCM_STATE_PREPARED', + PCM_STATE_RUNNING: 'PCM_STATE_RUNNING', + PCM_STATE_XRUN: 'PCM_STATE_XRUN', + PCM_STATE_DRAINING: 'PCM_STATE_DRAINING', + PCM_STATE_PAUSED: 'PCM_STATE_PAUSED', + PCM_STATE_SUSPENDED: 'PCM_STATE_SUSPENDED' +} + def poll_desc(mask): return '|'.join([poll_names[bit] for bit, name in poll_names.items() if mask & bit]) @@ -28,8 +44,12 @@ class PollDescriptor(object): self.fd = fd self.mask = mask + def as_tuple(self): + return (self.fd, self.mask) + @classmethod - def fromAlsaObject(cls, name, alsaobject, mask=None): + def from_alsa_object(cls, name, alsaobject, mask=None): + # TODO maybe refactor: we ignore objects that have more then one polldescriptor fd, alsamask = alsaobject.polldescriptors()[0] if mask is None: @@ -40,42 +60,158 @@ class PollDescriptor(object): class Loopback(object): '''Loopback state and event handling''' - def __init__(self, capture, playback): - self.playback = playback - self.playback_pd = PollDescriptor.fromAlsaObject('playback', playback) + def __init__(self, capture, playback_args, run_after_stop=None, run_before_start=None): + self.playback_args = playback_args + self.playback = None + self.capture_started = None + self.last_capture_event = None self.capture = capture - self.capture_pd = PollDescriptor.fromAlsaObject('capture', capture) + self.capture_pd = PollDescriptor.from_alsa_object('capture', capture) + + self.run_after_stop = run_after_stop.split(' ') + self.run_before_start = run_before_start.split(' ') + self.run_after_stop_did_run = False + + self.waitBeforeOpen = False + self.queue = [] + + self.period_size = 0 + self.silent_periods = 0 + + @staticmethod + def compute_energy(data): + values = struct.unpack(f'{len(data)//2}h', data) + e = 0 + for v in values: + e = e + v * v + + return e + + @staticmethod + def run_command(cmd): + if cmd: + rc = subprocess.run(cmd) + if rc.returncode: + logging.warning(f'run {cmd}, return code {rc.returncode}') + else: + logging.info(f'run {cmd}, return code {rc.returncode}') def register(self, reactor): + reactor.register_timeout_handler(self.timeout_handler) reactor.register(self.capture_pd, self) - reactor.register(self.playback_pd, self) def start(self): # start reading data - self.capture.read() - - def handle_playback_event(self, eventmask, name): - pass - - def handle_capture_event(self, eventmask, name): size, data = self.capture.read() - if not size: - logging.warning(f'underrun') + if size: + self.queue.append(data) + + def timeout_handler(self): + if self.playback and self.capture_started: + if self.last_capture_event: + if datetime.now() - self.last_capture_event > timedelta(seconds=2): + logging.info('timeout - closing playback device') + self.playback.close() + self.playback = None + self.capture_started = None + self.run_command(self.run_after_stop) return - written = self.playback.write(data) - if not written: - logging.warning('overrun') + self.waitBeforeOpen = False + + if not self.run_after_stop_did_run and not self.playback: + self.run_command(self.run_after_stop) + self.run_after_stop_did_run = True + + def pop(self): + if len(self.queue): + return self.queue.pop() else: - logging.debug(f'wrote {size}: {written}') + return None + + def handle_capture_event(self, eventmask, name): + '''called when data is available for reading''' + self.last_capture_event = datetime.now() + size, data = self.capture.read() + if not size: + logging.warning(f'capture event but no data') + return False + + energy = self.compute_energy(data) + logging.debug(f'energy: {energy}') + + # the usecase is a USB capture device where we get perfect silence when it's idle + if energy == 0: + self.silent_periods = self.silent_periods + 1 + + # turn off playback after two seconds of silence + # 2 channels * 2 seconds * 2 bytes per sample + fps = self.playback_args['rate'] * 8 // (self.playback_args['periodsize'] * self.playback_args['periods']) + + logging.debug(f'{self.silent_periods} of {fps} silent periods: {self.playback}') + + if self.silent_periods > fps and self.playback: + logging.info(f'closing playback due to silence') + self.playback.close() + self.playback = None + self.run_command(self.run_after_stop) + self.run_after_stop_did_run = True + + if not self.playback: + return + else: + self.silent_periods = 0 + + if not self.playback: + if self.waitBeforeOpen: + return False + try: + self.run_command(self.run_before_start) + self.playback = PCM(**self.playback_args) + self.period_size = self.playback.info()['period_size'] + logging.info(f'opened playback device with period_size {self.period_size}') + except ALSAAudioError as e: + logging.info('opening PCM playback device failed: %s', e) + self.waitBeforeOpen = True + return False + + self.capture_started = datetime.now() + logging.info(f'{self.playback} capture started: {self.capture_started}') + + self.queue.append(data) + + if len(self.queue) <= 2: + logging.info(f'buffering: {len(self.queue)}') + return False + + try: + data = self.pop() + if data: + space = self.playback.avail() + written = self.playback.write(data) + logging.debug(f'wrote {written} bytes while space was {space}') + except ALSAAudioError: + logging.error('underrun', exc_info=1) + + return True def __call__(self, fd, eventmask, name): - if fd == self.capture_pd.fd: - self.handle_capture_event(eventmask, name) - else: - self.handle_playback_event(eventmask, name) + if fd == self.capture_pd.fd: + real_mask = self.capture.polldescriptors_revents([self.capture_pd.as_tuple()]) + if real_mask: + return self.handle_capture_event(real_mask, name) + else: + logging.debug('null capture event') + return False + else: + real_mask = self.playback.polldescriptors_revents([self.playback_pd.as_tuple()]) + if real_mask: + return self.handle_playback_event(real_mask, name) + else: + logging.debug('null playback event') + return False class VolumeForwarder(object): '''Volume control event handling''' @@ -99,35 +235,45 @@ class Reactor(object): def __init__(self): self.poll = select.poll() self.descriptors = {} + self.timeout_handlers = set() def register(self, polldescriptor, callable): logging.debug(f'registered {polldescriptor.name}: {poll_desc(polldescriptor.mask)}') self.descriptors[polldescriptor.fd] = (polldescriptor, callable) - self.poll.register(polldescriptor.fd, polldescriptor.mask) def unregister(self, polldescriptor): self.poll.unregister(polldescriptor.fd) del self.descriptors[polldescriptor.fd] + def register_timeout_handler(self, callable): + self.timeout_handlers.add(callable) + + def unregister_timeout_handler(self, callable): + self.timeout_handlers.remove(callable) + def run(self): + last_timeout_ev = datetime.now() while True: - events = self.poll.poll() + # poll for a bit, then send a timeout to registered handlers + events = self.poll.poll(0.25) for fd, ev in events: polldescriptor, handler = self.descriptors[fd] - # warn about unexpected/unhandled events - if ev & (select.POLLERR | select.POLLHUP | select.POLLNVAL | select.POLLRDHUP): - logging.warning(f'{polldescriptor.name}: {poll_desc(ev)} ({ev})') - else: - logging.debug(f'{polldescriptor.name}: {poll_desc(ev)} ({ev})') + # very chatty - log all events + # logging.debug(f'{polldescriptor.name}: {poll_desc(ev)} ({ev})') handler(fd, ev, polldescriptor.name) + if datetime.now() - last_timeout_ev > timedelta(seconds=0.25): + for t in self.timeout_handlers: + t() + last_timeout_ev = datetime.now() + if __name__ == '__main__': - logging.basicConfig(format='%(asctime)s %(message)s', level=logging.INFO) + logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) parser = ArgumentParser(description='ALSA loopback (with volume forwarding)') @@ -135,51 +281,91 @@ if __name__ == '__main__': capture_pcms = pcms(pcmtype=PCM_CAPTURE) if not playback_pcms: - log.error('no playback PCM found') + logging.error('no playback PCM found') sys.exit(2) if not capture_pcms: - log.error('no capture PCM found') + logging.error('no capture PCM found') sys.exit(2) parser.add_argument('-d', '--debug', action='store_true') parser.add_argument('-i', '--input', default=capture_pcms[0]) parser.add_argument('-o', '--output', default=playback_pcms[0]) - parser.add_argument('-r', '--rate', type=int, default=48000) + parser.add_argument('-r', '--rate', type=int, default=44100) parser.add_argument('-c', '--channels', type=int, default=2) - parser.add_argument('-p', '--periodsize', type=int, default=480) - parser.add_argument('-P', '--periods', type=int, default=4) - parser.add_argument('-I', '--input-mixer', help='Control of the input mixer') - parser.add_argument('-O', '--output-mixer', help='control of the output mixer') + parser.add_argument('-p', '--periodsize', type=int, default=444) # must be divisible by 6 for 44k1 + parser.add_argument('-P', '--periods', type=int, default=2) + parser.add_argument('-I', '--input-mixer', help='Control of the input mixer, can contain the card index, e.g. Digital:2') + parser.add_argument('-O', '--output-mixer', help='Control of the output mixer, can contain the card index, e.g. PCM:1') + parser.add_argument('-A', '--run-after-stop', help='command to run when the capture device is idle/silent') + parser.add_argument('-B', '--run-before-start', help='command to run when the capture device becomes active') args = parser.parse_args() if args.debug: logging.getLogger().setLevel(logging.DEBUG) - playback = PCM(type=PCM_PLAYBACK, mode=PCM_NONBLOCK, device=args.output, rate=args.rate, - channels=args.channels, periodsize=args.periodsize, periods=args.periods) - - capture = PCM(type=PCM_CAPTURE, mode=PCM_NONBLOCK, device=args.input, rate=args.rate, - channels=args.channels, periodsize=args.periodsize, periods=args.periods) - - loopback = Loopback(capture, playback) + playback_args = { + 'type': PCM_PLAYBACK, + 'mode': PCM_NONBLOCK, + 'device': args.output, + 'rate': args.rate, + 'channels': args.channels, + 'periodsize': args.periodsize, + 'periods': args.periods + } reactor = Reactor() # If args.input_mixer and args.output_mixer are set, forward the capture volume to the playback volume. # The usecase is a capture device that is implemented using g_audio, i.e. the Linux USB gadget driver. - # When a USB device (eg. an iPad) is connected to this machine, its volume events will to the volume control + # When a USB device (eg. an iPad) is connected to this machine, its volume events will go to the volume control # of the output device + + capture = None + playback = None if args.input_mixer and args.output_mixer: - playback_control = Mixer(control=args.output_mixer, cardindex=playback.info()['card_no']) - capture_control = Mixer(control=args.input_mixer, cardindex=capture.info()['card_no']) + re_mixer = re.compile(r'([a-zA-Z0-9]+):?([0-9+])?') + + input_mixer_card = None + m = re_mixer.match(args.input_mixer) + if m: + input_mixer = m.group(1) + if m.group(2): + input_mixer_card = int(m.group(2)) + else: + parser.print_usage() + sys.exit(1) + + output_mixer_card = None + m = re_mixer.match(args.output_mixer) + if m: + output_mixer = m.group(1) + if m.group(2): + output_mixer_card = int(m.group(2)) + else: + parser.print_usage() + sys.exit(1) + + if input_mixer_card is None: + capture = PCM(type=PCM_CAPTURE, mode=PCM_NONBLOCK, device=args.input, rate=args.rate, + channels=args.channels, periodsize=args.periodsize, periods=args.periods) + input_mixer_card = capture.info()['card_no'] + + if output_mixer_card is None: + playback = PCM(**playback_args) + output_mixer_card = playback.info()['card_no'] + playback.close() + + playback_control = Mixer(control=output_mixer, cardindex=int(output_mixer_card)) + capture_control = Mixer(control=input_mixer, cardindex=int(input_mixer_card)) volume_handler = VolumeForwarder(capture_control, playback_control) - reactor.register(PollDescriptor.fromAlsaObject('capture_control', capture_control, select.POLLIN), volume_handler) + reactor.register(PollDescriptor.from_alsa_object('capture_control', capture_control, select.POLLIN), volume_handler) - loopback.register(reactor) - - loopback.start() + if capture and playback: + loopback = Loopback(capture, playback_args, args.run_after_stop, args.run_before_start) + loopback.register(reactor) + loopback.start() reactor.run() diff --git a/test.py b/test.py index 1a6858a..0251e7b 100755 --- a/test.py +++ b/test.py @@ -11,6 +11,7 @@ import unittest import alsaaudio import warnings +from contextlib import closing # we can't test read and write well - these are tested otherwise PCMMethods = [ @@ -156,5 +157,31 @@ class PCMTest(unittest.TestCase): self.assertEqual(len(w), 1, method + " expected a warning") self.assertTrue(issubclass(w[-1].category, DeprecationWarning), method + " expected a DeprecationWarning") +class PollDescriptorArgsTest(unittest.TestCase): + '''Test invalid args for polldescriptors_revents (takes a list of tuples of 2 integers)''' + def testArgsNoList(self): + with closing(alsaaudio.PCM()) as pcm: + with self.assertRaises(TypeError): + pcm.polldescriptors_revents('foo') + + def testArgsListButNoTuples(self): + with closing(alsaaudio.PCM()) as pcm: + with self.assertRaises(TypeError): + pcm.polldescriptors_revents(['foo', 1]) + + def testArgsListButInvalidTuples(self): + with closing(alsaaudio.PCM()) as pcm: + with self.assertRaises(TypeError): + pcm.polldescriptors_revents([('foo', 'bar')]) + + def testArgsListTupleWrongLength(self): + with closing(alsaaudio.PCM()) as pcm: + with self.assertRaises(TypeError): + pcm.polldescriptors_revents([(1, )]) + + with self.assertRaises(TypeError): + pcm.polldescriptors_revents([(1, 2, 3)]) + + if __name__ == '__main__': unittest.main()