Files
bumble-auracast/auracast/auracast.py

280 lines
10 KiB
Python

# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import asyncio
import contextlib
import dataclasses
import functools
import logging
import os
import wave
import itertools
from typing import cast, Any, AsyncGenerator, Coroutine, Dict, Optional, Tuple
import click
import pyee
try:
import lc3 # type: ignore # pylint: disable=E0401
except ImportError as e:
raise ImportError("Try `python -m pip install \".[lc3]\"`.") from e
from bumble.colors import color
from bumble import company_ids
from bumble import core
from bumble import gatt
from bumble import hci
from bumble.profiles import bap
from bumble.profiles import le_audio
from bumble.profiles import pbp
from bumble.profiles import bass
import bumble.device
import bumble.transport
import bumble.utils
from bumble.device import Host
import auracast_config
def modified_on_hci_number_of_completed_packets_event(self, event):
for connection_handle, num_completed_packets in zip(
event.connection_handles, event.num_completed_packets
):
if connection := self.connections.get(connection_handle):
connection.acl_packet_queue.on_packets_completed(num_completed_packets)
elif connection_handle not in itertools.chain(
self.cis_links.keys(),
self.sco_links.keys(),
itertools.chain.from_iterable(self.bigs.values()),
):
logger.warning(
'received packet completion event for unknown handle '
f'0x{connection_handle:04X}'
)
self.emit('hci_number_of_completed_packets_event', event)
Host.on_hci_number_of_completed_packets_event = modified_on_hci_number_of_completed_packets_event
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
logger = logging.getLogger(__name__)
@contextlib.asynccontextmanager
async def create_device(config: auracast_config.AuracastGlobalConfig) -> AsyncGenerator[bumble.device.Device, Any]:
async with await bumble.transport.open_transport(config.transport) as (
hci_source,
hci_sink,
):
device_config = bumble.device.DeviceConfiguration(
name=config.device_name,
address=config.auracast_device_address,
keystore='JsonKeyStore',
)
device = bumble.device.Device.from_config_with_hci(
device_config,
hci_source,
hci_sink,
)
await device.power_on()
yield device
def run_async(async_command: Coroutine) -> None:
try:
asyncio.run(async_command)
except core.ProtocolError as error:
if error.error_namespace == 'att' and error.error_code in list(
bass.ApplicationError
):
message = bass.ApplicationError(error.error_code).name
else:
message = str(error)
print(
color('!!! An error occurred while executing the command:', 'red'), message
)
# -----------------------------------------------------------------------------
# Constants
# -----------------------------------------------------------------------------
from typing import List
async def run_broadcast(
global_config : auracast_config.AuracastGlobalConfig,
big_config: List[auracast_config.AuracastBigConfig]
) -> None:
async with create_device(global_config) as device:
if not device.supports_le_periodic_advertising:
logger.error(color('Periodic advertising not supported', 'red'))
return
with wave.open(big_config[0].broacast_wav_file_path, 'rb') as wav:
logger.info('Encoding wav file into lc3...')
logger.info('Frame rate of .wav file is: %s', wav.getframerate())
encoder = lc3.Encoder(
frame_duration_us=global_config.frame_duration_us,
sample_rate_hz=global_config.auracast_sampling_rate_khz,
num_channels=1,
input_sample_rate_hz=wav.getframerate(),
)
frames = list[bytes]()
while pcm := wav.readframes(encoder.get_frame_samples()):
frames.append(
encoder.encode(pcm, num_bytes=global_config.octets_per_frame, bit_depth=wav.getsampwidth() * 8)
)
del encoder
print('Encoding complete.')
# Config advertising set
bap_sampling_freq = getattr(bap.SamplingFrequency, f"FREQ_{global_config.auracast_sampling_rate_khz}")
basic_audio_announcement = bap.BasicAudioAnnouncement(
presentation_delay=global_config.presentation_delay_us,
subgroups=[
bap.BasicAudioAnnouncement.Subgroup(
codec_id=hci.CodingFormat(codec_id=hci.CodecID.LC3),
codec_specific_configuration=bap.CodecSpecificConfiguration(
sampling_frequency=bap_sampling_freq,
frame_duration=bap.FrameDuration.DURATION_10000_US,
octets_per_codec_frame=global_config.octets_per_frame,
),
metadata=le_audio.Metadata(
[
le_audio.Metadata.Entry(
tag=le_audio.Metadata.Tag.LANGUAGE, data=big_config[0].broadcast_language.encode()
),
le_audio.Metadata.Entry(
tag=le_audio.Metadata.Tag.PROGRAM_INFO, data=big_config[0].broadcast_program_info.encode()
),
]
),
bis=[
bap.BasicAudioAnnouncement.BIS(
index=1,
codec_specific_configuration=bap.CodecSpecificConfiguration(
audio_channel_allocation=bap.AudioLocation.FRONT_LEFT
),
),
],
)
],
)
logging.info('Setup Advertising')
broadcast_audio_announcement = bap.BroadcastAudioAnnouncement(big_config[0].broadcast_id)
advertising_set0 = await device.create_advertising_set(
advertising_parameters=bumble.device.AdvertisingParameters(
advertising_event_properties=bumble.device.AdvertisingEventProperties(
is_connectable=False
),
primary_advertising_interval_min=100,
primary_advertising_interval_max=200,
advertising_sid=0
# TODO: use 2mbit phy
),
advertising_data=(
broadcast_audio_announcement.get_advertising_data()
+ bytes(
core.AdvertisingData(
[(core.AdvertisingData.BROADCAST_NAME, big_config[0].broadcast_name.encode())]
)
)
),
periodic_advertising_parameters=bumble.device.PeriodicAdvertisingParameters(
periodic_advertising_interval_min=80,
periodic_advertising_interval_max=160,
),
periodic_advertising_data=basic_audio_announcement.get_advertising_data(),
auto_restart=True,
auto_start=True,
)
logging.info('Start Periodic Advertising')
await advertising_set0.start_periodic()
logging.info('Setup BIG')
big0 = await device.create_big(
advertising_set0,
parameters=bumble.device.BigParameters(
num_bis=1,
sdu_interval=global_config.frame_duration_us,
max_sdu=global_config.octets_per_frame, # is this octets per frame ?
max_transport_latency=65,
rtn=4,
broadcast_code=(
bytes.fromhex(big_config[0].broadcast_code) if big_config[0].broadcast_code else None
),
),
)
logging.info('Setup ISO Data Path')
for bis_link in big0.bis_links:
await bis_link.setup_data_path(
direction=bis_link.Direction.HOST_TO_CONTROLLER
)
frames_iterator = itertools.cycle(frames)
logging.info("Broadcasting...")
def on_packet_complete(event):
frame = next(frames_iterator)
big0.bis_links[0].write(frame)
device.host.on('hci_number_of_completed_packets_event', on_packet_complete)
on_packet_complete('') # Send the first packet, to get the event loop running
while True:
await asyncio.sleep(1)
# -----------------------------------------------------------------------------
# Main
# -----------------------------------------------------------------------------
def broadcast(global_conf: auracast_config.AuracastGlobalConfig, big_conf: List[auracast_config.AuracastBigConfig]):
"""Start a broadcast as a source."""
run_async(
run_broadcast(
global_conf,
big_conf
)
)
# -----------------------------------------------------------------------------
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
global_conf = auracast_config.global_base_config
bigs = [
auracast_config.broadcast_de
]
global_conf.octets_per_frame=60# 48kbps@24kHz
broadcast(
global_conf,
bigs
)