From e1d717ed5c67dee43a1ba520d87624c9254dc56a Mon Sep 17 00:00:00 2001 From: pober Date: Tue, 3 Mar 2026 15:50:19 +0100 Subject: [PATCH] Adds DHCP/static IP toggle for both ports in the UI. --- src/auracast/server/multicast_frontend.py | 113 +++++++++++- src/auracast/server/multicast_server.py | 206 ++++++++++++++++++++++ 2 files changed, 314 insertions(+), 5 deletions(-) diff --git a/src/auracast/server/multicast_frontend.py b/src/auracast/server/multicast_frontend.py index 7b4971d..5e9b1dc 100644 --- a/src/auracast/server/multicast_frontend.py +++ b/src/auracast/server/multicast_frontend.py @@ -1774,13 +1774,116 @@ with st.expander("System control", expanded=False): try: import subprocess, socket device_hostname = socket.gethostname() - result = subprocess.run(["hostname", "-I"], capture_output=True, text=True, timeout=2) - ips = [ip for ip in result.stdout.strip().split() if not ip.startswith('127.') and ':' not in ip] st.write(f"Hostname: **{device_hostname}**") - if ips: - st.write(f"IP Address: **{ips[0]}**") + + network_info_resp = requests.get(f"{BACKEND_URL}/network_info", timeout=5) + if network_info_resp.status_code == 200: + network_data = network_info_resp.json() + interfaces = network_data.get("interfaces", {}) + port_mapping = network_data.get("port_mapping", {}) + + for port_name in ["port1", "port2"]: + if port_name not in port_mapping: + continue + + interface_name = port_mapping[port_name] + interface_data = interfaces.get(interface_name, {}) + + port_label = "Port 1" if port_name == "port1" else "Port 2" + st.markdown(f"### {port_label}") + + ip_address = interface_data.get("ip_address", "N/A") + is_dhcp = interface_data.get("is_dhcp", True) + + st.write(f"Interface: **{interface_name}**") + st.write(f"IP Address: **{ip_address}**") + + col1, col2 = st.columns([1, 3]) + with col1: + toggle_key = f"{port_name}_dhcp_toggle" + current_mode = "DHCP" if is_dhcp else "Static IP" + new_mode = st.radio( + "Mode", + options=["DHCP", "Static IP"], + index=0 if is_dhcp else 1, + key=toggle_key, + horizontal=True + ) + + with col2: + if new_mode == "Static IP": + ip_input_key = f"{port_name}_ip_input" + default_ip = ip_address if ip_address != "N/A" and not is_dhcp else "" + new_ip = st.text_input( + "Static IP Address", + value=default_ip, + key=ip_input_key, + placeholder="192.168.1.100" + ) + + if st.button(f"Apply", key=f"{port_name}_apply_btn"): + if not new_ip: + st.error("Please enter an IP address") + else: + import re + ip_pattern = re.compile(r'^(\d{1,3}\.){3}\d{1,3}$') + if not ip_pattern.match(new_ip): + st.error("Invalid IP address format") + else: + octets = new_ip.split('.') + if not all(0 <= int(octet) <= 255 for octet in octets): + st.error("IP address octets must be between 0 and 255") + else: + try: + config_payload = { + "interface": interface_name, + "is_dhcp": False, + "ip_address": new_ip, + "netmask": "24" + } + config_resp = requests.post( + f"{BACKEND_URL}/set_network_config", + json=config_payload, + timeout=10 + ) + if config_resp.status_code == 200: + st.success(f"Static IP {new_ip} applied to {interface_name}") + time.sleep(2) + st.rerun() + else: + st.error(f"Failed to apply configuration: {config_resp.text}") + except Exception as e: + st.error(f"Error applying configuration: {e}") + else: + if new_mode != current_mode: + if st.button(f"Apply DHCP", key=f"{port_name}_dhcp_apply_btn"): + try: + config_payload = { + "interface": interface_name, + "is_dhcp": True + } + config_resp = requests.post( + f"{BACKEND_URL}/set_network_config", + json=config_payload, + timeout=10 + ) + if config_resp.status_code == 200: + st.success(f"DHCP enabled for {interface_name}") + time.sleep(2) + st.rerun() + else: + st.error(f"Failed to apply configuration: {config_resp.text}") + except Exception as e: + st.error(f"Error applying configuration: {e}") + + st.markdown("---") else: - st.warning("No valid IP address found.") + result = subprocess.run(["hostname", "-I"], capture_output=True, text=True, timeout=2) + ips = [ip for ip in result.stdout.strip().split() if not ip.startswith('127.') and ':' not in ip] + if ips: + st.write(f"IP Address: **{ips[0]}**") + else: + st.warning("No valid IP address found.") except Exception as e: st.warning(f"Could not determine network info: {e}") diff --git a/src/auracast/server/multicast_server.py b/src/auracast/server/multicast_server.py index a5d4277..7a05a1b 100644 --- a/src/auracast/server/multicast_server.py +++ b/src/auracast/server/multicast_server.py @@ -1463,6 +1463,212 @@ async def delete_recordings(): raise HTTPException(status_code=500, detail=str(e)) +@app.get("/network_info") +async def get_network_info(): + """Get network information for all ethernet interfaces.""" + try: + interfaces = {} + + hardcoded_devices = ["eth0", "eth1"] + + proc = await asyncio.create_subprocess_exec( + "nmcli", "-t", "-f", "NAME,DEVICE", "connection", "show", + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) + stdout, stderr = await proc.communicate() + + device_to_connection = {} + if proc.returncode == 0: + for line in stdout.decode().strip().split('\n'): + if not line: + continue + parts = line.split(':') + if len(parts) >= 2: + connection_name = parts[0] + device_name = parts[1] + if device_name in hardcoded_devices: + device_to_connection[device_name] = connection_name + + for device in hardcoded_devices: + ip_address = None + + proc = await asyncio.create_subprocess_exec( + "nmcli", "-t", "-f", "IP4.ADDRESS", "device", "show", device, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) + stdout, stderr = await proc.communicate() + + if proc.returncode == 0: + for line in stdout.decode().strip().split('\n'): + if line.startswith('IP4.ADDRESS'): + ip_parts = line.split(':') + if len(ip_parts) >= 2: + full_ip = ip_parts[1] + ip_address = full_ip.split('/')[0] + if not ip_address.startswith('169.254.'): + break + + method = "auto" + connection_name = device_to_connection.get(device) + if connection_name: + proc = await asyncio.create_subprocess_exec( + "nmcli", "-t", "-f", "ipv4.method", "connection", "show", connection_name, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) + stdout, stderr = await proc.communicate() + + if proc.returncode == 0: + for line in stdout.decode().strip().split('\n'): + if line.startswith('ipv4.method:'): + method = line.split(':')[1] + break + + is_dhcp = method == "auto" + + interfaces[device] = { + "ip_address": ip_address or "N/A", + "is_dhcp": is_dhcp, + "method": method, + "connection_name": connection_name + } + + port_mapping = { + "port1": "eth0", + "port2": "eth1" + } + + return { + "interfaces": interfaces, + "port_mapping": port_mapping + } + + except HTTPException: + raise + except Exception as e: + log.error("Exception in /network_info: %s", e, exc_info=True) + raise HTTPException(status_code=500, detail=str(e)) + + +@app.post("/set_network_config") +async def set_network_config(config: dict): + """Set network configuration (DHCP or Static IP) for a specific interface. + + Expected payload: + { + "interface": "eth0", + "is_dhcp": true/false, + "ip_address": "192.168.1.100" (required if is_dhcp is false), + "netmask": "24" (optional, defaults to 24) + } + """ + try: + interface = config.get("interface") + is_dhcp = config.get("is_dhcp", True) + ip_address = config.get("ip_address") + netmask = config.get("netmask", "24") + + if not interface: + raise HTTPException(status_code=400, detail="Interface name is required") + + proc = await asyncio.create_subprocess_exec( + "nmcli", "-t", "-f", "NAME,DEVICE", "connection", "show", + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) + stdout, stderr = await proc.communicate() + + if proc.returncode != 0: + raise HTTPException(status_code=500, detail="Failed to get network connections") + + connection_name = None + for line in stdout.decode().strip().split('\n'): + if not line: + continue + parts = line.split(':') + if len(parts) >= 2 and parts[1] == interface: + connection_name = parts[0] + break + + if not connection_name: + log.info(f"No connection found for {interface}, creating new connection") + proc = await asyncio.create_subprocess_exec( + "sudo", "nmcli", "con", "add", "type", "ethernet", + "ifname", interface, "con-name", f"Wired connection {interface}", + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) + stdout, stderr = await proc.communicate() + + if proc.returncode != 0: + raise HTTPException(status_code=500, detail=f"Failed to create connection for {interface}: {stderr.decode()}") + + connection_name = f"Wired connection {interface}" + + if is_dhcp: + proc = await asyncio.create_subprocess_exec( + "sudo", "nmcli", "con", "modify", connection_name, "ipv4.method", "auto", + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) + await proc.communicate() + + if proc.returncode != 0: + raise HTTPException(status_code=500, detail="Failed to set DHCP mode") + + proc = await asyncio.create_subprocess_exec( + "sudo", "nmcli", "con", "modify", connection_name, "ipv4.addresses", "", + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) + await proc.communicate() + + else: + if not ip_address: + raise HTTPException(status_code=400, detail="IP address is required for static configuration") + + import re + ip_pattern = re.compile(r'^(\d{1,3}\.){3}\d{1,3}$') + if not ip_pattern.match(ip_address): + raise HTTPException(status_code=400, detail="Invalid IP address format") + + octets = ip_address.split('.') + if not all(0 <= int(octet) <= 255 for octet in octets): + raise HTTPException(status_code=400, detail="IP address octets must be between 0 and 255") + + proc = await asyncio.create_subprocess_exec( + "sudo", "nmcli", "con", "modify", connection_name, + "ipv4.method", "manual", + "ipv4.addresses", f"{ip_address}/{netmask}", + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) + await proc.communicate() + + if proc.returncode != 0: + raise HTTPException(status_code=500, detail="Failed to set static IP") + + proc = await asyncio.create_subprocess_exec( + "sudo", "nmcli", "con", "up", connection_name, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) + stdout, stderr = await proc.communicate() + + if proc.returncode != 0: + log.info("Connection activation returned non-zero (may be expected if no cable): %s", stderr.decode()) + + return {"status": "success", "message": f"Network configuration updated for {interface}"} + + except HTTPException: + raise + except Exception as e: + log.error("Exception in /set_network_config: %s", e, exc_info=True) + raise HTTPException(status_code=500, detail=str(e)) + + if __name__ == '__main__': import os os.chdir(os.path.dirname(__file__))