Files
bumble-auracast/auracast/multicast_commander.py
T

124 lines
3.7 KiB
Python

import logging
from typing import cast, Any, AsyncGenerator, Coroutine, Dict, Optional, Tuple, List
import bumble
import bumble.device
import bumble.transport
import bumble.utils
import asyncio
import aioconsole
import multicast
import auracast_config
class Multicaster:
def __init__(self, global_conf: auracast_config.AuracastGlobalConfig, big_conf: List[auracast_config.AuracastBigConfig]):
self.running = False
self.task = None # Holds the background print task
self.global_conf = global_conf
self.big_conf = big_conf
self.device = None
self.bigs = None
async def setup_broadcast(self):
self.device_acm = multicast.create_device(self.global_conf)
agen = self.device_acm.__aenter__() # Manually triggering setup
device = await agen
self.bigs = await multicast.setup_broadcast( # the bigs dictionary contains all the global configurations
device,
self.global_conf,
self.big_conf
)
self.device = device
async def setup_audio(self):
await multicast.setup_audio(
self.bigs,
self.global_conf,
self.big_conf
)
async def stream_audio(self):
await multicast.streamer(
self.bigs
)
def start(self):
"""Starts the background task if it's not already running."""
if not self.running:
self.running = True
self.task = asyncio.create_task(self._printer_coroutine())
def stop(self):
"""Stops the background task if running."""
if self.running:
self.running = False
if self.task:
self.task.cancel() # Cancel the task safely
self.task = None
def set_message(self, new_message):
"""Updates the message being printed."""
self.message = new_message
async def __del__(self):
await self.device_acm.__aexit__(None, None, None) # Manually triggering teardown
async def command_line_ui(printer):
while True:
command = await aioconsole.ainput("\nCommands: [start|stop|set|quit] > ")
if command.strip().lower() == "start":
printer.start()
print("💡 Printer started!")
elif command.strip().lower() == "stop":
printer.stop()
print("🛑 Printer stopped!")
elif command.strip().lower().startswith("set "):
_, new_message = command.split(" ", 1)
printer.set_message(new_message)
print(f"✏️ Message updated to: {new_message}")
elif command.strip().lower() == "quit":
printer.stop()
print("👋 Exiting...")
break # Exit loop
else:
print("❌ Invalid command. Use [start|stop|set <message>|quit]")
async def main():
logging.basicConfig(
level=logging.DEBUG,
format='%(module)s.py:%(lineno)d %(levelname)s: %(message)s'
)
global_conf = auracast_config.global_base_config
#global_conf.transport='serial:/dev/serial/by-id/usb-SEGGER_J-Link_001057705357-if02,1000000,rtscts' # transport for nrf54l15dk
global_conf.transport='serial:/dev/serial/by-id/usb-ZEPHYR_Zephyr_HCI_UART_sample_81BD14B8D71B5662-if00,115200,rtscts' #nrf52dongle hci_uart usb cdc
big_conf = [
auracast_config.broadcast_de,
auracast_config.broadcast_en,
auracast_config.broadcast_fr,
#auracast_config.broadcast_es,
#auracast_config.broadcast_it,
]
# look into:
#async with MyAPI() as api:
#pass
caster = Multicaster(global_conf, big_conf)
await caster.setup_broadcast()
#await command_line_ui(caster)
# Run the application
asyncio.run(main())