# Copyright 2024 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ----------------------------------------------------------------------------- # Imports # ----------------------------------------------------------------------------- from __future__ import annotations import asyncio import dataclasses import logging import os from typing import cast, Dict, Optional, Tuple import click import pyee from bumble.colors import color import bumble.company_ids import bumble.core import bumble.device import bumble.gatt import bumble.hci import bumble.profiles.bap import bumble.profiles.pbp import bumble.transport import bumble.utils # ----------------------------------------------------------------------------- # Logging # ----------------------------------------------------------------------------- logger = logging.getLogger(__name__) # ----------------------------------------------------------------------------- # Constants # ----------------------------------------------------------------------------- AURACAST_DEFAULT_DEVICE_NAME = "Bumble Auracast" AURACAST_DEFAULT_DEVICE_ADDRESS = bumble.hci.Address("F0:F1:F2:F3:F4:F5") # ----------------------------------------------------------------------------- # Discover Broadcasts # ----------------------------------------------------------------------------- class BroadcastDiscoverer: @dataclasses.dataclass class Broadcast(pyee.EventEmitter): name: str sync: bumble.device.PeriodicAdvertisingSync rssi: int = 0 public_broadcast_announcement: Optional[ bumble.profiles.pbp.PublicBroadcastAnnouncement ] = None broadcast_audio_announcement: Optional[ bumble.profiles.bap.BroadcastAudioAnnouncement ] = None basic_audio_announcement: Optional[ bumble.profiles.bap.BasicAudioAnnouncement ] = None appearance: Optional[bumble.core.Appearance] = None biginfo: Optional[bumble.device.BIGInfoAdvertisement] = None manufacturer_data: Optional[Tuple[str, bytes]] = None def __post_init__(self) -> None: super().__init__() self.sync.on('establishment', self.on_sync_establishment) self.sync.on('loss', self.on_sync_loss) self.sync.on('periodic_advertisement', self.on_periodic_advertisement) self.sync.on('biginfo_advertisement', self.on_biginfo_advertisement) self.establishment_timeout_task = asyncio.create_task( self.wait_for_establishment() ) async def wait_for_establishment(self) -> None: await asyncio.sleep(5.0) if self.sync.state == bumble.device.PeriodicAdvertisingSync.State.PENDING: print( color( '!!! Periodic advertisement sync not established in time, ' 'canceling', 'red', ) ) await self.sync.terminate() def update(self, advertisement: bumble.device.Advertisement) -> None: self.rssi = advertisement.rssi for service_data in advertisement.data.get_all( bumble.core.AdvertisingData.SERVICE_DATA ): assert isinstance(service_data, tuple) service_uuid, data = service_data assert isinstance(data, bytes) if ( service_uuid == bumble.gatt.GATT_PUBLIC_BROADCAST_ANNOUNCEMENT_SERVICE ): self.public_broadcast_announcement = ( bumble.profiles.pbp.PublicBroadcastAnnouncement.from_bytes(data) ) continue if ( service_uuid == bumble.gatt.GATT_BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE ): self.broadcast_audio_announcement = ( bumble.profiles.bap.BroadcastAudioAnnouncement.from_bytes(data) ) continue self.appearance = advertisement.data.get( # type: ignore[assignment] bumble.core.AdvertisingData.APPEARANCE ) if manufacturer_data := advertisement.data.get( bumble.core.AdvertisingData.MANUFACTURER_SPECIFIC_DATA ): assert isinstance(manufacturer_data, tuple) company_id = cast(int, manufacturer_data[0]) data = cast(bytes, manufacturer_data[1]) self.manufacturer_data = ( bumble.company_ids.COMPANY_IDENTIFIERS.get( company_id, f'0x{company_id:04X}' ), data, ) def print(self) -> None: print( color('Broadcast:', 'yellow'), self.sync.advertiser_address, color(self.sync.state.name, 'green'), ) print(f' {color("Name", "cyan")}: {self.name}') if self.appearance: print(f' {color("Appearance", "cyan")}: {str(self.appearance)}') print(f' {color("RSSI", "cyan")}: {self.rssi}') print(f' {color("SID", "cyan")}: {self.sync.sid}') if self.manufacturer_data: print( f' {color("Manufacturer Data", "cyan")}: ' f'{self.manufacturer_data[0]} -> {self.manufacturer_data[1].hex()}' ) if self.broadcast_audio_announcement: print( f' {color("Broadcast ID", "cyan")}: ' f'{self.broadcast_audio_announcement.broadcast_id}' ) if self.public_broadcast_announcement: print( f' {color("Features", "cyan")}: ' f'{self.public_broadcast_announcement.features}' ) print( f' {color("Metadata", "cyan")}: ' f'{self.public_broadcast_announcement.metadata}' ) if self.basic_audio_announcement: print(color(' Audio:', 'cyan')) print( color(' Presentation Delay:', 'magenta'), self.basic_audio_announcement.presentation_delay, ) for subgroup in self.basic_audio_announcement.subgroups: print(color(' Subgroup:', 'magenta')) print(color(' Codec ID:', 'yellow')) print( color(' Coding Format: ', 'green'), subgroup.codec_id.coding_format.name, ) print( color(' Company ID: ', 'green'), subgroup.codec_id.company_id, ) print( color(' Vendor Specific Codec ID:', 'green'), subgroup.codec_id.vendor_specific_codec_id, ) print( color(' Codec Config:', 'yellow'), subgroup.codec_specific_configuration, ) print(color(' Metadata: ', 'yellow'), subgroup.metadata) for bis in subgroup.bis: print(color(f' BIS [{bis.index}]:', 'yellow')) print( color(' Codec Config:', 'green'), bis.codec_specific_configuration, ) if self.biginfo: print(color(' BIG:', 'cyan')) print( color(' Number of BIS:', 'magenta'), self.biginfo.num_bis, ) print( color(' PHY: ', 'magenta'), self.biginfo.phy.name, ) print( color(' Framed: ', 'magenta'), self.biginfo.framed, ) print( color(' Encrypted: ', 'magenta'), self.biginfo.encrypted, ) def on_sync_establishment(self) -> None: self.establishment_timeout_task.cancel() self.emit('change') def on_sync_loss(self) -> None: self.basic_audio_announcement = None self.biginfo = None self.emit('change') def on_periodic_advertisement( self, advertisement: bumble.device.PeriodicAdvertisement ) -> None: if advertisement.data is None: return for service_data in advertisement.data.get_all( bumble.core.AdvertisingData.SERVICE_DATA ): assert isinstance(service_data, tuple) service_uuid, data = service_data assert isinstance(data, bytes) if service_uuid == bumble.gatt.GATT_BASIC_AUDIO_ANNOUNCEMENT_SERVICE: self.basic_audio_announcement = ( bumble.profiles.bap.BasicAudioAnnouncement.from_bytes(data) ) break self.emit('change') def on_biginfo_advertisement( self, advertisement: bumble.device.BIGInfoAdvertisement ) -> None: self.biginfo = advertisement self.emit('change') def __init__( self, device: bumble.device.Device, filter_duplicates: bool, sync_timeout: float, ): self.device = device self.filter_duplicates = filter_duplicates self.sync_timeout = sync_timeout self.broadcasts: Dict[bumble.hci.Address, BroadcastDiscoverer.Broadcast] = {} self.status_message = '' device.on('advertisement', self.on_advertisement) async def run(self) -> None: self.status_message = color('Scanning...', 'green') await self.device.start_scanning( active=False, filter_duplicates=False, ) def refresh(self) -> None: # Clear the screen from the top print('\033[H') print('\033[0J') print('\033[H') # Print the status message print(self.status_message) print("==========================================") # Print all broadcasts for broadcast in self.broadcasts.values(): broadcast.print() print('------------------------------------------') # Clear the screen to the bottom print('\033[0J') def on_advertisement(self, advertisement: bumble.device.Advertisement) -> None: if ( broadcast_name := advertisement.data.get( bumble.core.AdvertisingData.BROADCAST_NAME ) ) is None: return assert isinstance(broadcast_name, str) if broadcast := self.broadcasts.get(advertisement.address): broadcast.update(advertisement) self.refresh() return bumble.utils.AsyncRunner.spawn( self.on_new_broadcast(broadcast_name, advertisement) ) async def on_new_broadcast( self, name: str, advertisement: bumble.device.Advertisement ) -> None: periodic_advertising_sync = await self.device.create_periodic_advertising_sync( advertiser_address=advertisement.address, sid=advertisement.sid, sync_timeout=self.sync_timeout, filter_duplicates=self.filter_duplicates, ) broadcast = self.Broadcast( name, periodic_advertising_sync, ) broadcast.on('change', self.refresh) broadcast.update(advertisement) self.broadcasts[advertisement.address] = broadcast periodic_advertising_sync.on('loss', lambda: self.on_broadcast_loss(broadcast)) self.status_message = color( f'+Found {len(self.broadcasts)} broadcasts', 'green' ) self.refresh() def on_broadcast_loss(self, broadcast: Broadcast) -> None: del self.broadcasts[broadcast.sync.advertiser_address] bumble.utils.AsyncRunner.spawn(broadcast.sync.terminate()) self.status_message = color( f'-Found {len(self.broadcasts)} broadcasts', 'green' ) self.refresh() async def run_discover_broadcasts( filter_duplicates: bool, sync_timeout: float, transport: str ) -> None: async with await bumble.transport.open_transport(transport) as ( hci_source, hci_sink, ): device = bumble.device.Device.with_hci( AURACAST_DEFAULT_DEVICE_NAME, AURACAST_DEFAULT_DEVICE_ADDRESS, hci_source, hci_sink, ) await device.power_on() discoverer = BroadcastDiscoverer(device, filter_duplicates, sync_timeout) await discoverer.run() await hci_source.terminated # ----------------------------------------------------------------------------- # Main # ----------------------------------------------------------------------------- @click.group() @click.pass_context def auracast( ctx, ): ctx.ensure_object(dict) @auracast.command('discover-broadcasts') @click.option( '--filter-duplicates', is_flag=True, default=False, help='Filter duplicates' ) @click.option( '--sync-timeout', metavar='SYNC_TIMEOUT', type=float, default=5.0, help='Sync timeout (in seconds)', ) @click.argument('transport') @click.pass_context def discover_broadcasts(ctx, filter_duplicates, sync_timeout, transport): """Discover public broadcasts""" asyncio.run(run_discover_broadcasts(filter_duplicates, sync_timeout, transport)) def main(): logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper()) auracast() # ----------------------------------------------------------------------------- if __name__ == "__main__": main() # pylint: disable=no-value-for-parameter