Files
bumble-auracast/auracast/multicast.py
2025-02-12 17:36:11 +01:00

339 lines
14 KiB
Python

# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import pprint
import asyncio
import contextlib
import logging
import wave
import itertools
from typing import cast, Any, AsyncGenerator, Coroutine, Dict, Optional, Tuple
from typing import List
try:
import lc3 # type: ignore # pylint: disable=E0401
except ImportError as e:
raise ImportError("Try `python -m pip install \".[lc3]\"`.") from e
from bumble.colors import color
from bumble import company_ids
from bumble import core
from bumble import gatt
from bumble import hci
from bumble.profiles import bap
from bumble.profiles import le_audio
from bumble.profiles import pbp
from bumble.profiles import bass
import bumble.device
import bumble.transport
import bumble.utils
from bumble.device import Host, BIGInfoAdvertisement
import auracast_config
def modified_on_hci_number_of_completed_packets_event(self, event):
for connection_handle, num_completed_packets in zip(
event.connection_handles, event.num_completed_packets
):
if connection := self.connections.get(connection_handle):
connection.acl_packet_queue.on_packets_completed(num_completed_packets)
elif connection_handle not in itertools.chain(
self.cis_links.keys(),
self.sco_links.keys(),
itertools.chain.from_iterable(self.bigs.values()),
):
logger.warning(
'received packet completion event for unknown handle '
f'0x{connection_handle:04X}'
)
self.emit('hci_number_of_completed_packets_event', event)
Host.on_hci_number_of_completed_packets_event = modified_on_hci_number_of_completed_packets_event
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
logger = logging.getLogger(__name__)
@contextlib.asynccontextmanager
async def create_device(config: auracast_config.AuracastGlobalConfig) -> AsyncGenerator[bumble.device.Device, Any]:
async with await bumble.transport.open_transport(config.transport) as (
hci_source,
hci_sink,
):
device_config = bumble.device.DeviceConfiguration(
name=config.device_name,
address=config.auracast_device_address,
keystore='JsonKeyStore',
#le_simultaneous_enabled=True #TODO: needed ?
)
device = bumble.device.Device.from_config_with_hci(
device_config,
hci_source,
hci_sink,
)
await device.power_on()
yield device
def run_async(async_command: Coroutine) -> None:
try:
asyncio.run(async_command)
except core.ProtocolError as error:
if error.error_namespace == 'att' and error.error_code in list(
bass.ApplicationError
):
message = bass.ApplicationError(error.error_code).name
else:
message = str(error)
print(
color('!!! An error occurred while executing the command:', 'red'), message
)
ADVERTISING_SLOWDOWN_FACTOR=1
PKGS_COUNT = 0
async def run_broadcast(
global_config : auracast_config.AuracastGlobalConfig,
big_config: List[auracast_config.AuracastBigConfig]
) -> None:
async with create_device(global_config) as device:
if not device.supports_le_periodic_advertising:
logger.error(color('Periodic advertising not supported', 'red'))
return
bap_sampling_freq = getattr(bap.SamplingFrequency, f"FREQ_{global_config.auracast_sampling_rate_hz}")
bigs = {}
for i, conf in enumerate(big_config):
bigs[f'big{i}'] = {}
with wave.open(conf.broacast_wav_file_path, 'rb') as wav:
logger.info('Encoding wav file into lc3...')
logger.info('Frame rate of .wav file is: %s', wav.getframerate())
encoder = lc3.Encoder(
frame_duration_us=global_config.frame_duration_us,
sample_rate_hz=global_config.auracast_sampling_rate_hz,
num_channels=1,
input_sample_rate_hz=wav.getframerate(),
)
frames = list[bytes]()
while pcm := wav.readframes(encoder.get_frame_samples()):
frames.append(
encoder.encode(pcm, num_bytes=global_config.octets_per_frame, bit_depth=wav.getsampwidth() * 8)
)
del encoder
logger.info('Encoding complete.')
bigs.update()
bigs[f'big{i}']['frames'] = frames
# Config advertising set
bigs[f'big{i}']['basic_audio_announcement'] = bap.BasicAudioAnnouncement(
presentation_delay=global_config.presentation_delay_us,
subgroups=[
bap.BasicAudioAnnouncement.Subgroup(
codec_id=hci.CodingFormat(codec_id=hci.CodecID.LC3),
codec_specific_configuration=bap.CodecSpecificConfiguration(
sampling_frequency=bap_sampling_freq,
frame_duration=bap.FrameDuration.DURATION_10000_US,
octets_per_codec_frame=global_config.octets_per_frame,
),
metadata=le_audio.Metadata(
[
le_audio.Metadata.Entry(
tag=le_audio.Metadata.Tag.LANGUAGE, data=conf.broadcast_language.encode()
),
le_audio.Metadata.Entry(
tag=le_audio.Metadata.Tag.PROGRAM_INFO, data=conf.broadcast_program_info.encode()
),
le_audio.Metadata.Entry(
tag=le_audio.Metadata.Tag.BROADCAST_NAME, data=conf.broadcast_name.encode()
),
]
),
bis=[
bap.BasicAudioAnnouncement.BIS(
index=1,
codec_specific_configuration=bap.CodecSpecificConfiguration(
audio_channel_allocation=bap.AudioLocation.FRONT_LEFT
),
),
],
)
],
)
logger.info('Setup Advertising')
bigs[f'big{i}']['broadcast_audio_announcement'] = bap.BroadcastAudioAnnouncement(conf.broadcast_id)
advertising_set = await device.create_advertising_set(
random_address=conf.broadcast_random_address,
advertising_parameters=bumble.device.AdvertisingParameters(
advertising_event_properties=bumble.device.AdvertisingEventProperties(
is_connectable=False
),
primary_advertising_interval_min=round(100*ADVERTISING_SLOWDOWN_FACTOR),
primary_advertising_interval_max=round(200*ADVERTISING_SLOWDOWN_FACTOR),
advertising_sid=i,
#primary_advertising_phy=hci.Phy.LE_2M, TODO: 2m phy config throws error
#secondary_advertising_phy=hci.Phy.LE_2M,
),
advertising_data=(
bigs[f'big{i}']['broadcast_audio_announcement'].get_advertising_data()
+ bytes(
core.AdvertisingData(
[(core.AdvertisingData.BROADCAST_NAME, conf.broadcast_name.encode())]
)
)
),
periodic_advertising_parameters=bumble.device.PeriodicAdvertisingParameters(
periodic_advertising_interval_min=round(80*ADVERTISING_SLOWDOWN_FACTOR),
periodic_advertising_interval_max=round(160*ADVERTISING_SLOWDOWN_FACTOR),
),
periodic_advertising_data=bigs[f'big{i}']['basic_audio_announcement'].get_advertising_data(),
auto_restart=True,
auto_start=True,
)
bigs[f'big{i}']['advertising_set'] = advertising_set
logging.info('Start Periodic Advertising')
await bigs[f'big{i}']['advertising_set'].start_periodic()
logging.info('Setup BIG')
big = await device.create_big(
bigs[f'big{i}']['advertising_set'] ,
parameters=bumble.device.BigParameters(
num_bis=1,
sdu_interval=global_config.qos_config.iso_interval_us,
max_sdu=global_config.octets_per_frame, # is this octets per frame ?
max_transport_latency=global_config.qos_config.max_transport_latency_ms,
rtn=global_config.qos_config.number_of_retransmissions,
broadcast_code=(
bytes.fromhex(conf.broadcast_code) if conf.broadcast_code else None
),
),
)
bigs[f'big{i}']['big'] = big
logging.info('Setup ISO Data Path')
for bis_link in big.bis_links:
await bis_link.setup_data_path(
direction=bis_link.Direction.HOST_TO_CONTROLLER
)
bigs[f'big{i}']['frames_iterator'] = itertools.cycle(frames)
logging.debug(f'big{i} parameters are:')
logging.debug('%s', pprint.pformat(vars(big)))
logging.debug(f'Finished setup of big{i}.')
await asyncio.sleep(3) # Wait for advertising to set up
LOOP_FINITE_PKGS = -1 # set -1 to loop infinitely
def on_packet_complete(event):
global PKGS_COUNT
PKGS_COUNT = PKGS_COUNT + 1
event_handle = event.connection_handles[0]
if PKGS_COUNT < LOOP_FINITE_PKGS or LOOP_FINITE_PKGS == -1:
event_found = False
for big in bigs.values():
if big['big'].bis_links[0].handle == event_handle:
frame = next(big['frames_iterator'])
big['big'].bis_links[0].write(frame)
event_found = True
if not event_found:
logging.warning('unknown event on packet complete with handle: %s', event_handle)
else:
logging.warning("Stopping iso loop")
logging.info("Broadcasting...")
device.host.on('hci_number_of_completed_packets_event', on_packet_complete)
# Send the first packet for each big, to get the event loop running
for big in bigs.values():
frame = next(big['frames_iterator'] )
big['big'].bis_links[0].write(frame)
while True:
await asyncio.sleep(1)
# -----------------------------------------------------------------------------
# Main
# -----------------------------------------------------------------------------
def broadcast(global_conf: auracast_config.AuracastGlobalConfig, big_conf: List[auracast_config.AuracastBigConfig]):
"""Start a broadcast as a source."""
run_async(
run_broadcast(
global_conf,
big_conf
)
)
# -----------------------------------------------------------------------------
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format='%(module)s.py:%(lineno)d %(levelname)s: %(message)s'
)
global_conf = auracast_config.global_base_config
#global_conf.transport='serial:/dev/serial/by-id/usb-ZEPHYR_Zephyr_HCI_UART_sample_81BD14B8D71B5662-if00,1000000,rtscts' # transport for nrf52 dongle
#global_conf.transport='serial:/dev/serial/by-id/usb-SEGGER_J-Link_001050076061-if02,1000000,rtscts' # transport for nrf53dk
#global_conf.transport='serial:/dev/serial/by-id/usb-SEGGER_J-Link_001057705357-if02,1000000,rtscts' # transport for nrf54l15dk
global_conf.transport='serial:/dev/ttyACM1,115200,rtscts'
global_conf.qos_config = auracast_config.qos_config_mono_high_rel # TODO: low rel, actually advertising seems to work again- maybe then the problem is hci->controler
bigs = [
auracast_config.broadcast_de,
auracast_config.broadcast_en,
auracast_config.broadcast_fr,
auracast_config.broadcast_es, #TODO: spanish not appearing as auracast
# auracast_config.broadcast_it, #TODO: with more than three broadcasts, not advertising at all is present, regardless the adv. interval
]
global_conf.auracast_sampling_rate_hz = 16000
global_conf.octets_per_frame = 40 # 32kbps@16kHz
# Note: 24kHz is only working with 2 streams - so this may be a host->controller interface bottleneck
# use thread usage debugger on controller to check actual cpu load - not much load
# TODO; I dont think hci is really the bottleneck. probably limited airtime is the problem. Analyze this somehow.
# Advertising is still 1Mbit phy - use 2Mbit
# Check is the sdu interval (iso interval?) may really be varied - does not seem to work really
# # sdu per frame also needs to be adjusted somehow? is this even compatible with lc3?
broadcast(
global_conf,
bigs
)