Files
bumble-auracast/src/auracast/utils/sounddevice_utils.py

276 lines
9.6 KiB
Python

import sounddevice as sd
import os, re, json, subprocess
def devices_by_backend(backend_name: str):
hostapis = sd.query_hostapis() # list of host APIs
# find the host API index by (case-insensitive) name match
try:
hostapi_idx = next(
i for i, ha in enumerate(hostapis)
if backend_name.lower() in ha['name'].lower()
)
except StopIteration:
raise ValueError(f"No host API matching {backend_name!r}. "
f"Available: {[ha['name'] for ha in hostapis]}")
# return (global_index, device_dict) pairs filtered by that host API
return [(i, d) for i, d in enumerate(sd.query_devices())
if d['hostapi'] == hostapi_idx]
def _pa_like_hostapi_index():
for i, ha in enumerate(sd.query_hostapis()):
if any(k in ha["name"] for k in ("PipeWire", "PulseAudio")):
return i
raise RuntimeError("PipeWire/PulseAudio host API not present in PortAudio.")
def _pw_dump():
try:
return json.loads(subprocess.check_output(["pw-dump"]))
except (FileNotFoundError, subprocess.CalledProcessError):
# PipeWire not available
return []
def _sd_refresh():
"""Force PortAudio to re-enumerate devices on next query.
sounddevice/PortAudio keeps a static device list after initialization.
Terminating here ensures that subsequent sd.query_* calls re-initialize
and see newly added devices (e.g., AES67 nodes created after start).
"""
sd._terminate() # private API, acceptable for runtime refresh
sd._initialize()
def _sd_matches_from_names(pa_idx, names):
names_l = {n.lower() for n in names if n}
out = []
for i, d in enumerate(sd.query_devices()):
if d["hostapi"] != pa_idx or d["max_input_channels"] <= 0:
continue
dn = d["name"].lower()
# Exclude monitor devices (e.g., "Monitor of ...") to avoid false positives
if "monitor" in dn:
continue
if any(n in dn for n in names_l):
out.append((i, d))
return out
def get_device_index_by_name(name: str, backend: str | None = None):
"""Return the sounddevice index for an input device with exact name.
If backend is provided, restrict to that backend:
- 'ALSA' uses the ALSA host API
- 'PipeWire' or 'PulseAudio' use the PipeWire/Pulse host API index
Returns None if not found.
"""
try:
devices = sd.query_devices()
hostapi_filter = None
if backend:
if backend.lower() == 'alsa':
try:
alsa = devices_by_backend('ALSA')
alsa_indices = {idx for idx, _ in alsa}
hostapi_filter = ('indices', alsa_indices)
except Exception:
return None
else:
try:
pa_idx = _pa_like_hostapi_index()
hostapi_filter = ('hostapi', pa_idx)
except Exception:
return None
for idx, d in enumerate(devices):
if d.get('max_input_channels', 0) <= 0:
continue
if d.get('name') != name:
continue
if hostapi_filter:
kind, val = hostapi_filter
if kind == 'indices' and idx not in val:
continue
if kind == 'hostapi' and d.get('hostapi') != val:
continue
return idx
except Exception:
return None
return None
def resolve_input_device_index(name: str):
"""Resolve device index by exact name preferring backend by device type.
- If name is known PipeWire Network device, use PipeWire backend.
- Else if name is known ALSA USB device, use ALSA backend.
- Else fallback to any backend.
Returns None if not found.
"""
try:
net_names = {d.get('name') for _, d in get_network_pw_inputs()}
except Exception:
net_names = set()
try:
alsa_usb_names = {d.get('name') for _, d in get_alsa_usb_inputs()}
except Exception:
alsa_usb_names = set()
if name in net_names:
idx = get_device_index_by_name(name, backend='PipeWire')
if idx is not None:
return idx
if name in alsa_usb_names:
idx = get_device_index_by_name(name, backend='ALSA')
if idx is not None:
return idx
return get_device_index_by_name(name, backend=None)
# Module-level caches for device lists
_usb_inputs_cache = []
_network_inputs_cache = []
def get_usb_pw_inputs():
"""Return cached list of USB PipeWire inputs."""
return _usb_inputs_cache
def get_network_pw_inputs():
"""Return cached list of Network/AES67 PipeWire inputs."""
return _network_inputs_cache
def refresh_pw_cache():
"""
Performs a full device scan and updates the internal caches for both USB
and Network audio devices. This is a heavy operation and should not be
called frequently or during active streams.
If PipeWire is not available, caches will remain empty.
"""
global _usb_inputs_cache, _network_inputs_cache
# Force PortAudio to re-enumerate devices
_sd_refresh()
try:
pa_idx = _pa_like_hostapi_index()
except RuntimeError:
# PipeWire/PulseAudio not available - reset caches and return
_usb_inputs_cache = []
_network_inputs_cache = []
return
pw = _pw_dump()
# --- Pass 1: Map device.id to device.bus ---
device_bus = {}
for obj in pw:
if obj.get("type") == "PipeWire:Interface:Device":
props = (obj.get("info") or {}).get("props") or {}
device_bus[obj["id"]] = (props.get("device.bus") or "").lower()
# --- Pass 2: Identify all USB and Network nodes ---
usb_input_names = set()
network_input_names = set()
for obj in pw:
if obj.get("type") != "PipeWire:Interface:Node":
continue
props = (obj.get("info") or {}).get("props") or {}
media = (props.get("media.class") or "").lower()
if "source" not in media and "stream/input" not in media:
continue
nname = (props.get("node.name") or "")
ndesc = (props.get("node.description") or "")
# Skip all monitor sources
if ".monitor" in nname.lower() or "monitor" in ndesc.lower():
continue
# Check for USB
bus = (props.get("device.bus") or device_bus.get(props.get("device.id")) or "").lower()
if bus == "usb":
usb_input_names.add(ndesc or nname)
continue # A device is either USB or Network, not both
# Heuristics for Network/AES67/RTP
text = (nname + " " + ndesc).lower()
media_name = (props.get("media.name") or "").lower()
node_group = (props.get("node.group") or "").lower()
node_network_flag = bool(props.get("node.network"))
has_rtp_keys = any(k in props for k in ("rtp.session", "rtp.source.ip"))
has_sess_keys = any(k in props for k in ("sess.name", "sess.media"))
is_network = (
bus == "network" or
node_network_flag or
"rtp" in media_name or
any(k in text for k in ("rtp", "sap", "aes67", "network", "raop", "airplay")) or
has_rtp_keys or
has_sess_keys or
("pipewire.ptp" in node_group)
)
if is_network:
network_input_names.add(ndesc or nname)
# --- Final Step: Update caches ---
_usb_inputs_cache = _sd_matches_from_names(pa_idx, usb_input_names)
_network_inputs_cache = _sd_matches_from_names(pa_idx, network_input_names)
def get_alsa_usb_inputs():
"""
Return USB audio input devices using the ALSA backend.
Filters for devices that appear to be USB hardware (hw:X,Y pattern or USB in name).
Returns list of (index, device_dict) tuples.
"""
try:
alsa_devices = devices_by_backend('ALSA')
except ValueError:
# ALSA backend not available
return []
usb_inputs = []
for idx, dev in alsa_devices:
# Only include input devices
if dev.get('max_input_channels', 0) <= 0:
continue
name = dev.get('name', '').lower()
# Filter for USB devices based on common patterns:
# - Contains 'usb' in the name
# - hw:X or hw:X,Y pattern present anywhere in name (ALSA hardware devices)
# - dsnoop/ch1/ch2 convenience entries from asound.conf
# Exclude: default, dmix, pulse, pipewire, sysdefault
if any(exclude in name for exclude in ['default', 'dmix', 'pulse', 'pipewire', 'sysdefault']):
continue
# Include if it has 'usb' or contains an hw:* token, or matches common dsnoop/mono aliases
if (
'usb' in name or
re.search(r'hw:\d+(?:,\d+)?', name) or
name.startswith('dsnoop') or
name in ('ch1', 'ch2') or
name.startswith('dante_asrc_ch')
):
usb_inputs.append((idx, dev))
return usb_inputs
def get_alsa_inputs():
"""
Return all ALSA audio input devices.
Returns list of (index, device_dict) tuples.
"""
try:
alsa_devices = devices_by_backend('ALSA')
except ValueError:
# ALSA backend not available
return []
return [(idx, dev) for idx, dev in alsa_devices
if dev.get('max_input_channels', 0) > 0]
# Populate cache on initial module load
refresh_pw_cache()
# Example usage:
# for i, d in list_usb_pw_inputs():
# print(f"USB IN {i}: {d['name']} in={d['max_input_channels']}")
# for i, d in list_network_pw_inputs():
# print(f"NET IN {i}: {d['name']} in={d['max_input_channels']}")