Files
bumble-auracast/src/auracast/multicast_control.py

179 lines
6.1 KiB
Python

import logging
from typing import cast, Any, AsyncGenerator, Coroutine, Dict, Optional, Tuple, List
import bumble
import bumble.device
import bumble.transport
import bumble.utils
import asyncio
import aioconsole
from auracast import multicast
from auracast import auracast_config
class Multicaster:
"""
A class responsible for managing the multicasting and audio streaming process.
It provides methods to initialize and shutdown the broadcasting, as well as start and stop the streaming.
The class also manages the underlying device and advertising sets.
"""
def __init__(
self,
global_conf: auracast_config.AuracastGlobalConfig,
big_conf: List[auracast_config.AuracastBigConfig]
):
self.is_auracast_init = False
self.is_audio_init = False
self.streaming = False
self.global_conf = global_conf
self.big_conf = big_conf
self.device = None
self.bigs = None
self.streamer = None
def get_status(self):
streaming = self.streamer.is_streaming if self.streamer is not None else False
return {
'is_initialized': self.is_auracast_init,
'is_streaming': streaming,
}
def get_audio_levels(self) -> list[float]:
"""Return current RMS audio levels (0.0-1.0) for each BIG."""
if self.streamer is not None and self.streamer.is_streaming:
return self.streamer.get_audio_levels()
return []
async def init_broadcast(self):
self.device_acm = multicast.create_device(self.global_conf)
agen = self.device_acm.__aenter__() # Manually triggering setup
device = await agen
self.bigs = await multicast.init_broadcast( # the bigs dictionary contains all the global configurations
device,
self.global_conf,
self.big_conf
)
self.device = device
self.is_auracast_init = True
async def start_streaming(self):
"""Start streaming; if an old stream is running, stop it first to release audio devices."""
if self.streamer is not None:
await self.stop_streaming()
# Brief pause to ensure ALSA/PortAudio fully releases the input device
await asyncio.sleep(0.5)
self.streamer = multicast.Streamer(self.bigs, self.global_conf, self.big_conf)
self.streamer.start_streaming()
async def stop_streaming(self):
if self.streamer is not None:
await self.streamer.stop_streaming()
self.streamer = None
async def reset(self):
await self.shutdown() # Manually triggering teardown
self.__init__(self.global_conf, self.big_conf)
async def shutdown(self):
# Ensure streaming is fully stopped before tearing down Bluetooth resources
if self.streamer is not None:
await self.stop_streaming()
self.is_auracast_init = False
self.is_audio_init = False
if self.bigs:
for big in self.bigs.values():
if big.get('audio_input'):
if hasattr(big['audio_input'], 'aclose'):
await big['audio_input'].aclose()
if self.device:
await self.device.stop_advertising()
if self.bigs:
for big in self.bigs.values():
if big.get('advertising_set'):
await big['advertising_set'].stop()
# Explicitly power off the device to ensure a clean state before closing the transport
await self.device.power_off()
await self.device_acm.__aexit__(None, None, None) # Manually triggering teardown
# example commandline ui
async def command_line_ui(caster: Multicaster):
while True:
command = await aioconsole.ainput("\nCommands: [start_audio|stop_audio|stop|init|init_audio|quit] > ")
if command.strip().lower() == "start_audio":
caster.start_streaming()
print("Audio started!")
elif command.strip().lower() == "stop_audio":
caster.stop_streaming()
print("Audio stopped!")
elif command.strip().lower() == "stop":
print("👋 Stopping...")
caster.stop_streaming()
await caster.reset()
elif command.strip().lower() == "init":
await caster.reset()
await caster.init_broadcast()
await caster.init_audio()
elif command.strip().lower() == "init_audio":
await caster.init_audio()
elif command.strip().lower() == "quit":
print("👋 Exiting...")
if caster.device:
caster.stop_streaming()
await caster.shutdown()
break # Exit loop
else:
print("Invalid command.")
async def main():
import os
logging.basicConfig(
level=os.environ.get('LOG_LEVEL', logging.DEBUG),
format='%(module)s.py:%(lineno)d %(levelname)s: %(message)s'
)
# Enable debug logging for bumble
# logging.getLogger('bumble').setLevel(logging.DEBUG)
os.chdir(os.path.dirname(__file__))
global_conf = auracast_config.AuracastGlobalConfig(
qos_config=auracast_config.AuracastQosRobust()
)
#global_conf.transport='serial:/dev/serial/by-id/usb-SEGGER_J-Link_001057705357-if02,1000000,rtscts' # transport for nrf54l15dk
global_conf.transport='serial:/dev/serial/by-id/usb-ZEPHYR_Zephyr_HCI_UART_sample_81BD14B8D71B5662-if00,115200,rtscts' #nrf52dongle hci_uart usb cdc
big_conf = [
auracast_config.AuracastBigConfigDeu(),
auracast_config.AuracastBigConfigEng(),
auracast_config.AuracastBigConfigFra(),
#auracast_config.broadcast_es,
#auracast_config.broadcast_it,
]
for conf in big_conf:
conf.loop = False
# look into:
#async with MyAPI() as api:
#pass
caster = Multicaster(global_conf, big_conf)
await caster.init_broadcast()
await command_line_ui(caster)
if __name__ == '__main__':
# Run the application
asyncio.run(main())