Files
pyalsaaudio/loopback.py
2023-06-01 15:29:45 +01:00

186 lines
5.7 KiB
Python

#!/usr/bin/env python3
# -*- mode: python; indent-tabs-mode: t; c-basic-offset: 4; tab-width: 4 -*-
import sys
import select
import logging
from collections import namedtuple
from alsaaudio import PCM, pcms, PCM_PLAYBACK, PCM_CAPTURE, PCM_FORMAT_S16_LE, PCM_NONBLOCK, Mixer
from argparse import ArgumentParser
poll_names = {
select.POLLIN: 'POLLIN',
select.POLLPRI: 'POLLPRI',
select.POLLOUT: 'POLLOUT',
select.POLLERR: 'POLLERR',
select.POLLHUP: 'POLLHUP',
select.POLLRDHUP: 'POLLRDHUP',
select.POLLNVAL: 'POLLNVAL'
}
def poll_desc(mask):
return '|'.join([poll_names[bit] for bit, name in poll_names.items() if mask & bit])
class PollDescriptor(object):
'''File Descriptor, event mask and a name for logging'''
def __init__(self, name, fd, mask):
self.name = name
self.fd = fd
self.mask = mask
@classmethod
def fromAlsaObject(cls, name, alsaobject, mask=None):
fd, alsamask = alsaobject.polldescriptors()[0]
if mask is None:
mask = alsamask
return cls(name, fd, mask)
class Loopback(object):
'''Loopback state and event handling'''
def __init__(self, capture, playback):
self.playback = playback
self.playback_pd = PollDescriptor.fromAlsaObject('playback', playback)
self.capture = capture
self.capture_pd = PollDescriptor.fromAlsaObject('capture', capture)
def register(self, reactor):
reactor.register(self.capture_pd, self)
reactor.register(self.playback_pd, self)
def start(self):
# start reading data
self.capture.read()
def handle_playback_event(self, eventmask, name):
pass
def handle_capture_event(self, eventmask, name):
size, data = self.capture.read()
if not size:
logging.warning(f'underrun')
return
written = self.playback.write(data)
if not written:
logging.warning('overrun')
else:
logging.debug(f'wrote {size}: {written}')
def __call__(self, fd, eventmask, name):
if fd == self.capture_pd.fd:
self.handle_capture_event(eventmask, name)
else:
self.handle_playback_event(eventmask, name)
class VolumeForwarder(object):
'''Volume control event handling'''
def __init__(self, capture_control, playback_control):
self.playback_control = playback_control
self.capture_control = capture_control
def __call__(self, fd, eventmask, name):
volume = self.capture_control.getvolume(pcmtype=PCM_CAPTURE)
# indicate that we've handled the event
self.capture_control.handleevents()
logging.info(f'{name} adjusting volume to {volume}')
if volume:
self.playback_control.setvolume(volume[0])
class Reactor(object):
'''A wrapper around select.poll'''
def __init__(self):
self.poll = select.poll()
self.descriptors = {}
def register(self, polldescriptor, callable):
logging.debug(f'registered {polldescriptor.name}: {poll_desc(polldescriptor.mask)}')
self.descriptors[polldescriptor.fd] = (polldescriptor, callable)
self.poll.register(polldescriptor.fd, polldescriptor.mask)
def unregister(self, polldescriptor):
self.poll.unregister(polldescriptor.fd)
del self.descriptors[polldescriptor.fd]
def run(self):
while True:
events = self.poll.poll()
for fd, ev in events:
polldescriptor, handler = self.descriptors[fd]
# warn about unexpected/unhandled events
if ev & (select.POLLERR | select.POLLHUP | select.POLLNVAL | select.POLLRDHUP):
logging.warning(f'{polldescriptor.name}: {poll_desc(ev)} ({ev})')
else:
logging.debug(f'{polldescriptor.name}: {poll_desc(ev)} ({ev})')
handler(fd, ev, polldescriptor.name)
if __name__ == '__main__':
logging.basicConfig(format='%(asctime)s %(message)s', level=logging.INFO)
parser = ArgumentParser(description='ALSA loopback (with volume forwarding)')
playback_pcms = pcms(pcmtype=PCM_PLAYBACK)
capture_pcms = pcms(pcmtype=PCM_CAPTURE)
if not playback_pcms:
log.error('no playback PCM found')
sys.exit(2)
if not capture_pcms:
log.error('no capture PCM found')
sys.exit(2)
parser.add_argument('-d', '--debug', action='store_true')
parser.add_argument('-i', '--input', default=capture_pcms[0])
parser.add_argument('-o', '--output', default=playback_pcms[0])
parser.add_argument('-r', '--rate', type=int, default=48000)
parser.add_argument('-c', '--channels', type=int, default=2)
parser.add_argument('-p', '--periodsize', type=int, default=480)
parser.add_argument('-P', '--periods', type=int, default=4)
parser.add_argument('-I', '--input-mixer', help='Control of the input mixer')
parser.add_argument('-O', '--output-mixer', help='control of the output mixer')
args = parser.parse_args()
if args.debug:
logging.getLogger().setLevel(logging.DEBUG)
playback = PCM(type=PCM_PLAYBACK, mode=PCM_NONBLOCK, device=args.output, rate=args.rate,
channels=args.channels, periodsize=args.periodsize, periods=args.periods)
capture = PCM(type=PCM_CAPTURE, mode=PCM_NONBLOCK, device=args.input, rate=args.rate,
channels=args.channels, periodsize=args.periodsize, periods=args.periods)
loopback = Loopback(capture, playback)
reactor = Reactor()
# If args.input_mixer and args.output_mixer are set, forward the capture volume to the playback volume.
# The usecase is a capture device that is implemented using g_audio, i.e. the Linux USB gadget driver.
# When a USB device (eg. an iPad) is connected to this machine, its volume events will to the volume control
# of the output device
if args.input_mixer and args.output_mixer:
playback_control = Mixer(control=args.output_mixer, cardindex=playback.info()['card_no'])
capture_control = Mixer(control=args.input_mixer, cardindex=capture.info()['card_no'])
volume_handler = VolumeForwarder(capture_control, playback_control)
reactor.register(PollDescriptor.fromAlsaObject('capture_control', capture_control, select.POLLIN), volume_handler)
loopback.register(reactor)
loopback.start()
reactor.run()