analog_input_gain (#21)

- add input boost slider
- add level meter

Reviewed-on: https://gitea.pstruebi.xyz/auracaster/bumble-auracast/pulls/21
This commit was merged in pull request #21.
This commit is contained in:
2026-02-12 17:09:46 +01:00
parent 3322b9edf4
commit f5f93b4b8e
4 changed files with 95 additions and 7 deletions

View File

@@ -638,6 +638,12 @@ class Streamer():
except Exception:
pass
def get_audio_levels(self) -> list[float]:
"""Return current RMS audio levels (0.0-1.0) for each BIG."""
if not self.bigs:
return []
return [big.get('_audio_level_rms', 0.0) for big in self.bigs.values()]
async def stream(self):
bigs = self.bigs
@@ -867,6 +873,11 @@ class Streamer():
np.clip(pcm_arr, -32768, 32767, out=pcm_arr)
pcm_frame = pcm_arr.astype(np.int16).tobytes()
# Compute RMS audio level (normalized 0.0-1.0) for level monitoring
pcm_samples = np.frombuffer(pcm_frame, dtype=np.int16).astype(np.float32)
rms = np.sqrt(np.mean(pcm_samples ** 2)) / 32768.0 if len(pcm_samples) > 0 else 0.0
big['_audio_level_rms'] = float(rms)
# Measure LC3 encoding time
t1 = time.perf_counter()
num_bis = big.get('num_bis', 1)

View File

@@ -37,6 +37,12 @@ class Multicaster:
'is_initialized': self.is_auracast_init,
'is_streaming': streaming,
}
def get_audio_levels(self) -> list[float]:
"""Return current RMS audio levels (0.0-1.0) for each BIG."""
if self.streamer is not None and self.streamer.is_streaming:
return self.streamer.get_audio_levels()
return []
async def init_broadcast(self):
self.device_acm = multicast.create_device(self.global_conf)

View File

@@ -1,6 +1,7 @@
# frontend/app.py
import os
import time
import math
import logging as log
from PIL import Image
@@ -197,22 +198,28 @@ else:
start_stream, stop_stream = render_stream_controls(is_streaming, "Start Auracast", "Stop Auracast", running_mode, secondary_is_streaming)
# Analog gain control (only for Analog mode, placed below start button)
analog_gain_value = 50 # default
analog_gain_value = 50 # default (ALSA 10-60 range)
software_boost_db = 0 # default
if audio_mode == "Analog":
saved_analog_gain = saved_settings.get('analog_gain', 50)
analog_gain_value = st.slider(
# Convert persisted ALSA value (10-60) to display value (0-100)
saved_display = int(round((saved_analog_gain - 10) * 100 / 50))
saved_display = max(0, min(100, saved_display))
analog_gain_display = st.slider(
"Analog Input Gain",
min_value=10,
max_value=60,
value=min(saved_analog_gain, 60),
min_value=0,
max_value=100,
value=saved_display,
step=5,
disabled=is_streaming,
help="ADC gain level for both analog inputs (10-60%). Default is 50%."
format="%d%%",
help="ADC gain level for both analog inputs. Default is 80%."
)
# Map display value (0-100) back to ALSA range (10-60)
analog_gain_value = int(round(10 + analog_gain_display * 50 / 100))
saved_boost = saved_settings.get('software_boost_db', 0)
software_boost_db = st.slider(
"Software Boost (dB)",
"Boost",
min_value=0,
max_value=20,
value=min(int(saved_boost), 20),
@@ -221,6 +228,56 @@ if audio_mode == "Analog":
help="Digital gain boost applied before encoding (0-20 dB). Use this when the line-level signal is too quiet even at max ADC gain. Higher values may cause clipping on loud signals."
)
# Audio level monitor (checkbox, not persisted across reloads)
show_level_monitor = st.checkbox("Audio level monitor", value=False, disabled=not is_streaming,
help="Show real-time audio level meters for active radios. Only works while streaming.")
if show_level_monitor and is_streaming:
@st.fragment(run_every=0.2)
def _audio_level_fragment():
cols = st.columns(2)
# Radio 1
with cols[0]:
try:
r = requests.get(f"{BACKEND_URL}/audio_level", timeout=0.2)
levels = r.json().get("levels", []) if r.ok else []
except Exception:
levels = []
if levels:
rms = max(levels)
db = max(-60.0, 20.0 * (math.log10(rms) if rms > 0 else -3.0))
pct = int(max(0, min(100, (db + 60) * 100 / 60)))
st.markdown(
f"**Radio 1**"
f'<div style="background:#333;border-radius:4px;height:18px;width:100%;margin-top:4px;">'
f'<div style="background:#2ecc71;height:100%;width:{pct}%;border-radius:4px;transition:width 0.15s;"></div>'
f'</div>',
unsafe_allow_html=True,
)
else:
st.markdown("**Radio 1** &nbsp; --")
# Radio 2
with cols[1]:
try:
r2 = requests.get(f"{BACKEND_URL}/audio_level2", timeout=0.2)
levels2 = r2.json().get("levels", []) if r2.ok else []
except Exception:
levels2 = []
if levels2:
rms2 = max(levels2)
db2 = max(-60.0, 20.0 * (math.log10(rms2) if rms2 > 0 else -3.0))
pct2 = int(max(0, min(100, (db2 + 60) * 100 / 60)))
st.markdown(
f"**Radio 2**"
f'<div style="background:#333;border-radius:4px;height:18px;width:100%;margin-top:4px;">'
f'<div style="background:#2ecc71;height:100%;width:{pct2}%;border-radius:4px;transition:width 0.15s;"></div>'
f'</div>',
unsafe_allow_html=True,
)
else:
st.markdown("**Radio 2** &nbsp; --")
_audio_level_fragment()
# Placeholder for validation errors (will be filled in later)
validation_error_placeholder = st.empty()

View File

@@ -594,6 +594,20 @@ async def get_status():
return status
@app.get("/audio_level")
async def get_audio_level():
"""Return current RMS audio levels for primary radio (lightweight, for polling)."""
if multicaster1 is None:
return {"levels": []}
return {"levels": multicaster1.get_audio_levels()}
@app.get("/audio_level2")
async def get_audio_level2():
"""Return current RMS audio levels for secondary radio (lightweight, for polling)."""
if multicaster2 is None:
return {"levels": []}
return {"levels": multicaster2.get_audio_levels()}
async def _autostart_from_settings():
settings1 = load_stream_settings() or {}
settings2 = load_stream_settings2() or {}