add basic usb input mode functionallity

This commit is contained in:
2025-06-15 17:28:48 +02:00
parent d54d18987a
commit 005c3b550e
5 changed files with 287 additions and 91 deletions

View File

@@ -44,6 +44,7 @@ from bumble.profiles import bass
import bumble.device
import bumble.transport
import bumble.utils
import numpy as np # for audio down-mix
from bumble.device import Host, BIGInfoAdvertisement, AdvertisingChannelMap
from bumble.audio import io as audio_io
@@ -326,19 +327,37 @@ class Streamer():
else:
logging.warning('Streamer is already running')
def stop_streaming(self):
"""Stops the background task if running."""
if self.is_streaming:
self.is_streaming = False
if self.task:
self.task.cancel() # Cancel the task safely
self.task = None
async def stop_streaming(self):
"""Gracefully stop streaming and release audio devices."""
if not self.is_streaming and self.task is None:
return
# Ask the streaming loop to finish
self.is_streaming = False
if self.task is not None:
self.task.cancel()
try:
await self.task
except asyncio.CancelledError:
pass
self.task = None
# Close audio inputs (await to ensure ALSA devices are released)
close_tasks = []
for big in self.bigs.values():
ai = big.get("audio_input")
if ai and hasattr(ai, "close"):
close_tasks.append(ai.close())
# Remove reference so a fresh one is created next time
big.pop("audio_input", None)
if close_tasks:
await asyncio.gather(*close_tasks, return_exceptions=True)
async def stream(self):
bigs = self.bigs
big_config = self.big_config
global_config = self.global_config
# init
for i, big in enumerate(bigs.values()):
audio_source = big_config[i].audio_source
input_format = big_config[i].input_format
@@ -356,6 +375,7 @@ class Streamer():
lc3_frame_samples = encoder.get_frame_samples()
big['pcm_bit_depth'] = 16
big['lc3_frame_samples'] = lc3_frame_samples
big['lc3_bytes_per_frame'] = global_config.octets_per_frame
big['audio_input'] = audio_source
big['encoder'] = encoder
big['precoded'] = False
@@ -371,6 +391,7 @@ class Streamer():
lc3_frame_samples = encoder.get_frame_samples()
big['pcm_bit_depth'] = 16
big['lc3_frame_samples'] = lc3_frame_samples
big['lc3_bytes_per_frame'] = global_config.octets_per_frame
big['encoder'] = encoder
big['precoded'] = False
@@ -405,14 +426,14 @@ class Streamer():
pcm_format = await audio_input.open()
if pcm_format.channels != 1:
print("Only 1 channels PCM configurations are supported")
logging.error("Only 1 channels PCM configurations are supported")
return
if pcm_format.sample_type == audio_io.PcmFormat.SampleType.INT16:
pcm_bit_depth = 16
elif pcm_format.sample_type == audio_io.PcmFormat.SampleType.FLOAT32:
pcm_bit_depth = None
else:
print("Only INT16 and FLOAT32 sample types are supported")
logging.error("Only INT16 and FLOAT32 sample types are supported")
return
encoder = lc3.Encoder(
frame_duration_us=global_config.frame_duration_us,
@@ -437,73 +458,137 @@ class Streamer():
# anything else, e.g. realtime stream from device (bumble)
else:
audio_input = await audio_io.create_audio_input(audio_source, input_format)
audio_input.rewind = big_config[i].loop
pcm_format = await audio_input.open()
#try:
if pcm_format.channels != 1:
print("Only 1 channels PCM configurations are supported")
return
if pcm_format.sample_type == audio_io.PcmFormat.SampleType.INT16:
pcm_bit_depth = 16
elif pcm_format.sample_type == audio_io.PcmFormat.SampleType.FLOAT32:
pcm_bit_depth = None
else:
print("Only INT16 and FLOAT32 sample types are supported")
return
encoder = lc3.Encoder(
frame_duration_us=global_config.frame_duration_us,
sample_rate_hz=global_config.auracast_sampling_rate_hz,
num_channels=1,
input_sample_rate_hz=pcm_format.sample_rate,
)
lc3_frame_samples = encoder.get_frame_samples() # number of the pcm samples per lc3 frame
big['pcm_bit_depth'] = pcm_bit_depth
big['lc3_frame_samples'] = lc3_frame_samples
# Store early so stop_streaming can close even if open() fails
big['audio_input'] = audio_input
big['encoder'] = encoder
big['precoded'] = False
# SoundDeviceAudioInput (used for `mic:<device>` captures) has no `.rewind`.
if hasattr(audio_input, "rewind"):
audio_input.rewind = big_config[i].loop
# Need for coded an uncoded audio
lc3_frame_size = global_config.octets_per_frame #encoder.get_frame_bytes(bitrate)
lc3_bytes_per_frame = lc3_frame_size #* 2 #multiplied by number of channels
big['lc3_bytes_per_frame'] = lc3_bytes_per_frame
# Retry logic ALSA sometimes keeps the device busy for a short time after the
# previous stream has closed. Handle PortAudioError -9985 with back-off retries.
import sounddevice as _sd
max_attempts = 3
for attempt in range(1, max_attempts + 1):
try:
pcm_format = await audio_input.open()
break # success
except _sd.PortAudioError as err:
# -9985 == paDeviceUnavailable
logging.error('Could not open audio device %s with error %s', audio_source, err)
code = None
if hasattr(err, 'errno'):
code = err.errno
elif len(err.args) > 1 and isinstance(err.args[1], int):
code = err.args[1]
if code == -9985 and attempt < max_attempts:
backoff_ms = 200 * attempt
logging.warning("PortAudio device busy (attempt %d/%d). Retrying in %.1f ms…", attempt, max_attempts, backoff_ms)
# ensure device handle and PortAudio context are closed before retrying
try:
if hasattr(audio_input, "aclose"):
await audio_input.aclose()
elif hasattr(audio_input, "close"):
audio_input.close()
except Exception:
pass
# Fully terminate PortAudio to drop lingering handles (sounddevice quirk)
if hasattr(_sd, "_terminate"):
try:
_sd._terminate()
except Exception:
pass
# Small pause then re-initialize PortAudio
await asyncio.sleep(0.1)
if hasattr(_sd, "_initialize"):
try:
_sd._initialize()
except Exception:
pass
# TODO: Maybe do some pre buffering so the stream is stable from the beginning. One half iso queue would be appropriate
logging.info("Streaming audio...")
bigs = self.bigs
self.is_streaming = True
# One streamer fits all
while self.is_streaming:
stream_finished = [False for _ in range(len(bigs))]
for i, big in enumerate(bigs.values()):
# Back-off before next attempt
await asyncio.sleep(backoff_ms / 1000)
# Recreate audio_input fresh for next attempt
audio_input = await audio_io.create_audio_input(audio_source, input_format)
continue
# Other errors or final attempt re-raise so caller can abort gracefully
raise
else:
# Loop exhausted without break
logging.error("Unable to open audio device after %d attempts giving up", max_attempts)
return
if big['precoded']:# everything was already lc3 coded beforehand
lc3_frame = bytes(
itertools.islice(big['lc3_frames'], big['lc3_bytes_per_frame'])
if pcm_format.channels != 1:
logging.info("Input device provides %d channels will down-mix to mono for LC3", pcm_format.channels)
if pcm_format.sample_type == audio_io.PcmFormat.SampleType.INT16:
pcm_bit_depth = 16
elif pcm_format.sample_type == audio_io.PcmFormat.SampleType.FLOAT32:
pcm_bit_depth = None
else:
logging.error("Only INT16 and FLOAT32 sample types are supported")
return
encoder = lc3.Encoder(
frame_duration_us=global_config.frame_duration_us,
sample_rate_hz=global_config.auracast_sampling_rate_hz,
num_channels=1,
input_sample_rate_hz=pcm_format.sample_rate,
)
lc3_frame_samples = encoder.get_frame_samples() # number of the pcm samples per lc3 frame
big['pcm_bit_depth'] = pcm_bit_depth
big['channels'] = pcm_format.channels
big['lc3_frame_samples'] = lc3_frame_samples
big['lc3_bytes_per_frame'] = global_config.octets_per_frame
big['audio_input'] = audio_input
big['encoder'] = encoder
big['precoded'] = False
logging.info("Streaming audio...")
bigs = self.bigs
self.is_streaming = True
# One streamer fits all
while self.is_streaming:
stream_finished = [False for _ in range(len(bigs))]
for i, big in enumerate(bigs.values()):
if big['precoded']:# everything was already lc3 coded beforehand
lc3_frame = bytes(
itertools.islice(big['lc3_frames'], big['lc3_bytes_per_frame'])
)
if lc3_frame == b'': # Not all streams may stop at the same time
stream_finished[i] = True
continue
else: # code lc3 on the fly
pcm_frame = await anext(big['audio_input'].frames(big['lc3_frame_samples']), None)
if pcm_frame is None: # Not all streams may stop at the same time
stream_finished[i] = True
continue
# Down-mix multi-channel PCM to mono for LC3 encoder if needed
if big.get('channels', 1) > 1:
if isinstance(pcm_frame, np.ndarray):
if pcm_frame.ndim > 1:
mono = pcm_frame.mean(axis=1).astype(pcm_frame.dtype)
pcm_frame = mono
else:
# Convert raw bytes to numpy, average channels, convert back
dtype = np.int16 if big['pcm_bit_depth'] == 16 else np.float32
samples = np.frombuffer(pcm_frame, dtype=dtype)
samples = samples.reshape(-1, big['channels']).mean(axis=1)
pcm_frame = samples.astype(dtype).tobytes()
lc3_frame = big['encoder'].encode(
pcm_frame, num_bytes=big['lc3_bytes_per_frame'], bit_depth=big['pcm_bit_depth']
)
if lc3_frame == b'': # Not all streams may stop at the same time
stream_finished[i] = True
continue
else: # code lc3 on the fly
pcm_frame = await anext(big['audio_input'].frames(big['lc3_frame_samples']), None)
await big['iso_queue'].write(lc3_frame)
if pcm_frame is None: # Not all streams may stop at the same time
stream_finished[i] = True
continue
lc3_frame = big['encoder'].encode(
pcm_frame, num_bytes=big['lc3_bytes_per_frame'], bit_depth=big['pcm_bit_depth']
)
await big['iso_queue'].write(lc3_frame)
if all(stream_finished): # Take into account that multiple files have different lengths
logging.info('All streams finished, stopping streamer')
self.is_streaming = False
break
if all(stream_finished): # Take into account that multiple files have different lengths
logging.info('All streams finished, stopping streamer')
self.is_streaming = False
break
# -----------------------------------------------------------------------------

View File

@@ -52,13 +52,19 @@ class Multicaster:
self.device = device
self.is_auracast_init = True
def start_streaming(self):
async def start_streaming(self):
"""Start streaming; if an old stream is running, stop it first to release audio devices."""
if self.streamer is not None:
await self.stop_streaming()
# Brief pause to ensure ALSA/PortAudio fully releases the input device
await asyncio.sleep(0.5)
self.streamer = multicast.Streamer(self.bigs, self.global_conf, self.big_conf)
self.streamer.start_streaming()
def stop_streaming(self):
async def stop_streaming(self):
if self.streamer is not None:
self.streamer.stop_streaming()
await self.streamer.stop_streaming()
self.streamer = None
async def reset(self):
@@ -66,13 +72,23 @@ class Multicaster:
self.__init__(self.global_conf, self.big_conf)
async def shutdown(self):
# Ensure streaming is fully stopped before tearing down Bluetooth resources
if self.streamer is not None:
await self.stop_streaming()
self.is_auracast_init = False
self. is_audio_init = False
self.is_audio_init = False
for big in self.bigs.values():
if big.get('audio_input'):
if hasattr(big['audio_input'], 'aclose'):
await big['audio_input'].aclose()
if self.device:
await self.device.stop_advertising()
if self.bigs:
for big in self.bigs.values():
if big['advertising_set']:
if big.get('advertising_set'):
await big['advertising_set'].stop()
await self.device_acm.__aexit__(None, None, None) # Manually triggering teardown

View File

@@ -3,6 +3,7 @@ from itertools import filterfalse
import streamlit as st
import requests
from auracast import auracast_config
import logging as log
# Global: desired packetization time in ms for Opus (should match backend)
PTIME = 40
@@ -43,15 +44,44 @@ if audio_mode in ["Webapp", "USB"]:
quality = "High (48kHz)"
default_name = saved_settings.get('channel_names', ["Broadcast0"])[0]
default_lang = saved_settings.get('languages', ["deu"])[0]
default_input = saved_settings.get('input_device') or 'default'
stream_name = st.text_input("Channel Name", value=default_name)
language = st.text_input("Language (ISO 639-3)", value=default_lang)
# Input device selection for USB mode
if audio_mode == "USB":
try:
import sounddevice as sd # type: ignore
devs = sd.query_devices()
log.info('Found audio devices: %s', devs)
input_options = [
f"{idx}:{d['name']}"
for idx, d in enumerate(devs)
if d.get('max_input_channels', 0) > 0 and ("(hw:" in d['name'].lower() or "usb" in d['name'].lower())
]
except Exception:
input_options = []
if not input_options:
st.error("No hardware audio input devices found.")
st.stop()
if default_input not in input_options:
default_input = input_options[0]
selected_option = st.selectbox("Input Device", input_options, index=input_options.index(default_input))
# We send only the numeric/card identifier (before :) or 'default'
input_device = selected_option.split(":", 1)[0] if ":" in selected_option else selected_option
else:
input_device = None
start_stream = st.button("Start Auracast")
if start_stream:
# Prepare config using the model (do NOT send qos_config, only relevant fields)
q = quality_map[quality]
config = auracast_config.AuracastConfigGroup(
auracast_sampling_rate_hz=q['rate'],
octets_per_frame=q['octets'],
transport="auto",
bigs = [
auracast_config.AuracastBigConfig(
@@ -59,11 +89,11 @@ if audio_mode in ["Webapp", "USB"]:
program_info=f"{stream_name} {quality}",
language=language,
audio_source=(
"webrtc" if audio_mode == "Webapp" else (
"usb" if audio_mode == "USB" else "network"
f"device:{input_device}" if audio_mode == "USB" else (
"webrtc" if audio_mode == "Webapp" else "network"
)
),
input_format="auto",
input_format=(f"int16le,{q['rate']},1" if audio_mode == "USB" else "auto"),
iso_que_len=1, # TODO: this should be way less to decrease delay
sampling_frequency=q['rate'],
octets_per_frame=q['octets'],
@@ -73,7 +103,7 @@ if audio_mode in ["Webapp", "USB"]:
try:
r = requests.post(f"{BACKEND_URL}/init", json=config.model_dump())
if r.status_code == 200:
st.success("Stream initialized!")
st.success("Stream Started!")
else:
st.error(f"Failed to initialize: {r.text}")
except Exception as e:
@@ -127,3 +157,7 @@ else:
# else:
# st.error("Could not fetch advertised streams.")
log.basicConfig(
level=log.DEBUG,
format='%(module)s.py:%(lineno)d %(levelname)s: %(message)s'
)

View File

@@ -15,7 +15,6 @@ import av
import av.audio.layout
from typing import List, Set
import traceback
from auracast.utils.webrtc_audio_input import WebRTCAudioInput
# Path to persist stream settings
STREAM_SETTINGS_FILE = os.path.join(os.path.dirname(__file__), 'stream_settings.json')
@@ -78,26 +77,44 @@ async def initialize(conf: auracast_config.AuracastConfigGroup):
# persist stream settings for later retrieval
# Derive audio_mode from first BIG audio_source
first_source = conf.bigs[0].audio_source if conf.bigs else ''
audio_mode_persist = (
'Webapp' if first_source == 'webrtc' else
'USB' if first_source == 'usb' else
'Network'
)
if first_source.startswith('device:'):
audio_mode_persist = 'USB'
input_device = first_source.split(':', 1)[1] if ':' in first_source else 'default'
elif first_source == 'webrtc':
audio_mode_persist = 'Webapp'
input_device = None
else:
audio_mode_persist = 'Network'
input_device = None
save_stream_settings({
'channel_names': [big.name for big in conf.bigs],
'languages': [big.language for big in conf.bigs],
'audio_mode': audio_mode_persist,
'input_device': input_device,
'timestamp': datetime.utcnow().isoformat()
})
global_config_group = conf
# If there is an existing multicaster, cleanly shut it down first so audio devices are released
if multicaster is not None:
try:
await multicaster.shutdown()
except Exception:
log.warning("Failed to shutdown previous multicaster", exc_info=True)
log.info(
'Initializing multicaster with config:\n %s', conf.model_dump_json(indent=2)
)
# TODO: check if multicaster is already initialized
multicaster = multicast_control.Multicaster(
conf,
conf.bigs,
)
await multicaster.init_broadcast()
# Auto-start streaming for USB microphone mode
if any(big.audio_source.startswith('device:') for big in conf.bigs):
await multicaster.start_streaming()
except Exception as e:
log.error("Exception in /init: %s", traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e))
@@ -115,7 +132,7 @@ async def send_audio(audio_data: dict[str, str]):
big.audio_source = audio_data[big.language].encode('latin-1') # TODO: use base64 encoding
multicaster.big_conf = global_config_group.bigs
multicaster.start_streaming()
await multicaster.start_streaming()
return {"status": "audio_sent"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@@ -125,7 +142,7 @@ async def send_audio(audio_data: dict[str, str]):
async def stop_audio():
"""Stops streaming."""
try:
multicaster.stop_streaming()
await multicaster.stop_streaming()
return {"status": "stopped"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@@ -162,7 +179,7 @@ async def offer(offer: Offer):
# create directory for records - only for testing
os.makedirs("./records", exist_ok=True)
multicaster.start_streaming()
await multicaster.start_streaming()
@pc.on("track")
async def on_track(track: MediaStreamTrack):
log.info(f"{id_}: track {track.kind} received")
@@ -257,9 +274,9 @@ async def offer(offer: Offer):
@app.post("/shutdown")
async def shutdown():
"""Stops broadcasting."""
"""Stops broadcasting and releases all audio/Bluetooth resources."""
try:
await multicaster.reset()
await multicaster.shutdown()
return {"status": "stopped"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))

View File

@@ -0,0 +1,44 @@
"""Utility to diagnose Bumble SoundDeviceAudioInput compatibility.
Run inside the project venv:
python -m tests.usb_audio_diag [rate]
It enumerates all PortAudio input devices and tries to open each with Bumble's
create_audio_input using the URI pattern `device:<index>` with an explicit input_format of `int16le,<rate>,1`.
"""
from __future__ import annotations
import asyncio
import sys
import sounddevice as sd # type: ignore
from bumble.audio import io as audio_io # type: ignore
RATE = int(sys.argv[1]) if len(sys.argv) > 1 else 48000
aSYNC = asyncio.run
async def try_device(index: int, rate: int = 48000) -> None:
input_uri = f"device:{index}"
try:
audio_input = await audio_io.create_audio_input(input_uri, f"int16le,{rate},1")
fmt = await audio_input.open()
print(f"\033[32m✔ {input_uri} -> {fmt.channels}ch @ {fmt.sample_rate}Hz\033[0m")
if hasattr(audio_input, "aclose"):
await audio_input.aclose()
except Exception as exc: # pylint: disable=broad-except
print(f"\033[31m✗ {input_uri}: {exc}\033[0m")
async def main() -> None:
print(f"Trying PortAudio input devices with rate {RATE} Hz\n")
for idx, dev in enumerate(sd.query_devices()):
if dev["max_input_channels"] > 0 and "(hw:" in dev["name"].lower():
name = dev["name"]
print(f"[{idx}] {name}")
await try_device(idx, RATE)
print()
if __name__ == "__main__":
aSYNC(main())