Files
sw_pyalsaaudio/isine.py
Oswald Buddenhagen ae5c4aad9b add xrun handling to the examples
it's very primitive, but it shows adequately what can happen and what to
do about it minimally (that is, complain and move on).
2024-02-06 00:39:15 +01:00

98 lines
2.7 KiB
Python

''' Use this example from an interactive python session, for example:
>>> from isine import change
>>> change(880)
'''
from __future__ import print_function
import sys
import time
from threading import Thread
from multiprocessing import Queue
if sys.version_info[0] < 3:
from Queue import Empty
else:
from queue import Empty
from math import pi, sin, ceil
import struct
import itertools
import alsaaudio
sampling_rate = 48000
format = alsaaudio.PCM_FORMAT_S16_LE
pack_format = 'h' # short int, matching S16
channels = 2
def nearest_frequency(frequency):
# calculate the nearest frequency where the wave form fits into the buffer
# in other words, select f so that sampling_rate/f is an integer
return float(sampling_rate)/int(sampling_rate/frequency)
def generate(frequency, duration = 0.125):
# generate a buffer with a sine wave of `frequency`
# that is approximately `duration` seconds long
# the length of a full sine wave at the frequency
cycle_size = int(sampling_rate / frequency)
# number of full cycles we can fit into the duration
factor = int(ceil(duration * frequency))
# total number of frames
size = cycle_size * factor
sine = [ int(32767 * sin(2 * pi * frequency * i / sampling_rate)) \
for i in range(size)]
if channels > 1:
sine = list(itertools.chain.from_iterable(itertools.repeat(x, channels) for x in sine))
return struct.pack(str(size * channels) + pack_format, *sine)
class SinePlayer(Thread):
def __init__(self, frequency = 440.0):
Thread.__init__(self, daemon=True)
self.device = alsaaudio.PCM(channels=channels, format=format, rate=sampling_rate)
self.queue = Queue()
self.change(frequency)
def change(self, frequency):
'''This is called outside of the player thread'''
# we generate the buffer in the calling thread for less
# latency when switching frequencies
if frequency > sampling_rate / 2:
raise ValueError('maximum frequency is %d' % (sampling_rate / 2))
f = nearest_frequency(frequency)
print('nearest frequency: %f' % f)
buf = generate(f)
self.queue.put(buf)
def run(self):
buffer = None
while True:
try:
buffer = self.queue.get(False)
except Empty:
pass
if buffer:
if self.device.write(buffer) < 0:
print("Playback buffer underrun! Continuing nonetheless ...")
isine = SinePlayer()
isine.start()
time.sleep(1)
isine.change(1000)
time.sleep(1)