import serial import time import logging as log def write_to_serial_read_respone(port, cmd, timeout = 2): # Initialize serial connection ser = serial.Serial(timeout = timeout) ser.port = port ser.baudrate = 115200 ser.bytesize = serial.EIGHTBITS ser.parity = serial.PARITY_NONE ser.stopbits = serial.STOPBITS_ONE try: # Try to open the serial connection #if not ser.is_open: ser.open() # Send string to serial port and get response command = f"{cmd.strip()}\r\n" ser.write(command.encode()) time.sleep(1) # wait a bit for response readlines = [] for _ in range(20): line = ser.readline().decode('utf-8').strip() if not line: ser.close() break else: readlines.append(line) except serial.SerialException as e: print(f"Error communicating with serial port: {e}") finally: # Close serial connection before returning if ser.is_open: ser.close() return readlines def gen_broadcast_config_cmd(serial_port, preset, broadcast_config: dict): """ Writes broadcaster configuration to the given serial port. Args: serial_port (str): Device path of the serial port (e.g., '/dev/ttyACM0') preset (str): Preset string used in nac preset line broadcast_names (list): List of names for each broadcast group """ cmds = [] with serial.Serial(serial_port, 115200) as ser: ser.write(f"nac stop".encode() + b'\r\n') ser.write(f"nac clear".encode() + b'\r\n') time.sleep(.5) for ch, d in enumerate(broadcast_config.items()): broadcast_name, file_name = d left_channel_file = file_name cmds.append(f"nac preset {preset} {ch}") cmds.append(f"nac broadcast_name {broadcast_name} {ch}") cmds.append(f"nac file select_play_once {left_channel_file} {ch} 0 0") cmds.append(f"nac num_bises 1 {ch} 0") #cmds.append(f"nac immediate 1 {ch} 0") cmds.append(f"nac start") return cmds def broadcaster_config(): serial_port = "/dev/ttyACM0" broadcast_config = { "broadcast1": "left24kHz_48kbps.lc3", "broadcast2": "right-channel_24kHz_left_48kbps_10ms.lc3" } #write_config_to_tty(serial_port, "24_2_1", broadcast_config) # Example usage: cmds = gen_broadcast_config_cmd(serial_port, "24_2_1", broadcast_config) for cmd in cmds: ret = write_to_serial_read_respone(serial_port, cmd, timeout=0.1) log.info("\n".join(ret)) ret = write_to_serial_read_respone(serial_port, "nac show", timeout=0.1) ret_str = "\n".join(ret) log.info(ret_str) return ret_str