140 lines
5.1 KiB
Python
140 lines
5.1 KiB
Python
import sounddevice as sd
|
|
import os, re, json, subprocess
|
|
|
|
def devices_by_backend(backend_name: str):
|
|
hostapis = sd.query_hostapis() # list of host APIs
|
|
# find the host API index by (case-insensitive) name match
|
|
try:
|
|
hostapi_idx = next(
|
|
i for i, ha in enumerate(hostapis)
|
|
if backend_name.lower() in ha['name'].lower()
|
|
)
|
|
except StopIteration:
|
|
raise ValueError(f"No host API matching {backend_name!r}. "
|
|
f"Available: {[ha['name'] for ha in hostapis]}")
|
|
# return (global_index, device_dict) pairs filtered by that host API
|
|
return [(i, d) for i, d in enumerate(sd.query_devices())
|
|
if d['hostapi'] == hostapi_idx]
|
|
|
|
def _pa_like_hostapi_index():
|
|
for i, ha in enumerate(sd.query_hostapis()):
|
|
if any(k in ha["name"] for k in ("PipeWire", "PulseAudio")):
|
|
return i
|
|
raise RuntimeError("PipeWire/PulseAudio host API not present in PortAudio.")
|
|
|
|
def _pw_dump():
|
|
return json.loads(subprocess.check_output(["pw-dump"]))
|
|
|
|
def _sd_refresh():
|
|
"""Force PortAudio to re-enumerate devices on next query.
|
|
|
|
sounddevice/PortAudio keeps a static device list after initialization.
|
|
Terminating here ensures that subsequent sd.query_* calls re-initialize
|
|
and see newly added devices (e.g., AES67 nodes created after start).
|
|
"""
|
|
sd._terminate() # private API, acceptable for runtime refresh
|
|
sd._initialize()
|
|
|
|
def _sd_matches_from_names(pa_idx, names):
|
|
names_l = {n.lower() for n in names if n}
|
|
out = []
|
|
for i, d in enumerate(sd.query_devices()):
|
|
if d["hostapi"] != pa_idx or d["max_input_channels"] <= 0:
|
|
continue
|
|
dn = d["name"].lower()
|
|
# Exclude monitor devices (e.g., "Monitor of ...") to avoid false positives
|
|
if "monitor" in dn:
|
|
continue
|
|
if any(n in dn for n in names_l):
|
|
out.append((i, d))
|
|
return out
|
|
|
|
# Module-level caches for device lists
|
|
_usb_inputs_cache = []
|
|
_network_inputs_cache = []
|
|
|
|
def get_usb_pw_inputs():
|
|
"""Return cached list of USB PipeWire inputs."""
|
|
return _usb_inputs_cache
|
|
|
|
def get_network_pw_inputs():
|
|
"""Return cached list of Network/AES67 PipeWire inputs."""
|
|
return _network_inputs_cache
|
|
|
|
def refresh_pw_cache():
|
|
"""
|
|
Performs a full device scan and updates the internal caches for both USB
|
|
and Network audio devices. This is a heavy operation and should not be
|
|
called frequently or during active streams.
|
|
"""
|
|
global _usb_inputs_cache, _network_inputs_cache
|
|
|
|
# Force PortAudio to re-enumerate devices
|
|
_sd_refresh()
|
|
pa_idx = _pa_like_hostapi_index()
|
|
pw = _pw_dump()
|
|
|
|
# --- Pass 1: Map device.id to device.bus ---
|
|
device_bus = {}
|
|
for obj in pw:
|
|
if obj.get("type") == "PipeWire:Interface:Device":
|
|
props = (obj.get("info") or {}).get("props") or {}
|
|
device_bus[obj["id"]] = (props.get("device.bus") or "").lower()
|
|
|
|
# --- Pass 2: Identify all USB and Network nodes ---
|
|
usb_input_names = set()
|
|
network_input_names = set()
|
|
for obj in pw:
|
|
if obj.get("type") != "PipeWire:Interface:Node":
|
|
continue
|
|
props = (obj.get("info") or {}).get("props") or {}
|
|
media = (props.get("media.class") or "").lower()
|
|
if "source" not in media and "stream/input" not in media:
|
|
continue
|
|
|
|
nname = (props.get("node.name") or "")
|
|
ndesc = (props.get("node.description") or "")
|
|
# Skip all monitor sources
|
|
if ".monitor" in nname.lower() or "monitor" in ndesc.lower():
|
|
continue
|
|
|
|
# Check for USB
|
|
bus = (props.get("device.bus") or device_bus.get(props.get("device.id")) or "").lower()
|
|
if bus == "usb":
|
|
usb_input_names.add(ndesc or nname)
|
|
continue # A device is either USB or Network, not both
|
|
|
|
# Heuristics for Network/AES67/RTP
|
|
text = (nname + " " + ndesc).lower()
|
|
media_name = (props.get("media.name") or "").lower()
|
|
node_group = (props.get("node.group") or "").lower()
|
|
node_network_flag = bool(props.get("node.network"))
|
|
has_rtp_keys = any(k in props for k in ("rtp.session", "rtp.source.ip"))
|
|
has_sess_keys = any(k in props for k in ("sess.name", "sess.media"))
|
|
|
|
is_network = (
|
|
bus == "network" or
|
|
node_network_flag or
|
|
"rtp" in media_name or
|
|
any(k in text for k in ("rtp", "sap", "aes67", "network", "raop", "airplay")) or
|
|
has_rtp_keys or
|
|
has_sess_keys or
|
|
("pipewire.ptp" in node_group)
|
|
)
|
|
if is_network:
|
|
network_input_names.add(ndesc or nname)
|
|
|
|
# --- Final Step: Update caches ---
|
|
_usb_inputs_cache = _sd_matches_from_names(pa_idx, usb_input_names)
|
|
_network_inputs_cache = _sd_matches_from_names(pa_idx, network_input_names)
|
|
|
|
|
|
# Populate cache on initial module load
|
|
refresh_pw_cache()
|
|
|
|
# Example usage:
|
|
# for i, d in list_usb_pw_inputs():
|
|
# print(f"USB IN {i}: {d['name']} in={d['max_input_channels']}")
|
|
# for i, d in list_network_pw_inputs():
|
|
# print(f"NET IN {i}: {d['name']} in={d['max_input_channels']}")
|