This commit is contained in:
Gilles Boccon-Gibod
2025-02-05 16:23:47 -05:00
parent 9756572c93
commit efae307b3d
6 changed files with 27 additions and 186 deletions
+4 -164
View File
@@ -25,9 +25,7 @@ import dataclasses
import functools
import logging
import os
import pathlib
import struct
import sys
from typing import (
cast,
Any,
@@ -80,164 +78,6 @@ AURACAST_DEFAULT_SAMPLE_RATE = 48000
AURACAST_DEFAULT_TRANSMIT_BITRATE = 80000
# -----------------------------------------------------------------------------
# Audio I/O Support
# -----------------------------------------------------------------------------
def check_audio_output(output: str) -> bool:
if output == 'device' or output.startswith('device:'):
try:
import sounddevice # type: ignore[import-untyped]
except ImportError as exc:
raise ValueError(
'audio output not available (sounddevice python module not installed)'
) from exc
if output == 'device':
# Default device
return True
# Specific device
device = output[7:]
if device == '?':
print(color('Audio Devices:', 'yellow'))
for device_info in [
device_info
for device_info in sounddevice.query_devices()
if device_info['max_output_channels'] > 0
]:
device_index = device_info['index']
is_default = (
color(' [default]', 'green')
if sounddevice.default.device[1] == device_index
else ''
)
print(
f'{color(device_index, "cyan")}: {device_info["name"]}{is_default}'
)
return False
try:
device_info = sounddevice.query_devices(int(device))
except sounddevice.PortAudioError as exc:
raise ValueError('No such audio device') from exc
if device_info['max_output_channels'] < 1:
raise ValueError(
f'Device {device} ({device_info["name"]}) does not have an output'
)
return True
async def create_audio_output(output: str) -> audio_io.AudioOutput:
if output == 'stdout':
return audio_io.StreamAudioOutput(sys.stdout.buffer)
if output == 'device' or output.startswith('device:'):
device_name = '' if output == 'device' else output[7:]
return audio_io.SoundDeviceAudioOutput(device_name)
if output == 'ffplay':
return audio_io.SubprocessAudioOutput(
command=(
'ffplay -probesize 32 -fflags nobuffer -analyzeduration 0 '
'-ar {sample_rate} '
'-ch_layout {channel_layout} '
'-f f32le pipe:0'
)
)
if output.startswith('file:'):
return audio_io.FileAudioOutput(output[5:])
raise ValueError('unsupported audio output')
def check_audio_input(input: str) -> bool:
if input == 'device' or input.startswith('device:'):
try:
import sounddevice
except ImportError as exc:
raise ValueError(
'audio input not available (sounddevice python module not installed)'
) from exc
if input == 'device':
# Default device
return True
# Specific device
device = input[7:]
if device == '?':
print(color('Audio Devices:', 'yellow'))
for device_info in [
device_info
for device_info in sounddevice.query_devices()
if device_info['max_input_channels'] > 0
]:
device_index = device_info["index"]
is_mono = device_info['max_input_channels'] == 1
max_channels = color(f'[{"mono" if is_mono else "stereo"}]', 'cyan')
is_default = (
color(' [default]', 'green')
if sounddevice.default.device[0] == device_index
else ''
)
print(
f'{color(device_index, "cyan")}: {device_info["name"]}'
f' {max_channels}{is_default}'
)
return False
try:
device_info = sounddevice.query_devices(int(device))
except sounddevice.PortAudioError as exc:
raise ValueError('No such audio device') from exc
if device_info['max_input_channels'] < 1:
raise ValueError(
f'Device {device} ({device_info["name"]}) does not have an input'
)
return True
async def create_audio_input(input: str, input_format: str) -> audio_io.AudioInput:
pcm_format: audio_io.PcmFormat | None
if input_format == 'auto':
pcm_format = None
else:
pcm_format = audio_io.PcmFormat.from_str(input_format)
if input == 'stdin':
if not pcm_format:
raise ValueError('input format details required for stdin')
return audio_io.StreamAudioInput(sys.stdin.buffer, pcm_format)
if input == 'device' or input.startswith('device:'):
if not pcm_format:
raise ValueError('input format details required for device')
device_name = '' if input == 'device' else input[7:]
return audio_io.SoundDeviceAudioInput(device_name, pcm_format)
# If there's no file: prefix, check if we can assume it is a file.
if pathlib.Path(input).is_file():
input = 'file:' + input
if input.startswith('file:'):
filename = input[5:]
if filename.endswith('.wav'):
if input_format != 'auto':
raise ValueError(".wav file only supported with 'auto' format")
return audio_io.WaveAudioInput(filename)
if pcm_format is None:
raise ValueError('input format details required for raw PCM files')
return audio_io.FileAudioInput(filename, pcm_format)
raise ValueError('input not supported')
# -----------------------------------------------------------------------------
# Utils
# -----------------------------------------------------------------------------
@@ -833,7 +673,7 @@ async def run_receive(
) -> None:
# Run a pre-flight check for the output.
try:
if not check_audio_output(output):
if not audio_io.check_audio_output(output):
return
except ValueError as error:
print(error)
@@ -907,7 +747,7 @@ async def run_receive(
packet_stats = [0, 0]
async with contextlib.aclosing(
await create_audio_output(output)
await audio_io.create_audio_output(output)
) as audio_output:
await audio_output.open(
audio_io.PcmFormat(
@@ -978,7 +818,7 @@ async def run_transmit(
) -> None:
# Run a pre-flight check for the input.
try:
if not check_audio_input(input):
if not audio_io.check_audio_input(input):
return
except ValueError as error:
print(error)
@@ -1073,7 +913,7 @@ async def run_transmit(
print('Start Periodic Advertising')
await advertising_set.start_periodic()
audio_input = await create_audio_input(input, input_format)
audio_input = await audio_io.create_audio_input(input, input_format)
pcm_format = await audio_input.open()
if pcm_format.channels != 2:
print("Only 2 channels PCM configurations are supported")