Files
gstreamer-test/src/aes67_source.py
2025-07-11 15:17:53 +02:00

175 lines
5.2 KiB
Python

#!/usr/bin/env python3
"""
AES67 Source Example (Microphone to Network RTP @ 48 kHz)
This script enumerates local microphone devices (GStreamer "Audio/Source")
and lets you pick one to publish as an AES67-compatible RTP multicast stream.
It also advertises the stream via SAP so that receivers on the same subnet can
automatically discover and subscribe.
Tested with GStreamer 1.24 and Python 3.12.
Usage:
python aes67_source.py [--multicast 239.255.0.1] [--port 5004]
Requirements:
• PyGObject bindings for GStreamer (pip install pygobject)
• The corresponding GStreamer runtime with "good" plugins.
Copyright 2025 © Patrick Struebi
"""
from __future__ import annotations
import argparse
import sys
from typing import List
from sap_discovery import discover_sinks
import gi
gi.require_version("Gst", "1.0")
from gi.repository import Gst # noqa: E402
from gi.repository import GLib # noqa: E402
Gst.init(None)
def enumerate_audio_sources() -> List[Gst.Device]:
"""Return a list of microphone devices recognised by GStreamer."""
monitor = Gst.DeviceMonitor()
# Filter for capture sources (microphones)
monitor.add_filter("Audio/Source")
monitor.start()
devices = monitor.get_devices()
monitor.stop()
return devices
def prompt_for_device(devices: List[Gst.Device], prompt: str) -> Gst.Device:
if not devices:
print("No audio input devices found.")
sys.exit(1)
if len(devices) == 1:
print(f"Auto-selected only available input device: {devices[0].get_display_name()}")
return devices[0]
print(prompt)
for idx, dev in enumerate(devices):
print(f"[{idx}] {dev.get_display_name()}")
while True:
try:
choice = int(input("Select device #: "))
if 0 <= choice < len(devices):
return devices[choice]
except ValueError:
pass
print("Invalid selection, try again.")
def build_pipeline(device: Gst.Device, multicast: str, port: int) -> Gst.Pipeline:
"""Construct the GStreamer pipeline string and create a pipeline."""
# Attempt to extract a gst-launch-compatible device string
dev_props = device.get_properties() or {}
device_str = None
if dev_props:
# Common property names depending on backend
for key in ("device", "device.path", "device.string"):
if dev_props.has_field(key):
device_str = dev_props.get_string(key)
break
# Fallback to auto-selection if we cannot determine the exact string
if device_str:
src = f"pulsesrc device={device_str}"
else:
src = "autoaudiosrc"
# AES67 requires 48 kHz, 16-bit, 2-channel PCM
pipeline_description = (
f"{src} ! audioconvert ! audioresample ! audio/x-raw,channels=2,rate=48000 "
"! rtpL16pay ! multiudpsink clients="
f"{multicast}:{port} sync=false"
)
print("GStreamer pipeline:")
print(pipeline_description, "\n")
pipeline = Gst.parse_launch(pipeline_description)
return pipeline # type: ignore[return-value]
def prompt_for_sink(sinks):
if not sinks:
print("No sinks discovered via SAP announcements.")
sys.exit(1)
if len(sinks) == 1:
print(f"Auto-selected only discovered sink: {sinks[0].get('name', 'Unknown')} @ {sinks[0].get('address', '?')}:{sinks[0].get('port', '?')}")
return sinks[0]
print("Discovered sinks:")
for idx, sink in enumerate(sinks):
name = sink.get('name', 'Unknown')
addr = sink.get('address', '?')
port = sink.get('port', '?')
print(f"[{idx}] {name} @ {addr}:{port}")
while True:
try:
sel = int(input("Select sink #: "))
if 0 <= sel < len(sinks):
return sinks[sel]
except ValueError:
pass
print("Invalid selection, try again.")
def main():
parser = argparse.ArgumentParser(description="AES67 Source (Microphone)")
parser.add_argument("--sap-wait", type=float, default=5.0, help="Seconds to listen for SAP sinks")
args = parser.parse_args()
print("Listening for SAP announcements...")
sinks = discover_sinks(timeout=args.sap_wait)
sink = prompt_for_sink(sinks)
multicast = sink['address']
port = int(sink['port'])
devices = enumerate_audio_sources()
mic = prompt_for_device(devices, "Available microphone devices:")
pipeline = build_pipeline(mic, multicast, port)
loop = GLib.MainLoop()
def on_message(_, msg):
t = msg.type
if t == Gst.MessageType.EOS:
print("End-of-Stream reached.")
loop.quit()
elif t == Gst.MessageType.ERROR:
err, dbg = msg.parse_error()
print("Pipeline error:", err, dbg)
loop.quit()
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", on_message)
ret = pipeline.set_state(Gst.State.PLAYING)
if ret == Gst.StateChangeReturn.FAILURE:
print("Failed to start pipeline.")
sys.exit(1)
print(f"Streaming to {multicast}:{port}… Press Ctrl+C to stop.")
try:
loop.run()
except KeyboardInterrupt:
pass
finally:
pipeline.set_state(Gst.State.NULL)
if __name__ == "__main__":
main()