Adds DHCP/static IP toggle for both ports in the UI.
This commit is contained in:
@@ -1774,13 +1774,116 @@ with st.expander("System control", expanded=False):
|
||||
try:
|
||||
import subprocess, socket
|
||||
device_hostname = socket.gethostname()
|
||||
result = subprocess.run(["hostname", "-I"], capture_output=True, text=True, timeout=2)
|
||||
ips = [ip for ip in result.stdout.strip().split() if not ip.startswith('127.') and ':' not in ip]
|
||||
st.write(f"Hostname: **{device_hostname}**")
|
||||
if ips:
|
||||
st.write(f"IP Address: **{ips[0]}**")
|
||||
|
||||
network_info_resp = requests.get(f"{BACKEND_URL}/network_info", timeout=5)
|
||||
if network_info_resp.status_code == 200:
|
||||
network_data = network_info_resp.json()
|
||||
interfaces = network_data.get("interfaces", {})
|
||||
port_mapping = network_data.get("port_mapping", {})
|
||||
|
||||
for port_name in ["port1", "port2"]:
|
||||
if port_name not in port_mapping:
|
||||
continue
|
||||
|
||||
interface_name = port_mapping[port_name]
|
||||
interface_data = interfaces.get(interface_name, {})
|
||||
|
||||
port_label = "Port 1" if port_name == "port1" else "Port 2"
|
||||
st.markdown(f"### {port_label}")
|
||||
|
||||
ip_address = interface_data.get("ip_address", "N/A")
|
||||
is_dhcp = interface_data.get("is_dhcp", True)
|
||||
|
||||
st.write(f"Interface: **{interface_name}**")
|
||||
st.write(f"IP Address: **{ip_address}**")
|
||||
|
||||
col1, col2 = st.columns([1, 3])
|
||||
with col1:
|
||||
toggle_key = f"{port_name}_dhcp_toggle"
|
||||
current_mode = "DHCP" if is_dhcp else "Static IP"
|
||||
new_mode = st.radio(
|
||||
"Mode",
|
||||
options=["DHCP", "Static IP"],
|
||||
index=0 if is_dhcp else 1,
|
||||
key=toggle_key,
|
||||
horizontal=True
|
||||
)
|
||||
|
||||
with col2:
|
||||
if new_mode == "Static IP":
|
||||
ip_input_key = f"{port_name}_ip_input"
|
||||
default_ip = ip_address if ip_address != "N/A" and not is_dhcp else ""
|
||||
new_ip = st.text_input(
|
||||
"Static IP Address",
|
||||
value=default_ip,
|
||||
key=ip_input_key,
|
||||
placeholder="192.168.1.100"
|
||||
)
|
||||
|
||||
if st.button(f"Apply", key=f"{port_name}_apply_btn"):
|
||||
if not new_ip:
|
||||
st.error("Please enter an IP address")
|
||||
else:
|
||||
import re
|
||||
ip_pattern = re.compile(r'^(\d{1,3}\.){3}\d{1,3}$')
|
||||
if not ip_pattern.match(new_ip):
|
||||
st.error("Invalid IP address format")
|
||||
else:
|
||||
octets = new_ip.split('.')
|
||||
if not all(0 <= int(octet) <= 255 for octet in octets):
|
||||
st.error("IP address octets must be between 0 and 255")
|
||||
else:
|
||||
try:
|
||||
config_payload = {
|
||||
"interface": interface_name,
|
||||
"is_dhcp": False,
|
||||
"ip_address": new_ip,
|
||||
"netmask": "24"
|
||||
}
|
||||
config_resp = requests.post(
|
||||
f"{BACKEND_URL}/set_network_config",
|
||||
json=config_payload,
|
||||
timeout=10
|
||||
)
|
||||
if config_resp.status_code == 200:
|
||||
st.success(f"Static IP {new_ip} applied to {interface_name}")
|
||||
time.sleep(2)
|
||||
st.rerun()
|
||||
else:
|
||||
st.error(f"Failed to apply configuration: {config_resp.text}")
|
||||
except Exception as e:
|
||||
st.error(f"Error applying configuration: {e}")
|
||||
else:
|
||||
if new_mode != current_mode:
|
||||
if st.button(f"Apply DHCP", key=f"{port_name}_dhcp_apply_btn"):
|
||||
try:
|
||||
config_payload = {
|
||||
"interface": interface_name,
|
||||
"is_dhcp": True
|
||||
}
|
||||
config_resp = requests.post(
|
||||
f"{BACKEND_URL}/set_network_config",
|
||||
json=config_payload,
|
||||
timeout=10
|
||||
)
|
||||
if config_resp.status_code == 200:
|
||||
st.success(f"DHCP enabled for {interface_name}")
|
||||
time.sleep(2)
|
||||
st.rerun()
|
||||
else:
|
||||
st.error(f"Failed to apply configuration: {config_resp.text}")
|
||||
except Exception as e:
|
||||
st.error(f"Error applying configuration: {e}")
|
||||
|
||||
st.markdown("---")
|
||||
else:
|
||||
st.warning("No valid IP address found.")
|
||||
result = subprocess.run(["hostname", "-I"], capture_output=True, text=True, timeout=2)
|
||||
ips = [ip for ip in result.stdout.strip().split() if not ip.startswith('127.') and ':' not in ip]
|
||||
if ips:
|
||||
st.write(f"IP Address: **{ips[0]}**")
|
||||
else:
|
||||
st.warning("No valid IP address found.")
|
||||
except Exception as e:
|
||||
st.warning(f"Could not determine network info: {e}")
|
||||
|
||||
|
||||
@@ -1463,6 +1463,212 @@ async def delete_recordings():
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
|
||||
|
||||
@app.get("/network_info")
|
||||
async def get_network_info():
|
||||
"""Get network information for all ethernet interfaces."""
|
||||
try:
|
||||
interfaces = {}
|
||||
|
||||
hardcoded_devices = ["eth0", "eth1"]
|
||||
|
||||
proc = await asyncio.create_subprocess_exec(
|
||||
"nmcli", "-t", "-f", "NAME,DEVICE", "connection", "show",
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE
|
||||
)
|
||||
stdout, stderr = await proc.communicate()
|
||||
|
||||
device_to_connection = {}
|
||||
if proc.returncode == 0:
|
||||
for line in stdout.decode().strip().split('\n'):
|
||||
if not line:
|
||||
continue
|
||||
parts = line.split(':')
|
||||
if len(parts) >= 2:
|
||||
connection_name = parts[0]
|
||||
device_name = parts[1]
|
||||
if device_name in hardcoded_devices:
|
||||
device_to_connection[device_name] = connection_name
|
||||
|
||||
for device in hardcoded_devices:
|
||||
ip_address = None
|
||||
|
||||
proc = await asyncio.create_subprocess_exec(
|
||||
"nmcli", "-t", "-f", "IP4.ADDRESS", "device", "show", device,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE
|
||||
)
|
||||
stdout, stderr = await proc.communicate()
|
||||
|
||||
if proc.returncode == 0:
|
||||
for line in stdout.decode().strip().split('\n'):
|
||||
if line.startswith('IP4.ADDRESS'):
|
||||
ip_parts = line.split(':')
|
||||
if len(ip_parts) >= 2:
|
||||
full_ip = ip_parts[1]
|
||||
ip_address = full_ip.split('/')[0]
|
||||
if not ip_address.startswith('169.254.'):
|
||||
break
|
||||
|
||||
method = "auto"
|
||||
connection_name = device_to_connection.get(device)
|
||||
if connection_name:
|
||||
proc = await asyncio.create_subprocess_exec(
|
||||
"nmcli", "-t", "-f", "ipv4.method", "connection", "show", connection_name,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE
|
||||
)
|
||||
stdout, stderr = await proc.communicate()
|
||||
|
||||
if proc.returncode == 0:
|
||||
for line in stdout.decode().strip().split('\n'):
|
||||
if line.startswith('ipv4.method:'):
|
||||
method = line.split(':')[1]
|
||||
break
|
||||
|
||||
is_dhcp = method == "auto"
|
||||
|
||||
interfaces[device] = {
|
||||
"ip_address": ip_address or "N/A",
|
||||
"is_dhcp": is_dhcp,
|
||||
"method": method,
|
||||
"connection_name": connection_name
|
||||
}
|
||||
|
||||
port_mapping = {
|
||||
"port1": "eth0",
|
||||
"port2": "eth1"
|
||||
}
|
||||
|
||||
return {
|
||||
"interfaces": interfaces,
|
||||
"port_mapping": port_mapping
|
||||
}
|
||||
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception as e:
|
||||
log.error("Exception in /network_info: %s", e, exc_info=True)
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
|
||||
|
||||
@app.post("/set_network_config")
|
||||
async def set_network_config(config: dict):
|
||||
"""Set network configuration (DHCP or Static IP) for a specific interface.
|
||||
|
||||
Expected payload:
|
||||
{
|
||||
"interface": "eth0",
|
||||
"is_dhcp": true/false,
|
||||
"ip_address": "192.168.1.100" (required if is_dhcp is false),
|
||||
"netmask": "24" (optional, defaults to 24)
|
||||
}
|
||||
"""
|
||||
try:
|
||||
interface = config.get("interface")
|
||||
is_dhcp = config.get("is_dhcp", True)
|
||||
ip_address = config.get("ip_address")
|
||||
netmask = config.get("netmask", "24")
|
||||
|
||||
if not interface:
|
||||
raise HTTPException(status_code=400, detail="Interface name is required")
|
||||
|
||||
proc = await asyncio.create_subprocess_exec(
|
||||
"nmcli", "-t", "-f", "NAME,DEVICE", "connection", "show",
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE
|
||||
)
|
||||
stdout, stderr = await proc.communicate()
|
||||
|
||||
if proc.returncode != 0:
|
||||
raise HTTPException(status_code=500, detail="Failed to get network connections")
|
||||
|
||||
connection_name = None
|
||||
for line in stdout.decode().strip().split('\n'):
|
||||
if not line:
|
||||
continue
|
||||
parts = line.split(':')
|
||||
if len(parts) >= 2 and parts[1] == interface:
|
||||
connection_name = parts[0]
|
||||
break
|
||||
|
||||
if not connection_name:
|
||||
log.info(f"No connection found for {interface}, creating new connection")
|
||||
proc = await asyncio.create_subprocess_exec(
|
||||
"sudo", "nmcli", "con", "add", "type", "ethernet",
|
||||
"ifname", interface, "con-name", f"Wired connection {interface}",
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE
|
||||
)
|
||||
stdout, stderr = await proc.communicate()
|
||||
|
||||
if proc.returncode != 0:
|
||||
raise HTTPException(status_code=500, detail=f"Failed to create connection for {interface}: {stderr.decode()}")
|
||||
|
||||
connection_name = f"Wired connection {interface}"
|
||||
|
||||
if is_dhcp:
|
||||
proc = await asyncio.create_subprocess_exec(
|
||||
"sudo", "nmcli", "con", "modify", connection_name, "ipv4.method", "auto",
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE
|
||||
)
|
||||
await proc.communicate()
|
||||
|
||||
if proc.returncode != 0:
|
||||
raise HTTPException(status_code=500, detail="Failed to set DHCP mode")
|
||||
|
||||
proc = await asyncio.create_subprocess_exec(
|
||||
"sudo", "nmcli", "con", "modify", connection_name, "ipv4.addresses", "",
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE
|
||||
)
|
||||
await proc.communicate()
|
||||
|
||||
else:
|
||||
if not ip_address:
|
||||
raise HTTPException(status_code=400, detail="IP address is required for static configuration")
|
||||
|
||||
import re
|
||||
ip_pattern = re.compile(r'^(\d{1,3}\.){3}\d{1,3}$')
|
||||
if not ip_pattern.match(ip_address):
|
||||
raise HTTPException(status_code=400, detail="Invalid IP address format")
|
||||
|
||||
octets = ip_address.split('.')
|
||||
if not all(0 <= int(octet) <= 255 for octet in octets):
|
||||
raise HTTPException(status_code=400, detail="IP address octets must be between 0 and 255")
|
||||
|
||||
proc = await asyncio.create_subprocess_exec(
|
||||
"sudo", "nmcli", "con", "modify", connection_name,
|
||||
"ipv4.method", "manual",
|
||||
"ipv4.addresses", f"{ip_address}/{netmask}",
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE
|
||||
)
|
||||
await proc.communicate()
|
||||
|
||||
if proc.returncode != 0:
|
||||
raise HTTPException(status_code=500, detail="Failed to set static IP")
|
||||
|
||||
proc = await asyncio.create_subprocess_exec(
|
||||
"sudo", "nmcli", "con", "up", connection_name,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE
|
||||
)
|
||||
stdout, stderr = await proc.communicate()
|
||||
|
||||
if proc.returncode != 0:
|
||||
log.info("Connection activation returned non-zero (may be expected if no cable): %s", stderr.decode())
|
||||
|
||||
return {"status": "success", "message": f"Network configuration updated for {interface}"}
|
||||
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception as e:
|
||||
log.error("Exception in /set_network_config: %s", e, exc_info=True)
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
import os
|
||||
os.chdir(os.path.dirname(__file__))
|
||||
|
||||
Reference in New Issue
Block a user