Files
castbox-provisioning/src/provision.py

545 lines
19 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
import ipaddress, os, re, subprocess, tempfile, json, datetime, shlex
import secrets
from pathlib import Path
from dotenv import load_dotenv
from utils.wg_easy import get_env_auth, ensure_client_and_config
load_dotenv()
# wg-easy connectivity
BASE = os.getenv("WG_EASY_BASE_URL", "").rstrip("/")
USER = os.getenv("WG_EASY_USERNAME", "")
PASS = os.getenv("WG_EASY_PASSWORD", "")
POOL_CIDR = os.getenv("POOL_CIDR", "10.8.0.0/24")
WG_IFACE = os.getenv("WG_INTERFACE", "wg0")
# SSH to IoT device
SSH_USER = os.getenv("IOT_SSH_USER", "caster")
SSH_PORT = int(os.getenv("IOT_SSH_PORT", "22"))
SSH_KEY = os.getenv("SSH_KEY") or None # path or None
PROVISION_LOG = os.getenv("PROVISION_LOG") or str((Path(__file__).resolve().parent / "provision.log"))
def rewrite_allowed_ips(config_text: str, allowed_cidr: str = None) -> str:
"""Rewrite AllowedIPs in a WireGuard config to only route VPN traffic.
By default, wg-easy generates configs with AllowedIPs = 0.0.0.0/0 which routes
ALL traffic through the VPN, making the device unreachable on the local network.
This rewrites it to only route traffic destined for the VPN network.
"""
if allowed_cidr is None:
allowed_cidr = POOL_CIDR
# Replace AllowedIPs = 0.0.0.0/0 (and optional ::/0 for IPv6) with just the VPN CIDR
# Handles various formats: "0.0.0.0/0", "0.0.0.0/0, ::/0", "0.0.0.0/0,::/0"
config_text = re.sub(
r"(AllowedIPs\s*=\s*)0\.0\.0\.0/0[^\n]*",
rf"\g<1>{allowed_cidr}",
config_text
)
return config_text
def scp_and_enable(ssh_host, config_text):
tmp = Path(tempfile.gettempdir()) / f"{WG_IFACE}.conf"
tmp.write_text(config_text)
scp_cmd = ["scp", "-P", str(SSH_PORT)]
if SSH_KEY:
scp_cmd += ["-i", SSH_KEY]
scp_cmd += [str(tmp), f"{SSH_USER}@{ssh_host}:/tmp/{WG_IFACE}.conf"]
subprocess.run(scp_cmd, check=True)
remote = f"""set -e
sudo mkdir -p /etc/wireguard
sudo install -m 600 /tmp/{WG_IFACE}.conf /etc/wireguard/{WG_IFACE}.conf
# If interface exists already, bring it down first to reload config cleanly
sudo systemctl stop wg-quick@{WG_IFACE} || sudo wg-quick down {WG_IFACE} || true
sudo ip link del {WG_IFACE} || true
sudo systemctl reset-failed wg-quick@{WG_IFACE} || true
sudo systemctl daemon-reload || true
# Bring interface up and ensure service is enabled+active
sudo systemctl enable --now wg-quick@{WG_IFACE}
sudo systemctl is-enabled wg-quick@{WG_IFACE} || true
sudo systemctl is-active wg-quick@{WG_IFACE} || true
sudo wg show {WG_IFACE} || true
"""
ssh_cmd = ["ssh", "-p", str(SSH_PORT)]
if SSH_KEY:
ssh_cmd += ["-i", SSH_KEY]
ssh_cmd += [f"{SSH_USER}@{ssh_host}", remote]
subprocess.run(ssh_cmd, check=True)
def ssh_capture(ssh_host: str, command: str) -> str:
"""Run a command on the remote host over SSH and capture stdout (stripped)."""
ssh_cmd = ["ssh", "-p", str(SSH_PORT)]
if SSH_KEY:
ssh_cmd += ["-i", SSH_KEY]
ssh_cmd += [f"{SSH_USER}@{ssh_host}", command]
out = subprocess.run(ssh_cmd, check=False, capture_output=True, text=True)
if out.returncode != 0:
return ""
return (out.stdout or "").strip()
def get_device_facts(ssh_host: str) -> dict:
"""Collect basic device facts (serial, mac, hostname) via SSH best-effort."""
# serial number (prefer device-tree)
serial = ssh_capture(ssh_host, "cat /proc/device-tree/serial-number 2>/dev/null || true")
if not serial:
serial = ssh_capture(ssh_host, "awk -F ': *' '/Serial/ {print $2}' /proc/cpuinfo 2>/dev/null || true")
# Some systems expose a trailing NUL in DTB serial; strip it explicitly
serial = serial.replace("\x00", "").strip() if serial else ""
# hostname
hostname = ssh_capture(ssh_host, "hostname 2>/dev/null || true")
# MAC: try eth0
mac = ssh_capture(ssh_host, "cat /sys/class/net/eth0/address 2>/dev/null || true")
return {"serial": serial, "hostname": hostname, "mac": mac}
def write_provision_log(entry: dict):
"""Append a JSON line to the provisioning log with timestamp."""
# Use timezone-aware UTC timestamp to avoid DeprecationWarning
ts = datetime.datetime.now(datetime.timezone.utc).isoformat().replace("+00:00", "Z")
enriched = {"ts": ts, **entry}
try:
with open(PROVISION_LOG, "a", encoding="utf-8") as f:
f.write(json.dumps(enriched, ensure_ascii=False) + "\n")
except Exception as e:
print(f"⚠️ Failed to write log {PROVISION_LOG}: {e}")
def step_wireguard_provision(iot_host: str, client_name: str):
"""Create or reuse a wg-easy client, fetch config, copy to device and enable it.
Returns a dict with wg_name.
"""
name = client_name
base, auth = get_env_auth()
cid, iface, cfg = ensure_client_and_config(base, auth, name)
# Rewrite AllowedIPs to only route VPN traffic, preserving local network access
cfg = rewrite_allowed_ips(cfg, POOL_CIDR)
scp_and_enable(iot_host, cfg)
return {"wg_name": name, "wg_iface": iface}
def step_set_eth1_mac(iot_host: str):
"""Generate a random locally administered MAC (02:xx:xx:xx:xx:xx) for eth1 and
configure it via systemd .link on the remote device. Prints that a reboot is required.
Returns a dict with the chosen mac.
"""
mac = "02:" + ":".join(f"{secrets.randbelow(256):02x}" for _ in range(5))
remote = (
"set -e\n"
"sudo mkdir -p /etc/systemd/network\n"
"sudo tee /etc/systemd/network/10-eth1-mac.link >/dev/null <<'EOF'\n"
"[Match]\n"
"Driver=smsc95xx\n"
"\n"
"[Link]\n"
f"MACAddress={mac}\n"
"EOF\n"
)
ssh_cmd = ["ssh", "-p", str(SSH_PORT)]
if SSH_KEY:
ssh_cmd += ["-i", SSH_KEY]
ssh_cmd += [f"{SSH_USER}@{iot_host}", remote]
proc = subprocess.run(ssh_cmd, check=False, capture_output=True, text=True)
stdout = (proc.stdout or "").strip()
stderr = (proc.stderr or "").strip()
if proc.returncode != 0:
print(f"❌ set eth1 mac: failed rc={proc.returncode}: {stderr}")
else:
print(f"✅ set eth1 mac: configured {mac} at /etc/systemd/network/10-eth1-mac.link")
print(" Reboot is required for the MAC change to take effect.")
return {
"rc": proc.returncode,
"mac": mac,
"out": stdout[-500:],
"err": stderr[-500:],
}
def step_set_hostname(iot_host: str, hostname: str | None):
"""Set hostname on the device by running the project's provision script over SSH.
Executes: /home/caster/bumble-auracast/src/auracast/server/provision_domain_hostname.sh <hostname> <domain>
Domain is always 'local'. Returns a dict including whether the hostname appears changed.
"""
if not hostname:
print("⏭️ hostname: no hostname provided, skipping")
return {"hostname": None, "changed": False}
# Known locations where the script might live on the device
candidates = [
"/home/caster/bumble-auracast/src/auracast/server/provision_domain_hostname.sh",
]
domain = "local"
# Build remote command. Use sudo since hostname changes typically require elevated privileges.
quoted_name = shlex.quote(hostname)
quoted_domain = shlex.quote(domain)
# Build a remote snippet that picks the first existing candidate and runs it via bash
# Using bash avoids executable-bit/shebang issues.
remote_candidates = " ".join(shlex.quote(c) for c in candidates)
remote = (
"set -e\n"
f"for p in {remote_candidates}; do\n"
" if [ -f \"$p\" ]; then sp=\"$p\"; break; fi\n"
"done\n"
"if [ -z \"${sp:-}\" ]; then echo 'script not found in any candidate path' >&2; exit 1; fi\n"
f"sudo bash \"$sp\" {quoted_name} {quoted_domain}\n"
"hostname 2>/dev/null || true\n"
)
ssh_cmd = ["ssh", "-p", str(SSH_PORT)]
if SSH_KEY:
ssh_cmd += ["-i", SSH_KEY]
ssh_cmd += [f"{SSH_USER}@{iot_host}", remote]
proc = subprocess.run(ssh_cmd, check=False, capture_output=True, text=True)
stdout = (proc.stdout or "").strip()
stderr = (proc.stderr or "").strip()
# After running the script, re-check the hostname from the device
facts_post = get_device_facts(iot_host)
new_hostname = facts_post.get("hostname")
changed = bool(new_hostname) and (new_hostname == hostname)
if proc.returncode != 0:
print(f"❌ hostname: remote script failed rc={proc.returncode}: {stderr}")
else:
print(f"✅ hostname: script executed. device hostname now '{new_hostname}' (rc={proc.returncode})")
return {
"hostname": hostname,
"domain": domain,
"device_hostname": new_hostname,
"changed": changed,
"rc": proc.returncode,
"out": stdout[-500:], # tail to keep logs compact
"err": stderr[-500:],
}
def step_git_pull(iot_host: str, branch: str = "main"):
"""Fetch latest tags from main branch and checkout the latest tag.
Executes git fetch, finds the latest tag, and checks it out in ~/bumble-auracast.
"""
remote = (
"set -e\n"
"cd ~/bumble-auracast\n"
"git remote set-url origin https://gitea.summitwave.work/auracaster/bumble-auracast\n"
f"git fetch origin {shlex.quote(branch)} --tags\n"
"LATEST_TAG=$(git tag --sort=-v:refname | head -n 1)\n"
"if [ -z \"$LATEST_TAG\" ]; then\n"
" echo 'No tags found, falling back to main branch' >&2\n"
f" git checkout {shlex.quote(branch)}\n"
" git pull origin " + shlex.quote(branch) + "\n"
"else\n"
" echo \"Checking out latest tag: $LATEST_TAG\"\n"
" git checkout \"$LATEST_TAG\"\n"
"fi\n"
)
ssh_cmd = ["ssh", "-p", str(SSH_PORT)]
if SSH_KEY:
ssh_cmd += ["-i", SSH_KEY]
ssh_cmd += [f"{SSH_USER}@{iot_host}", remote]
proc = subprocess.run(ssh_cmd, check=False, capture_output=True, text=True)
stdout = (proc.stdout or "").strip()
stderr = (proc.stderr or "").strip()
if proc.returncode != 0:
print(f"❌ git pull: failed rc={proc.returncode}: {stderr}")
else:
print("✅ git pull: completed (latest tag checked out)")
return {
"rc": proc.returncode,
"out": stdout[-500:],
"err": stderr[-500:],
}
def step_update_app(iot_host: str):
"""Install dependencies using poetry for the checked-out code.
Assumes code is already at the correct version (via step_git_pull).
"""
remote = (
"set -e\n"
"cd ~/bumble-auracast\n"
"/home/caster/.local/bin/poetry config virtualenvs.in-project true\n"
"/home/caster/.local/bin/poetry install\n"
)
ssh_cmd = ["ssh", "-p", str(SSH_PORT)]
if SSH_KEY:
ssh_cmd += ["-i", SSH_KEY]
ssh_cmd += [f"{SSH_USER}@{iot_host}", remote]
proc = subprocess.run(ssh_cmd, check=False, capture_output=True, text=True)
stdout = (proc.stdout or "").strip()
stderr = (proc.stderr or "").strip()
if proc.returncode != 0:
print(f"❌ update app: failed rc={proc.returncode}: {stderr}")
else:
print("✅ update app: poetry install completed")
return {
"rc": proc.returncode,
"out": stdout[-500:],
"err": stderr[-500:],
}
def step_start_app(iot_host: str, app: str):
"""Placeholder: start the application flow (script or UI) on the device.
Intention: depending on --app, run an app launcher over SSH.
Currently does nothing.
"""
scripts = [
"src/service/update_and_run_pw_aes67.sh",
"src/service/update_and_run_server_and_frontend.sh",
]
remote = (
"set -e\n"
"sudo loginctl enable-linger \"$USER\"\n"
"cd ~/bumble-auracast\n"
"for s in " + " ".join(shlex.quote(s) for s in scripts) + "; do\n"
" if [ -f \"$s\" ]; then\n"
" echo \"▶ running $s\"\n"
" bash \"$s\"\n"
" else\n"
" echo \"⚠️ missing $s\" 1>&2\n"
" fi\n"
"done\n"
"# Ensure services are enabled and started\n"
"systemctl --user daemon-reload\n"
"systemctl --user enable --now auracast-server.service\n"
"sudo systemctl daemon-reload\n"
"sudo systemctl enable --now auracast-frontend.service\n"
"systemctl --user is-enabled auracast-server.service || true\n"
"systemctl --user is-active auracast-server.service || true\n"
"sudo systemctl is-enabled auracast-frontend.service || true\n"
"sudo systemctl is-active auracast-frontend.service || true\n"
)
ssh_cmd = ["ssh", "-p", str(SSH_PORT)]
if SSH_KEY:
ssh_cmd += ["-i", SSH_KEY]
ssh_cmd += [f"{SSH_USER}@{iot_host}", remote]
proc = subprocess.run(ssh_cmd, check=False, capture_output=True, text=True)
stdout = (proc.stdout or "").strip()
stderr = (proc.stderr or "").strip()
if proc.returncode != 0:
print(f"❌ start app: failed rc={proc.returncode}")
if stderr:
print(f"stderr: {stderr[-1000:]}")
if stdout:
print(f"stdout: {stdout[-1000:]}")
else:
print("✅ start app: scripts executed")
if stdout:
print(f"Output:\n{stdout[-1000:]}")
return {
"app_mode": app,
"rc": proc.returncode,
"out": stdout[-1000:],
"err": stderr[-1000:],
}
def step_add_ssh_key(iot_host: str):
"""Add Paul's SSH key to the device's authorized_keys.
Adds the SSH key for user 'paul' to the caster user's authorized_keys.
"""
ssh_key = "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABgQDDg4R0lZEGAlaJnMBYi0ZuX9tZ7aJtpeYTY0JcffYZjU3ynY/GEvonvMcQq2pdO1OY1awqZQ4drAhQc195MDZCFS6iof6AsGU17MEIEEmFvIANbLwGFYFv0fDwDAZLdY4HZtEIyNZZkfX32O0v1xSSrueFM8N6PkCYQBjhRFLZpBi5jkwk1nnnATN/mGpaDBbvKpWU2FS+PlwKRhm/bF6pKuQ/eXgO7k4fvM6aegtdHNARfMR9yK6/5s5vo45o1NbSbJ4sK3Vf0TdSjlWQSyu2e9D+Xomt0+fBpvGL+yl/7bc9AKq5ZlJNEA3XMjuihNlDoIglvSAYiDOTq09pocVq+myLwDKCfobX8cfHNDTrsWevuZKKTolP6BGfcX3MEWyc/md8ndsSJi49XakdzBhMqVzXmLq9CKBw0QyZID3CuWG8NeRuqZZMGSs0GCdlYF4YqHBhH1icoNgysZ4g7kQLstnTh8ZDcNHEWTxM1ZKCh12XOPvtq506/DTN1aMM0H0= paul@paul-Yoga-Pro-7-14APH8"
remote = (
"set -e\n"
"mkdir -p ~/.ssh\n"
"chmod 700 ~/.ssh\n"
"echo " + shlex.quote(ssh_key) + " >> ~/.ssh/authorized_keys\n"
"chmod 600 ~/.ssh/authorized_keys\n"
"echo 'SSH key for paul added successfully'\n"
)
ssh_cmd = ["ssh", "-p", str(SSH_PORT)]
if SSH_KEY:
ssh_cmd += ["-i", SSH_KEY]
ssh_cmd += [f"{SSH_USER}@{iot_host}", remote]
proc = subprocess.run(ssh_cmd, check=False, capture_output=True, text=True)
stdout = (proc.stdout or "").strip()
stderr = (proc.stderr or "").strip()
if proc.returncode != 0:
print(f"❌ add ssh key: failed rc={proc.returncode}: {stderr}")
else:
print("✅ add ssh key: Paul's SSH key added successfully")
return {
"rc": proc.returncode,
"out": stdout[-500:],
"err": stderr[-500:],
}
def step_finish(iot_host: str):
"""Finalize setup on the device: enable linger for the user and reboot.
Executes on remote:
- loginctl enable-linger "$USER"
- sudo reboot
"""
remote = (
"set -e\n"
"sudo reboot\n"
)
ssh_cmd = ["ssh", "-p", str(SSH_PORT)]
if SSH_KEY:
ssh_cmd += ["-i", SSH_KEY]
ssh_cmd += [f"{SSH_USER}@{iot_host}", remote]
proc = subprocess.run(ssh_cmd, check=False, capture_output=True, text=True)
stdout = (proc.stdout or "").strip()
stderr = (proc.stderr or "").strip()
if proc.returncode != 0:
print(f"❌ finish: failed rc={proc.returncode}: {stderr}")
else:
print("✅ finish: reboot initiated")
return {
"rc": proc.returncode,
"out": stdout[-500:],
"err": stderr[-500:],
}
def main():
import argparse
ap = argparse.ArgumentParser(
description=(
"Provision IoT device: wg-easy client + optional hostname, services, and app start. "
"Run all steps or select individual ones."
)
)
ap.add_argument("iot_host", help="Local/LAN IP or hostname of the IoT device reachable via SSH")
ap.add_argument("-n", "--name", required=False, help="Device name to use for both hostname and WireGuard client")
ap.add_argument(
"--steps",
nargs="+",
choices=["pull", "wg", "hostname", "mac", "update_app", "start_app", "add_ssh_key", "finish", "all"],
default=["all"],
help="Which steps to run. Default: all",
)
# Hostname will be taken from --name
ap.add_argument("--app", choices=["ui", "script"], default="ui", help="Application mode to start")
ap.add_argument("--branch", default="main", help="Git branch to checkout and pull (default: main)")
args = ap.parse_args()
# Validate wg-easy env
get_env_auth()
# Normalize steps
steps = args.steps
if "all" in steps:
steps = [
"pull",
"add_ssh_key",
"hostname",
"mac",
"wg",
"update_app",
"start_app",
"finish"
]
# Validate required args per step
name = args.name
if any(s in steps for s in ("wg", "hostname")) and not name:
print("error: --name is required for steps: wg, hostname")
raise SystemExit(2)
# Derive device name: if numeric, prefix with 'summitwave-beacon'
if name and re.fullmatch(r"\d+", name):
name = f"summitwave-beacon{name}"
# Gather device facts once (may change after hostname step, but we at least log the initial state)
facts = get_device_facts(args.iot_host)
# Execute selected steps in order with logging
if "pull" in steps:
pull_info = step_git_pull(args.iot_host, args.branch)
write_provision_log({
"action": "pull",
"branch": args.branch,
**get_device_facts(args.iot_host),
**pull_info,
})
if "add_ssh_key" in steps:
ssh_info = step_add_ssh_key(args.iot_host)
write_provision_log({
"action": "add_ssh_key",
**get_device_facts(args.iot_host),
**ssh_info,
})
if "hostname" in steps:
host_info = step_set_hostname(args.iot_host, name)
# refresh hostname after step (if a real implementation later changes it)
facts_post = get_device_facts(args.iot_host)
write_provision_log({
"action": "hostname",
**facts_post,
**host_info,
})
if "wg" in steps:
wg_info = step_wireguard_provision(args.iot_host, name)
write_provision_log({
"action": "wg",
**facts,
**wg_info,
})
if "mac" in steps:
mac_info = step_set_eth1_mac(args.iot_host)
write_provision_log({
"action": "mac",
**get_device_facts(args.iot_host),
**mac_info,
})
if "update_app" in steps:
upd_info = step_update_app(args.iot_host)
write_provision_log({
"action": "update_app",
**get_device_facts(args.iot_host),
**upd_info,
})
if "start_app" in steps:
app_info = step_start_app(args.iot_host, args.app)
write_provision_log({
"action": "start_app",
**get_device_facts(args.iot_host),
**app_info,
})
if "finish" in steps:
fin_info = step_finish(args.iot_host)
write_provision_log({
"action": "finish",
**get_device_facts(args.iot_host),
**fin_info,
})
if __name__ == "__main__":
main()