diff --git a/apps/bap_broadcast_source.py b/apps/bap_broadcast_source.py index b9b9208..2e4f395 100644 --- a/apps/bap_broadcast_source.py +++ b/apps/bap_broadcast_source.py @@ -23,8 +23,6 @@ import dataclasses import functools import logging import os -import wave -import itertools from typing import cast, Any, AsyncGenerator, Coroutine, Dict, Optional, Tuple from scipy import signal import scipy.io.wavfile as wav @@ -64,328 +62,16 @@ AURACAST_DEFAULT_DEVICE_NAME = 'Bumble Auracast' AURACAST_DEFAULT_DEVICE_ADDRESS = hci.Address('F0:F1:F2:F3:F4:F5') AURACAST_DEFAULT_SYNC_TIMEOUT = 5.0 AURACAST_DEFAULT_ATT_MTU = 256 -iso_index:int = 0 +AURACAST_DEFAULT_SAMPLING_FREQUENCY = 48000 +AURACAST_DEFAULT_FRAME_DURATION = 10000 +iso_index: int = 0 -# ----------------------------------------------------------------------------- -# Scan For Broadcasts -# ----------------------------------------------------------------------------- -class BroadcastScanner(pyee.EventEmitter): - @dataclasses.dataclass - class Broadcast(pyee.EventEmitter): - name: str | None - sync: bumble.device.PeriodicAdvertisingSync - broadcast_id: int - rssi: int = 0 - public_broadcast_announcement: Optional[pbp.PublicBroadcastAnnouncement] = None - broadcast_audio_announcement: Optional[bap.BroadcastAudioAnnouncement] = None - basic_audio_announcement: Optional[bap.BasicAudioAnnouncement] = None - appearance: Optional[core.Appearance] = None - biginfo: Optional[bumble.device.BIGInfoAdvertisement] = None - manufacturer_data: Optional[Tuple[str, bytes]] = None - - def __post_init__(self) -> None: - super().__init__() - self.sync.on('establishment', self.on_sync_establishment) - self.sync.on('loss', self.on_sync_loss) - self.sync.on('periodic_advertisement', self.on_periodic_advertisement) - self.sync.on('biginfo_advertisement', self.on_biginfo_advertisement) - - def update(self, advertisement: bumble.device.Advertisement) -> None: - self.rssi = advertisement.rssi - for service_data in advertisement.data.get_all( - core.AdvertisingData.SERVICE_DATA - ): - assert isinstance(service_data, tuple) - service_uuid, data = service_data - assert isinstance(data, bytes) - - if service_uuid == gatt.GATT_PUBLIC_BROADCAST_ANNOUNCEMENT_SERVICE: - self.public_broadcast_announcement = ( - pbp.PublicBroadcastAnnouncement.from_bytes(data) - ) - continue - - if service_uuid == gatt.GATT_BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE: - self.broadcast_audio_announcement = ( - bap.BroadcastAudioAnnouncement.from_bytes(data) - ) - continue - - self.appearance = advertisement.data.get( # type: ignore[assignment] - core.AdvertisingData.APPEARANCE - ) - - if manufacturer_data := advertisement.data.get( - core.AdvertisingData.MANUFACTURER_SPECIFIC_DATA - ): - assert isinstance(manufacturer_data, tuple) - company_id = cast(int, manufacturer_data[0]) - data = cast(bytes, manufacturer_data[1]) - self.manufacturer_data = ( - company_ids.COMPANY_IDENTIFIERS.get( - company_id, f'0x{company_id:04X}' - ), - data, - ) - - self.emit('update') - - def print(self) -> None: - print( - color('Broadcast:', 'yellow'), - self.sync.advertiser_address, - color(self.sync.state.name, 'green'), - ) - if self.name is not None: - print(f' {color("Name", "cyan")}: {self.name}') - if self.appearance: - print(f' {color("Appearance", "cyan")}: {str(self.appearance)}') - print(f' {color("RSSI", "cyan")}: {self.rssi}') - print(f' {color("SID", "cyan")}: {self.sync.sid}') - - if self.manufacturer_data: - print( - f' {color("Manufacturer Data", "cyan")}: ' - f'{self.manufacturer_data[0]} -> {self.manufacturer_data[1].hex()}' - ) - - if self.broadcast_audio_announcement: - print( - f' {color("Broadcast ID", "cyan")}: ' - f'{self.broadcast_audio_announcement.broadcast_id}' - ) - - if self.public_broadcast_announcement: - print( - f' {color("Features", "cyan")}: ' - f'{self.public_broadcast_announcement.features}' - ) - print( - f' {color("Metadata", "cyan")}: ' - f'{self.public_broadcast_announcement.metadata}' - ) - - if self.basic_audio_announcement: - print(color(' Audio:', 'cyan')) - print( - color(' Presentation Delay:', 'magenta'), - self.basic_audio_announcement.presentation_delay, - ) - for subgroup in self.basic_audio_announcement.subgroups: - print(color(' Subgroup:', 'magenta')) - print(color(' Codec ID:', 'yellow')) - print( - color(' Coding Format: ', 'green'), - subgroup.codec_id.codec_id.name, - ) - print( - color(' Company ID: ', 'green'), - subgroup.codec_id.company_id, - ) - print( - color(' Vendor Specific Codec ID:', 'green'), - subgroup.codec_id.vendor_specific_codec_id, - ) - print( - color(' Codec Config:', 'yellow'), - subgroup.codec_specific_configuration, - ) - print(color(' Metadata: ', 'yellow'), subgroup.metadata) - - for bis in subgroup.bis: - print(color(f' BIS [{bis.index}]:', 'yellow')) - print( - color(' Codec Config:', 'green'), - bis.codec_specific_configuration, - ) - - if self.biginfo: - print(color(' BIG:', 'cyan')) - print( - color(' Number of BIS:', 'magenta'), - self.biginfo.num_bis, - ) - print( - color(' PHY: ', 'magenta'), - self.biginfo.phy.name, - ) - print( - color(' Framed: ', 'magenta'), - self.biginfo.framed, - ) - print( - color(' Encrypted: ', 'magenta'), - self.biginfo.encrypted, - ) - - def on_sync_establishment(self) -> None: - self.emit('sync_establishment') - - def on_sync_loss(self) -> None: - self.basic_audio_announcement = None - self.biginfo = None - self.emit('sync_loss') - - def on_periodic_advertisement( - self, advertisement: bumble.device.PeriodicAdvertisement - ) -> None: - if advertisement.data is None: - return - - for service_data in advertisement.data.get_all( - core.AdvertisingData.SERVICE_DATA - ): - assert isinstance(service_data, tuple) - service_uuid, data = service_data - assert isinstance(data, bytes) - - if service_uuid == gatt.GATT_BASIC_AUDIO_ANNOUNCEMENT_SERVICE: - self.basic_audio_announcement = ( - bap.BasicAudioAnnouncement.from_bytes(data) - ) - break - - self.emit('change') - - def on_biginfo_advertisement( - self, advertisement: bumble.device.BIGInfoAdvertisement - ) -> None: - self.biginfo = advertisement - self.emit('change') - - def __init__( - self, - device: bumble.device.Device, - filter_duplicates: bool, - sync_timeout: float, - ): - super().__init__() - self.device = device - self.filter_duplicates = filter_duplicates - self.sync_timeout = sync_timeout - self.broadcasts = dict[hci.Address, BroadcastScanner.Broadcast]() - device.on('advertisement', self.on_advertisement) - - async def start(self) -> None: - await self.device.start_scanning( - active=False, - filter_duplicates=False, - ) - - async def stop(self) -> None: - await self.device.stop_scanning() - - def on_advertisement(self, advertisement: bumble.device.Advertisement) -> None: - if not ( - ads := advertisement.data.get_all( - core.AdvertisingData.SERVICE_DATA_16_BIT_UUID - ) - ) or not ( - broadcast_audio_announcement := next( - ( - ad - for ad in ads - if isinstance(ad, tuple) - and ad[0] == gatt.GATT_BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE - ), - None, - ) - ): - return - - broadcast_name = advertisement.data.get(core.AdvertisingData.BROADCAST_NAME) - assert isinstance(broadcast_name, str) or broadcast_name is None - assert isinstance(broadcast_audio_announcement[1], bytes) - - if broadcast := self.broadcasts.get(advertisement.address): - broadcast.update(advertisement) - return - - bumble.utils.AsyncRunner.spawn( - self.on_new_broadcast( - broadcast_name, - advertisement, - bap.BroadcastAudioAnnouncement.from_bytes( - broadcast_audio_announcement[1] - ).broadcast_id, - ) - ) - - async def on_new_broadcast( - self, - name: str | None, - advertisement: bumble.device.Advertisement, - broadcast_id: int, - ) -> None: - periodic_advertising_sync = await self.device.create_periodic_advertising_sync( - advertiser_address=advertisement.address, - sid=advertisement.sid, - sync_timeout=self.sync_timeout, - filter_duplicates=self.filter_duplicates, - ) - broadcast = self.Broadcast(name, periodic_advertising_sync, broadcast_id) - broadcast.update(advertisement) - self.broadcasts[advertisement.address] = broadcast - periodic_advertising_sync.on('loss', lambda: self.on_broadcast_loss(broadcast)) - self.emit('new_broadcast', broadcast) - - def on_broadcast_loss(self, broadcast: Broadcast) -> None: - del self.broadcasts[broadcast.sync.advertiser_address] - bumble.utils.AsyncRunner.spawn(broadcast.sync.terminate()) - self.emit('broadcast_loss', broadcast) - - -class PrintingBroadcastScanner(pyee.EventEmitter): - def __init__( - self, device: bumble.device.Device, filter_duplicates: bool, sync_timeout: float - ) -> None: - super().__init__() - self.scanner = BroadcastScanner(device, filter_duplicates, sync_timeout) - self.scanner.on('new_broadcast', self.on_new_broadcast) - self.scanner.on('broadcast_loss', self.on_broadcast_loss) - self.scanner.on('update', self.refresh) - self.status_message = '' - - async def start(self) -> None: - self.status_message = color('Scanning...', 'green') - await self.scanner.start() - - def on_new_broadcast(self, broadcast: BroadcastScanner.Broadcast) -> None: - self.status_message = color( - f'+Found {len(self.scanner.broadcasts)} broadcasts', 'green' - ) - broadcast.on('change', self.refresh) - broadcast.on('update', self.refresh) - self.refresh() - - def on_broadcast_loss(self, broadcast: BroadcastScanner.Broadcast) -> None: - self.status_message = color( - f'-Found {len(self.scanner.broadcasts)} broadcasts', 'green' - ) - self.refresh() - - def refresh(self) -> None: - # Clear the screen from the top - print('\033[H') - print('\033[0J') - print('\033[H') - - # Print the status message - print(self.status_message) - print("==========================================") - - # Print all broadcasts - for broadcast in self.scanner.broadcasts.values(): - broadcast.print() - print('------------------------------------------') - - # Clear the screen to the bottom - print('\033[0J') @contextlib.asynccontextmanager async def create_device(transport: str) -> AsyncGenerator[bumble.device.Device, Any]: + transport = "serial:" +transport async with await bumble.transport.open_transport(transport) as ( hci_source, hci_sink, @@ -395,7 +81,8 @@ async def create_device(transport: str) -> AsyncGenerator[bumble.device.Device, address=AURACAST_DEFAULT_DEVICE_ADDRESS, keystore='JsonKeyStore', ) - + + device = bumble.device.Device.from_config_with_hci( device_config, hci_source, @@ -406,329 +93,12 @@ async def create_device(transport: str) -> AsyncGenerator[bumble.device.Device, yield device -async def find_broadcast_by_name( - device: bumble.device.Device, name: Optional[str] -) -> BroadcastScanner.Broadcast: - result = asyncio.get_running_loop().create_future() - - def on_broadcast_change(broadcast: BroadcastScanner.Broadcast) -> None: - if broadcast.basic_audio_announcement and not result.done(): - print(color('Broadcast basic audio announcement received', 'green')) - result.set_result(broadcast) - - def on_new_broadcast(broadcast: BroadcastScanner.Broadcast) -> None: - if name is None or broadcast.name == name: - print(color('Broadcast found:', 'green'), broadcast.name) - broadcast.on('change', lambda: on_broadcast_change(broadcast)) - return - - print(color(f'Skipping broadcast {broadcast.name}')) - - scanner = BroadcastScanner(device, False, AURACAST_DEFAULT_SYNC_TIMEOUT) - scanner.on('new_broadcast', on_new_broadcast) - await scanner.start() - - broadcast = await result - await scanner.stop() - - return broadcast - - -async def run_scan( - filter_duplicates: bool, sync_timeout: float, transport: str -) -> None: - async with create_device(transport) as device: - if not device.supports_le_periodic_advertising: - print(color('Periodic advertising not supported', 'red')) - return - - scanner = PrintingBroadcastScanner(device, filter_duplicates, sync_timeout) - await scanner.start() - await asyncio.get_running_loop().create_future() - - -async def run_assist( - broadcast_name: Optional[str], - source_id: Optional[int], - command: str, - transport: str, - address: str, -) -> None: - async with create_device(transport) as device: - if not device.supports_le_periodic_advertising: - print(color('Periodic advertising not supported', 'red')) - return - - # Connect to the server - print(f'=== Connecting to {address}...') - connection = await device.connect(address) - peer = bumble.device.Peer(connection) - print(f'=== Connected to {peer}') - - print("+++ Encrypting connection...") - await peer.connection.encrypt() - print("+++ Connection encrypted") - - # Request a larger MTU - mtu = AURACAST_DEFAULT_ATT_MTU - print(color(f'$$$ Requesting MTU={mtu}', 'yellow')) - await peer.request_mtu(mtu) - - # Get the BASS service - bass_client = await peer.discover_service_and_create_proxy( - bass.BroadcastAudioScanServiceProxy - ) - - # Check that the service was found - if not bass_client: - print(color('!!! Broadcast Audio Scan Service not found', 'red')) - return - - # Subscribe to and read the broadcast receive state characteristics - for i, broadcast_receive_state in enumerate( - bass_client.broadcast_receive_states - ): - try: - await broadcast_receive_state.subscribe( - lambda value, i=i: print( - f"{color(f'Broadcast Receive State Update [{i}]:', 'green')} {value}" - ) - ) - except core.ProtocolError as error: - print( - color( - f'!!! Failed to subscribe to Broadcast Receive State characteristic:', - 'red', - ), - error, - ) - value = await broadcast_receive_state.read_value() - print( - f'{color(f"Initial Broadcast Receive State [{i}]:", "green")} {value}' - ) - - if command == 'monitor-state': - await peer.sustain() - return - - if command == 'add-source': - # Find the requested broadcast - await bass_client.remote_scan_started() - if broadcast_name: - print(color('Scanning for broadcast:', 'cyan'), broadcast_name) - else: - print(color('Scanning for any broadcast', 'cyan')) - broadcast = await find_broadcast_by_name(device, broadcast_name) - - if broadcast.broadcast_audio_announcement is None: - print(color('No broadcast audio announcement found', 'red')) - return - - if ( - broadcast.basic_audio_announcement is None - or not broadcast.basic_audio_announcement.subgroups - ): - print(color('No subgroups found', 'red')) - return - - # Add the source - print(color('Adding source:', 'blue'), broadcast.sync.advertiser_address) - await bass_client.add_source( - broadcast.sync.advertiser_address, - broadcast.sync.sid, - broadcast.broadcast_audio_announcement.broadcast_id, - bass.PeriodicAdvertisingSyncParams.SYNCHRONIZE_TO_PA_PAST_AVAILABLE, - 0xFFFF, - [ - bass.SubgroupInfo( - bass.SubgroupInfo.ANY_BIS, - bytes(broadcast.basic_audio_announcement.subgroups[0].metadata), - ) - ], - ) - - # Initiate a PA Sync Transfer - await broadcast.sync.transfer(peer.connection) - - # Notify the sink that we're done scanning. - await bass_client.remote_scan_stopped() - - await peer.sustain() - return - - if command == 'modify-source': - if source_id is None: - print(color('!!! modify-source requires --source-id')) - return - - # Find the requested broadcast - await bass_client.remote_scan_started() - if broadcast_name: - print(color('Scanning for broadcast:', 'cyan'), broadcast_name) - else: - print(color('Scanning for any broadcast', 'cyan')) - broadcast = await find_broadcast_by_name(device, broadcast_name) - - if broadcast.broadcast_audio_announcement is None: - print(color('No broadcast audio announcement found', 'red')) - return - - if ( - broadcast.basic_audio_announcement is None - or not broadcast.basic_audio_announcement.subgroups - ): - print(color('No subgroups found', 'red')) - return - - # Modify the source - print( - color('Modifying source:', 'blue'), - source_id, - ) - await bass_client.modify_source( - source_id, - bass.PeriodicAdvertisingSyncParams.SYNCHRONIZE_TO_PA_PAST_NOT_AVAILABLE, - 0xFFFF, - [ - bass.SubgroupInfo( - bass.SubgroupInfo.ANY_BIS, - bytes(broadcast.basic_audio_announcement.subgroups[0].metadata), - ) - ], - ) - await peer.sustain() - return - - if command == 'remove-source': - if source_id is None: - print(color('!!! remove-source requires --source-id')) - return - - # Remove the source - print(color('Removing source:', 'blue'), source_id) - await bass_client.remove_source(source_id) - await peer.sustain() - return - - print(color(f'!!! invalid command {command}')) - - -async def run_pair(transport: str, address: str) -> None: - async with create_device(transport) as device: - - # Connect to the server - print(f'=== Connecting to {address}...') - async with device.connect_as_gatt(address) as peer: - print(f'=== Connected to {peer}') - - print("+++ Initiating pairing...") - await peer.connection.pair() - print("+++ Paired") - - -async def run_receive( - transport: str, - broadcast_id: int, - broadcast_code: str | None, - sync_timeout: float, - subgroup_index: int, -) -> None: - async with create_device(transport) as device: - if not device.supports_le_periodic_advertising: - print(color('Periodic advertising not supported', 'red')) - return - - scanner = BroadcastScanner(device, False, sync_timeout) - scan_result: asyncio.Future[BroadcastScanner.Broadcast] = ( - asyncio.get_running_loop().create_future() - ) - - def on_new_broadcast(broadcast: BroadcastScanner.Broadcast) -> None: - if scan_result.done(): - return - if broadcast.broadcast_id == broadcast_id: - scan_result.set_result(broadcast) - - scanner.on('new_broadcast', on_new_broadcast) - await scanner.start() - print('Start scanning...') - broadcast = await scan_result - print('Advertisement found:') - broadcast.print() - basic_audio_announcement_scanned = asyncio.Event() - - def on_change() -> None: - if ( - broadcast.basic_audio_announcement - and not basic_audio_announcement_scanned.is_set() - ): - basic_audio_announcement_scanned.set() - - broadcast.on('change', on_change) - if not broadcast.basic_audio_announcement: - print('Wait for Basic Audio Announcement...') - await basic_audio_announcement_scanned.wait() - print('Basic Audio Announcement found') - broadcast.print() - print('Stop scanning') - await scanner.stop() - print('Start sync to BIG') - - assert broadcast.basic_audio_announcement - subgroup = broadcast.basic_audio_announcement.subgroups[subgroup_index] - configuration = subgroup.codec_specific_configuration - assert configuration - assert (sampling_frequency := configuration.sampling_frequency) - assert (frame_duration := configuration.frame_duration) - - big_sync = await device.create_big_sync( - broadcast.sync, - bumble.device.BigSyncParameters( - big_sync_timeout=0x4000, - bis=[bis.index for bis in subgroup.bis], - broadcast_code=( - bytes.fromhex(broadcast_code) if broadcast_code else None - ), - ), - ) - num_bis = len(big_sync.bis_links) - decoder = lc3.Decoder( - frame_duration_us=frame_duration.us, - sample_rate_hz=sampling_frequency.hz, - num_channels=num_bis, - ) - sdus = [b''] * num_bis - subprocess = await asyncio.create_subprocess_shell( - f'stdbuf -i0 ffplay -ar {sampling_frequency.hz} -ac {num_bis} -f f32le pipe:0', - stdin=asyncio.subprocess.PIPE, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - ) - for i, bis_link in enumerate(big_sync.bis_links): - print(f'Setup ISO for BIS {bis_link.handle}') - - def sink(index: int, packet: hci.HCI_IsoDataPacket): - nonlocal sdus - sdus[index] = packet.iso_sdu_fragment - if all(sdus) and subprocess.stdin: - subprocess.stdin.write(decoder.decode(b''.join(sdus)).tobytes()) - sdus = [b''] * num_bis - - bis_link.sink = functools.partial(sink, i) - await bis_link.setup_data_path( - direction=bis_link.Direction.CONTROLLER_TO_HOST - ) - - terminated = asyncio.Event() - big_sync.on(big_sync.Event.TERMINATION, lambda _: terminated.set()) - await terminated.wait() - - async def run_broadcast( - transport: str, broadcast_id: int, broadcast_code: str | None, wav_file_path: str + transport: str, broadcast_id: int, broadcast_code: str | None, wav_file_path: str | None ) -> None: - - TEST_SINE = 0 + + TEST_SINE = 1 if wav_file_path is None else 0 + print(f'Test Sine: {TEST_SINE}') encoder = LeAudioEncoder() async with create_device(transport) as device: if not device.supports_le_periodic_advertising: @@ -739,27 +109,29 @@ async def run_broadcast( f = open("log.btsnoop", "wb") Snooper = BtSnooper(f) device.host.snooper = Snooper - encoder.setup_encoders(48000,10000,1) - frames = list[bytes]() - sine = generate_sine_data(1000,48000,0.01) - print(len(sine)) - - sample_size = 480 - print(wav_file_path) - upsampled_left_channel = read_wav_file(wav_file_path,48000) - num_runs = len(upsampled_left_channel) // sample_size - TEST_SINE = 0 - for i in range(num_runs): - - if TEST_SINE == 0: - pcm = upsampled_left_channel[i * - sample_size:i*sample_size+sample_size] - iso = encoder.encode(100,1,1,bytes(pcm)) - else: - iso = encoder.encode(100,1,1,sine) - - frames.append(iso) + # setup Lc3 encoder + encoder.setup_encoders(48000, 10000, 1) + + frames = list[bytes]() + + if TEST_SINE == 1: + data_to_encode = generate_sine_data(1000, 48000, 0.01) + num_runs = 2000 + else: + sample_size = 480 + data_to_encode = read_wav_file(wav_file_path, 48000) + num_runs = len(data_to_encode) // sample_size + + for i in range(num_runs): + if TEST_SINE == 0: + pcm = data_to_encode[i * sample_size:i*sample_size+sample_size] + iso = encoder.encode(100, 1, 1, bytes(pcm)) + else: + iso = encoder.encode(100, 1, 1, data_to_encode) + + frames.append(iso) + del encoder print('Encoding complete.') @@ -794,7 +166,8 @@ async def run_broadcast( ) ], ) - broadcast_audio_announcement = bap.BroadcastAudioAnnouncement(broadcast_id) + broadcast_audio_announcement = bap.BroadcastAudioAnnouncement( + broadcast_id) print('Start Advertising') advertising_set = await device.create_advertising_set( advertising_parameters=bumble.device.AdvertisingParameters( @@ -837,7 +210,6 @@ async def run_broadcast( ), ) print('Setup ISO Data Path') - for bis_link in big.bis_links: await bis_link.setup_data_path( direction=bis_link.Direction.HOST_TO_CONTROLLER @@ -845,20 +217,18 @@ async def run_broadcast( dummy_frame = bytearray(100) big.bis_links[0].write(dummy_frame) - #big.bis_links[1].write(dummy_frame) + # big.bis_links[1].write(dummy_frame) def on_iso_pdu_sent(event): global iso_index - print('ISO PDU sent') big.bis_links[0].write(frames[iso_index]) - iso_index +=1 - if iso_index == len(frames): + iso_index += 1 + if iso_index == len(frames): iso_index = 0 - - device.host.on('packet_complete',on_iso_pdu_sent) - print('Start sending frames') + device.host.on('packet_complete', on_iso_pdu_sent) + print('Start sending frames ...') while True: await asyncio.sleep(1) @@ -884,8 +254,16 @@ def run_async(async_command: Coroutine) -> None: # ----------------------------------------------------------------------------- @click.command() -@click.option('transport','-p', type=str) -@click.option('wav_file_path','-w' , type=str) +@click.argument( + 'port', + metavar='com port', + type=str, +) +@click.option( + '--wav', + metavar='WAV_FILE_PATH', + type=str, + help='Wav file path',) @click.option( '--broadcast_id', metavar='BROADCAST_ID', @@ -899,23 +277,23 @@ def run_async(async_command: Coroutine) -> None: type=str, help='Broadcast encryption code in hex format', ) - -def broadcast(transport, broadcast_id, broadcast_code, wav_file_path): +def broadcast(port, broadcast_id, broadcast_code, wav): """Start a broadcast as a source.""" - #ctx.ensure_object(dict) + # ctx.ensure_object(dict) run_async( run_broadcast( - transport=transport, + transport=port, broadcast_id=broadcast_id, broadcast_code=broadcast_code, - wav_file_path=wav_file_path, + wav_file_path=wav, ) ) def main(): - - logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'ERROR').upper()) + + logging.basicConfig(level=os.environ.get( + 'BUMBLE_LOGLEVEL', 'ERROR').upper()) broadcast() @@ -929,4 +307,3 @@ if __name__ == "__main__": # SCAN 2 OFF 2 # open F0F1F2F3F4F5 BROAD 2 0 1e40 # MUSIC 91 PLAY - diff --git a/utils/le_audio_encoder.py b/utils/le_audio_encoder.py deleted file mode 100644 index 602ebfa..0000000 --- a/utils/le_audio_encoder.py +++ /dev/null @@ -1,121 +0,0 @@ - -import wasmtime -import ctypes -from typing import List, cast -import wasmtime.loader -import utils.liblc3 as liblc3 # type: ignore -import enum - -store = wasmtime.loader.store - -_memory = cast(wasmtime.Memory, liblc3.memory) - -STACK_POINTER = _memory.data_len(store) - -_memory.grow(store, 1) - -# Mapping wasmtime memory to linear address - -memory = (ctypes.c_ubyte * _memory.data_len(store)).from_address( - - ctypes.addressof(_memory.data_ptr(store).contents) # type: ignore - -) - - -class Liblc3PcmFormat(enum.IntEnum): - - S16 = 0 - - S24 = 1 - - S24_3LE = 2 - - FLOAT = 3 - - -DEFAULT_PCM_SAMPLE_RATE = 48000 -MAX_DECODER_SIZE = liblc3.lc3_decoder_size(10000, DEFAULT_PCM_SAMPLE_RATE) -MAX_ENCODER_SIZE = liblc3.lc3_encoder_size(10000, DEFAULT_PCM_SAMPLE_RATE) - -DECODER_STACK_POINTER = STACK_POINTER -ENCODER_STACK_POINTER = DECODER_STACK_POINTER + MAX_DECODER_SIZE * 2 -DECODE_BUFFER_STACK_POINTER = ENCODER_STACK_POINTER + MAX_ENCODER_SIZE * 2 -ENCODE_BUFFER_STACK_POINTER = DECODE_BUFFER_STACK_POINTER + 8192 - - -DEFAULT_PCM_FORMAT = Liblc3PcmFormat - -DEFAULT_PCM_BYTES_PER_SAMPLE = 2 - - -class LeAudioEncoder: - - def __init__(self): - self.encoders: List[int] = [] - pass - - def setup_encoders(self, sample_rate: int, frame_duration_us: int, num_channels: int) -> None: - """Setup LE audio encoders - - Args: - sample_rate (int): Sample rate in Hz - frame_duration_us (int): Frame duration in microseconds - num_channels (int): Number of channels - """ - self.encoders[:num_channels] = [ - liblc3.lc3_setup_encoder( - frame_duration_us, - sample_rate, - 0, # Input sample rate - ENCODER_STACK_POINTER + MAX_ENCODER_SIZE * i, - ) - for i in range(num_channels) - ] - - def encode( - self, - sdu_length: int, - num_channels: int, - input_stride: int, - input_data: bytes, - ) -> bytes: - """Encode a LE audio frame - - Args: - sdu_length (int): Length of the SDU - num_channels (int): Number of channels - input_stride (int): Stride of the input data - input_data (bytes): Input data to encode - - Returns: - bytes: Encoded data - """ - if not input_data: - return b"" - - input_buffer_offset = ENCODE_BUFFER_STACK_POINTER - input_buffer_size = len(input_data) - - # Copy into wasm memory - memory[input_buffer_offset : input_buffer_offset + input_buffer_size] = input_data - - output_buffer_offset = input_buffer_offset + input_buffer_size - output_buffer_size = sdu_length - output_frame_size = output_buffer_size // num_channels - - for i in range(num_channels): - result = liblc3.lc3_encode( - self.encoders[i], - 0, - input_buffer_offset + DEFAULT_PCM_BYTES_PER_SAMPLE * i, - input_stride, - output_frame_size, - output_buffer_offset + output_frame_size * i, - ) - - if result != 0: - raise RuntimeError(f"lc3_encode failed, result={result}") - - # Extract encoded data from the output buffer - return bytes(memory[output_buffer_offset : output_buffer_offset + output_buffer_size]) diff --git a/utils/liblc3.wasm b/utils/liblc3.wasm deleted file mode 100644 index e905105..0000000 Binary files a/utils/liblc3.wasm and /dev/null differ