Adds recording tab to the frontend for debugging audio streams.

This commit is contained in:
pober
2026-01-12 14:35:02 +01:00
committed by pstruebi
parent a88b6703fa
commit 2bc97c26af
3 changed files with 236 additions and 0 deletions

2
.gitignore vendored
View File

@@ -50,3 +50,5 @@ ch2.wav
src/auracast/available_samples.txt
src/auracast/server/stream_settings2.json
src/scripts/temperature_log*
src/auracast/server/recordings/

View File

@@ -1014,6 +1014,77 @@ with st.expander("System control", expanded=False):
except Exception as e:
st.error(f"Error calling reboot: {e}")
############################
# Record expander (collapsed)
############################
with st.expander("Record", expanded=False):
st.subheader("ALSA Device Recording")
# Get ALSA devices from backend
try:
resp = requests.get(f"{BACKEND_URL}/alsa_devices", timeout=2)
if resp.status_code == 200:
alsa_devices = resp.json().get('devices', [])
else:
alsa_devices = []
except Exception:
alsa_devices = []
# ALSA device selection dropdown
device_options = [f"{d['name']} [{d['id']}]" for d in alsa_devices]
device_name_map = {f"{d['name']} [{d['id']}]": d['name'] for d in alsa_devices}
if device_options:
selected_device = st.selectbox(
"Select ALSA Device",
device_options,
help="Choose an ALSA device for recording"
)
selected_device_name = device_name_map.get(selected_device)
else:
st.warning("No ALSA devices found.")
selected_device_name = None
# Recording controls
col_record, col_download = st.columns([1, 1])
with col_record:
if st.button("Start Recording (5s)", disabled=not selected_device_name):
try:
r = requests.post(f"{BACKEND_URL}/start_recording", json={"device": selected_device_name}, timeout=15)
if r.ok:
result = r.json()
if result.get('success'):
st.success(f"Recording completed: {result.get('filename')}")
st.session_state['last_recording'] = result.get('filename')
else:
st.error(f"Recording failed: {result.get('error', 'Unknown error')}")
else:
st.error(f"Failed to start recording: {r.status_code} {r.text}")
except Exception as e:
st.error(f"Error starting recording: {e}")
with col_download:
last_recording = st.session_state.get('last_recording')
if last_recording:
try:
# Get the recorded file for download
file_resp = requests.get(f"{BACKEND_URL}/download_recording/{last_recording}", timeout=5)
if file_resp.status_code == 200:
st.download_button(
label="Download Last Recording",
data=file_resp.content,
file_name=last_recording,
mime="audio/wav"
)
else:
st.warning("Recording file not available")
except Exception as e:
st.warning(f"Could not fetch recording: {e}")
else:
st.button("Download Last Recording", disabled=True, help="No recording available yet")
log.basicConfig(
level=os.environ.get('LOG_LEVEL', log.DEBUG),
format='%(module)s.py:%(lineno)d %(levelname)s: %(message)s'

View File

@@ -1212,6 +1212,169 @@ async def system_update():
raise HTTPException(status_code=500, detail=str(e))
# Recording functionality
RECORDINGS_DIR = os.path.join(os.path.dirname(__file__), 'recordings')
os.makedirs(RECORDINGS_DIR, exist_ok=True)
def cleanup_old_recordings(keep_latest: str = None):
"""Delete all recordings except the latest one (or specified file)."""
try:
recordings = []
for filename in os.listdir(RECORDINGS_DIR):
if filename.endswith('.wav'):
filepath = os.path.join(RECORDINGS_DIR, filename)
if os.path.isfile(filepath):
recordings.append((filename, os.path.getmtime(filepath)))
# Sort by modification time (newest first)
recordings.sort(key=lambda x: x[1], reverse=True)
# Keep only the latest recording (or the specified one)
if keep_latest and os.path.exists(os.path.join(RECORDINGS_DIR, keep_latest)):
files_to_keep = {keep_latest}
else:
files_to_keep = {recordings[0][0]} if recordings else set()
# Delete old recordings
for filename, _ in recordings:
if filename not in files_to_keep:
filepath = os.path.join(RECORDINGS_DIR, filename)
try:
os.remove(filepath)
log.info("Deleted old recording: %s", filename)
except Exception as e:
log.warning("Failed to delete recording %s: %s", filename, e)
except Exception as e:
log.warning("Error during recording cleanup: %s", e)
@app.get("/alsa_devices")
async def get_alsa_devices():
"""Get list of available ALSA input devices."""
try:
devices = []
dev_list = sd.query_devices()
for idx, dev in enumerate(dev_list):
if dev.get('max_input_channels', 0) > 0:
devices.append({
'id': idx,
'name': dev['name'],
'max_input_channels': dev['max_input_channels']
})
# Add individual Dante ASRC channels if shared device is found
dante_shared_device = None
for device in devices:
if device['name'] == 'dante_asrc_shared6':
dante_shared_device = device
break
if dante_shared_device:
# Add individual Dante ASRC channels as virtual devices
for i in range(1, 7): # ch1 to ch6
devices.append({
'id': f"dante_asrc_ch{i}",
'name': f'dante_asrc_ch{i}',
'max_input_channels': 1,
'parent_device': dante_shared_device['name'],
'parent_id': dante_shared_device['id']
})
return {"devices": devices}
except Exception as e:
log.error("Exception in /alsa_devices: %s", e, exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@app.post("/start_recording")
async def start_recording(request: dict):
"""Start a 5-second recording from the specified ALSA device."""
try:
device_name = request.get('device')
if not device_name:
raise HTTPException(status_code=400, detail="Device name is required")
# Generate filename with timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"recording_{timestamp}.wav"
filepath = os.path.join(RECORDINGS_DIR, filename)
# Determine channel count based on device type
# For other devices, try to find actual channel count
channels = 1 # Default to mono
try:
devices = sd.query_devices()
for dev in devices:
if dev['name'] == device_name and dev.get('max_input_channels', 0) > 0:
channels = dev.get('max_input_channels', 1)
break
except Exception:
pass
# Build arecord command
cmd = [
"arecord",
"-D", device_name, # Use the device name directly
"-f", "cd", # CD quality (16-bit little-endian, 44100 Hz)
"-c", str(channels), # Channel count
"-d", "5", # Duration in seconds
"-t", "wav", # WAV format
filepath
]
log.info("Starting recording with command: %s", " ".join(cmd))
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
if proc.returncode != 0:
error_msg = stderr.decode(errors="ignore").strip() if stderr else "Unknown error"
log.error("Recording failed: %s", error_msg)
raise HTTPException(status_code=500, detail=f"Recording failed: {error_msg}")
# Verify file was created and has content
if not os.path.exists(filepath) or os.path.getsize(filepath) == 0:
raise HTTPException(status_code=500, detail="Recording file was not created or is empty")
# Clean up old recordings, keeping only this new one
cleanup_old_recordings(keep_latest=filename)
log.info("Recording completed successfully: %s", filename)
return {"success": True, "filename": filename}
except HTTPException:
raise
except Exception as e:
log.error("Exception in /start_recording: %s", e, exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@app.get("/download_recording/{filename}")
async def download_recording(filename: str):
"""Download a recorded WAV file."""
try:
# Validate filename to prevent directory traversal
if not filename.endswith('.wav') or '/' in filename or '\\' in filename:
raise HTTPException(status_code=400, detail="Invalid filename")
filepath = os.path.join(RECORDINGS_DIR, filename)
if not os.path.exists(filepath):
raise HTTPException(status_code=404, detail="Recording file not found")
return FileResponse(filepath, filename=filename, media_type="audio/wav")
except HTTPException:
raise
except Exception as e:
log.error("Exception in /download_recording: %s", e, exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
if __name__ == '__main__':
import os
os.chdir(os.path.dirname(__file__))