add passwort for the frontend
This commit is contained in:
+2
-2
@@ -35,10 +35,10 @@ env/
|
||||
__pycache__/
|
||||
|
||||
# Exclude .env file from all platforms
|
||||
*/.env
|
||||
*.env
|
||||
|
||||
wg_config/wg_confs/
|
||||
records/
|
||||
records/DISABLE_FRONTEND_PW
|
||||
src/auracast/server/stream_settings.json
|
||||
src/auracast/server/certs/per_device/
|
||||
src/auracast/.env
|
||||
|
||||
@@ -5,14 +5,73 @@ import streamlit as st
|
||||
import requests
|
||||
from auracast import auracast_config
|
||||
import logging as log
|
||||
from dotenv import load_dotenv
|
||||
from auracast.utils.frontend_auth import (
|
||||
is_pw_disabled,
|
||||
load_pw_record,
|
||||
save_pw_record,
|
||||
hash_password,
|
||||
verify_password,
|
||||
)
|
||||
|
||||
# Set page configuration (tab title and icon) before using other Streamlit APIs
|
||||
st.set_page_config(page_title="Castbox", page_icon="", layout="centered")
|
||||
|
||||
# Load environment variables from a .env file if present
|
||||
load_dotenv()
|
||||
|
||||
# Track whether WebRTC stream is active across Streamlit reruns
|
||||
if 'stream_started' not in st.session_state:
|
||||
st.session_state['stream_started'] = False
|
||||
|
||||
# Frontend authentication gate is controlled via env using shared utils
|
||||
|
||||
if 'frontend_authenticated' not in st.session_state:
|
||||
st.session_state['frontend_authenticated'] = False
|
||||
|
||||
if not is_pw_disabled():
|
||||
pw_rec = load_pw_record()
|
||||
|
||||
# First-time setup: no password set -> force user to choose one
|
||||
if pw_rec is None:
|
||||
st.header("Set up your frontend password")
|
||||
st.info("For security, you must set a password on first access.")
|
||||
with st.form("first_setup_form"):
|
||||
new_pw = st.text_input("New password", type="password")
|
||||
new_pw2 = st.text_input("Confirm password", type="password")
|
||||
submitted = st.form_submit_button("Save password")
|
||||
if submitted:
|
||||
if len(new_pw) < 6:
|
||||
st.error("Password should be at least 6 characters.")
|
||||
elif new_pw != new_pw2:
|
||||
st.error("Passwords do not match.")
|
||||
else:
|
||||
salt, key = hash_password(new_pw)
|
||||
try:
|
||||
save_pw_record(salt, key)
|
||||
st.success("Password saved. You can now sign in.")
|
||||
st.rerun()
|
||||
except Exception as e:
|
||||
st.error(f"Failed to save password: {e}")
|
||||
st.stop()
|
||||
|
||||
# Normal sign-in gate
|
||||
if not st.session_state['frontend_authenticated']:
|
||||
st.header("Sign in")
|
||||
with st.form("signin_form"):
|
||||
pw = st.text_input("Password", type="password")
|
||||
submitted = st.form_submit_button("Sign in")
|
||||
if submitted:
|
||||
if verify_password(pw, pw_rec):
|
||||
st.session_state['frontend_authenticated'] = True
|
||||
st.success("Signed in.")
|
||||
st.rerun()
|
||||
else:
|
||||
st.error("Incorrect password. Please try again.")
|
||||
# Stop rendering the rest of the app until authenticated
|
||||
if not st.session_state['frontend_authenticated']:
|
||||
st.stop()
|
||||
|
||||
# Global: desired packetization time in ms for Opus (should match backend)
|
||||
PTIME = 40
|
||||
BACKEND_URL = "http://localhost:5000"
|
||||
@@ -444,6 +503,35 @@ else:
|
||||
# else:
|
||||
# st.error("Could not fetch advertised streams.")
|
||||
|
||||
############################
|
||||
# System expander (collapsed)
|
||||
############################
|
||||
with st.expander("System", expanded=False):
|
||||
if is_pw_disabled():
|
||||
st.info("Frontend password protection is disabled via DISABLE_FRONTEND_PW.")
|
||||
else:
|
||||
st.subheader("Change password")
|
||||
with st.form("change_pw_form"):
|
||||
cur = st.text_input("Current password", type="password")
|
||||
new1 = st.text_input("New password", type="password")
|
||||
new2 = st.text_input("Confirm new password", type="password")
|
||||
submit_change = st.form_submit_button("Change password")
|
||||
if submit_change:
|
||||
rec = load_pw_record()
|
||||
if not rec or not verify_password(cur, rec):
|
||||
st.error("Current password is incorrect.")
|
||||
elif len(new1) < 6:
|
||||
st.error("New password should be at least 6 characters.")
|
||||
elif new1 != new2:
|
||||
st.error("New passwords do not match.")
|
||||
else:
|
||||
salt, key = hash_password(new1)
|
||||
try:
|
||||
save_pw_record(salt, key)
|
||||
st.success("Password updated.")
|
||||
except Exception as e:
|
||||
st.error(f"Failed to update password: {e}")
|
||||
|
||||
log.basicConfig(
|
||||
level=os.environ.get('LOG_LEVEL', log.DEBUG),
|
||||
format='%(module)s.py:%(lineno)d %(levelname)s: %(message)s'
|
||||
|
||||
@@ -0,0 +1,96 @@
|
||||
import os
|
||||
import json
|
||||
import base64
|
||||
import hashlib
|
||||
import hmac
|
||||
from pathlib import Path
|
||||
from typing import Optional, Tuple, Dict
|
||||
|
||||
__all__ = [
|
||||
"is_pw_disabled",
|
||||
"state_dir",
|
||||
"pw_file_path",
|
||||
"ensure_state_dir",
|
||||
"hash_password",
|
||||
"save_pw_record",
|
||||
"load_pw_record",
|
||||
"verify_password",
|
||||
]
|
||||
|
||||
|
||||
# Environment-controlled bypass
|
||||
|
||||
def is_pw_disabled() -> bool:
|
||||
val = os.getenv("DISABLE_FRONTEND_PW", "")
|
||||
return str(val).strip().lower() in ("1", "true", "yes", "on")
|
||||
|
||||
|
||||
# Storage paths and permissions
|
||||
|
||||
def state_dir() -> Path:
|
||||
custom = os.getenv("AURACAST_STATE_DIR")
|
||||
if custom:
|
||||
return Path(custom).expanduser()
|
||||
return Path.home() / ".config" / "auracast"
|
||||
|
||||
|
||||
def pw_file_path() -> Path:
|
||||
return state_dir() / "frontend_pw.json"
|
||||
|
||||
|
||||
def ensure_state_dir() -> None:
|
||||
d = state_dir()
|
||||
d.mkdir(parents=True, exist_ok=True)
|
||||
try:
|
||||
os.chmod(d, 0o700)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
# Hashing and verification
|
||||
|
||||
def hash_password(password: str, salt: Optional[bytes] = None) -> Tuple[bytes, bytes]:
|
||||
if salt is None:
|
||||
salt = os.urandom(16)
|
||||
key = hashlib.pbkdf2_hmac("sha256", password.encode("utf-8"), salt, 150_000, dklen=32)
|
||||
return salt, key
|
||||
|
||||
|
||||
def save_pw_record(salt: bytes, key: bytes) -> None:
|
||||
ensure_state_dir()
|
||||
rec = {
|
||||
"salt": base64.b64encode(salt).decode("ascii"),
|
||||
"key": base64.b64encode(key).decode("ascii"),
|
||||
"kdf": "pbkdf2_sha256",
|
||||
"iterations": 150000,
|
||||
}
|
||||
p = pw_file_path()
|
||||
p.write_text(json.dumps(rec))
|
||||
try:
|
||||
os.chmod(p, 0o600)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
def load_pw_record() -> Optional[Dict]:
|
||||
p = pw_file_path()
|
||||
if not p.exists():
|
||||
return None
|
||||
try:
|
||||
rec = json.loads(p.read_text())
|
||||
if "salt" in rec and "key" in rec:
|
||||
return rec
|
||||
except Exception:
|
||||
return None
|
||||
return None
|
||||
|
||||
|
||||
def verify_password(password: str, rec: Dict) -> bool:
|
||||
try:
|
||||
salt = base64.b64decode(rec["salt"])
|
||||
expected = base64.b64decode(rec["key"])
|
||||
iters = int(rec.get("iterations", 150000))
|
||||
key = hashlib.pbkdf2_hmac("sha256", password.encode("utf-8"), salt, iters, dklen=32)
|
||||
return hmac.compare_digest(key, expected)
|
||||
except Exception:
|
||||
return False
|
||||
Reference in New Issue
Block a user