diff --git a/.gitignore b/.gitignore index 6d6cf83..68ac071 100644 --- a/.gitignore +++ b/.gitignore @@ -35,10 +35,10 @@ env/ __pycache__/ # Exclude .env file from all platforms -*/.env +*.env wg_config/wg_confs/ -records/ +records/DISABLE_FRONTEND_PW src/auracast/server/stream_settings.json src/auracast/server/certs/per_device/ src/auracast/.env diff --git a/src/auracast/server/multicast_frontend.py b/src/auracast/server/multicast_frontend.py index c8f726f..1c78900 100644 --- a/src/auracast/server/multicast_frontend.py +++ b/src/auracast/server/multicast_frontend.py @@ -5,14 +5,73 @@ import streamlit as st import requests from auracast import auracast_config import logging as log +from dotenv import load_dotenv +from auracast.utils.frontend_auth import ( + is_pw_disabled, + load_pw_record, + save_pw_record, + hash_password, + verify_password, +) # Set page configuration (tab title and icon) before using other Streamlit APIs st.set_page_config(page_title="Castbox", page_icon="", layout="centered") +# Load environment variables from a .env file if present +load_dotenv() + # Track whether WebRTC stream is active across Streamlit reruns if 'stream_started' not in st.session_state: st.session_state['stream_started'] = False +# Frontend authentication gate is controlled via env using shared utils + +if 'frontend_authenticated' not in st.session_state: + st.session_state['frontend_authenticated'] = False + +if not is_pw_disabled(): + pw_rec = load_pw_record() + + # First-time setup: no password set -> force user to choose one + if pw_rec is None: + st.header("Set up your frontend password") + st.info("For security, you must set a password on first access.") + with st.form("first_setup_form"): + new_pw = st.text_input("New password", type="password") + new_pw2 = st.text_input("Confirm password", type="password") + submitted = st.form_submit_button("Save password") + if submitted: + if len(new_pw) < 6: + st.error("Password should be at least 6 characters.") + elif new_pw != new_pw2: + st.error("Passwords do not match.") + else: + salt, key = hash_password(new_pw) + try: + save_pw_record(salt, key) + st.success("Password saved. You can now sign in.") + st.rerun() + except Exception as e: + st.error(f"Failed to save password: {e}") + st.stop() + + # Normal sign-in gate + if not st.session_state['frontend_authenticated']: + st.header("Sign in") + with st.form("signin_form"): + pw = st.text_input("Password", type="password") + submitted = st.form_submit_button("Sign in") + if submitted: + if verify_password(pw, pw_rec): + st.session_state['frontend_authenticated'] = True + st.success("Signed in.") + st.rerun() + else: + st.error("Incorrect password. Please try again.") + # Stop rendering the rest of the app until authenticated + if not st.session_state['frontend_authenticated']: + st.stop() + # Global: desired packetization time in ms for Opus (should match backend) PTIME = 40 BACKEND_URL = "http://localhost:5000" @@ -444,6 +503,35 @@ else: # else: # st.error("Could not fetch advertised streams.") +############################ +# System expander (collapsed) +############################ +with st.expander("System", expanded=False): + if is_pw_disabled(): + st.info("Frontend password protection is disabled via DISABLE_FRONTEND_PW.") + else: + st.subheader("Change password") + with st.form("change_pw_form"): + cur = st.text_input("Current password", type="password") + new1 = st.text_input("New password", type="password") + new2 = st.text_input("Confirm new password", type="password") + submit_change = st.form_submit_button("Change password") + if submit_change: + rec = load_pw_record() + if not rec or not verify_password(cur, rec): + st.error("Current password is incorrect.") + elif len(new1) < 6: + st.error("New password should be at least 6 characters.") + elif new1 != new2: + st.error("New passwords do not match.") + else: + salt, key = hash_password(new1) + try: + save_pw_record(salt, key) + st.success("Password updated.") + except Exception as e: + st.error(f"Failed to update password: {e}") + log.basicConfig( level=os.environ.get('LOG_LEVEL', log.DEBUG), format='%(module)s.py:%(lineno)d %(levelname)s: %(message)s' diff --git a/src/auracast/utils/frontend_auth.py b/src/auracast/utils/frontend_auth.py new file mode 100644 index 0000000..e1d4111 --- /dev/null +++ b/src/auracast/utils/frontend_auth.py @@ -0,0 +1,96 @@ +import os +import json +import base64 +import hashlib +import hmac +from pathlib import Path +from typing import Optional, Tuple, Dict + +__all__ = [ + "is_pw_disabled", + "state_dir", + "pw_file_path", + "ensure_state_dir", + "hash_password", + "save_pw_record", + "load_pw_record", + "verify_password", +] + + +# Environment-controlled bypass + +def is_pw_disabled() -> bool: + val = os.getenv("DISABLE_FRONTEND_PW", "") + return str(val).strip().lower() in ("1", "true", "yes", "on") + + +# Storage paths and permissions + +def state_dir() -> Path: + custom = os.getenv("AURACAST_STATE_DIR") + if custom: + return Path(custom).expanduser() + return Path.home() / ".config" / "auracast" + + +def pw_file_path() -> Path: + return state_dir() / "frontend_pw.json" + + +def ensure_state_dir() -> None: + d = state_dir() + d.mkdir(parents=True, exist_ok=True) + try: + os.chmod(d, 0o700) + except Exception: + pass + + +# Hashing and verification + +def hash_password(password: str, salt: Optional[bytes] = None) -> Tuple[bytes, bytes]: + if salt is None: + salt = os.urandom(16) + key = hashlib.pbkdf2_hmac("sha256", password.encode("utf-8"), salt, 150_000, dklen=32) + return salt, key + + +def save_pw_record(salt: bytes, key: bytes) -> None: + ensure_state_dir() + rec = { + "salt": base64.b64encode(salt).decode("ascii"), + "key": base64.b64encode(key).decode("ascii"), + "kdf": "pbkdf2_sha256", + "iterations": 150000, + } + p = pw_file_path() + p.write_text(json.dumps(rec)) + try: + os.chmod(p, 0o600) + except Exception: + pass + + +def load_pw_record() -> Optional[Dict]: + p = pw_file_path() + if not p.exists(): + return None + try: + rec = json.loads(p.read_text()) + if "salt" in rec and "key" in rec: + return rec + except Exception: + return None + return None + + +def verify_password(password: str, rec: Dict) -> bool: + try: + salt = base64.b64decode(rec["salt"]) + expected = base64.b64decode(rec["key"]) + iters = int(rec.get("iterations", 150000)) + key = hashlib.pbkdf2_hmac("sha256", password.encode("utf-8"), salt, iters, dklen=32) + return hmac.compare_digest(key, expected) + except Exception: + return False