Files
bumble-auracast/src/auracast/multicast_control.py
T
pstruebi 8ea7aeb412 restrucuture_for_cloud (#4)
- update multicast_server
- modify config models
- add dockerfile and docker compose

Reviewed-on: https://gitea.pstruebi.xyz/auracaster/bumble-auracast/pulls/4
2025-03-19 13:00:13 +01:00

151 lines
4.9 KiB
Python

import logging
from typing import cast, Any, AsyncGenerator, Coroutine, Dict, Optional, Tuple, List
import bumble
import bumble.device
import bumble.transport
import bumble.utils
import asyncio
import aioconsole
from auracast import multicast
from auracast import auracast_config
class Multicaster:
"""
A class responsible for managing the multicasting and audio streaming process.
It provides methods to initialize and shutdown the broadcasting, as well as start and stop the streaming.
The class also manages the underlying device and advertising sets.
"""
def __init__(
self,
global_conf: auracast_config.AuracastGlobalConfig,
big_conf: List[auracast_config.AuracastBigConfig]
):
self.is_auracast_init = False
self.is_audio_init = False
self.streaming = False
self.global_conf = global_conf
self.big_conf = big_conf
self.device = None
self.bigs = None
self.streamer = None
def get_status(self):
streaming = self.streamer.is_streaming if self.streamer is not None else False
return {
'is_initialized': self.is_auracast_init,
'is_streaming': streaming,
}
async def init_broadcast(self):
self.device_acm = multicast.create_device(self.global_conf)
agen = self.device_acm.__aenter__() # Manually triggering setup
device = await agen
self.bigs = await multicast.init_broadcast( # the bigs dictionary contains all the global configurations
device,
self.global_conf,
self.big_conf
)
self.device = device
self.is_auracast_init = True
def start_streaming(self):
self.streamer = multicast.Streamer(self.bigs, self.global_conf, self.big_conf)
self.streamer.start_streaming()
def stop_streaming(self):
if self.streamer is not None:
self.streamer.stop_streaming()
self.streamer = None
async def reset(self):
await self.shutdown() # Manually triggering teardown
self.__init__(self.global_conf, self.big_conf)
async def shutdown(self):
self.is_auracast_init = False
self. is_audio_init = False
if self.device:
await self.device.stop_advertising()
if self.bigs:
for big in self.bigs.values():
if big['advertising_set']:
await big['advertising_set'].stop()
await self.device_acm.__aexit__(None, None, None) # Manually triggering teardown
# example commandline ui
async def command_line_ui(caster: Multicaster):
while True:
command = await aioconsole.ainput("\nCommands: [start_audio|stop_audio|stop|init|init_audio|quit] > ")
if command.strip().lower() == "start_audio":
caster.start_streaming()
print("Audio started!")
elif command.strip().lower() == "stop_audio":
caster.stop_streaming()
print("Audio stopped!")
elif command.strip().lower() == "stop":
print("👋 Stopping...")
caster.stop_streaming()
await caster.reset()
elif command.strip().lower() == "init":
await caster.reset()
await caster.init_broadcast()
await caster.init_audio()
elif command.strip().lower() == "init_audio":
await caster.init_audio()
elif command.strip().lower() == "quit":
print("👋 Exiting...")
if caster.device:
caster.stop_streaming()
await caster.shutdown()
break # Exit loop
else:
print("Invalid command.")
async def main():
import os
logging.basicConfig(
level=os.environ.get('LOG_LEVEL', logging.DEBUG),
format='%(module)s.py:%(lineno)d %(levelname)s: %(message)s'
)
os.chdir(os.path.dirname(__file__))
global_conf = auracast_config.AuracastGlobalConfig(
qos_config=auracast_config.AuracastQosHigh()
)
#global_conf.transport='serial:/dev/serial/by-id/usb-SEGGER_J-Link_001057705357-if02,1000000,rtscts' # transport for nrf54l15dk
global_conf.transport='serial:/dev/serial/by-id/usb-ZEPHYR_Zephyr_HCI_UART_sample_81BD14B8D71B5662-if00,115200,rtscts' #nrf52dongle hci_uart usb cdc
big_conf = [
auracast_config.AuracastBigConfigDeu(),
auracast_config.AuracastBigConfigEng(),
auracast_config.AuracastBigConfigFra(),
#auracast_config.broadcast_es,
#auracast_config.broadcast_it,
]
for conf in big_conf:
conf.loop = False
# look into:
#async with MyAPI() as api:
#pass
caster = Multicaster(global_conf, big_conf)
await caster.init_broadcast()
await command_line_ui(caster)
if __name__ == '__main__':
# Run the application
asyncio.run(main())