Introduce a configuration file

This commit is contained in:
2025-01-25 12:51:35 +01:00
parent 008df44cd4
commit 41d757ce6b
9 changed files with 201 additions and 198 deletions

0
auracast/__init__.py Normal file
View File

View File

@@ -49,6 +49,8 @@ import bumble.transport
import bumble.utils
from bumble.device import Host
import auracast_config
def modified_on_hci_number_of_completed_packets_event(self, event):
for connection_handle, num_completed_packets in zip(
@@ -76,26 +78,15 @@ Host.on_hci_number_of_completed_packets_event = modified_on_hci_number_of_comple
# -----------------------------------------------------------------------------
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
# Constants
# -----------------------------------------------------------------------------
AURACAST_DEFAULT_DEVICE_NAME = 'Bumble Auracast'
AURACAST_DEFAULT_DEVICE_ADDRESS = hci.Address('F0:F1:F2:F3:F4:F5')
AURACAST_DEFAULT_SYNC_TIMEOUT = 5.0
AURACAST_DEFAULT_ATT_MTU = 256
AURACAST_SAMPLING_FREQUENCY=24000
OCTETS_PER_FRAME= 100 # bitrate = octets_per_frame * 8 / frame len
@contextlib.asynccontextmanager
async def create_device(transport: str) -> AsyncGenerator[bumble.device.Device, Any]:
async with await bumble.transport.open_transport(transport) as (
async def create_device(config: auracast_config.AuracastGlobalConfig) -> AsyncGenerator[bumble.device.Device, Any]:
async with await bumble.transport.open_transport(config.transport) as (
hci_source,
hci_sink,
):
device_config = bumble.device.DeviceConfiguration(
name=AURACAST_DEFAULT_DEVICE_NAME,
address=AURACAST_DEFAULT_DEVICE_ADDRESS,
name=config.device_name,
address=config.auracast_device_address,
keystore='JsonKeyStore',
)
@@ -109,146 +100,6 @@ async def create_device(transport: str) -> AsyncGenerator[bumble.device.Device,
yield device
async def run_broadcast(
transport: str, broadcast_id: int, broadcast_code: str | None, wav_file_path: str
) -> None:
async with create_device(transport) as device:
if not device.supports_le_periodic_advertising:
print(color('Periodic advertising not supported', 'red'))
return
with wave.open(wav_file_path, 'rb') as wav:
print('Encoding wav file into lc3...')
print('Frame rate of .wav file is:', wav.getframerate())
encoder = lc3.Encoder(
frame_duration_us=10000,
sample_rate_hz=AURACAST_SAMPLING_FREQUENCY,
num_channels=1,
input_sample_rate_hz=wav.getframerate(),
)
frames = list[bytes]()
while pcm := wav.readframes(encoder.get_frame_samples()):
frames.append(
encoder.encode(pcm, num_bytes=OCTETS_PER_FRAME, bit_depth=wav.getsampwidth() * 8)
)
del encoder
print('Encoding complete.')
bap_sampling_freq = getattr(bap.SamplingFrequency, f"FREQ_{AURACAST_SAMPLING_FREQUENCY}")
basic_audio_announcement = bap.BasicAudioAnnouncement(
presentation_delay=40000,
subgroups=[
bap.BasicAudioAnnouncement.Subgroup(
codec_id=hci.CodingFormat(codec_id=hci.CodecID.LC3),
codec_specific_configuration=bap.CodecSpecificConfiguration(
sampling_frequency=bap_sampling_freq,
frame_duration=bap.FrameDuration.DURATION_10000_US,
octets_per_codec_frame=OCTETS_PER_FRAME,
),
metadata=le_audio.Metadata(
[
le_audio.Metadata.Entry(
tag=le_audio.Metadata.Tag.LANGUAGE, data=b'eng'
),
le_audio.Metadata.Entry(
tag=le_audio.Metadata.Tag.PROGRAM_INFO, data=b'Disco'
),
]
),
bis=[
bap.BasicAudioAnnouncement.BIS(
index=1,
codec_specific_configuration=bap.CodecSpecificConfiguration(
audio_channel_allocation=bap.AudioLocation.FRONT_LEFT
),
),
# bap.BasicAudioAnnouncement.BIS(
# index=2,
# codec_specific_configuration=bap.CodecSpecificConfiguration(
# audio_channel_allocation=bap.AudioLocation.FRONT_RIGHT
# ),
# ),
],
)
],
)
broadcast_audio_announcement = bap.BroadcastAudioAnnouncement(broadcast_id)
print('Start Advertising')
advertising_set = await device.create_advertising_set(
advertising_parameters=bumble.device.AdvertisingParameters(
advertising_event_properties=bumble.device.AdvertisingEventProperties(
is_connectable=False
),
primary_advertising_interval_min=100,
primary_advertising_interval_max=200,
),
advertising_data=(
broadcast_audio_announcement.get_advertising_data()
+ bytes(
core.AdvertisingData(
[(core.AdvertisingData.BROADCAST_NAME, b'Bumble Auracast')]
)
)
),
periodic_advertising_parameters=bumble.device.PeriodicAdvertisingParameters(
periodic_advertising_interval_min=80,
periodic_advertising_interval_max=160,
),
periodic_advertising_data=basic_audio_announcement.get_advertising_data(),
auto_restart=True,
auto_start=True,
)
print('Start Periodic Advertising')
await advertising_set.start_periodic()
print('Setup BIG')
big = await device.create_big(
advertising_set,
parameters=bumble.device.BigParameters(
num_bis=1,
sdu_interval=10000,
max_sdu=100, # is this octets per frame ?
max_transport_latency=65,
rtn=4,
broadcast_code=(
bytes.fromhex(broadcast_code) if broadcast_code else None
),
),
)
print('Setup ISO Data Path')
for bis_link in big.bis_links:
await bis_link.setup_data_path(
direction=bis_link.Direction.HOST_TO_CONTROLLER
)
frames_iterator = itertools.cycle(frames)
print("Broadcasting...")
def on_packet_complete(event):
frame = next(frames_iterator)
big.bis_links[0].write(frame)
#mid = len(frame) // 2
#big.bis_links[0].write(frame[:mid])
#big.bis_links[1].write(frame[mid:])
device.host.on('hci_number_of_completed_packets_event', on_packet_complete)
on_packet_complete('') # Send the first packet, to get the event loop running
#on_packet_complete(None) # trigger once to get the loop running
# for frame in itertools.cycle(frames):
# mid = len(frame) // 2
# big.bis_links[0].write(frame[:mid])
# big.bis_links[1].write(frame[mid:])
# await asyncio.sleep(0.009)
while True:
await asyncio.sleep(1)
def run_async(async_command: Coroutine) -> None:
try:
asyncio.run(async_command)
@@ -265,38 +116,163 @@ def run_async(async_command: Coroutine) -> None:
)
# -----------------------------------------------------------------------------
# Constants
# -----------------------------------------------------------------------------
from typing import List
async def run_broadcast(
global_config : auracast_config.AuracastGlobalConfig,
big_config: List[auracast_config.AuracastBigConfig]
) -> None:
async with create_device(global_config) as device:
if not device.supports_le_periodic_advertising:
logger.error(color('Periodic advertising not supported', 'red'))
return
with wave.open(big_config[0].broacast_wav_file_path, 'rb') as wav:
logger.info('Encoding wav file into lc3...')
logger.info('Frame rate of .wav file is: %s', wav.getframerate())
encoder = lc3.Encoder(
frame_duration_us=global_config.frame_duration_us,
sample_rate_hz=global_config.auracast_sampling_rate_khz,
num_channels=1,
input_sample_rate_hz=wav.getframerate(),
)
frames = list[bytes]()
while pcm := wav.readframes(encoder.get_frame_samples()):
frames.append(
encoder.encode(pcm, num_bytes=global_config.octets_per_frame, bit_depth=wav.getsampwidth() * 8)
)
del encoder
print('Encoding complete.')
# Config advertising set
bap_sampling_freq = getattr(bap.SamplingFrequency, f"FREQ_{global_config.auracast_sampling_rate_khz}")
basic_audio_announcement = bap.BasicAudioAnnouncement(
presentation_delay=global_config.presentation_delay_us,
subgroups=[
bap.BasicAudioAnnouncement.Subgroup(
codec_id=hci.CodingFormat(codec_id=hci.CodecID.LC3),
codec_specific_configuration=bap.CodecSpecificConfiguration(
sampling_frequency=bap_sampling_freq,
frame_duration=bap.FrameDuration.DURATION_10000_US,
octets_per_codec_frame=global_config.octets_per_frame,
),
metadata=le_audio.Metadata(
[
le_audio.Metadata.Entry(
tag=le_audio.Metadata.Tag.LANGUAGE, data=big_config[0].broadcast_language.encode()
),
le_audio.Metadata.Entry(
tag=le_audio.Metadata.Tag.PROGRAM_INFO, data=big_config[0].broadcast_program_info.encode()
),
]
),
bis=[
bap.BasicAudioAnnouncement.BIS(
index=1,
codec_specific_configuration=bap.CodecSpecificConfiguration(
audio_channel_allocation=bap.AudioLocation.FRONT_LEFT
),
),
],
)
],
)
logging.info('Setup Advertising')
broadcast_audio_announcement = bap.BroadcastAudioAnnouncement(big_config[0].broadcast_id)
advertising_set0 = await device.create_advertising_set(
advertising_parameters=bumble.device.AdvertisingParameters(
advertising_event_properties=bumble.device.AdvertisingEventProperties(
is_connectable=False
),
primary_advertising_interval_min=100,
primary_advertising_interval_max=200,
advertising_sid=0
# TODO: use 2mbit phy
),
advertising_data=(
broadcast_audio_announcement.get_advertising_data()
+ bytes(
core.AdvertisingData(
[(core.AdvertisingData.BROADCAST_NAME, big_config[0].broadcast_name.encode())]
)
)
),
periodic_advertising_parameters=bumble.device.PeriodicAdvertisingParameters(
periodic_advertising_interval_min=80,
periodic_advertising_interval_max=160,
),
periodic_advertising_data=basic_audio_announcement.get_advertising_data(),
auto_restart=True,
auto_start=True,
)
logging.info('Start Periodic Advertising')
await advertising_set0.start_periodic()
logging.info('Setup BIG')
big0 = await device.create_big(
advertising_set0,
parameters=bumble.device.BigParameters(
num_bis=1,
sdu_interval=global_config.frame_duration_us,
max_sdu=global_config.octets_per_frame, # is this octets per frame ?
max_transport_latency=65,
rtn=4,
broadcast_code=(
bytes.fromhex(big_config[0].broadcast_code) if big_config[0].broadcast_code else None
),
),
)
logging.info('Setup ISO Data Path')
for bis_link in big0.bis_links:
await bis_link.setup_data_path(
direction=bis_link.Direction.HOST_TO_CONTROLLER
)
frames_iterator = itertools.cycle(frames)
logging.info("Broadcasting...")
def on_packet_complete(event):
frame = next(frames_iterator)
big0.bis_links[0].write(frame)
device.host.on('hci_number_of_completed_packets_event', on_packet_complete)
on_packet_complete('') # Send the first packet, to get the event loop running
while True:
await asyncio.sleep(1)
# -----------------------------------------------------------------------------
# Main
# -----------------------------------------------------------------------------
def broadcast(transport, broadcast_id, broadcast_code, wav_file_path):
def broadcast(global_conf: auracast_config.AuracastGlobalConfig, big_conf: List[auracast_config.AuracastBigConfig]):
"""Start a broadcast as a source."""
run_async(
run_broadcast(
transport=transport,
broadcast_id=broadcast_id,
broadcast_code=broadcast_code,
wav_file_path=wav_file_path,
global_conf,
big_conf
)
)
def main():
logging.basicConfig(level=logging.INFO)
transport = "serial:/dev/ttyACM1,1000000,rtscts"
broadcast_id =123456
broadcast_code = None # Hex encryption code
wav_file_path="./auracast/announcement_48_10_96000_en.wav"
broadcast(
transport=transport,
broadcast_id=broadcast_id,
broadcast_code=broadcast_code,
wav_file_path=wav_file_path,
)
# -----------------------------------------------------------------------------
if __name__ == "__main__":
main() # pylint: disable=no-value-for-parameter
logging.basicConfig(level=logging.DEBUG)
global_conf = auracast_config.global_base_config
bigs = [
auracast_config.broadcast_de
]
broadcast(
global_conf,
bigs
)

View File

@@ -0,0 +1,48 @@
from bumble import hci
from bumble.profiles import bap
from dataclasses import dataclass
# Define some base dataclasses to hold the relevant parameters
@dataclass
class AuracastGlobalConfig:
device_name: str = 'Auracaster'
transport: str = 'serial:/dev/ttyACM1,1000000,rtscts'
auracast_device_address: hci.Address = hci.Address('F0:F1:F2:F3:F4:F5')
auracast_sampling_rate_khz: int =24000
octets_per_frame: int = 100 # bitrate = octets_per_frame * 8 / frame len
frame_duration_us: int = 10000
presentation_delay_us: int = 40000
@dataclass
class AuracastBigConfig:
broadcast_id: int =123456
broadcast_code: str = None # a hexstr
broadcast_language: str = 'en'
broadcast_name: str = 'Broadcast0'
broadcast_program_info: str = 'Some Announcements'
broacast_wav_file_path: str = './auracast/announcement_48_10_96000_en.wav'
global_base_config = AuracastGlobalConfig()
# Instanciate some example configurations
broadcast_de = AuracastBigConfig(
broadcast_id=12,
broadcast_language='de',
broadcast_name = 'Broadcast0',
broacast_wav_file_path = './auracast/announcement_48_10_96000_de.wav',
)
broadcast_en = AuracastBigConfig(
broadcast_id=123,
broadcast_language='eng',
broadcast_name = 'Broadcast1',
broacast_wav_file_path = './auracast/announcement_48_10_96000_en.wav',
)
broadcast_fr = AuracastBigConfig(
broadcast_id=1234,
broadcast_language='fr',
broadcast_name = 'Broadcast2',
broacast_wav_file_path = './auracast/announcement_48_10_96000_fr.wav',
)