# Copyright 2024 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ----------------------------------------------------------------------------- # Imports # ----------------------------------------------------------------------------- from __future__ import annotations import pprint import asyncio import contextlib import logging import wave import itertools from typing import cast, Any, AsyncGenerator, Coroutine, Dict, Optional, Tuple from typing import List try: import lc3 # type: ignore # pylint: disable=E0401 except ImportError as e: raise ImportError("Try `python -m pip install \".[lc3]\"`.") from e from bumble.colors import color from bumble import company_ids from bumble import core from bumble import gatt from bumble import hci from bumble.profiles import bap from bumble.profiles import le_audio from bumble.profiles import pbp from bumble.profiles import bass import bumble.device import bumble.transport import bumble.utils from bumble.device import Host, BIGInfoAdvertisement, AdvertisingChannelMap import auracast_config ADVERTISING_CHANNELS = ( AdvertisingChannelMap.CHANNEL_37 ,AdvertisingChannelMap.CHANNEL_38 ,AdvertisingChannelMap.CHANNEL_39 ) # ----------------------------------------------------------------------------- # Logging # ----------------------------------------------------------------------------- logger = logging.getLogger(__name__) @contextlib.asynccontextmanager async def create_device(config: auracast_config.AuracastGlobalConfig) -> AsyncGenerator[bumble.device.Device, Any]: async with await bumble.transport.open_transport(config.transport) as ( hci_source, hci_sink, ): device_config = bumble.device.DeviceConfiguration( name=config.device_name, address=config.auracast_device_address, keystore='JsonKeyStore', #le_simultaneous_enabled=True #TODO: What is this doing ? ) device = bumble.device.Device.from_config_with_hci( device_config, hci_source, hci_sink, ) await device.power_on() yield device def run_async(async_command: Coroutine) -> None: try: asyncio.run(async_command) except core.ProtocolError as error: if error.error_namespace == 'att' and error.error_code in list( bass.ApplicationError ): message = bass.ApplicationError(error.error_code).name else: message = str(error) print( color('!!! An error occurred while executing the command:', 'red'), message ) ADVERTISING_SLOWDOWN_FACTOR=1 async def run_broadcast( global_config : auracast_config.AuracastGlobalConfig, big_config: List[auracast_config.AuracastBigConfig] ) -> None: async with create_device(global_config) as device: if not device.supports_le_periodic_advertising: logger.error(color('Periodic advertising not supported', 'red')) return bap_sampling_freq = getattr(bap.SamplingFrequency, f"FREQ_{global_config.auracast_sampling_rate_hz}") bigs = {} for i, conf in enumerate(big_config): bigs[f'big{i}'] = {} with wave.open(conf.broacast_wav_file_path, 'rb') as wav: logger.info('Encoding wav file into lc3...') logger.info('Frame rate of .wav file is: %s', wav.getframerate()) encoder = lc3.Encoder( frame_duration_us=global_config.frame_duration_us, sample_rate_hz=global_config.auracast_sampling_rate_hz, num_channels=1, input_sample_rate_hz=wav.getframerate(), ) frames = list[bytes]() while pcm := wav.readframes(encoder.get_frame_samples()): frames.append( encoder.encode( pcm, num_bytes=global_config.octets_per_frame, bit_depth=wav.getsampwidth() * 8) ) del encoder logger.info('Encoding complete.') bigs.update() bigs[f'big{i}']['frames'] = frames bigs[f'big{i}']['frames_iterator'] = itertools.cycle(frames) # Config advertising set bigs[f'big{i}']['basic_audio_announcement'] = bap.BasicAudioAnnouncement( presentation_delay=global_config.presentation_delay_us, subgroups=[ bap.BasicAudioAnnouncement.Subgroup( codec_id=hci.CodingFormat(codec_id=hci.CodecID.LC3), codec_specific_configuration=bap.CodecSpecificConfiguration( sampling_frequency=bap_sampling_freq, frame_duration=bap.FrameDuration.DURATION_10000_US, octets_per_codec_frame=global_config.octets_per_frame, ), metadata=le_audio.Metadata( [ le_audio.Metadata.Entry( tag=le_audio.Metadata.Tag.LANGUAGE, data=conf.broadcast_language.encode() ), le_audio.Metadata.Entry( tag=le_audio.Metadata.Tag.PROGRAM_INFO, data=conf.broadcast_program_info.encode() ), le_audio.Metadata.Entry( tag=le_audio.Metadata.Tag.BROADCAST_NAME, data=conf.broadcast_name.encode() ), ] ), bis=[ bap.BasicAudioAnnouncement.BIS( index=1, codec_specific_configuration=bap.CodecSpecificConfiguration( audio_channel_allocation=bap.AudioLocation.FRONT_LEFT ), ), ], ) ], ) logger.info('Setup Advertising') bigs[f'big{i}']['broadcast_audio_announcement'] = bap.BroadcastAudioAnnouncement(conf.broadcast_id) advertising_set = await device.create_advertising_set( random_address=conf.broadcast_random_address, advertising_parameters=bumble.device.AdvertisingParameters( advertising_event_properties=bumble.device.AdvertisingEventProperties( is_connectable=False ), primary_advertising_interval_min=round(100*ADVERTISING_SLOWDOWN_FACTOR), primary_advertising_interval_max=round(200*ADVERTISING_SLOWDOWN_FACTOR), advertising_sid=i, primary_advertising_phy=hci.Phy.LE_1M, # 2m phy config throws error - because for primary advertising channels, 1mbit is only supported secondary_advertising_phy=hci.Phy.LE_2M, # this is the secondary advertising beeing send on non advertising channels (extendend advertising) #advertising_tx_power= # tx power in dbm (max 20) secondary_advertising_max_skip=10, ), advertising_data=( bigs[f'big{i}']['broadcast_audio_announcement'].get_advertising_data() + bytes( core.AdvertisingData( [(core.AdvertisingData.BROADCAST_NAME, conf.broadcast_name.encode())] ) ) ), periodic_advertising_parameters=bumble.device.PeriodicAdvertisingParameters( periodic_advertising_interval_min=round(80*ADVERTISING_SLOWDOWN_FACTOR), periodic_advertising_interval_max=round(160*ADVERTISING_SLOWDOWN_FACTOR), ), periodic_advertising_data=bigs[f'big{i}']['basic_audio_announcement'].get_advertising_data(), auto_restart=True, auto_start=True, ) bigs[f'big{i}']['advertising_set'] = advertising_set logging.info('Start Periodic Advertising') await advertising_set.start_periodic() logging.info('Setup BIG') if global_config.qos_config.iso_int_multiple_10ms == 1: frame_enable = 0 else: frame_enable = 1 big = await device.create_big( bigs[f'big{i}']['advertising_set'], parameters=bumble.device.BigParameters( num_bis=1, sdu_interval=global_config.qos_config.iso_int_multiple_10ms*10000, # Is the same as iso interval max_sdu=global_config.octets_per_frame, max_transport_latency=global_config.qos_config.max_transport_latency_ms, rtn=global_config.qos_config.number_of_retransmissions, broadcast_code=( bytes.fromhex(conf.broadcast_code) if conf.broadcast_code else None ), framing=frame_enable # needed if iso interval is not frame interval of codedc ), ) bigs[f'big{i}']['big'] = big iso_queues = [ bumble.device.IsoPacketStream(big.bis_links[0], 64), #bumble.device.IsoPacketStream(big.bis_links[1], 64), # right channel ] logging.info('Setup ISO Data Path') for queue in iso_queues: await queue.iso_link.setup_data_path( direction=queue.iso_link.Direction.HOST_TO_CONTROLLER ) bigs[f'big{i}']['iso_queues'] = iso_queues logging.debug(f'big{i} parameters are:') logging.debug('%s', pprint.pformat(vars(big))) logging.debug(f'Finished setup of big{i}.') await asyncio.sleep(i+1) # Wait for advertising to set up logging.info("Broadcasting...") def on_flow(): data_packet_queue = iso_queues[0].data_packet_queue print( f'\rPACKETS: pending={data_packet_queue.pending}, ' f'queued={data_packet_queue.queued}, ' f'completed={data_packet_queue.completed}', end='', ) bigs[f'big{0}']['iso_queues'][0].data_packet_queue.on('flow', on_flow) async def streamer(queue, iterator): while True: frame = next(iterator) await queue[0].write(frame) streams = [] for big in bigs.values(): streams.append( asyncio.create_task( streamer(big['iso_queues'], big['frames_iterator']) ) ) await asyncio.wait(streams) print("Done.") # ----------------------------------------------------------------------------- # Main # ----------------------------------------------------------------------------- def broadcast(global_conf: auracast_config.AuracastGlobalConfig, big_conf: List[auracast_config.AuracastBigConfig]): """Start a broadcast as a source.""" run_async( run_broadcast( global_conf, big_conf ) ) # ----------------------------------------------------------------------------- if __name__ == "__main__": logging.basicConfig( level=logging.INFO, format='%(module)s.py:%(lineno)d %(levelname)s: %(message)s' ) global_conf = auracast_config.global_base_config #global_conf.transport='serial:/dev/serial/by-id/usb-ZEPHYR_Zephyr_HCI_UART_sample_81BD14B8D71B5662-if00,1000000,rtscts' # transport for nrf52 dongle #global_conf.transport='serial:/dev/serial/by-id/usb-SEGGER_J-Link_001050076061-if02,1000000,rtscts' # transport for nrf53dk #global_conf.transport='serial:/dev/serial/by-id/usb-SEGGER_J-Link_001057705357-if02,1000000,rtscts' # transport for nrf54l15dk global_conf.transport='serial:/dev/serial/by-id/usb-ZEPHYR_Zephyr_HCI_UART_sample_81BD14B8D71B5662-if00,115200,rtscts' #nrf52dongle hci_uart usb cdc # global_conf.transport='usb:2fe3:000b' #nrf52dongle hci_usb #TODO: iso packet over usb seems not to be supported #global_conf.qos_config = auracast_config.qos_config_mono_medium_rel global_conf.qos_config = auracast_config.qos_config_mono_high_rel bigs = [ auracast_config.broadcast_de, auracast_config.broadcast_en, auracast_config.broadcast_fr, #auracast_config.broadcast_es, #auracast_config.broadcast_it, ] # TODO: with more than three broadcasters no advertising (no primary channels is present anymore) - is this a number of advertising sets constraint? # TODO: solve the advertising problem and 5 broadcaster may be possible global_conf.auracast_sampling_rate_hz = 16000 global_conf.octets_per_frame = 40 # 32kbps@16kHz # Note: 24kHz is only working with 2 streams - so this may be a host->controller interface bottleneck - probably airtime constraint # 16kHz works reliably with 3 streams # use thread usage debugger on controller to check actual cpu load - not much load # TODO: How can we use other iso interval than 10ms ?(medium or low rel) ? - nrf53audio receiver repports I2S tx underrun # TODO; I dont think hci is really the bottleneck. probably limited airtime is the problem. Analyze this somehow. broadcast( global_conf, bigs )