feature/autostart (#7)
implement auto restart of last stream fix aes67 streaming basic webinterface and many more Co-authored-by: pstruebi <struebin.patrick.com> Reviewed-on: https://gitea.pstruebi.xyz/auracaster/bumble-auracast/pulls/7
This commit was merged in pull request #7.
This commit is contained in:
96
src/auracast/utils/frontend_auth.py
Normal file
96
src/auracast/utils/frontend_auth.py
Normal file
@@ -0,0 +1,96 @@
|
||||
import os
|
||||
import json
|
||||
import base64
|
||||
import hashlib
|
||||
import hmac
|
||||
from pathlib import Path
|
||||
from typing import Optional, Tuple, Dict
|
||||
|
||||
__all__ = [
|
||||
"is_pw_disabled",
|
||||
"state_dir",
|
||||
"pw_file_path",
|
||||
"ensure_state_dir",
|
||||
"hash_password",
|
||||
"save_pw_record",
|
||||
"load_pw_record",
|
||||
"verify_password",
|
||||
]
|
||||
|
||||
|
||||
# Environment-controlled bypass
|
||||
|
||||
def is_pw_disabled() -> bool:
|
||||
val = os.getenv("DISABLE_FRONTEND_PW", "")
|
||||
return str(val).strip().lower() in ("1", "true", "yes", "on")
|
||||
|
||||
|
||||
# Storage paths and permissions
|
||||
|
||||
def state_dir() -> Path:
|
||||
custom = os.getenv("AURACAST_STATE_DIR")
|
||||
if custom:
|
||||
return Path(custom).expanduser()
|
||||
return Path.home() / ".config" / "auracast"
|
||||
|
||||
|
||||
def pw_file_path() -> Path:
|
||||
return state_dir() / "frontend_pw.json"
|
||||
|
||||
|
||||
def ensure_state_dir() -> None:
|
||||
d = state_dir()
|
||||
d.mkdir(parents=True, exist_ok=True)
|
||||
try:
|
||||
os.chmod(d, 0o700)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
# Hashing and verification
|
||||
|
||||
def hash_password(password: str, salt: Optional[bytes] = None) -> Tuple[bytes, bytes]:
|
||||
if salt is None:
|
||||
salt = os.urandom(16)
|
||||
key = hashlib.pbkdf2_hmac("sha256", password.encode("utf-8"), salt, 150_000, dklen=32)
|
||||
return salt, key
|
||||
|
||||
|
||||
def save_pw_record(salt: bytes, key: bytes) -> None:
|
||||
ensure_state_dir()
|
||||
rec = {
|
||||
"salt": base64.b64encode(salt).decode("ascii"),
|
||||
"key": base64.b64encode(key).decode("ascii"),
|
||||
"kdf": "pbkdf2_sha256",
|
||||
"iterations": 150000,
|
||||
}
|
||||
p = pw_file_path()
|
||||
p.write_text(json.dumps(rec))
|
||||
try:
|
||||
os.chmod(p, 0o600)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
def load_pw_record() -> Optional[Dict]:
|
||||
p = pw_file_path()
|
||||
if not p.exists():
|
||||
return None
|
||||
try:
|
||||
rec = json.loads(p.read_text())
|
||||
if "salt" in rec and "key" in rec:
|
||||
return rec
|
||||
except Exception:
|
||||
return None
|
||||
return None
|
||||
|
||||
|
||||
def verify_password(password: str, rec: Dict) -> bool:
|
||||
try:
|
||||
salt = base64.b64decode(rec["salt"])
|
||||
expected = base64.b64decode(rec["key"])
|
||||
iters = int(rec.get("iterations", 150000))
|
||||
key = hashlib.pbkdf2_hmac("sha256", password.encode("utf-8"), salt, iters, dklen=32)
|
||||
return hmac.compare_digest(key, expected)
|
||||
except Exception:
|
||||
return False
|
||||
Reference in New Issue
Block a user