refractoring/auracast_config (#3)

- use pydantic for configuration management

Reviewed-on: https://gitea.pstruebi.xyz/auracaster/bumble-auracast/pulls/3
This commit was merged in pull request #3.
This commit is contained in:
2025-03-05 18:00:26 +01:00
parent b8b4c82be0
commit 419a65ba09
6 changed files with 89 additions and 96 deletions

View File

@@ -9,6 +9,7 @@ dependencies = [
"sounddevice",
"aioconsole",
"quart == 0.20.0",
"pydantic"
]
[project.optional-dependencies]

View File

@@ -1,40 +1,39 @@
from bumble import hci
from bumble.profiles import bap
from dataclasses import dataclass
from pydantic import BaseModel
# Define some base dataclasses to hold the relevant parameters
@dataclass
class AuracastQoSConfig:
# Define some base to hold the relevant parameters
class AuracastQoSConfig(BaseModel):
iso_int_multiple_10ms: int
number_of_retransmissions: int
max_transport_latency_ms: int
class AuracastQosHigh(AuracastQoSConfig):
iso_int_multiple_10ms: int = 1
number_of_retransmissions:int = 4 #4
max_transport_latency_ms:int = 43 #varies from the default value in bumble (was 65)
qos_config_mono_high_rel = AuracastQoSConfig() #highest rel + lowest latency
qos_config_mono_medium_rel = AuracastQoSConfig(
iso_int_multiple_10ms = 2,
number_of_retransmissions = 3,
max_transport_latency_ms = 65
)
qos_config_mono_low_rel = AuracastQoSConfig( #highest latency
iso_int_multiple_10ms = 3,
number_of_retransmissions = 2,
max_transport_latency_ms = 65
)
class AuracastQosMid(AuracastQoSConfig):
iso_int_multiple_10ms: int = 2
number_of_retransmissions:int = 3
max_transport_latency_ms:int = 65
@dataclass
class AuracastGlobalConfig:
qos_config: AuracastQoSConfig
class AuracastQosLow(AuracastQoSConfig):
iso_int_multiple_10ms: int = 3
number_of_retransmissions:int = 2 #4
max_transport_latency_ms:int = 65 #varies from the default value in bumble (was 65)
class AuracastGlobalConfig(BaseModel):
qos_config: AuracastQoSConfig = AuracastQosHigh()
debug: bool = False
device_name: str = 'Auracaster'
transport: str = ''
auracast_device_address: hci.Address = hci.Address('F0:F1:F2:F3:F4:F5')
auracast_device_address: str = 'F0:F1:F2:F3:F4:F5'
auracast_sampling_rate_hz: int = 16000
octets_per_frame: int = 40 #48kbps@24kHz # bitrate = octets_per_frame * 8 / frame len
frame_duration_us: int = 10000
presentation_delay_us: int = 40000
manufacturer_data: tuple[int, bytes] = None
manufacturer_data: tuple[int, bytes] |None = None # TODO:pydantic does not support bytes serialization
global_base_config = AuracastGlobalConfig(qos_config=AuracastQoSConfig())
# "Audio input. "
# "'device' -> use the host's default sound input device, "
@@ -42,13 +41,10 @@ global_base_config = AuracastGlobalConfig(qos_config=AuracastQoSConfig())
# "(specify 'device:?' to get a list of available sound input devices), "
# "'stdin' -> receive audio from stdin as int16 PCM, "
# "'file:<filename> -> read audio from a .wav or raw int16 PCM file. "
@dataclass
class AuracastBigConfig:
id: int = 123456,
random_address: hci.Address = hci.Address('F1:F1:F2:F3:F4:F5')
code: str = None # Broadcast_Code a 16-octet parameter provided by the Host
class AuracastBigConfig(BaseModel):
id: int = 123456
random_address: str = 'F1:F1:F2:F3:F4:F5'
code: str | None = None # Broadcast_Code a 16-octet parameter provided by the Host
language: str = 'eng' # See: https://en.wikipedia.org/wiki/List_of_ISO_639_language_codes
name: str = 'Broadcast0'
program_info: str = 'Some Announcements'
@@ -58,49 +54,45 @@ class AuracastBigConfig:
precode_wav: bool = False
iso_que_len: int = 64
class AuracastBigConfigDe(AuracastBigConfig):
id: int = 12
random_address: str = 'F1:F1:F2:F3:F4:F5'
name: str = 'Broadcast0'
language: str ='deu'
program_info: str = 'Announcements German'
audio_source: str = 'file:./testdata/announcement_de.wav'
# Instanciate some example configurations
broadcast_de = AuracastBigConfig(
id=12,
random_address=hci.Address('F1:F1:F2:F3:F4:F5'),
name = 'Broadcast0',
language='deu',
program_info = 'Announcements German',
audio_source = 'file:./testdata/announcement_de.wav',
)
class AuracastBigConfigEn(AuracastBigConfig):
id: int = 123
random_address: str = 'F2:F1:F2:F3:F4:F5'
name: str = 'Broadcast1'
language: str ='eng'
program_info: str = 'Announcements English'
audio_source: str = 'file:./testdata/announcement_en.wav'
broadcast_en = AuracastBigConfig(
id=123,
random_address=hci.Address('F2:F1:F2:F3:F4:F5'),
name = 'Broadcast1',
language='eng',
program_info = 'Announcements English',
audio_source = 'file:./testdata/announcement_en.wav',
)
class AuracastBigConfigFr(AuracastBigConfig):
id: int = 1234
random_address: str = 'F3:F1:F2:F3:F4:F5'
name: str = 'Broadcast2'
language: str ='fra'
program_info: str = 'Announcements French'
audio_source: str = 'file:./testdata/announcement_fr.wav'
broadcast_fr = AuracastBigConfig(
id=1234,
random_address=hci.Address('F3:F1:F2:F3:F4:F5'),
name = 'Broadcast2',
language='fra',
program_info = 'Announcements French',
audio_source = 'file:./testdata/announcement_fr.wav',
)
class AuracastBigConfigEs(AuracastBigConfig):
id: int =12345
random_address: str = 'F4:F1:F2:F3:F4:F5'
name: str = 'Broadcast3'
language: str ='spa'
program_info: str = 'Announcements Spanish'
audio_source: str = 'file:./testdata/announcement_es.wav'
broadcast_es = AuracastBigConfig(
id=12345,
random_address=hci.Address('F4:F1:F2:F3:F4:F5'),
name = 'Broadcast3',
language='spa',
program_info = 'Announcements Spanish',
audio_source = 'file:./testdata/announcement_es.wav',
)
class AuracastBigConfigIt(AuracastBigConfig):
id: int =1234567
random_address: str = 'F5:F1:F2:F3:F4:F5'
name: str = 'Broadcast4'
language: str ='ita'
program_info: str = 'Announcements Italian'
audio_source: str = 'file:./testdata/announcement_it.wav'
broadcast_it = AuracastBigConfig(
id=123456,
random_address=hci.Address('F5:F1:F2:F3:F4:F5'),
name = 'Broadcast4',
language='ita',
program_info = 'Announcements Italian',
audio_source = 'file:./testdata/announcement_it.wav',
)
# TODO: could be best to merge all in just one CONFIG class and give every language an enable parameter

View File

@@ -108,7 +108,7 @@ async def create_device(config: auracast_config.AuracastGlobalConfig) -> AsyncGe
):
device_config = bumble.device.DeviceConfiguration(
name=config.device_name,
address=config.auracast_device_address,
address= hci.Address(config.auracast_device_address),
keystore='JsonKeyStore',
#le_simultaneous_enabled=True #TODO: What is this doing ?
)
@@ -201,7 +201,7 @@ async def init_broadcast(
)
bigs[f'big{i}']['broadcast_audio_announcement'] = bap.BroadcastAudioAnnouncement(conf.id)
advertising_set = await device.create_advertising_set(
random_address=conf.random_address,
random_address=hci.Address(conf.random_address),
advertising_parameters=bumble.device.AdvertisingParameters(
advertising_event_properties=bumble.device.AdvertisingEventProperties(
is_connectable=False
@@ -496,18 +496,15 @@ async def broadcast(global_conf: auracast_config.AuracastGlobalConfig, big_conf:
if __name__ == "__main__":
import os
if os.environ['LOG_LEVEL']:
log_level = getattr(logging, os.environ['LOG_LEVEL'])
else:
log_level = logging.DEBUG
log_level = os.environ['LOG_LEVEL']
logging.basicConfig(
level=log_level,
logging.basicConfig( #export LOG_LEVEL=INFO
level=os.environ.get('LOG_LEVEL', logging.DEBUG),
format='%(module)s.py:%(lineno)d %(levelname)s: %(message)s'
)
os.chdir(os.path.dirname(__file__))
global_conf = auracast_config.global_base_config
global_conf = auracast_config.AuracastGlobalConfig(
qos_config=auracast_config.AuracastQosHigh()
)
#global_conf.transport='serial:/dev/serial/by-id/usb-ZEPHYR_Zephyr_HCI_UART_sample_81BD14B8D71B5662-if00,1000000,rtscts' # transport for nrf52 dongle
@@ -521,15 +518,13 @@ if __name__ == "__main__":
# TODO: How can we use other iso interval than 10ms ?(medium or low rel) ? - nrf53audio receiver repports I2S tx underrun
#global_conf.qos_config = auracast_config.qos_config_mono_medium_rel
global_conf.qos_config = auracast_config.qos_config_mono_high_rel
bigs = [
auracast_config.broadcast_de,
auracast_config.broadcast_en,
auracast_config.broadcast_fr,
#auracast_config.broadcast_es,
#auracast_config.broadcast_it,
auracast_config.AuracastBigConfigDe(),
auracast_config.AuracastBigConfigEn(),
auracast_config.AuracastBigConfigFr(),
#auracast_config.AuracastBigConfigEs(),
#auracast_config.AuracastBigConfigIt(),
]
for big in bigs: # TODO: encrypted streams are not working
#big.code = 'ff'*16 # returns hci/HCI_ENCRYPTION_MODE_NOT_ACCEPTABLE_ERROR

View File

@@ -113,19 +113,24 @@ async def command_line_ui(caster: Multicaster):
print("Invalid command.")
async def main():
import os
logging.basicConfig(
level=logging.DEBUG,
level=os.environ.get('LOG_LEVEL', logging.DEBUG),
format='%(module)s.py:%(lineno)d %(levelname)s: %(message)s'
)
os.chdir(os.path.dirname(__file__))
global_conf = auracast_config.global_base_config
global_conf = auracast_config.AuracastGlobalConfig(
qos_config=auracast_config.AuracastQosHigh()
)
#global_conf.transport='serial:/dev/serial/by-id/usb-SEGGER_J-Link_001057705357-if02,1000000,rtscts' # transport for nrf54l15dk
global_conf.transport='serial:/dev/serial/by-id/usb-ZEPHYR_Zephyr_HCI_UART_sample_81BD14B8D71B5662-if00,115200,rtscts' #nrf52dongle hci_uart usb cdc
big_conf = [
auracast_config.broadcast_de,
auracast_config.broadcast_en,
auracast_config.broadcast_fr,
auracast_config.AuracastBigConfigDe(),
auracast_config.AuracastBigConfigEn(),
auracast_config.AuracastBigConfigFr(),
#auracast_config.broadcast_es,
#auracast_config.broadcast_it,
]

View File

@@ -11,9 +11,9 @@ global_conf = auracast_config.global_base_config
global_conf.transport='serial:/dev/serial/by-id/usb-ZEPHYR_Zephyr_HCI_UART_sample_81BD14B8D71B5662-if00,115200,rtscts' #nrf52dongle hci_uart usb cdc
big_conf = { # TODO: use another dataclass for this to be able to iterate over the names
'broadcast_de': auracast_config.broadcast_de,
'broadcast_en': auracast_config.broadcast_en,
'broadcast_fr': auracast_config.broadcast_fr,
'broadcast_de': auracast_config.AuracastBigConfigDe(),
'broadcast_en': auracast_config.AuracastBigConfigEn(),
'broadcast_fr': auracast_config.AuracastBigConfigFr(),
#auracast_config.broadcast_es,
#auracast_config.broadcast_it,
}