mage automated wg provisioning work

This commit is contained in:
2025-08-27 14:22:02 +02:00
parent 2d902c0265
commit 06d16a6b03
11 changed files with 1381 additions and 1 deletions
+209
View File
@@ -0,0 +1,209 @@
#!/usr/bin/env python3
import ipaddress, os, re, subprocess, tempfile, json, datetime
from pathlib import Path
from dotenv import load_dotenv
from utils.wg_easy import get_env_auth, ensure_client_and_config
load_dotenv()
# wg-easy connectivity
BASE = os.getenv("WG_EASY_BASE_URL", "").rstrip("/")
USER = os.getenv("WG_EASY_USERNAME", "")
PASS = os.getenv("WG_EASY_PASSWORD", "")
POOL_CIDR = os.getenv("POOL_CIDR", "10.8.0.0/24")
WG_IFACE = os.getenv("WG_INTERFACE", "wg0")
# SSH to IoT device
SSH_USER = os.getenv("IOT_SSH_USER", "caster")
SSH_PORT = int(os.getenv("IOT_SSH_PORT", "22"))
SSH_KEY = os.getenv("SSH_KEY") or None # path or None
PROVISION_LOG = os.getenv("PROVISION_LOG") or str((Path(__file__).resolve().parent / "provision.log"))
def _get(obj, *names):
"""Safe attribute/dict getter trying several candidate names."""
for n in names:
if hasattr(obj, n):
return getattr(obj, n)
if isinstance(obj, dict) and n in obj:
return obj[n]
return None
def scp_and_enable(ssh_host, config_text):
tmp = Path(tempfile.gettempdir()) / f"{WG_IFACE}.conf"
tmp.write_text(config_text)
scp_cmd = ["scp", "-P", str(SSH_PORT)]
if SSH_KEY:
scp_cmd += ["-i", SSH_KEY]
scp_cmd += [str(tmp), f"{SSH_USER}@{ssh_host}:/tmp/{WG_IFACE}.conf"]
subprocess.run(scp_cmd, check=True)
remote = f"""set -e
sudo mkdir -p /etc/wireguard
sudo install -m 600 /tmp/{WG_IFACE}.conf /etc/wireguard/{WG_IFACE}.conf
# If interface exists already, bring it down first to reload config cleanly
sudo wg-quick down {WG_IFACE} || true
# Bring interface up and ensure service is enabled+active
sudo wg-quick up {WG_IFACE} || true
sudo systemctl enable --now wg-quick@{WG_IFACE}
sudo systemctl is-enabled wg-quick@{WG_IFACE} || true
sudo systemctl is-active wg-quick@{WG_IFACE} || true
sudo wg show {WG_IFACE} || true
"""
ssh_cmd = ["ssh", "-p", str(SSH_PORT)]
if SSH_KEY:
ssh_cmd += ["-i", SSH_KEY]
ssh_cmd += [f"{SSH_USER}@{ssh_host}", remote]
subprocess.run(ssh_cmd, check=True)
def ssh_capture(ssh_host: str, command: str) -> str:
"""Run a command on the remote host over SSH and capture stdout (stripped)."""
ssh_cmd = ["ssh", "-p", str(SSH_PORT)]
if SSH_KEY:
ssh_cmd += ["-i", SSH_KEY]
ssh_cmd += [f"{SSH_USER}@{ssh_host}", command]
out = subprocess.run(ssh_cmd, check=False, capture_output=True, text=True)
if out.returncode != 0:
return ""
return (out.stdout or "").strip()
def get_device_facts(ssh_host: str) -> dict:
"""Collect basic device facts (serial, mac, hostname) via SSH best-effort."""
# serial number (prefer device-tree)
serial = ssh_capture(ssh_host, "cat /proc/device-tree/serial-number 2>/dev/null || true")
if not serial:
serial = ssh_capture(ssh_host, "awk -F ': *' '/Serial/ {print $2}' /proc/cpuinfo 2>/dev/null || true")
# Some systems expose a trailing NUL in DTB serial; strip it explicitly
serial = serial.replace("\x00", "").strip() if serial else ""
# hostname
hostname = ssh_capture(ssh_host, "hostname 2>/dev/null || true")
# MAC: try eth0
mac = ssh_capture(ssh_host, "cat /sys/class/net/eth0/address 2>/dev/null || true")
return {"serial": serial, "hostname": hostname, "mac": mac}
def write_provision_log(entry: dict):
"""Append a JSON line to the provisioning log with timestamp."""
# Use timezone-aware UTC timestamp to avoid DeprecationWarning
ts = datetime.datetime.now(datetime.timezone.utc).isoformat().replace("+00:00", "Z")
enriched = {"ts": ts, **entry}
try:
with open(PROVISION_LOG, "a", encoding="utf-8") as f:
f.write(json.dumps(enriched, ensure_ascii=False) + "\n")
except Exception as e:
print(f"⚠️ Failed to write log {PROVISION_LOG}: {e}")
def step_wireguard_provision(iot_host: str, client_name: str):
"""Create or reuse a wg-easy client, fetch config, copy to device and enable it.
Returns a dict with wg_name.
"""
name = client_name
base, auth = get_env_auth()
cid, iface, cfg = ensure_client_and_config(base, auth, name)
scp_and_enable(iot_host, cfg)
return {"wg_name": name, "wg_iface": iface}
def step_set_hostname(iot_host: str, hostname: str | None):
"""Placeholder: set hostname on the device via custom script under /home/caster/bumble-auracast/src/server.
Intention: SSH into device and run a script, e.g. /home/caster/bumble-auracast/src/server/set-hostname <hostname>.
Currently does nothing.
"""
print("⏭️ [placeholder] Skipping hostname change (no-op). Intended hostname:", hostname)
return {"hostname": hostname}
def step_update_app(iot_host: str, services: list[str] | None = None):
"""Placeholder: start/enable required system services on the device.
Intention: SSH into the device and run systemctl enable --now <service> for required services.
Currently does nothing.
"""
# TODO: run the app update script
services = services or []
print("⏭️ [placeholder] Skipping start services (no-op).")
return {"services": services}
def step_start_app(iot_host: str, app: str):
"""Placeholder: start the application flow (script or UI) on the device.
Intention: depending on --app, run an app launcher over SSH.
Currently does nothing.
"""
# TODO: start with pulling the latest main from the repo
print(f"⏭️ [placeholder] Skipping start app (no-op). --app={app}")
return {"app_mode": app}
def main():
import argparse
ap = argparse.ArgumentParser(
description=(
"Provision IoT device: wg-easy client + optional hostname, services, and app start. "
"Run all steps or select individual ones."
)
)
ap.add_argument("iot_host", help="Local/LAN IP or hostname of the IoT device reachable via SSH")
ap.add_argument("--name", required=True, help="Device name to use for both hostname and WireGuard client")
ap.add_argument(
"--steps",
nargs="+",
choices=["wg", "hostname", "services", "app", "all"],
default=["all"],
help="Which steps to run. Default: all",
)
# Hostname will be taken from --name
ap.add_argument("--app", choices=["ui", "script"], default="ui", help="Application mode to start")
ap.add_argument("--services", nargs="*", default=[], help="Services to start/enable (logged)")
args = ap.parse_args()
# Validate wg-easy env
get_env_auth()
# Normalize steps
steps = args.steps
if "all" in steps:
steps = ["wg", "hostname", "services", "app"]
# Gather device facts once (may change after hostname step, but we at least log the initial state)
facts = get_device_facts(args.iot_host)
# Execute selected steps in order with logging
if "wg" in steps:
wg_info = step_wireguard_provision(args.iot_host, args.name)
write_provision_log({
"action": "wg",
**facts,
**wg_info,
})
if "hostname" in steps:
host_info = step_set_hostname(args.iot_host, args.name)
# refresh hostname after step (if a real implementation later changes it)
facts_post = get_device_facts(args.iot_host)
write_provision_log({
"action": "hostname",
**facts_post,
**host_info,
})
if "services" in steps:
svc_info = step_update_app(args.iot_host, args.services)
write_provision_log({
"action": "services",
**get_device_facts(args.iot_host),
**svc_info,
})
if "app" in steps:
app_info = step_start_app(args.iot_host, args.app)
write_provision_log({
"action": "app",
**get_device_facts(args.iot_host),
**app_info,
})
if __name__ == "__main__":
main()
+1
View File
@@ -0,0 +1 @@
# Utilities package for shared helpers
Binary file not shown.
Binary file not shown.
+125
View File
@@ -0,0 +1,125 @@
#!/usr/bin/env python3
"""
Shared utilities for interacting with wg-easy's HTTP API using HTTP Basic Auth.
Environment variables (loaded via python-dotenv if present):
- WG_EASY_BASE_URL
- WG_EASY_USERNAME
- WG_EASY_PASSWORD
Primary helpers:
- get_env_auth(): returns (base, HTTPBasicAuth)
- list_clients(base, auth)
- get_client_id_by_name(base, auth, name)
- create_client_if_needed(base, auth, name): returns (client_id, interface_id)
- get_client_config_any(base, auth, client_id, interface_id="wg0")
- ensure_client_and_config(base, auth, name): returns (client_id, interface_id, config_text)
"""
from __future__ import annotations
import os
from typing import Optional, Tuple, List, Dict, Any
import requests
from requests.auth import HTTPBasicAuth
from dotenv import load_dotenv
# load .env once on import; callers can still override via explicit args
load_dotenv()
def get_env_auth() -> tuple[str, HTTPBasicAuth]:
base = os.getenv("WG_EASY_BASE_URL", "").rstrip("/")
user = os.getenv("WG_EASY_USERNAME", "")
pwd = os.getenv("WG_EASY_PASSWORD", "")
if not base or not user or not pwd:
raise SystemExit("WG_EASY_BASE_URL, WG_EASY_USERNAME and WG_EASY_PASSWORD must be set in .env")
return base, HTTPBasicAuth(user, pwd)
def list_clients(base: str, auth: HTTPBasicAuth, timeout: int = 20) -> List[Dict[str, Any]]:
r = requests.get(f"{base}/api/client", auth=auth, timeout=timeout)
r.raise_for_status()
return r.json()
def get_client_id_by_name(base: str, auth: HTTPBasicAuth, name: str, timeout: int = 20) -> tuple[Optional[str], Optional[str]]:
for c in list_clients(base, auth, timeout=timeout):
if (c.get("name") or c.get("client_name") or c.get("label")) == name:
return str(c.get("id") or c.get("clientId") or c.get("uuid")), c.get("interfaceId") or "wg0"
return None, None
def create_client_if_needed(base: str, auth: HTTPBasicAuth, name: str, timeout: int = 20) -> tuple[str, str]:
cid, iface = get_client_id_by_name(base, auth, name, timeout=timeout)
if cid:
return cid, iface or "wg0"
# Some instances require an expiry; try a couple of payloads
payloads = [
{"name": name, "expiresAt": None},
{"name": name, "expiresAt": "2099-12-31T23:59:59.000Z"},
{"name": name}, # final fallback
]
last = None
for p in payloads:
r = requests.post(f"{base}/api/client", json=p, auth=auth, timeout=timeout)
if r.status_code in (200, 201, 204, 409):
# 409 means already exists
cid, iface = get_client_id_by_name(base, auth, name, timeout=timeout)
if cid:
return cid, iface or "wg0"
last = r
if last is not None:
try:
last.raise_for_status()
except requests.HTTPError as e:
detail = getattr(e.response, "text", "") if hasattr(e, "response") and e.response is not None else ""
raise SystemExit(f"Failed to create client '{name}': {e} {detail}")
raise SystemExit("Failed to create client for unknown reasons.")
def fetch_text(url: str, auth: HTTPBasicAuth, timeout: int = 20) -> str:
headers = {"Accept": "text/plain, */*"}
r = requests.get(url, auth=auth, headers=headers, timeout=timeout)
if r.status_code == 200 and (r.text or "").strip():
return r.text
r.raise_for_status()
return "" # never reached
def get_client_config_any(base: str, auth: HTTPBasicAuth, client_id: str, interface_id: str = "wg0", timeout: int = 20) -> str:
candidates = [
f"{base}/api/client/{client_id}/config",
f"{base}/api/client/{client_id}/configuration",
f"{base}/api/client/{client_id}/download",
f"{base}/api/interface/{interface_id}/client/{client_id}/config",
f"{base}/api/interface/{interface_id}/client/{client_id}/configuration",
f"{base}/api/interface/{interface_id}/client/{client_id}/download",
f"{base}/api/wireguard/client/{client_id}/config",
]
last_err: Optional[Exception] = None
for url in candidates:
try:
return fetch_text(url, auth, timeout=timeout)
except requests.HTTPError as e:
last_err = e
except requests.RequestException as e:
last_err = e
# Try client details for a relative download URL
r = requests.get(f"{base}/api/client/{client_id}", auth=auth, timeout=timeout)
if r.ok:
data = r.json()
for k in ("downloadUrl", "configUrl", "configurationUrl"):
if k in data:
url = f"{base}{data[k]}" if str(data[k]).startswith("/") else str(data[k])
return fetch_text(url, auth, timeout=timeout)
if last_err:
raise last_err
raise RuntimeError("Could not locate a config endpoint for this server build.")
def ensure_client_and_config(base: str, auth: HTTPBasicAuth, name: str, timeout: int = 20) -> tuple[str, str, str]:
cid, iface = create_client_if_needed(base, auth, name, timeout=timeout)
cfg = get_client_config_any(base, auth, cid, iface, timeout=timeout)
return cid, (iface or "wg0"), cfg
+21
View File
@@ -0,0 +1,21 @@
#!/usr/bin/env python3
import sys
from dotenv import load_dotenv
from utils.wg_easy import get_env_auth, ensure_client_and_config
load_dotenv()
if len(sys.argv) < 2:
sys.exit("Usage: wg-easy-create-client.py <name>")
name = sys.argv[1]
base, auth = get_env_auth()
# Ensure client exists and fetch its configuration
client_id, iface, cfg_text = ensure_client_and_config(base, auth, name)
out = f"wg-{name}.conf"
with open(out, "w") as f:
f.write(cfg_text)
print(f"✅ Saved config to {out} (id={client_id}, iface={iface}).")
+14
View File
@@ -0,0 +1,14 @@
import sys
import json
from dotenv import load_dotenv
from utils.wg_easy import get_env_auth, list_clients
load_dotenv()
try:
base, auth = get_env_auth()
except SystemExit as e:
sys.exit(str(e))
clients = list_clients(base, auth, timeout=20)
print(json.dumps(clients, indent=2))