import serial import time import logging as log SAMPLING_RATE_KHZ = 16 FRAME_DUR_MS = 10 PRESET = f'{SAMPLING_RATE_KHZ}_2_1' if SAMPLING_RATE_KHZ == 8: BITRATE_KBPS = 16 elif SAMPLING_RATE_KHZ == 16: BITRATE_KBPS = 32 elif SAMPLING_RATE_KHZ == 24: BITRATE_KBPS = 48 else: raise NotImplemented() def write_to_serial_read_respone(port, cmd, timeout = 2): # Initialize serial connection ser = serial.Serial(timeout = timeout) ser.port = port ser.baudrate = 115200 ser.bytesize = serial.EIGHTBITS ser.parity = serial.PARITY_NONE ser.stopbits = serial.STOPBITS_ONE try: # Try to open the serial connection #if not ser.is_open: ser.open() # Send string to serial port and get response command = f"{cmd.strip()}\r\n" ser.write(command.encode()) time.sleep(1) # wait a bit for response readlines = [] for _ in range(20): line = ser.readline().decode('utf-8').strip() if not line: ser.close() break else: readlines.append(line) except serial.SerialException as e: print(f"Error communicating with serial port: {e}") finally: # Close serial connection before returning if ser.is_open: ser.close() return readlines def gen_broadcast_config_cmd(preset, broadcast_config: dict): """ Writes broadcaster configuration to the given serial port. Args: serial_port (str): Device path of the serial port (e.g., '/dev/ttyACM0') preset (str): Preset string used in nac preset line broadcast_names (list): List of names for each broadcast group """ cmds = [] for ch, file_name in broadcast_config.items(): cmds.append(f"nac preset {preset} {ch}") cmds.append(f"nac broadcast_name broadcast{ch} {ch}") cmds.append(f"nac file select_play_once {file_name} {ch} 0 0") cmds.append(f"nac num_bises 1 {ch} 0") return cmds # TODO: Advertising interval wird ungelmäßig bei mehr als 3 broadcasts 10ms -> 1s< bei 24kHz sampling rate BROADCAST_CONFIG = { 0: f"announcement_{SAMPLING_RATE_KHZ}_{FRAME_DUR_MS}_{BITRATE_KBPS}_de.lc3", 1: f"announcement_{SAMPLING_RATE_KHZ}_{FRAME_DUR_MS}_{BITRATE_KBPS}_en.lc3", 2: f"announcement_{SAMPLING_RATE_KHZ}_{FRAME_DUR_MS}_{BITRATE_KBPS}_fr.lc3", #3: f"announcement_{SAMPLING_RATE_KHZ}_{FRAME_DUR_MS}_{BITRATE_KBPS}_es.lc3", #4: f"announcement_{SAMPLING_RATE_KHZ}_{FRAME_DUR_MS}_{BITRATE_KBPS}_it.lc3" } def broadcaster_config(): import subprocess PORT = "/dev/ttyACM0" total_ret= "" cmds = gen_broadcast_config_cmd(PRESET, BROADCAST_CONFIG) subprocess.run(["nrfjprog", "--reset", "-s", "1050109484"], check=True) time.sleep(2) ret = write_to_serial_read_respone(PORT, f"nac en_usb_mass", timeout=0.1) total_ret += "\n".join(ret) log.info("\n".join(ret)) time.sleep(1) for cmd in cmds: ret = write_to_serial_read_respone(PORT, cmd, timeout=0.1) log.info("\n".join(ret)) total_ret += "\n".join(ret) time.sleep(1) for i in BROADCAST_CONFIG.keys(): ret = write_to_serial_read_respone(PORT, f"nac start_idx {i}", timeout=0.1) total_ret += "\n".join(ret) log.info("\n".join(ret)) time.sleep(0.2) return total_ret