diff --git a/auracast/auracast.py b/auracast/auracast.py deleted file mode 100644 index 2358ccf..0000000 --- a/auracast/auracast.py +++ /dev/null @@ -1,279 +0,0 @@ -# Copyright 2024 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# ----------------------------------------------------------------------------- -# Imports -# ----------------------------------------------------------------------------- -from __future__ import annotations - -import asyncio -import contextlib -import logging -import wave -import itertools -from typing import cast, Any, AsyncGenerator, Coroutine, Dict, Optional, Tuple -from typing import List - - -try: - import lc3 # type: ignore # pylint: disable=E0401 -except ImportError as e: - raise ImportError("Try `python -m pip install \".[lc3]\"`.") from e - -from bumble.colors import color -from bumble import company_ids -from bumble import core -from bumble import gatt -from bumble import hci -from bumble.profiles import bap -from bumble.profiles import le_audio -from bumble.profiles import pbp -from bumble.profiles import bass -import bumble.device -import bumble.transport -import bumble.utils -from bumble.device import Host - -import auracast_config - - -def modified_on_hci_number_of_completed_packets_event(self, event): - for connection_handle, num_completed_packets in zip( - event.connection_handles, event.num_completed_packets - ): - if connection := self.connections.get(connection_handle): - connection.acl_packet_queue.on_packets_completed(num_completed_packets) - elif connection_handle not in itertools.chain( - self.cis_links.keys(), - self.sco_links.keys(), - itertools.chain.from_iterable(self.bigs.values()), - ): - logger.warning( - 'received packet completion event for unknown handle ' - f'0x{connection_handle:04X}' - ) - self.emit('hci_number_of_completed_packets_event', event) - -Host.on_hci_number_of_completed_packets_event = modified_on_hci_number_of_completed_packets_event - - - -# ----------------------------------------------------------------------------- -# Logging -# ----------------------------------------------------------------------------- -logger = logging.getLogger(__name__) - -@contextlib.asynccontextmanager -async def create_device(config: auracast_config.AuracastGlobalConfig) -> AsyncGenerator[bumble.device.Device, Any]: - async with await bumble.transport.open_transport(config.transport) as ( - hci_source, - hci_sink, - ): - device_config = bumble.device.DeviceConfiguration( - name=config.device_name, - address=config.auracast_device_address, - keystore='JsonKeyStore', - ) - - device = bumble.device.Device.from_config_with_hci( - device_config, - hci_source, - hci_sink, - ) - await device.power_on() - - yield device - - -def run_async(async_command: Coroutine) -> None: - try: - asyncio.run(async_command) - except core.ProtocolError as error: - if error.error_namespace == 'att' and error.error_code in list( - bass.ApplicationError - ): - message = bass.ApplicationError(error.error_code).name - else: - message = str(error) - - print( - color('!!! An error occurred while executing the command:', 'red'), message - ) - -async def run_broadcast( - global_config : auracast_config.AuracastGlobalConfig, - big_config: List[auracast_config.AuracastBigConfig] - -) -> None: - async with create_device(global_config) as device: - if not device.supports_le_periodic_advertising: - logger.error(color('Periodic advertising not supported', 'red')) - return - - with wave.open(big_config[0].audio_source, 'rb') as wav: - logger.info('Encoding wav file into lc3...') - logger.info('Frame rate of .wav file is: %s', wav.getframerate()) - encoder = lc3.Encoder( - frame_duration_us=global_config.frame_duration_us, - sample_rate_hz=global_config.auracast_sampling_rate_hz, - num_channels=1, - input_sample_rate_hz=wav.getframerate(), - ) - frames = list[bytes]() - while pcm := wav.readframes(encoder.get_frame_samples()): - frames.append( - encoder.encode(pcm, num_bytes=global_config.octets_per_frame, bit_depth=wav.getsampwidth() * 8) - ) - del encoder - print('Encoding complete.') - - # Config advertising set - bap_sampling_freq = getattr(bap.SamplingFrequency, f"FREQ_{global_config.auracast_sampling_rate_hz}") - basic_audio_announcement = bap.BasicAudioAnnouncement( - presentation_delay=global_config.presentation_delay_us, - subgroups=[ - bap.BasicAudioAnnouncement.Subgroup( - codec_id=hci.CodingFormat(codec_id=hci.CodecID.LC3), - codec_specific_configuration=bap.CodecSpecificConfiguration( - sampling_frequency=bap_sampling_freq, - frame_duration=bap.FrameDuration.DURATION_10000_US, - octets_per_codec_frame=global_config.octets_per_frame, - ), - metadata=le_audio.Metadata( - [ - le_audio.Metadata.Entry( - tag=le_audio.Metadata.Tag.LANGUAGE, data=big_config[0].language.encode() - ), - le_audio.Metadata.Entry( - tag=le_audio.Metadata.Tag.PROGRAM_INFO, data=big_config[0].program_info.encode() - ), - ] - ), - bis=[ - bap.BasicAudioAnnouncement.BIS( - index=1, - codec_specific_configuration=bap.CodecSpecificConfiguration( - audio_channel_allocation=bap.AudioLocation.FRONT_LEFT - ), - ), - ], - ) - ], - ) - logging.info('Setup Advertising') - broadcast_audio_announcement = bap.BroadcastAudioAnnouncement(big_config[0].id) - advertising_set0 = await device.create_advertising_set( - advertising_parameters=bumble.device.AdvertisingParameters( - advertising_event_properties=bumble.device.AdvertisingEventProperties( - is_connectable=False - ), - primary_advertising_interval_min=100, - primary_advertising_interval_max=200, - advertising_sid=0 - # TODO: use 2mbit phy - ), - advertising_data=( - broadcast_audio_announcement.get_advertising_data() - + bytes( - core.AdvertisingData( - [(core.AdvertisingData.BROADCAST_NAME, big_config[0].name.encode())] - ) - ) - ), - periodic_advertising_parameters=bumble.device.PeriodicAdvertisingParameters( - periodic_advertising_interval_min=80, - periodic_advertising_interval_max=160, - ), - periodic_advertising_data=basic_audio_announcement.get_advertising_data(), - auto_restart=True, - auto_start=True, - ) - - logging.info('Start Periodic Advertising') - await advertising_set0.start_periodic() - - logging.info('Setup BIG') - big0 = await device.create_big( - advertising_set0, - parameters=bumble.device.BigParameters( - num_bis=1, - sdu_interval=global_config.frame_duration_us, - max_sdu=global_config.octets_per_frame, # is this octets per frame ? - max_transport_latency=65, - rtn=4, - broadcast_code=( - bytes.fromhex(big_config[0].code) if big_config[0].code else None - ), - ), - ) - - logging.info('Setup ISO Data Path') - for bis_link in big0.bis_links: - await bis_link.setup_data_path( - direction=bis_link.Direction.HOST_TO_CONTROLLER - ) - - frames_iterator = itertools.cycle(frames) - logging.info("Broadcasting...") - - def on_packet_complete(event): - frame = next(frames_iterator) - big0.bis_links[0].write(frame) - - device.host.on('hci_number_of_completed_packets_event', on_packet_complete) - - on_packet_complete('') # Send the first packet, to get the event loop running - - while True: - await asyncio.sleep(1) - -# ----------------------------------------------------------------------------- -# Main -# ----------------------------------------------------------------------------- - -def broadcast(global_conf: auracast_config.AuracastGlobalConfig, big_conf: List[auracast_config.AuracastBigConfig]): - """Start a broadcast as a source.""" - run_async( - run_broadcast( - global_conf, - big_conf - ) - ) - - - -# ----------------------------------------------------------------------------- -if __name__ == "__main__": - logging.basicConfig(level=logging.INFO) - - global_conf = auracast_config.global_base_config - - #global_conf.transport='serial:/dev/serial/by-id/usb-ZEPHYR_Zephyr_HCI_UART_sample_81BD14B8D71B5662-if00,1000000,rtscts' # transport for nrf52 dongle - - #global_conf.transport='serial:/dev/serial/by-id/usb-SEGGER_J-Link_001050076061-if02,1000000,rtscts' # transport for nrf53dk - - global_conf.transport='serial:/dev/serial/by-id/usb-SEGGER_J-Link_001057705357-if02,1000000,rtscts' # transport for nrf54l15dk - - bigs = [ - auracast_config.broadcast_de - ] - global_conf.octets_per_frame=60# 48kbps@24kHz - - global_conf.auracast_sampling_rate_hz = 32000 - global_conf.octets_per_frame=80# 64kbps@24kHz - - broadcast( - global_conf, - bigs - ) diff --git a/auracast/dualcast.py b/auracast/dualcast.py deleted file mode 100644 index 5cc0f96..0000000 --- a/auracast/dualcast.py +++ /dev/null @@ -1,399 +0,0 @@ -# Copyright 2024 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# ----------------------------------------------------------------------------- -# Imports -# ----------------------------------------------------------------------------- -from __future__ import annotations - -import asyncio -import contextlib -import logging -import wave -import itertools -from typing import cast, Any, AsyncGenerator, Coroutine, Dict, Optional, Tuple -from typing import List - - -try: - import lc3 # type: ignore # pylint: disable=E0401 -except ImportError as e: - raise ImportError("Try `python -m pip install \".[lc3]\"`.") from e - -from bumble.colors import color -from bumble import company_ids -from bumble import core -from bumble import gatt -from bumble import hci -from bumble.profiles import bap -from bumble.profiles import le_audio -from bumble.profiles import pbp -from bumble.profiles import bass -import bumble.device -import bumble.transport -import bumble.utils -from bumble.device import Host - -import auracast_config - - -def modified_on_hci_number_of_completed_packets_event(self, event): - for connection_handle, num_completed_packets in zip( - event.connection_handles, event.num_completed_packets - ): - if connection := self.connections.get(connection_handle): - connection.acl_packet_queue.on_packets_completed(num_completed_packets) - elif connection_handle not in itertools.chain( - self.cis_links.keys(), - self.sco_links.keys(), - itertools.chain.from_iterable(self.bigs.values()), - ): - logger.warning( - 'received packet completion event for unknown handle ' - f'0x{connection_handle:04X}' - ) - self.emit('hci_number_of_completed_packets_event', event) - -Host.on_hci_number_of_completed_packets_event = modified_on_hci_number_of_completed_packets_event - - - -# ----------------------------------------------------------------------------- -# Logging -# ----------------------------------------------------------------------------- -logger = logging.getLogger(__name__) - -@contextlib.asynccontextmanager -async def create_device(config: auracast_config.AuracastGlobalConfig) -> AsyncGenerator[bumble.device.Device, Any]: - async with await bumble.transport.open_transport(config.transport) as ( - hci_source, - hci_sink, - ): - device_config = bumble.device.DeviceConfiguration( - name=config.device_name, - address=config.auracast_device_address, - keystore='JsonKeyStore', - #le_simultaneous_enabled=True - ) - - device = bumble.device.Device.from_config_with_hci( - device_config, - hci_source, - hci_sink, - ) - await device.power_on() - - yield device - - -def run_async(async_command: Coroutine) -> None: - try: - asyncio.run(async_command) - except core.ProtocolError as error: - if error.error_namespace == 'att' and error.error_code in list( - bass.ApplicationError - ): - message = bass.ApplicationError(error.error_code).name - else: - message = str(error) - - print( - color('!!! An error occurred while executing the command:', 'red'), message - ) - -handle0 = None -handle1 = None - -async def run_broadcast( - global_config : auracast_config.AuracastGlobalConfig, - big_config: List[auracast_config.AuracastBigConfig] - -) -> None: - async with create_device(global_config) as device: - if not device.supports_le_periodic_advertising: - logger.error(color('Periodic advertising not supported', 'red')) - return - - - with wave.open(big_config[0].audio_source, 'rb') as wav: - logger.info('Encoding wav file into lc3...') - logger.info('Frame rate of .wav file is: %s', wav.getframerate()) - encoder = lc3.Encoder( - frame_duration_us=global_config.frame_duration_us, - sample_rate_hz=global_config.auracast_sampling_rate_hz, - num_channels=1, - input_sample_rate_hz=wav.getframerate(), - ) - frames = list[bytes]() - while pcm := wav.readframes(encoder.get_frame_samples()): - frames.append( - encoder.encode(pcm, num_bytes=global_config.octets_per_frame, bit_depth=wav.getsampwidth() * 8) - ) - del encoder - print('Encoding complete.') - - # Config advertising set - bap_sampling_freq = getattr(bap.SamplingFrequency, f"FREQ_{global_config.auracast_sampling_rate_hz}") - basic_audio_announcement0 = bap.BasicAudioAnnouncement( - presentation_delay=global_config.presentation_delay_us, - subgroups=[ - bap.BasicAudioAnnouncement.Subgroup( - codec_id=hci.CodingFormat(codec_id=hci.CodecID.LC3), - codec_specific_configuration=bap.CodecSpecificConfiguration( - sampling_frequency=bap_sampling_freq, - frame_duration=bap.FrameDuration.DURATION_10000_US, - octets_per_codec_frame=global_config.octets_per_frame, - ), - metadata=le_audio.Metadata( - [ - le_audio.Metadata.Entry( - tag=le_audio.Metadata.Tag.LANGUAGE, data=big_config[0].language.encode() - ), - le_audio.Metadata.Entry( - tag=le_audio.Metadata.Tag.PROGRAM_INFO, data=big_config[0].program_info.encode() - ), - le_audio.Metadata.Entry( - tag=le_audio.Metadata.Tag.BROADCAST_NAME, data=b'Broadcast0' - ), - ] - ), - bis=[ - bap.BasicAudioAnnouncement.BIS( - index=1, - codec_specific_configuration=bap.CodecSpecificConfiguration( - audio_channel_allocation=bap.AudioLocation.FRONT_LEFT - ), - ), - ], - ) - ], - ) - basic_audio_announcement1 = bap.BasicAudioAnnouncement( - presentation_delay=global_config.presentation_delay_us, - subgroups=[ - bap.BasicAudioAnnouncement.Subgroup( - codec_id=hci.CodingFormat(codec_id=hci.CodecID.LC3), - codec_specific_configuration=bap.CodecSpecificConfiguration( - sampling_frequency=bap_sampling_freq, - frame_duration=bap.FrameDuration.DURATION_10000_US, - octets_per_codec_frame=global_config.octets_per_frame, - ), - metadata=le_audio.Metadata( - [ - le_audio.Metadata.Entry( - tag=le_audio.Metadata.Tag.LANGUAGE, data=big_config[0].language.encode() - ), - le_audio.Metadata.Entry( - tag=le_audio.Metadata.Tag.PROGRAM_INFO, data=big_config[0].program_info.encode() - ), - le_audio.Metadata.Entry( - tag=le_audio.Metadata.Tag.BROADCAST_NAME, data=b'Broadcast1' - ), - ] - ), - bis=[ - bap.BasicAudioAnnouncement.BIS( - index=1, - codec_specific_configuration=bap.CodecSpecificConfiguration( - audio_channel_allocation=bap.AudioLocation.FRONT_LEFT - ), - ), - ], - ) - ], - ) - - - logging.info('Setup Advertising') - broadcast_audio_announcement0 = bap.BroadcastAudioAnnouncement(12) - advertising_set0 = await device.create_advertising_set( - random_address=hci.Address('F1:F1:F2:F3:F4:F5'), - advertising_parameters=bumble.device.AdvertisingParameters( - advertising_event_properties=bumble.device.AdvertisingEventProperties( - is_connectable=False - ), - primary_advertising_interval_min=100, - primary_advertising_interval_max=200, - advertising_sid=0, - #primary_advertising_phy=hci.HCI_LE_2M_PHY, - #secondary_advertising_phy=hci.HCI_LE_2M_PHY, - # TODO: use 2mbit phy - ), - advertising_data=( - broadcast_audio_announcement0.get_advertising_data() - + bytes( - core.AdvertisingData( - [(core.AdvertisingData.BROADCAST_NAME, b"Broadcast0")] - ) - ) - ), - periodic_advertising_parameters=bumble.device.PeriodicAdvertisingParameters( - periodic_advertising_interval_min=80, - periodic_advertising_interval_max=160, - ), - periodic_advertising_data=basic_audio_announcement0.get_advertising_data(), - auto_restart=True, - auto_start=True, - ) - - broadcast_audio_announcement1 = bap.BroadcastAudioAnnouncement(13) - advertising_set1 = await device.create_advertising_set( - random_address=hci.Address('F2:F1:F2:F3:F4:F5'), - advertising_parameters=bumble.device.AdvertisingParameters( - advertising_event_properties=bumble.device.AdvertisingEventProperties( - is_connectable=False - ), - primary_advertising_interval_min=100, - primary_advertising_interval_max=200, - advertising_sid=1, - #primary_advertising_phy=hci.HCI_LE_2M_PHY, - #secondary_advertising_phy=hci.HCI_LE_2M_PHY, - # TODO: use 2mbit phy - ), - advertising_data=( - broadcast_audio_announcement1.get_advertising_data() - + bytes( - core.AdvertisingData( - [(core.AdvertisingData.BROADCAST_NAME, b"Broadcast1")] - ) - ) - ), - periodic_advertising_parameters=bumble.device.PeriodicAdvertisingParameters( - periodic_advertising_interval_min=80, - periodic_advertising_interval_max=160, - ), - periodic_advertising_data=basic_audio_announcement1.get_advertising_data(), - auto_restart=True, - auto_start=True, - ) - - - logging.info('Start Periodic Advertising') - await advertising_set0.start_periodic() - await advertising_set1.start_periodic() - - - logging.info('Setup BIG') - big0 = await device.create_big( # is this actually extended advertising ? - advertising_set0, - parameters=bumble.device.BigParameters( - num_bis=1, - sdu_interval=global_config.frame_duration_us, - max_sdu=global_config.octets_per_frame, # is this octets per frame ? - max_transport_latency=65, - rtn=4, - broadcast_code=( - bytes.fromhex(big_config[0].code) if big_config[0].code else None - ), - ), - ) - - big1 = await device.create_big( # is this actually extended advertising ? - advertising_set1, - parameters=bumble.device.BigParameters( - num_bis=1, - sdu_interval=global_config.frame_duration_us, - max_sdu=global_config.octets_per_frame, # is this octets per frame ? - max_transport_latency=65, - rtn=4, - broadcast_code=( - bytes.fromhex(big_config[0].code) if big_config[0].code else None - ), - ), - ) - - logging.info('Setup ISO Data Path') - for bis_link in big0.bis_links: - await bis_link.setup_data_path( - direction=bis_link.Direction.HOST_TO_CONTROLLER - ) - - for bis_link in big1.bis_links: - await bis_link.setup_data_path( - direction=bis_link.Direction.HOST_TO_CONTROLLER - ) - - frames_iterator0 = itertools.cycle(frames) - frames_iterator1 = itertools.cycle(frames) - frame0 = next(frames_iterator0) - frame1 = next(frames_iterator1) - - # Get the corresponding handles first - logging.info("Determine broadcast handles") - - # Define on packet complete function to get the handle for each broadcast - handle0 = big0.bis_links[0].handle - handle1 = big1.bis_links[0].handle - - logging.info("Broadcasting...") - - def on_packet_complete(event): - - event_handle = event.connection_handles[0] - if event_handle == handle0: - frame0 = next(frames_iterator0) - big0.bis_links[0].write(frame0) - elif event_handle == handle1: - frame1 = next(frames_iterator1) - big1.bis_links[0].write(frame1) - else: - raise NotImplementedError('Unkown connection handle') - - device.host.on('hci_number_of_completed_packets_event', on_packet_complete) - # Send the first packets, to get the event loop running - big0.bis_links[0].write(frame0) - big1.bis_links[0].write(frame1) - - while True: - await asyncio.sleep(1) - -# ----------------------------------------------------------------------------- -# Main -# ----------------------------------------------------------------------------- - -def broadcast(global_conf: auracast_config.AuracastGlobalConfig, big_conf: List[auracast_config.AuracastBigConfig]): - """Start a broadcast as a source.""" - run_async( - run_broadcast( - global_conf, - big_conf - ) - ) - - - -# ----------------------------------------------------------------------------- -if __name__ == "__main__": - logging.basicConfig(level=logging.INFO) - - global_conf = auracast_config.global_base_config - - #global_conf.transport='serial:/dev/serial/by-id/usb-ZEPHYR_Zephyr_HCI_UART_sample_81BD14B8D71B5662-if00,1000000,rtscts' # transport for nrf52 dongle - - #global_conf.transport='serial:/dev/serial/by-id/usb-SEGGER_J-Link_001050076061-if02,1000000,rtscts' # transport for nrf53dk - - global_conf.transport='serial:/dev/serial/by-id/usb-SEGGER_J-Link_001057705357-if02,1000000,rtscts' # transport for nrf54l15dk - - bigs = [ - auracast_config.broadcast_de - ] - #global_conf.auracast_sampling_rate_khz=16000 - #global_conf.octets_per_frame=40 # 16kbps@8kHz - - global_conf.octets_per_frame=60# 48kbps@24kHz - - broadcast( - global_conf, - bigs - )