From 8f7e8fbdfe54f37ba55f792123e67cd40c596f69 Mon Sep 17 00:00:00 2001 From: Lars Immisch Date: Tue, 30 May 2023 14:02:36 +0100 Subject: [PATCH] Add a naive loopback implementation using select.poll() It does work, though. --- loopback.py | 105 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 105 insertions(+) create mode 100644 loopback.py diff --git a/loopback.py b/loopback.py new file mode 100644 index 0000000..0b592d2 --- /dev/null +++ b/loopback.py @@ -0,0 +1,105 @@ +#!/usr/bin/env python3 +# -*- mode: python; indent-tabs-mode: t; c-basic-offset: 4; tab-width: 4 -*- + +import sys +import select +import logging +from alsaaudio import PCM, pcms, PCM_PLAYBACK, PCM_CAPTURE, PCM_FORMAT_S16_LE, PCM_NONBLOCK +from argparse import ArgumentParser + +poll_names = { + select.POLLIN: 'POLLIN', + select.POLLPRI: 'POLLPRI', + select.POLLOUT: 'POLLOUT', + select.POLLERR: 'POLLERR', + select.POLLHUP: 'POLLHUP', + select.POLLRDHUP: 'POLLRDHUP', + select.POLLNVAL: 'POLLNVAL' +} + +def poll_desc(mask): + return '|'.join([poll_names[bit] for bit, name in poll_names.items() if mask & bit]) + +class Loopback(object): + + def __init__(self, playback, capture): + + self.playback = playback + self.playback_fd, self.playback_mask = playback.polldescriptors()[0] + + self.capture = capture + self.capture_fd, self.capture_mask = capture.polldescriptors()[0] + + self.poll = select.poll() + + self.poll.register(self.playback_fd, self.playback_mask) + logging.debug(f'registered playback: {poll_desc(self.playback_mask)}') + + self.poll.register(self.capture_fd, self.capture_mask) + logging.debug(f'registered capture: {poll_desc(self.capture_mask)}') + + def fd_desc(self, fd): + if fd == self.capture_fd: + return 'capture' + + if fd == self.playback_fd: + return 'playback' + + return 'unknown' + + def run(self): + + # start reading + self.capture.read() + + while True: + events = self.poll.poll() + for fd, ev in events: + logging.debug(f'{self.fd_desc(fd)}: {poll_desc(ev)} ({ev})') + + # This is very basic. We just write data as soon as we read it + # and don't care for errors or the playback device not being ready + if fd == self.capture_fd: + size, data = self.capture.read() + written = self.playback.write(data) + logging.debug(f'wrote {size}: {written}') + +if __name__ == '__main__': + + logging.basicConfig(format='%(asctime)s %(message)s', level=logging.INFO) + + parser = ArgumentParser(description='ALSA loopback') + + playback_pcms = pcms(pcmtype=PCM_PLAYBACK) + capture_pcms = pcms(pcmtype=PCM_CAPTURE) + + if not playback_pcms: + log.error('no playback PCM found') + sys.exit(2) + + if not capture_pcms: + log.error('no capture PCM found') + sys.exit(2) + + parser.add_argument('-d', '--debug', action='store_true') + parser.add_argument('-i', '--input', default=capture_pcms[0]) + parser.add_argument('-o', '--output', default=playback_pcms[0]) + parser.add_argument('-r', '--rate', type=int, default=48000) + parser.add_argument('-c', '--channels', type=int, default=2) + parser.add_argument('-p', '--periodsize', type=int, default=480) + parser.add_argument('-P', '--periods', type=int, default=4) + + args = parser.parse_args() + + if args.debug: + logging.getLogger().setLevel(logging.DEBUG) + + playback = PCM(type=PCM_PLAYBACK, mode=PCM_NONBLOCK, device=args.output, rate=args.rate, + channels=args.channels, periodsize=args.periodsize, periods=args.periods) + + capture = PCM(type=PCM_CAPTURE, mode=PCM_NONBLOCK, device=args.input, rate=args.rate, + channels=args.channels, periodsize=args.periodsize, periods=args.periods) + + loopback = Loopback(playback, capture) + + loopback.run()