""" multicast_script ================= Loads environment variables from a .env file located next to this script and configures the multicast broadcast. Only UPPERCASE keys are read. This version uses ALSA backend for USB audio devices (no PipeWire required). Environment variables --------------------- - LOG_LEVEL: Logging level for the script. Default: INFO. Examples: DEBUG, INFO, WARNING, ERROR. - BROADCAST_NAME: Name of the broadcast (Auracast BIG name). Default: "Broadcast0". - PROGRAM_INFO: Free-text program/broadcast info. Default: "Some Announcements". - LANGUATE: ISO 639-3 language code used by config (intentional key name). Default: "deu". Examples (.env) --------------- LOG_LEVEL=DEBUG BROADCAST_NAME=MyBroadcast PROGRAM_INFO="Live announcements" LANGUATE=deu""" import logging import os import time from dotenv import load_dotenv from auracast import multicast from auracast import auracast_config from auracast.utils.sounddevice_utils import ( get_alsa_usb_inputs, get_network_pw_inputs, # PipeWire network (AES67) inputs refresh_pw_cache, resolve_input_device_index, ) if __name__ == "__main__": logging.basicConfig( #export LOG_LEVEL=DEBUG level=os.environ.get('LOG_LEVEL', logging.INFO), format='%(module)s.py:%(lineno)d %(levelname)s: %(message)s' ) os.chdir(os.path.dirname(__file__)) # Load .env located next to this script (only uppercase keys will be referenced) load_dotenv(dotenv_path='.env') # Refresh PipeWire cache and list devices (Network only for PW; USB via ALSA) try: refresh_pw_cache() except Exception: pass pw_net = get_network_pw_inputs() # List PipeWire Network inputs logging.info("PipeWire Network inputs:") for i, d in pw_net: logging.info(f"{i}: {d['name']} in={d.get('max_input_channels', 0)}") # Also list USB ALSA inputs (fallback path) usb_inputs = get_alsa_usb_inputs() logging.info("USB ALSA inputs:") for i, d in usb_inputs: logging.info(f"{i}: {d['name']} in={d['max_input_channels']}") # Optional device selection via .env: match by string or substring (uppercase keys only) device_match = os.environ.get('INPUT_DEVICE') device_match = device_match.strip() if isinstance(device_match, str) else None # Loop until a device becomes available (prefer PW Network, then ALSA USB) selected_dev = None while True: try: refresh_pw_cache() except Exception: pass pw_net = get_network_pw_inputs() # 1) Try to satisfy explicit .env match on PW Network if device_match and pw_net: for _, d in pw_net: name = d.get('name', '') if device_match in name: idx = resolve_input_device_index(name) if idx is not None: input_sel = idx selected_dev = d logging.info(f"Selected Network input by match '{device_match}' (PipeWire): index={input_sel}, device={name}") break if selected_dev is not None: break if pw_net and selected_dev is None: _, d0 = pw_net[0] idx = resolve_input_device_index(d0.get('name', '')) if idx is not None: input_sel = idx selected_dev = d0 logging.info(f"Selected first Network input (PipeWire): index={input_sel}, device={d0['name']}") break current_alsa = get_alsa_usb_inputs() # 2) Try to satisfy explicit .env match on ALSA USB if device_match and current_alsa and selected_dev is None: matched = None for idx_alsa, d in current_alsa: name = d.get('name', '') if device_match in name: matched = (idx_alsa, d) break if matched is not None: input_sel, selected_dev = matched logging.info(f"Selected USB input by match '{device_match}' (ALSA): index={input_sel}, device={selected_dev['name']}") break # Fallback to first ALSA USB if current_alsa and selected_dev is None: input_sel, selected_dev = current_alsa[0] logging.info(f"Selected first USB input (ALSA): index={input_sel}, device={selected_dev['name']}") break logging.info("Waiting for audio input (prefer PW Network, then ALSA USB)... retrying in 2s") time.sleep(2) TRANSPORT1 = 'serial:/dev/ttyAMA3,1000000,rtscts' # transport for raspberry pi gpio header TRANSPORT2 = 'serial:/dev/ttyAMA4,1000000,rtscts' # transport for raspberry pi gpio header # Capture at 48 kHz to avoid resampler latency; encode LC3 at 24 kHz CAPTURE_SRATE = 48000 LC3_SRATE = 24000 OCTETS_PER_FRAME=60 # Read uppercase-only settings from environment/.env broadcast_name = os.environ.get('BROADCAST_NAME', 'Broadcast0') program_info = os.environ.get('PROGRAM_INFO', 'Some Announcements') # Note: 'LANGUATE' (typo) is intentionally used as requested, maps to config.language language = os.environ.get('LANGUATE', 'deu') # Determine capture channel count based on selected device (prefer up to 2) try: max_in = int((selected_dev or {}).get('max_input_channels', 1)) except Exception: max_in = 1 channels = max(1, min(2, max_in)) config = auracast_config.AuracastConfigGroup( bigs = [ auracast_config.AuracastBigConfig( name=broadcast_name, program_info=program_info, language=language, iso_que_len=1, audio_source=f'device:{input_sel}', input_format=f"int16le,{CAPTURE_SRATE},{channels}", sampling_frequency=LC3_SRATE, octets_per_frame=OCTETS_PER_FRAME, ), #auracast_config.AuracastBigConfigEng(), ], immediate_rendering=False, presentation_delay_us=40000, qos_config=auracast_config.AuracastQosRobust(), auracast_sampling_rate_hz = LC3_SRATE, octets_per_frame = OCTETS_PER_FRAME, transport=TRANSPORT1, enable_adaptive_frame_dropping=True, ) config.debug = False logging.info(config.model_dump_json(indent=2)) multicast.run_async( multicast.broadcast( config, config.bigs ) )