2 Commits

Author SHA1 Message Date
41d5f8c90b remove obsolete test implementations 2025-02-25 13:26:55 +01:00
5501f76f85 Adaptions 2025-02-25 13:26:44 +01:00
6 changed files with 15 additions and 692 deletions

View File

@@ -1,279 +0,0 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import asyncio
import contextlib
import logging
import wave
import itertools
from typing import cast, Any, AsyncGenerator, Coroutine, Dict, Optional, Tuple
from typing import List
try:
import lc3 # type: ignore # pylint: disable=E0401
except ImportError as e:
raise ImportError("Try `python -m pip install \".[lc3]\"`.") from e
from bumble.colors import color
from bumble import company_ids
from bumble import core
from bumble import gatt
from bumble import hci
from bumble.profiles import bap
from bumble.profiles import le_audio
from bumble.profiles import pbp
from bumble.profiles import bass
import bumble.device
import bumble.transport
import bumble.utils
from bumble.device import Host
import auracast_config
def modified_on_hci_number_of_completed_packets_event(self, event):
for connection_handle, num_completed_packets in zip(
event.connection_handles, event.num_completed_packets
):
if connection := self.connections.get(connection_handle):
connection.acl_packet_queue.on_packets_completed(num_completed_packets)
elif connection_handle not in itertools.chain(
self.cis_links.keys(),
self.sco_links.keys(),
itertools.chain.from_iterable(self.bigs.values()),
):
logger.warning(
'received packet completion event for unknown handle '
f'0x{connection_handle:04X}'
)
self.emit('hci_number_of_completed_packets_event', event)
Host.on_hci_number_of_completed_packets_event = modified_on_hci_number_of_completed_packets_event
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
logger = logging.getLogger(__name__)
@contextlib.asynccontextmanager
async def create_device(config: auracast_config.AuracastGlobalConfig) -> AsyncGenerator[bumble.device.Device, Any]:
async with await bumble.transport.open_transport(config.transport) as (
hci_source,
hci_sink,
):
device_config = bumble.device.DeviceConfiguration(
name=config.device_name,
address=config.auracast_device_address,
keystore='JsonKeyStore',
)
device = bumble.device.Device.from_config_with_hci(
device_config,
hci_source,
hci_sink,
)
await device.power_on()
yield device
def run_async(async_command: Coroutine) -> None:
try:
asyncio.run(async_command)
except core.ProtocolError as error:
if error.error_namespace == 'att' and error.error_code in list(
bass.ApplicationError
):
message = bass.ApplicationError(error.error_code).name
else:
message = str(error)
print(
color('!!! An error occurred while executing the command:', 'red'), message
)
async def run_broadcast(
global_config : auracast_config.AuracastGlobalConfig,
big_config: List[auracast_config.AuracastBigConfig]
) -> None:
async with create_device(global_config) as device:
if not device.supports_le_periodic_advertising:
logger.error(color('Periodic advertising not supported', 'red'))
return
with wave.open(big_config[0].audio_source, 'rb') as wav:
logger.info('Encoding wav file into lc3...')
logger.info('Frame rate of .wav file is: %s', wav.getframerate())
encoder = lc3.Encoder(
frame_duration_us=global_config.frame_duration_us,
sample_rate_hz=global_config.auracast_sampling_rate_hz,
num_channels=1,
input_sample_rate_hz=wav.getframerate(),
)
frames = list[bytes]()
while pcm := wav.readframes(encoder.get_frame_samples()):
frames.append(
encoder.encode(pcm, num_bytes=global_config.octets_per_frame, bit_depth=wav.getsampwidth() * 8)
)
del encoder
print('Encoding complete.')
# Config advertising set
bap_sampling_freq = getattr(bap.SamplingFrequency, f"FREQ_{global_config.auracast_sampling_rate_hz}")
basic_audio_announcement = bap.BasicAudioAnnouncement(
presentation_delay=global_config.presentation_delay_us,
subgroups=[
bap.BasicAudioAnnouncement.Subgroup(
codec_id=hci.CodingFormat(codec_id=hci.CodecID.LC3),
codec_specific_configuration=bap.CodecSpecificConfiguration(
sampling_frequency=bap_sampling_freq,
frame_duration=bap.FrameDuration.DURATION_10000_US,
octets_per_codec_frame=global_config.octets_per_frame,
),
metadata=le_audio.Metadata(
[
le_audio.Metadata.Entry(
tag=le_audio.Metadata.Tag.LANGUAGE, data=big_config[0].language.encode()
),
le_audio.Metadata.Entry(
tag=le_audio.Metadata.Tag.PROGRAM_INFO, data=big_config[0].program_info.encode()
),
]
),
bis=[
bap.BasicAudioAnnouncement.BIS(
index=1,
codec_specific_configuration=bap.CodecSpecificConfiguration(
audio_channel_allocation=bap.AudioLocation.FRONT_LEFT
),
),
],
)
],
)
logging.info('Setup Advertising')
broadcast_audio_announcement = bap.BroadcastAudioAnnouncement(big_config[0].id)
advertising_set0 = await device.create_advertising_set(
advertising_parameters=bumble.device.AdvertisingParameters(
advertising_event_properties=bumble.device.AdvertisingEventProperties(
is_connectable=False
),
primary_advertising_interval_min=100,
primary_advertising_interval_max=200,
advertising_sid=0
# TODO: use 2mbit phy
),
advertising_data=(
broadcast_audio_announcement.get_advertising_data()
+ bytes(
core.AdvertisingData(
[(core.AdvertisingData.BROADCAST_NAME, big_config[0].name.encode())]
)
)
),
periodic_advertising_parameters=bumble.device.PeriodicAdvertisingParameters(
periodic_advertising_interval_min=80,
periodic_advertising_interval_max=160,
),
periodic_advertising_data=basic_audio_announcement.get_advertising_data(),
auto_restart=True,
auto_start=True,
)
logging.info('Start Periodic Advertising')
await advertising_set0.start_periodic()
logging.info('Setup BIG')
big0 = await device.create_big(
advertising_set0,
parameters=bumble.device.BigParameters(
num_bis=1,
sdu_interval=global_config.frame_duration_us,
max_sdu=global_config.octets_per_frame, # is this octets per frame ?
max_transport_latency=65,
rtn=4,
broadcast_code=(
bytes.fromhex(big_config[0].code) if big_config[0].code else None
),
),
)
logging.info('Setup ISO Data Path')
for bis_link in big0.bis_links:
await bis_link.setup_data_path(
direction=bis_link.Direction.HOST_TO_CONTROLLER
)
frames_iterator = itertools.cycle(frames)
logging.info("Broadcasting...")
def on_packet_complete(event):
frame = next(frames_iterator)
big0.bis_links[0].write(frame)
device.host.on('hci_number_of_completed_packets_event', on_packet_complete)
on_packet_complete('') # Send the first packet, to get the event loop running
while True:
await asyncio.sleep(1)
# -----------------------------------------------------------------------------
# Main
# -----------------------------------------------------------------------------
def broadcast(global_conf: auracast_config.AuracastGlobalConfig, big_conf: List[auracast_config.AuracastBigConfig]):
"""Start a broadcast as a source."""
run_async(
run_broadcast(
global_conf,
big_conf
)
)
# -----------------------------------------------------------------------------
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
global_conf = auracast_config.global_base_config
#global_conf.transport='serial:/dev/serial/by-id/usb-ZEPHYR_Zephyr_HCI_UART_sample_81BD14B8D71B5662-if00,1000000,rtscts' # transport for nrf52 dongle
#global_conf.transport='serial:/dev/serial/by-id/usb-SEGGER_J-Link_001050076061-if02,1000000,rtscts' # transport for nrf53dk
global_conf.transport='serial:/dev/serial/by-id/usb-SEGGER_J-Link_001057705357-if02,1000000,rtscts' # transport for nrf54l15dk
bigs = [
auracast_config.broadcast_de
]
global_conf.octets_per_frame=60# 48kbps@24kHz
global_conf.auracast_sampling_rate_hz = 32000
global_conf.octets_per_frame=80# 64kbps@24kHz
broadcast(
global_conf,
bigs
)

View File

@@ -53,8 +53,9 @@ class AuracastBigConfig:
name: str = 'Broadcast0'
program_info: str = 'Some Announcements'
audio_source: str = 'file:./auracast/announcement_48_10_96000_en.wav'
iso_que_len: int = 64
input_format: str = 'auto'
loop_wav: bool = True
iso_que_len: int = 64
# Instanciate some example configurations

View File

@@ -1,399 +0,0 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import asyncio
import contextlib
import logging
import wave
import itertools
from typing import cast, Any, AsyncGenerator, Coroutine, Dict, Optional, Tuple
from typing import List
try:
import lc3 # type: ignore # pylint: disable=E0401
except ImportError as e:
raise ImportError("Try `python -m pip install \".[lc3]\"`.") from e
from bumble.colors import color
from bumble import company_ids
from bumble import core
from bumble import gatt
from bumble import hci
from bumble.profiles import bap
from bumble.profiles import le_audio
from bumble.profiles import pbp
from bumble.profiles import bass
import bumble.device
import bumble.transport
import bumble.utils
from bumble.device import Host
import auracast_config
def modified_on_hci_number_of_completed_packets_event(self, event):
for connection_handle, num_completed_packets in zip(
event.connection_handles, event.num_completed_packets
):
if connection := self.connections.get(connection_handle):
connection.acl_packet_queue.on_packets_completed(num_completed_packets)
elif connection_handle not in itertools.chain(
self.cis_links.keys(),
self.sco_links.keys(),
itertools.chain.from_iterable(self.bigs.values()),
):
logger.warning(
'received packet completion event for unknown handle '
f'0x{connection_handle:04X}'
)
self.emit('hci_number_of_completed_packets_event', event)
Host.on_hci_number_of_completed_packets_event = modified_on_hci_number_of_completed_packets_event
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
logger = logging.getLogger(__name__)
@contextlib.asynccontextmanager
async def create_device(config: auracast_config.AuracastGlobalConfig) -> AsyncGenerator[bumble.device.Device, Any]:
async with await bumble.transport.open_transport(config.transport) as (
hci_source,
hci_sink,
):
device_config = bumble.device.DeviceConfiguration(
name=config.device_name,
address=config.auracast_device_address,
keystore='JsonKeyStore',
#le_simultaneous_enabled=True
)
device = bumble.device.Device.from_config_with_hci(
device_config,
hci_source,
hci_sink,
)
await device.power_on()
yield device
def run_async(async_command: Coroutine) -> None:
try:
asyncio.run(async_command)
except core.ProtocolError as error:
if error.error_namespace == 'att' and error.error_code in list(
bass.ApplicationError
):
message = bass.ApplicationError(error.error_code).name
else:
message = str(error)
print(
color('!!! An error occurred while executing the command:', 'red'), message
)
handle0 = None
handle1 = None
async def run_broadcast(
global_config : auracast_config.AuracastGlobalConfig,
big_config: List[auracast_config.AuracastBigConfig]
) -> None:
async with create_device(global_config) as device:
if not device.supports_le_periodic_advertising:
logger.error(color('Periodic advertising not supported', 'red'))
return
with wave.open(big_config[0].audio_source, 'rb') as wav:
logger.info('Encoding wav file into lc3...')
logger.info('Frame rate of .wav file is: %s', wav.getframerate())
encoder = lc3.Encoder(
frame_duration_us=global_config.frame_duration_us,
sample_rate_hz=global_config.auracast_sampling_rate_hz,
num_channels=1,
input_sample_rate_hz=wav.getframerate(),
)
frames = list[bytes]()
while pcm := wav.readframes(encoder.get_frame_samples()):
frames.append(
encoder.encode(pcm, num_bytes=global_config.octets_per_frame, bit_depth=wav.getsampwidth() * 8)
)
del encoder
print('Encoding complete.')
# Config advertising set
bap_sampling_freq = getattr(bap.SamplingFrequency, f"FREQ_{global_config.auracast_sampling_rate_hz}")
basic_audio_announcement0 = bap.BasicAudioAnnouncement(
presentation_delay=global_config.presentation_delay_us,
subgroups=[
bap.BasicAudioAnnouncement.Subgroup(
codec_id=hci.CodingFormat(codec_id=hci.CodecID.LC3),
codec_specific_configuration=bap.CodecSpecificConfiguration(
sampling_frequency=bap_sampling_freq,
frame_duration=bap.FrameDuration.DURATION_10000_US,
octets_per_codec_frame=global_config.octets_per_frame,
),
metadata=le_audio.Metadata(
[
le_audio.Metadata.Entry(
tag=le_audio.Metadata.Tag.LANGUAGE, data=big_config[0].language.encode()
),
le_audio.Metadata.Entry(
tag=le_audio.Metadata.Tag.PROGRAM_INFO, data=big_config[0].program_info.encode()
),
le_audio.Metadata.Entry(
tag=le_audio.Metadata.Tag.BROADCAST_NAME, data=b'Broadcast0'
),
]
),
bis=[
bap.BasicAudioAnnouncement.BIS(
index=1,
codec_specific_configuration=bap.CodecSpecificConfiguration(
audio_channel_allocation=bap.AudioLocation.FRONT_LEFT
),
),
],
)
],
)
basic_audio_announcement1 = bap.BasicAudioAnnouncement(
presentation_delay=global_config.presentation_delay_us,
subgroups=[
bap.BasicAudioAnnouncement.Subgroup(
codec_id=hci.CodingFormat(codec_id=hci.CodecID.LC3),
codec_specific_configuration=bap.CodecSpecificConfiguration(
sampling_frequency=bap_sampling_freq,
frame_duration=bap.FrameDuration.DURATION_10000_US,
octets_per_codec_frame=global_config.octets_per_frame,
),
metadata=le_audio.Metadata(
[
le_audio.Metadata.Entry(
tag=le_audio.Metadata.Tag.LANGUAGE, data=big_config[0].language.encode()
),
le_audio.Metadata.Entry(
tag=le_audio.Metadata.Tag.PROGRAM_INFO, data=big_config[0].program_info.encode()
),
le_audio.Metadata.Entry(
tag=le_audio.Metadata.Tag.BROADCAST_NAME, data=b'Broadcast1'
),
]
),
bis=[
bap.BasicAudioAnnouncement.BIS(
index=1,
codec_specific_configuration=bap.CodecSpecificConfiguration(
audio_channel_allocation=bap.AudioLocation.FRONT_LEFT
),
),
],
)
],
)
logging.info('Setup Advertising')
broadcast_audio_announcement0 = bap.BroadcastAudioAnnouncement(12)
advertising_set0 = await device.create_advertising_set(
random_address=hci.Address('F1:F1:F2:F3:F4:F5'),
advertising_parameters=bumble.device.AdvertisingParameters(
advertising_event_properties=bumble.device.AdvertisingEventProperties(
is_connectable=False
),
primary_advertising_interval_min=100,
primary_advertising_interval_max=200,
advertising_sid=0,
#primary_advertising_phy=hci.HCI_LE_2M_PHY,
#secondary_advertising_phy=hci.HCI_LE_2M_PHY,
# TODO: use 2mbit phy
),
advertising_data=(
broadcast_audio_announcement0.get_advertising_data()
+ bytes(
core.AdvertisingData(
[(core.AdvertisingData.BROADCAST_NAME, b"Broadcast0")]
)
)
),
periodic_advertising_parameters=bumble.device.PeriodicAdvertisingParameters(
periodic_advertising_interval_min=80,
periodic_advertising_interval_max=160,
),
periodic_advertising_data=basic_audio_announcement0.get_advertising_data(),
auto_restart=True,
auto_start=True,
)
broadcast_audio_announcement1 = bap.BroadcastAudioAnnouncement(13)
advertising_set1 = await device.create_advertising_set(
random_address=hci.Address('F2:F1:F2:F3:F4:F5'),
advertising_parameters=bumble.device.AdvertisingParameters(
advertising_event_properties=bumble.device.AdvertisingEventProperties(
is_connectable=False
),
primary_advertising_interval_min=100,
primary_advertising_interval_max=200,
advertising_sid=1,
#primary_advertising_phy=hci.HCI_LE_2M_PHY,
#secondary_advertising_phy=hci.HCI_LE_2M_PHY,
# TODO: use 2mbit phy
),
advertising_data=(
broadcast_audio_announcement1.get_advertising_data()
+ bytes(
core.AdvertisingData(
[(core.AdvertisingData.BROADCAST_NAME, b"Broadcast1")]
)
)
),
periodic_advertising_parameters=bumble.device.PeriodicAdvertisingParameters(
periodic_advertising_interval_min=80,
periodic_advertising_interval_max=160,
),
periodic_advertising_data=basic_audio_announcement1.get_advertising_data(),
auto_restart=True,
auto_start=True,
)
logging.info('Start Periodic Advertising')
await advertising_set0.start_periodic()
await advertising_set1.start_periodic()
logging.info('Setup BIG')
big0 = await device.create_big( # is this actually extended advertising ?
advertising_set0,
parameters=bumble.device.BigParameters(
num_bis=1,
sdu_interval=global_config.frame_duration_us,
max_sdu=global_config.octets_per_frame, # is this octets per frame ?
max_transport_latency=65,
rtn=4,
broadcast_code=(
bytes.fromhex(big_config[0].code) if big_config[0].code else None
),
),
)
big1 = await device.create_big( # is this actually extended advertising ?
advertising_set1,
parameters=bumble.device.BigParameters(
num_bis=1,
sdu_interval=global_config.frame_duration_us,
max_sdu=global_config.octets_per_frame, # is this octets per frame ?
max_transport_latency=65,
rtn=4,
broadcast_code=(
bytes.fromhex(big_config[0].code) if big_config[0].code else None
),
),
)
logging.info('Setup ISO Data Path')
for bis_link in big0.bis_links:
await bis_link.setup_data_path(
direction=bis_link.Direction.HOST_TO_CONTROLLER
)
for bis_link in big1.bis_links:
await bis_link.setup_data_path(
direction=bis_link.Direction.HOST_TO_CONTROLLER
)
frames_iterator0 = itertools.cycle(frames)
frames_iterator1 = itertools.cycle(frames)
frame0 = next(frames_iterator0)
frame1 = next(frames_iterator1)
# Get the corresponding handles first
logging.info("Determine broadcast handles")
# Define on packet complete function to get the handle for each broadcast
handle0 = big0.bis_links[0].handle
handle1 = big1.bis_links[0].handle
logging.info("Broadcasting...")
def on_packet_complete(event):
event_handle = event.connection_handles[0]
if event_handle == handle0:
frame0 = next(frames_iterator0)
big0.bis_links[0].write(frame0)
elif event_handle == handle1:
frame1 = next(frames_iterator1)
big1.bis_links[0].write(frame1)
else:
raise NotImplementedError('Unkown connection handle')
device.host.on('hci_number_of_completed_packets_event', on_packet_complete)
# Send the first packets, to get the event loop running
big0.bis_links[0].write(frame0)
big1.bis_links[0].write(frame1)
while True:
await asyncio.sleep(1)
# -----------------------------------------------------------------------------
# Main
# -----------------------------------------------------------------------------
def broadcast(global_conf: auracast_config.AuracastGlobalConfig, big_conf: List[auracast_config.AuracastBigConfig]):
"""Start a broadcast as a source."""
run_async(
run_broadcast(
global_conf,
big_conf
)
)
# -----------------------------------------------------------------------------
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
global_conf = auracast_config.global_base_config
#global_conf.transport='serial:/dev/serial/by-id/usb-ZEPHYR_Zephyr_HCI_UART_sample_81BD14B8D71B5662-if00,1000000,rtscts' # transport for nrf52 dongle
#global_conf.transport='serial:/dev/serial/by-id/usb-SEGGER_J-Link_001050076061-if02,1000000,rtscts' # transport for nrf53dk
global_conf.transport='serial:/dev/serial/by-id/usb-SEGGER_J-Link_001057705357-if02,1000000,rtscts' # transport for nrf54l15dk
bigs = [
auracast_config.broadcast_de
]
#global_conf.auracast_sampling_rate_khz=16000
#global_conf.octets_per_frame=40 # 16kbps@8kHz
global_conf.octets_per_frame=60# 48kbps@24kHz
broadcast(
global_conf,
bigs
)

View File

@@ -47,7 +47,7 @@ import bumble.utils
from bumble.device import Host, BIGInfoAdvertisement, AdvertisingChannelMap
from bumble.audio import io as audio_io
import auracast_config
from auracast import auracast_config
# modified from bumble
@@ -205,8 +205,8 @@ async def init_broadcast(
advertising_event_properties=bumble.device.AdvertisingEventProperties(
is_connectable=False
),
primary_advertising_interval_min=round(100),
primary_advertising_interval_max=round(200),
primary_advertising_interval_min=100,
primary_advertising_interval_max=200,
advertising_sid=i,
primary_advertising_phy=hci.Phy.LE_1M, # 2m phy config throws error - because for primary advertising channels, 1mbit is only supported
secondary_advertising_phy=hci.Phy.LE_2M, # this is the secondary advertising beeing send on non advertising channels (extendend advertising)
@@ -223,8 +223,8 @@ async def init_broadcast(
+ advertising_manufacturer_data
),
periodic_advertising_parameters=bumble.device.PeriodicAdvertisingParameters(
periodic_advertising_interval_min=round(80),
periodic_advertising_interval_max=round(160),
periodic_advertising_interval_min=80,
periodic_advertising_interval_max=160,
),
periodic_advertising_data=bigs[f'big{i}']['basic_audio_announcement'].get_advertising_data(),
auto_restart=True,
@@ -295,7 +295,7 @@ async def init_audio(
):
for i, big in enumerate(bigs.values()):
audio_source = big_config[i].audio_source
input_format = 'auto'
input_format = big_config[i].input_format
audio_input = await audio_io.create_audio_input(audio_source, input_format)
audio_input.rewind = big_config[i].loop_wav
pcm_format = await audio_input.open()
@@ -367,7 +367,7 @@ class Streamer():
if all(stream_finished): # Take into account that multiple files have different lengths
logging.info('All streams finished, stopping streamer')
self.is_streaming = True
self.is_streaming = False
break

View File

@@ -7,9 +7,8 @@ import bumble.utils
import asyncio
import aioconsole
import multicast
import auracast_config
from auracast import multicast
from auracast import auracast_config
class Multicaster:
"""
@@ -26,7 +25,6 @@ class Multicaster:
self.is_auracast_init = False
self.is_audio_init = False
self.streaming = False
self.task = None # Holds the background print task
self.global_conf = global_conf
self.big_conf = big_conf
self.device = None
@@ -144,5 +142,6 @@ async def main():
await command_line_ui(caster)
# Run the application
asyncio.run(main())
if __name__ == '__main__':
# Run the application
asyncio.run(main())

View File

@@ -7,6 +7,7 @@ dependencies = [
"bumble @ git+ssh://git@ssh.pstruebi.xyz:222/auracaster/bumble_mirror.git@e027bcb57a0f29c82e3c02c8bb8691dcb91eac62",
"lc3 @ git+ssh://git@ssh.pstruebi.xyz:222/auracaster/liblc3.git@7558637303106c7ea971e7bb8cedf379d3e08bcc",
"sounddevice",
"aioconsole"
]
[project.optional-dependencies]