diff --git a/auracast/multicast_commander.py b/auracast/multicast_commander.py new file mode 100644 index 0000000..2eb7c16 --- /dev/null +++ b/auracast/multicast_commander.py @@ -0,0 +1,124 @@ +import logging +from typing import cast, Any, AsyncGenerator, Coroutine, Dict, Optional, Tuple, List +import bumble +import bumble.device +import bumble.transport +import bumble.utils + +import asyncio +import aioconsole +import multicast + +import auracast_config + +class Multicaster: + def __init__(self, global_conf: auracast_config.AuracastGlobalConfig, big_conf: List[auracast_config.AuracastBigConfig]): + self.running = False + self.task = None # Holds the background print task + self.global_conf = global_conf + self.big_conf = big_conf + self.device = None + self.bigs = None + + + async def setup_broadcast(self): + self.device_acm = multicast.create_device(self.global_conf) + + agen = self.device_acm.__aenter__() # Manually triggering setup + device = await agen + + self.bigs = await multicast.setup_broadcast( # the bigs dictionary contains all the global configurations + device, + self.global_conf, + self.big_conf + ) + self.device = device + + async def setup_audio(self): + await multicast.setup_audio( + self.bigs, + self.global_conf, + self.big_conf + ) + + async def stream_audio(self): + await multicast.streamer( + self.bigs + ) + + def start(self): + """Starts the background task if it's not already running.""" + if not self.running: + self.running = True + self.task = asyncio.create_task(self._printer_coroutine()) + + def stop(self): + """Stops the background task if running.""" + if self.running: + self.running = False + if self.task: + self.task.cancel() # Cancel the task safely + self.task = None + + def set_message(self, new_message): + """Updates the message being printed.""" + self.message = new_message + + async def __del__(self): + await self.device_acm.__aexit__(None, None, None) # Manually triggering teardown + + +async def command_line_ui(printer): + while True: + command = await aioconsole.ainput("\nCommands: [start|stop|set|quit] > ") + + if command.strip().lower() == "start": + printer.start() + print("💡 Printer started!") + + elif command.strip().lower() == "stop": + printer.stop() + print("🛑 Printer stopped!") + + elif command.strip().lower().startswith("set "): + _, new_message = command.split(" ", 1) + printer.set_message(new_message) + print(f"✏️ Message updated to: {new_message}") + + elif command.strip().lower() == "quit": + printer.stop() + print("👋 Exiting...") + break # Exit loop + + else: + print("❌ Invalid command. Use [start|stop|set |quit]") + +async def main(): + logging.basicConfig( + level=logging.DEBUG, + format='%(module)s.py:%(lineno)d %(levelname)s: %(message)s' + ) + + global_conf = auracast_config.global_base_config + #global_conf.transport='serial:/dev/serial/by-id/usb-SEGGER_J-Link_001057705357-if02,1000000,rtscts' # transport for nrf54l15dk + global_conf.transport='serial:/dev/serial/by-id/usb-ZEPHYR_Zephyr_HCI_UART_sample_81BD14B8D71B5662-if00,115200,rtscts' #nrf52dongle hci_uart usb cdc + + big_conf = [ + auracast_config.broadcast_de, + auracast_config.broadcast_en, + auracast_config.broadcast_fr, + #auracast_config.broadcast_es, + #auracast_config.broadcast_it, + ] + + # look into: + #async with MyAPI() as api: + #pass + + caster = Multicaster(global_conf, big_conf) + await caster.setup_broadcast() + + #await command_line_ui(caster) + +# Run the application +asyncio.run(main()) \ No newline at end of file