initial commit

This commit is contained in:
2025-01-23 13:25:27 +01:00
commit 008df44cd4
11 changed files with 380 additions and 0 deletions

302
auracast/auracast.py Normal file
View File

@@ -0,0 +1,302 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import asyncio
import contextlib
import dataclasses
import functools
import logging
import os
import wave
import itertools
from typing import cast, Any, AsyncGenerator, Coroutine, Dict, Optional, Tuple
import click
import pyee
try:
import lc3 # type: ignore # pylint: disable=E0401
except ImportError as e:
raise ImportError("Try `python -m pip install \".[lc3]\"`.") from e
from bumble.colors import color
from bumble import company_ids
from bumble import core
from bumble import gatt
from bumble import hci
from bumble.profiles import bap
from bumble.profiles import le_audio
from bumble.profiles import pbp
from bumble.profiles import bass
import bumble.device
import bumble.transport
import bumble.utils
from bumble.device import Host
def modified_on_hci_number_of_completed_packets_event(self, event):
for connection_handle, num_completed_packets in zip(
event.connection_handles, event.num_completed_packets
):
if connection := self.connections.get(connection_handle):
connection.acl_packet_queue.on_packets_completed(num_completed_packets)
elif connection_handle not in itertools.chain(
self.cis_links.keys(),
self.sco_links.keys(),
itertools.chain.from_iterable(self.bigs.values()),
):
logger.warning(
'received packet completion event for unknown handle '
f'0x{connection_handle:04X}'
)
self.emit('hci_number_of_completed_packets_event', event)
Host.on_hci_number_of_completed_packets_event = modified_on_hci_number_of_completed_packets_event
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
# Constants
# -----------------------------------------------------------------------------
AURACAST_DEFAULT_DEVICE_NAME = 'Bumble Auracast'
AURACAST_DEFAULT_DEVICE_ADDRESS = hci.Address('F0:F1:F2:F3:F4:F5')
AURACAST_DEFAULT_SYNC_TIMEOUT = 5.0
AURACAST_DEFAULT_ATT_MTU = 256
AURACAST_SAMPLING_FREQUENCY=24000
OCTETS_PER_FRAME= 100 # bitrate = octets_per_frame * 8 / frame len
@contextlib.asynccontextmanager
async def create_device(transport: str) -> AsyncGenerator[bumble.device.Device, Any]:
async with await bumble.transport.open_transport(transport) as (
hci_source,
hci_sink,
):
device_config = bumble.device.DeviceConfiguration(
name=AURACAST_DEFAULT_DEVICE_NAME,
address=AURACAST_DEFAULT_DEVICE_ADDRESS,
keystore='JsonKeyStore',
)
device = bumble.device.Device.from_config_with_hci(
device_config,
hci_source,
hci_sink,
)
await device.power_on()
yield device
async def run_broadcast(
transport: str, broadcast_id: int, broadcast_code: str | None, wav_file_path: str
) -> None:
async with create_device(transport) as device:
if not device.supports_le_periodic_advertising:
print(color('Periodic advertising not supported', 'red'))
return
with wave.open(wav_file_path, 'rb') as wav:
print('Encoding wav file into lc3...')
print('Frame rate of .wav file is:', wav.getframerate())
encoder = lc3.Encoder(
frame_duration_us=10000,
sample_rate_hz=AURACAST_SAMPLING_FREQUENCY,
num_channels=1,
input_sample_rate_hz=wav.getframerate(),
)
frames = list[bytes]()
while pcm := wav.readframes(encoder.get_frame_samples()):
frames.append(
encoder.encode(pcm, num_bytes=OCTETS_PER_FRAME, bit_depth=wav.getsampwidth() * 8)
)
del encoder
print('Encoding complete.')
bap_sampling_freq = getattr(bap.SamplingFrequency, f"FREQ_{AURACAST_SAMPLING_FREQUENCY}")
basic_audio_announcement = bap.BasicAudioAnnouncement(
presentation_delay=40000,
subgroups=[
bap.BasicAudioAnnouncement.Subgroup(
codec_id=hci.CodingFormat(codec_id=hci.CodecID.LC3),
codec_specific_configuration=bap.CodecSpecificConfiguration(
sampling_frequency=bap_sampling_freq,
frame_duration=bap.FrameDuration.DURATION_10000_US,
octets_per_codec_frame=OCTETS_PER_FRAME,
),
metadata=le_audio.Metadata(
[
le_audio.Metadata.Entry(
tag=le_audio.Metadata.Tag.LANGUAGE, data=b'eng'
),
le_audio.Metadata.Entry(
tag=le_audio.Metadata.Tag.PROGRAM_INFO, data=b'Disco'
),
]
),
bis=[
bap.BasicAudioAnnouncement.BIS(
index=1,
codec_specific_configuration=bap.CodecSpecificConfiguration(
audio_channel_allocation=bap.AudioLocation.FRONT_LEFT
),
),
# bap.BasicAudioAnnouncement.BIS(
# index=2,
# codec_specific_configuration=bap.CodecSpecificConfiguration(
# audio_channel_allocation=bap.AudioLocation.FRONT_RIGHT
# ),
# ),
],
)
],
)
broadcast_audio_announcement = bap.BroadcastAudioAnnouncement(broadcast_id)
print('Start Advertising')
advertising_set = await device.create_advertising_set(
advertising_parameters=bumble.device.AdvertisingParameters(
advertising_event_properties=bumble.device.AdvertisingEventProperties(
is_connectable=False
),
primary_advertising_interval_min=100,
primary_advertising_interval_max=200,
),
advertising_data=(
broadcast_audio_announcement.get_advertising_data()
+ bytes(
core.AdvertisingData(
[(core.AdvertisingData.BROADCAST_NAME, b'Bumble Auracast')]
)
)
),
periodic_advertising_parameters=bumble.device.PeriodicAdvertisingParameters(
periodic_advertising_interval_min=80,
periodic_advertising_interval_max=160,
),
periodic_advertising_data=basic_audio_announcement.get_advertising_data(),
auto_restart=True,
auto_start=True,
)
print('Start Periodic Advertising')
await advertising_set.start_periodic()
print('Setup BIG')
big = await device.create_big(
advertising_set,
parameters=bumble.device.BigParameters(
num_bis=1,
sdu_interval=10000,
max_sdu=100, # is this octets per frame ?
max_transport_latency=65,
rtn=4,
broadcast_code=(
bytes.fromhex(broadcast_code) if broadcast_code else None
),
),
)
print('Setup ISO Data Path')
for bis_link in big.bis_links:
await bis_link.setup_data_path(
direction=bis_link.Direction.HOST_TO_CONTROLLER
)
frames_iterator = itertools.cycle(frames)
print("Broadcasting...")
def on_packet_complete(event):
frame = next(frames_iterator)
big.bis_links[0].write(frame)
#mid = len(frame) // 2
#big.bis_links[0].write(frame[:mid])
#big.bis_links[1].write(frame[mid:])
device.host.on('hci_number_of_completed_packets_event', on_packet_complete)
on_packet_complete('') # Send the first packet, to get the event loop running
#on_packet_complete(None) # trigger once to get the loop running
# for frame in itertools.cycle(frames):
# mid = len(frame) // 2
# big.bis_links[0].write(frame[:mid])
# big.bis_links[1].write(frame[mid:])
# await asyncio.sleep(0.009)
while True:
await asyncio.sleep(1)
def run_async(async_command: Coroutine) -> None:
try:
asyncio.run(async_command)
except core.ProtocolError as error:
if error.error_namespace == 'att' and error.error_code in list(
bass.ApplicationError
):
message = bass.ApplicationError(error.error_code).name
else:
message = str(error)
print(
color('!!! An error occurred while executing the command:', 'red'), message
)
# -----------------------------------------------------------------------------
# Main
# -----------------------------------------------------------------------------
def broadcast(transport, broadcast_id, broadcast_code, wav_file_path):
"""Start a broadcast as a source."""
run_async(
run_broadcast(
transport=transport,
broadcast_id=broadcast_id,
broadcast_code=broadcast_code,
wav_file_path=wav_file_path,
)
)
def main():
logging.basicConfig(level=logging.INFO)
transport = "serial:/dev/ttyACM1,1000000,rtscts"
broadcast_id =123456
broadcast_code = None # Hex encryption code
wav_file_path="./auracast/announcement_48_10_96000_en.wav"
broadcast(
transport=transport,
broadcast_id=broadcast_id,
broadcast_code=broadcast_code,
wav_file_path=wav_file_path,
)
# -----------------------------------------------------------------------------
if __name__ == "__main__":
main() # pylint: disable=no-value-for-parameter