chore: move examples to dedicated directory

This commit is contained in:
Matteo Bernardini
2025-08-06 12:59:57 +08:00
committed by Lars Immisch
parent 0b3f1f41c7
commit df5c2f4685
7 changed files with 0 additions and 0 deletions

97
examples/isine.py Normal file
View File

@@ -0,0 +1,97 @@
''' Use this example from an interactive python session, for example:
>>> from isine import change
>>> change(880)
'''
from __future__ import print_function
import sys
import time
from threading import Thread
from multiprocessing import Queue
if sys.version_info[0] < 3:
from Queue import Empty
else:
from queue import Empty
from math import pi, sin, ceil
import struct
import itertools
import alsaaudio
sampling_rate = 48000
format = alsaaudio.PCM_FORMAT_S16_LE
pack_format = 'h' # short int, matching S16
channels = 2
def nearest_frequency(frequency):
# calculate the nearest frequency where the wave form fits into the buffer
# in other words, select f so that sampling_rate/f is an integer
return float(sampling_rate)/int(sampling_rate/frequency)
def generate(frequency, duration = 0.125):
# generate a buffer with a sine wave of `frequency`
# that is approximately `duration` seconds long
# the length of a full sine wave at the frequency
cycle_size = int(sampling_rate / frequency)
# number of full cycles we can fit into the duration
factor = int(ceil(duration * frequency))
# total number of frames
size = cycle_size * factor
sine = [ int(32767 * sin(2 * pi * frequency * i / sampling_rate)) \
for i in range(size)]
if channels > 1:
sine = list(itertools.chain.from_iterable(itertools.repeat(x, channels) for x in sine))
return struct.pack(str(size * channels) + pack_format, *sine)
class SinePlayer(Thread):
def __init__(self, frequency = 440.0):
Thread.__init__(self, daemon=True)
self.device = alsaaudio.PCM(channels=channels, format=format, rate=sampling_rate)
self.queue = Queue()
self.change(frequency)
def change(self, frequency):
'''This is called outside of the player thread'''
# we generate the buffer in the calling thread for less
# latency when switching frequencies
if frequency > sampling_rate / 2:
raise ValueError('maximum frequency is %d' % (sampling_rate / 2))
f = nearest_frequency(frequency)
print('nearest frequency: %f' % f)
buf = generate(f)
self.queue.put(buf)
def run(self):
buffer = None
while True:
try:
buffer = self.queue.get(False)
except Empty:
pass
if buffer:
if self.device.write(buffer) < 0:
print("Playback buffer underrun! Continuing nonetheless ...")
isine = SinePlayer()
isine.start()
time.sleep(1)
isine.change(1000)
time.sleep(1)

403
examples/loopback.py Normal file
View File

@@ -0,0 +1,403 @@
#!/usr/bin/env python3
# -*- mode: python; indent-tabs-mode: t; c-basic-offset: 4; tab-width: 4 -*-
import sys
import select
import logging
import re
import struct
import subprocess
from datetime import datetime, timedelta
from alsaaudio import (PCM, pcms, PCM_PLAYBACK, PCM_CAPTURE, PCM_NONBLOCK, Mixer,
PCM_STATE_OPEN, PCM_STATE_SETUP, PCM_STATE_PREPARED, PCM_STATE_RUNNING, PCM_STATE_XRUN, PCM_STATE_DRAINING,
PCM_STATE_PAUSED, PCM_STATE_SUSPENDED, ALSAAudioError)
from argparse import ArgumentParser
poll_names = {
select.POLLIN: 'POLLIN',
select.POLLPRI: 'POLLPRI',
select.POLLOUT: 'POLLOUT',
select.POLLERR: 'POLLERR',
select.POLLHUP: 'POLLHUP',
select.POLLRDHUP: 'POLLRDHUP',
select.POLLNVAL: 'POLLNVAL'
}
state_names = {
PCM_STATE_OPEN: 'PCM_STATE_OPEN',
PCM_STATE_SETUP: 'PCM_STATE_SETUP',
PCM_STATE_PREPARED: 'PCM_STATE_PREPARED',
PCM_STATE_RUNNING: 'PCM_STATE_RUNNING',
PCM_STATE_XRUN: 'PCM_STATE_XRUN',
PCM_STATE_DRAINING: 'PCM_STATE_DRAINING',
PCM_STATE_PAUSED: 'PCM_STATE_PAUSED',
PCM_STATE_SUSPENDED: 'PCM_STATE_SUSPENDED'
}
def poll_desc(mask):
return '|'.join([poll_names[bit] for bit, name in poll_names.items() if mask & bit])
class PollDescriptor(object):
'''File Descriptor, event mask and a name for logging'''
def __init__(self, name, fd, mask):
self.name = name
self.fd = fd
self.mask = mask
def as_tuple(self):
return (self.fd, self.mask)
@classmethod
def from_alsa_object(cls, name, alsaobject, mask=None):
# TODO maybe refactor: we ignore objects that have more then one polldescriptor
fd, alsamask = alsaobject.polldescriptors()[0]
if mask is None:
mask = alsamask
return cls(name, fd, mask)
class Loopback(object):
'''Loopback state and event handling'''
def __init__(self, capture, playback_args, volume_handler, run_after_stop=None, run_before_start=None):
self.playback_args = playback_args
self.playback = None
self.volume_handler = volume_handler
self.capture_started = None
self.last_capture_event = None
self.capture = capture
self.capture_pd = PollDescriptor.from_alsa_object('capture', capture)
self.run_after_stop = None
if run_after_stop:
self.run_after_stop = run_after_stop.split(' ')
self.run_before_start = None
if run_before_start:
self.run_before_start = run_before_start.split(' ')
self.run_after_stop_did_run = False
self.waitBeforeOpen = False
self.queue = []
self.period_size = 0
self.silent_periods = 0
@staticmethod
def compute_energy(data):
values = struct.unpack(f'{len(data)//2}h', data)
e = 0
for v in values:
e = e + v * v
return e
@staticmethod
def run_command(cmd):
if cmd:
rc = subprocess.run(cmd)
if rc.returncode:
logging.warning(f'run {cmd}, return code {rc.returncode}')
else:
logging.info(f'run {cmd}, return code {rc.returncode}')
def register(self, reactor):
reactor.register_timeout_handler(self.timeout_handler)
reactor.register(self.capture_pd, self)
def start(self):
# start reading data
size, data = self.capture.read()
if size:
self.queue.append(data)
def timeout_handler(self):
if self.playback and self.capture_started:
if self.last_capture_event:
if datetime.now() - self.last_capture_event > timedelta(seconds=2):
logging.info('timeout - closing playback device')
self.playback.close()
self.playback = None
self.capture_started = None
if self.volume_handler:
self.volume_handler.stop()
self.run_command(self.run_after_stop)
return
self.waitBeforeOpen = False
if not self.run_after_stop_did_run and not self.playback:
if self.volume_handler:
self.volume_handler.stop()
self.run_command(self.run_after_stop)
self.run_after_stop_did_run = True
def pop(self):
if len(self.queue):
return self.queue.pop()
else:
return None
def handle_capture_event(self, eventmask, name):
'''called when data is available for reading'''
self.last_capture_event = datetime.now()
size, data = self.capture.read()
if not size:
logging.warning(f'capture event but no data')
return False
energy = self.compute_energy(data)
logging.debug(f'energy: {energy}')
# the usecase is a USB capture device where we get perfect silence when it's idle
if energy == 0:
self.silent_periods = self.silent_periods + 1
# turn off playback after two seconds of silence
# 2 channels * 2 seconds * 2 bytes per sample
fps = self.playback_args['rate'] * 8 // (self.playback_args['periodsize'] * self.playback_args['periods'])
logging.debug(f'{self.silent_periods} of {fps} silent periods: {self.playback}')
if self.silent_periods > fps and self.playback:
logging.info(f'closing playback due to silence')
self.playback.close()
self.playback = None
if self.volume_handler:
self.volume_handler.stop()
self.run_command(self.run_after_stop)
self.run_after_stop_did_run = True
if not self.playback:
return
else:
self.silent_periods = 0
if not self.playback:
if self.waitBeforeOpen:
return False
try:
if self.volume_handler:
self.volume_handler.start()
self.run_command(self.run_before_start)
self.playback = PCM(**self.playback_args)
self.period_size = self.playback.info()['period_size']
logging.info(f'opened playback device with period_size {self.period_size}')
except ALSAAudioError as e:
logging.info('opening PCM playback device failed: %s', e)
self.waitBeforeOpen = True
return False
self.capture_started = datetime.now()
logging.info(f'{self.playback} capture started: {self.capture_started}')
self.queue.append(data)
if len(self.queue) <= 2:
logging.info(f'buffering: {len(self.queue)}')
return False
try:
data = self.pop()
if data:
space = self.playback.avail()
written = self.playback.write(data)
logging.debug(f'wrote {written} bytes while space was {space}')
except ALSAAudioError:
logging.error('underrun', exc_info=1)
return True
def __call__(self, fd, eventmask, name):
if fd == self.capture_pd.fd:
real_mask = self.capture.polldescriptors_revents([self.capture_pd.as_tuple()])
if real_mask:
return self.handle_capture_event(real_mask, name)
else:
logging.debug('null capture event')
return False
else:
real_mask = self.playback.polldescriptors_revents([self.playback_pd.as_tuple()])
if real_mask:
return self.handle_playback_event(real_mask, name)
else:
logging.debug('null playback event')
return False
class VolumeForwarder(object):
'''Volume control event handling'''
def __init__(self, capture_control, playback_control):
self.playback_control = playback_control
self.capture_control = capture_control
self.active = True
self.volume = None
def start(self):
self.active = True
if self.volume:
self.volume = playback_control.setvolume(self.volume)
def stop(self):
self.active = False
self.volume = self.playback_control.getvolume(pcmtype=PCM_CAPTURE)[0]
def __call__(self, fd, eventmask, name):
if not self.active:
return
volume = self.capture_control.getvolume(pcmtype=PCM_CAPTURE)
# indicate that we've handled the event
self.capture_control.handleevents()
logging.info(f'{name} adjusting volume to {volume}')
if volume:
self.playback_control.setvolume(volume[0])
class Reactor(object):
'''A wrapper around select.poll'''
def __init__(self):
self.poll = select.poll()
self.descriptors = {}
self.timeout_handlers = set()
def register(self, polldescriptor, callable):
logging.debug(f'registered {polldescriptor.name}: {poll_desc(polldescriptor.mask)}')
self.descriptors[polldescriptor.fd] = (polldescriptor, callable)
self.poll.register(polldescriptor.fd, polldescriptor.mask)
def unregister(self, polldescriptor):
self.poll.unregister(polldescriptor.fd)
del self.descriptors[polldescriptor.fd]
def register_timeout_handler(self, callable):
self.timeout_handlers.add(callable)
def unregister_timeout_handler(self, callable):
self.timeout_handlers.remove(callable)
def run(self):
last_timeout_ev = datetime.now()
while True:
# poll for a bit, then send a timeout to registered handlers
events = self.poll.poll(0.25)
for fd, ev in events:
polldescriptor, handler = self.descriptors[fd]
# very chatty - log all events
# logging.debug(f'{polldescriptor.name}: {poll_desc(ev)} ({ev})')
handler(fd, ev, polldescriptor.name)
if datetime.now() - last_timeout_ev > timedelta(seconds=0.25):
for t in self.timeout_handlers:
t()
last_timeout_ev = datetime.now()
if __name__ == '__main__':
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
parser = ArgumentParser(description='ALSA loopback (with volume forwarding)')
playback_pcms = pcms(pcmtype=PCM_PLAYBACK)
capture_pcms = pcms(pcmtype=PCM_CAPTURE)
if not playback_pcms:
logging.error('no playback PCM found')
sys.exit(2)
if not capture_pcms:
logging.error('no capture PCM found')
sys.exit(2)
parser.add_argument('-d', '--debug', action='store_true')
parser.add_argument('-i', '--input', default=capture_pcms[0])
parser.add_argument('-o', '--output', default=playback_pcms[0])
parser.add_argument('-r', '--rate', type=int, default=44100)
parser.add_argument('-c', '--channels', type=int, default=2)
parser.add_argument('-p', '--periodsize', type=int, default=444) # must be divisible by 6 for 44k1
parser.add_argument('-P', '--periods', type=int, default=2)
parser.add_argument('-I', '--input-mixer', help='Control of the input mixer, can contain the card index, e.g. Digital:2')
parser.add_argument('-O', '--output-mixer', help='Control of the output mixer, can contain the card index, e.g. PCM:1')
parser.add_argument('-A', '--run-after-stop', help='command to run when the capture device is idle/silent')
parser.add_argument('-B', '--run-before-start', help='command to run when the capture device becomes active')
parser.add_argument('-V', '--volume', help='Initial volume (default is leave unchanged)')
args = parser.parse_args()
if args.debug:
logging.getLogger().setLevel(logging.DEBUG)
playback_args = {
'type': PCM_PLAYBACK,
'mode': PCM_NONBLOCK,
'device': args.output,
'rate': args.rate,
'channels': args.channels,
'periodsize': args.periodsize,
'periods': args.periods
}
reactor = Reactor()
# If args.input_mixer and args.output_mixer are set, forward the capture volume to the playback volume.
# The usecase is a capture device that is implemented using g_audio, i.e. the Linux USB gadget driver.
# When a USB device (eg. an iPad) is connected to this machine, its volume events will go to the volume control
# of the output device
capture = None
playback = None
volume_handler = None
if args.input_mixer and args.output_mixer:
re_mixer = re.compile(r'([a-zA-Z0-9]+):?([0-9+])?')
input_mixer_card = None
m = re_mixer.match(args.input_mixer)
if m:
input_mixer = m.group(1)
if m.group(2):
input_mixer_card = int(m.group(2))
else:
parser.print_usage()
sys.exit(1)
output_mixer_card = None
m = re_mixer.match(args.output_mixer)
if m:
output_mixer = m.group(1)
if m.group(2):
output_mixer_card = int(m.group(2))
else:
parser.print_usage()
sys.exit(1)
if input_mixer_card is None:
capture = PCM(type=PCM_CAPTURE, mode=PCM_NONBLOCK, device=args.input, rate=args.rate,
channels=args.channels, periodsize=args.periodsize, periods=args.periods)
input_mixer_card = capture.info()['card_no']
if output_mixer_card is None:
playback = PCM(**playback_args)
output_mixer_card = playback.info()['card_no']
playback.close()
playback_control = Mixer(control=output_mixer, cardindex=int(output_mixer_card))
capture_control = Mixer(control=input_mixer, cardindex=int(input_mixer_card))
volume_handler = VolumeForwarder(capture_control, playback_control)
reactor.register(PollDescriptor.from_alsa_object('capture_control', capture_control, select.POLLIN), volume_handler)
if args.volume and playback_control:
playback_control.setvolume(int(args.volume))
loopback = Loopback(capture, playback_args, volume_handler, args.run_after_stop, args.run_before_start)
loopback.register(reactor)
loopback.start()
reactor.run()

159
examples/mixertest.py Executable file
View File

@@ -0,0 +1,159 @@
#!/usr/bin/env python
## mixertest.py
##
## This is an example of using the ALSA mixer API
##
## The script will set the volume or mute switch of the specified Mixer
## depending on command line options.
##
## Examples:
## python mixertest.py # list available mixers
## python mixertest.py Master # show Master mixer settings
## python mixertest.py Master 80 # set the master volume to 80%
## python mixertest.py Master 1,90 # set channel 1 volume to 90%
## python mixertest.py Master [un]mute # [un]mute the master mixer
## python mixertest.py Capture [un]rec # [dis/en]able capture
## python mixertest.py Master 0,[un]mute # [un]mute channel 0
## python mixertest.py Capture 0,[un]rec # [dis/en]able capture on channel 0
from __future__ import print_function
import sys
import getopt
import alsaaudio
def list_cards():
print("Available sound cards:")
for i in alsaaudio.card_indexes():
(name, longname) = alsaaudio.card_name(i)
print(" %d: %s (%s)" % (i, name, longname))
def list_mixers(kwargs):
print("Available mixer controls:")
for m in alsaaudio.mixers(**kwargs):
print(" '%s'" % m)
def show_mixer(name, kwargs):
# Demonstrates how mixer settings are queried.
try:
mixer = alsaaudio.Mixer(name, **kwargs)
except alsaaudio.ALSAAudioError:
print("No such mixer", file=sys.stderr)
sys.exit(1)
print("Mixer name: '%s'" % mixer.mixer())
volcap = mixer.volumecap()
print("Capabilities: %s %s" % (' '.join(volcap),
' '.join(mixer.switchcap())))
if "Volume" in volcap or "Joined Volume" in volcap or "Playback Volume" in volcap:
pmin, pmax = mixer.getrange(alsaaudio.PCM_PLAYBACK)
pmin_keyword, pmax_keyword = mixer.getrange(pcmtype=alsaaudio.PCM_PLAYBACK, units=alsaaudio.VOLUME_UNITS_RAW)
pmin_default, pmax_default = mixer.getrange()
assert pmin == pmin_keyword
assert pmax == pmax_keyword
assert pmin == pmin_default
assert pmax == pmax_default
print("Raw playback volume range {}-{}".format(pmin, pmax))
pmin_dB, pmax_dB = mixer.getrange(units=alsaaudio.VOLUME_UNITS_DB)
print("dB playback volume range {}-{}".format(pmin_dB / 100.0, pmax_dB / 100.0))
if "Capture Volume" in volcap or "Joined Capture Volume" in volcap:
# Check that `getrange` works with keyword and positional arguments
cmin, cmax = mixer.getrange(alsaaudio.PCM_CAPTURE)
cmin_keyword, cmax_keyword = mixer.getrange(pcmtype=alsaaudio.PCM_CAPTURE, units=alsaaudio.VOLUME_UNITS_RAW)
assert cmin == cmin_keyword
assert cmax == cmax_keyword
print("Raw capture volume range {}-{}".format(cmin, cmax))
cmin_dB, cmax_dB = mixer.getrange(pcmtype=alsaaudio.PCM_CAPTURE, units=alsaaudio.VOLUME_UNITS_DB)
print("dB capture volume range {}-{}".format(cmin_dB / 100.0, cmax_dB / 100.0))
volumes = mixer.getvolume()
volumes_dB = mixer.getvolume(units=alsaaudio.VOLUME_UNITS_DB)
for i in range(len(volumes)):
print("Channel %i playback volume: %i%% (%.1f dB)" % (i, volumes[i], volumes_dB[i] / 100.0))
volumes = mixer.getvolume(pcmtype=alsaaudio.PCM_CAPTURE)
volumes_dB = mixer.getvolume(pcmtype=alsaaudio.PCM_CAPTURE, units=alsaaudio.VOLUME_UNITS_DB)
for i in range(len(volumes)):
print("Channel %i capture volume: %i%% (%.1f dB)" % (i, volumes[i], volumes_dB[i] / 100.0))
try:
mutes = mixer.getmute()
for i in range(len(mutes)):
if mutes[i]:
print("Channel %i is muted" % i)
except alsaaudio.ALSAAudioError:
# May not support muting
pass
try:
recs = mixer.getrec()
for i in range(len(recs)):
if recs[i]:
print("Channel %i is recording" % i)
except alsaaudio.ALSAAudioError:
# May not support recording
pass
def set_mixer(name, args, kwargs):
# Demonstrates how to set mixer settings
try:
mixer = alsaaudio.Mixer(name, **kwargs)
except alsaaudio.ALSAAudioError:
print("No such mixer", file=sys.stderr)
sys.exit(1)
if args.find(',') != -1:
args_array = args.split(',')
channel = int(args_array[0])
args = ','.join(args_array[1:])
else:
channel = alsaaudio.MIXER_CHANNEL_ALL
if args in ['mute', 'unmute']:
# Mute/unmute the mixer
if args == 'mute':
mixer.setmute(1, channel)
else:
mixer.setmute(0, channel)
elif args in ['rec','unrec']:
# Enable/disable recording
if args == 'rec':
mixer.setrec(1, channel)
else:
mixer.setrec(0, channel)
else:
# Set volume for specified channel. MIXER_CHANNEL_ALL means set
# volume for all channels
volume = int(args)
mixer.setvolume(volume, channel)
def usage():
print('usage: mixertest.py [-c <card>] [ <control>[,<value>]]',
file=sys.stderr)
sys.exit(2)
if __name__ == '__main__':
kwargs = {}
opts, args = getopt.getopt(sys.argv[1:], 'c:d:?h')
for o, a in opts:
if o == '-c':
kwargs = { 'cardindex': int(a) }
elif o == '-d':
kwargs = { 'device': a }
else:
usage()
list_cards()
if not len(args):
list_mixers(kwargs)
elif len(args) == 1:
show_mixer(args[0], kwargs)
else:
set_mixer(args[0], args[1], kwargs)

25
examples/play_rusage.py Normal file
View File

@@ -0,0 +1,25 @@
#!/usr/bin/env python
# Contributed by Stein Magnus Jodal
from __future__ import print_function
import resource
import time
import alsaaudio
seconds = 0
max_rss = 0
device = alsaaudio.PCM()
while True:
device.write(b'\x00' * 44100)
time.sleep(1)
seconds += 1
if seconds % 10 == 0:
prev_rss = max_rss
max_rss = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
diff_rss = max_rss - prev_rss
print('After %ds: max RSS %d kB, increased %d kB' % (
seconds, max_rss, diff_rss))

54
examples/playbacktest.py Executable file
View File

@@ -0,0 +1,54 @@
#!/usr/bin/env python3
# -*- mode: python; indent-tabs-mode: t; c-basic-offset: 4; tab-width: 4 -*-
## playbacktest.py
##
## This is an example of a simple sound playback script.
##
## The script opens an ALSA pcm for sound playback. Set
## various attributes of the device. It then reads data
## from stdin and writes it to the device.
##
## To test it out do the following:
## python recordtest.py out.raw # talk to the microphone
## python playbacktest.py out.raw
from __future__ import print_function
import sys
import time
import getopt
import alsaaudio
def usage():
print('usage: playbacktest.py [-d <device>] <file>', file=sys.stderr)
sys.exit(2)
if __name__ == '__main__':
device = 'default'
opts, args = getopt.getopt(sys.argv[1:], 'd:')
for o, a in opts:
if o == '-d':
device = a
if not args:
usage()
f = open(args[0], 'rb')
# Open the device in playback mode in Mono, 44100 Hz, 16 bit little endian frames
# The period size controls the internal number of frames per period.
# The significance of this parameter is documented in the ALSA api.
out = alsaaudio.PCM(alsaaudio.PCM_PLAYBACK, channels=1, rate=44100, format=alsaaudio.PCM_FORMAT_S16_LE, periodsize=160, device=device)
# Read data from stdin
data = f.read(320)
while data:
if out.write(data) < 0:
print("Playback buffer underrun! Continuing nonetheless ...")
data = f.read(320)
out.close()

64
examples/playwav.py Executable file
View File

@@ -0,0 +1,64 @@
#!/usr/bin/env python3
# -*- mode: python; indent-tabs-mode: t; c-basic-offset: 4; tab-width: 4 -*-
# Simple test script that plays (some) wav files
from __future__ import print_function
import sys
import wave
import getopt
import alsaaudio
def play(device, f):
format = None
# 8bit is unsigned in wav files
if f.getsampwidth() == 1:
format = alsaaudio.PCM_FORMAT_U8
# Otherwise we assume signed data, little endian
elif f.getsampwidth() == 2:
format = alsaaudio.PCM_FORMAT_S16_LE
elif f.getsampwidth() == 3:
format = alsaaudio.PCM_FORMAT_S24_3LE
elif f.getsampwidth() == 4:
format = alsaaudio.PCM_FORMAT_S32_LE
else:
raise ValueError('Unsupported format')
periodsize = f.getframerate() // 8
print('%d channels, %d sampling rate, format %d, periodsize %d\n' % (f.getnchannels(),
f.getframerate(),
format,
periodsize))
device = alsaaudio.PCM(channels=f.getnchannels(), rate=f.getframerate(), format=format, periodsize=periodsize, device=device)
data = f.readframes(periodsize)
while data:
# Read data from stdin
if device.write(data) < 0:
print("Playback buffer underrun! Continuing nonetheless ...")
data = f.readframes(periodsize)
def usage():
print('usage: playwav.py [-d <device>] <file>', file=sys.stderr)
sys.exit(2)
if __name__ == '__main__':
device = 'default'
opts, args = getopt.getopt(sys.argv[1:], 'd:')
for o, a in opts:
if o == '-d':
device = a
if not args:
usage()
with wave.open(args[0], 'rb') as f:
play(device, f)

66
examples/recordtest.py Executable file
View File

@@ -0,0 +1,66 @@
#!/usr/bin/env python3
# -*- mode: python; indent-tabs-mode: t; c-basic-offset: 4; tab-width: 4 -*-
## recordtest.py
##
## This is an example of a simple sound capture script.
##
## The script opens an ALSA pcm device for sound capture, sets
## various attributes of the capture, and reads in a loop,
## writing the data to standard out.
##
## To test it out do the following:
## python recordtest.py out.raw # talk to the microphone
## aplay -r 8000 -f S16_LE -c 1 out.raw
#!/usr/bin/env python
from __future__ import print_function
import sys
import time
import getopt
import alsaaudio
def usage():
print('usage: recordtest.py [-d <device>] <file>', file=sys.stderr)
sys.exit(2)
if __name__ == '__main__':
device = 'default'
opts, args = getopt.getopt(sys.argv[1:], 'd:')
for o, a in opts:
if o == '-d':
device = a
if not args:
usage()
f = open(args[0], 'wb')
# Open the device in nonblocking capture mode in mono, with a sampling rate of 44100 Hz
# and 16 bit little endian samples
# The period size controls the internal number of frames per period.
# The significance of this parameter is documented in the ALSA api.
# For our purposes, it is suficcient to know that reads from the device
# will return this many frames. Each frame being 2 bytes long.
# This means that the reads below will return either 320 bytes of data
# or 0 bytes of data. The latter is possible because we are in nonblocking
# mode.
inp = alsaaudio.PCM(alsaaudio.PCM_CAPTURE, alsaaudio.PCM_NONBLOCK,
channels=1, rate=44100, format=alsaaudio.PCM_FORMAT_S16_LE,
periodsize=160, device=device)
loops = 1000000
while loops > 0:
loops -= 1
# Read data from device
l, data = inp.read()
if l < 0:
print("Capture buffer overrun! Continuing nonetheless ...")
elif l:
f.write(data)
time.sleep(.001)