Files
bumble-auracast/src/auracast/multicast_control.py
T
Paul Obernesser ca461c47da add PBP qualification tests and rename QoS profiles
- Rename AuracastQosHigh/Mid/Low to AuracastQosDefault/Fast for clarity
- Add Public Broadcast Profile (PBP) service data to advertising with dynamic feature calculation based on encryption and sample rate
- Add PBP/PBS/STR test cases (BV-01-C through BV-06-C) for standard and high quality streaming with various configurations
- Update all existing test cases and main scripts to use new QoS profile names
2025-12-08 16:18:40 +01:00

169 lines
5.7 KiB
Python

import logging
from typing import cast, Any, AsyncGenerator, Coroutine, Dict, Optional, Tuple, List
import bumble
import bumble.device
import bumble.transport
import bumble.utils
import asyncio
import aioconsole
from auracast import multicast
from auracast import auracast_config
class Multicaster:
"""
A class responsible for managing the multicasting and audio streaming process.
It provides methods to initialize and shutdown the broadcasting, as well as start and stop the streaming.
The class also manages the underlying device and advertising sets.
"""
def __init__(
self,
global_conf: auracast_config.AuracastGlobalConfig,
big_conf: List[auracast_config.AuracastBigConfig]
):
self.is_auracast_init = False
self.is_audio_init = False
self.streaming = False
self.global_conf = global_conf
self.big_conf = big_conf
self.device = None
self.bigs = None
self.streamer = None
def get_status(self):
streaming = self.streamer.is_streaming if self.streamer is not None else False
return {
'is_initialized': self.is_auracast_init,
'is_streaming': streaming,
}
async def init_broadcast(self):
self.device_acm = multicast.create_device(self.global_conf)
agen = self.device_acm.__aenter__() # Manually triggering setup
device = await agen
self.bigs = await multicast.init_broadcast( # the bigs dictionary contains all the global configurations
device,
self.global_conf,
self.big_conf
)
self.device = device
self.is_auracast_init = True
async def start_streaming(self):
"""Start streaming; if an old stream is running, stop it first to release audio devices."""
if self.streamer is not None:
await self.stop_streaming()
# Brief pause to ensure ALSA/PortAudio fully releases the input device
await asyncio.sleep(0.5)
self.streamer = multicast.Streamer(self.bigs, self.global_conf, self.big_conf)
self.streamer.start_streaming()
async def stop_streaming(self):
if self.streamer is not None:
await self.streamer.stop_streaming()
self.streamer = None
async def reset(self):
await self.shutdown() # Manually triggering teardown
self.__init__(self.global_conf, self.big_conf)
async def shutdown(self):
# Ensure streaming is fully stopped before tearing down Bluetooth resources
if self.streamer is not None:
await self.stop_streaming()
self.is_auracast_init = False
self.is_audio_init = False
if self.bigs:
for big in self.bigs.values():
if big.get('audio_input'):
if hasattr(big['audio_input'], 'aclose'):
await big['audio_input'].aclose()
if self.device:
await self.device.stop_advertising()
if self.bigs:
for big in self.bigs.values():
if big.get('advertising_set'):
await big['advertising_set'].stop()
# Explicitly power off the device to ensure a clean state before closing the transport
await self.device.power_off()
await self.device_acm.__aexit__(None, None, None) # Manually triggering teardown
# example commandline ui
async def command_line_ui(caster: Multicaster):
while True:
command = await aioconsole.ainput("\nCommands: [start_audio|stop_audio|stop|init|init_audio|quit] > ")
if command.strip().lower() == "start_audio":
caster.start_streaming()
print("Audio started!")
elif command.strip().lower() == "stop_audio":
caster.stop_streaming()
print("Audio stopped!")
elif command.strip().lower() == "stop":
print("👋 Stopping...")
caster.stop_streaming()
await caster.reset()
elif command.strip().lower() == "init":
await caster.reset()
await caster.init_broadcast()
await caster.init_audio()
elif command.strip().lower() == "init_audio":
await caster.init_audio()
elif command.strip().lower() == "quit":
print("👋 Exiting...")
if caster.device:
caster.stop_streaming()
await caster.shutdown()
break # Exit loop
else:
print("Invalid command.")
async def main():
import os
logging.basicConfig(
level=os.environ.get('LOG_LEVEL', logging.DEBUG),
format='%(module)s.py:%(lineno)d %(levelname)s: %(message)s'
)
os.chdir(os.path.dirname(__file__))
global_conf = auracast_config.AuracastGlobalConfig(
qos_config=auracast_config.AuracastQosDefault()
)
#global_conf.transport='serial:/dev/serial/by-id/usb-SEGGER_J-Link_001057705357-if02,1000000,rtscts' # transport for nrf54l15dk
global_conf.transport='serial:/dev/serial/by-id/usb-ZEPHYR_Zephyr_HCI_UART_sample_81BD14B8D71B5662-if00,115200,rtscts' #nrf52dongle hci_uart usb cdc
big_conf = [
auracast_config.AuracastBigConfigDeu(),
auracast_config.AuracastBigConfigEng(),
auracast_config.AuracastBigConfigFra(),
#auracast_config.broadcast_es,
#auracast_config.broadcast_it,
]
for conf in big_conf:
conf.loop = False
# look into:
#async with MyAPI() as api:
#pass
caster = Multicaster(global_conf, big_conf)
await caster.init_broadcast()
await command_line_ui(caster)
if __name__ == '__main__':
# Run the application
asyncio.run(main())