diff --git a/apps/auracast.py b/apps/auracast.py new file mode 100644 index 0000000..463fc9b --- /dev/null +++ b/apps/auracast.py @@ -0,0 +1,407 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# ----------------------------------------------------------------------------- +# Imports +# ----------------------------------------------------------------------------- +from __future__ import annotations +import asyncio +import dataclasses +import logging +import os +from typing import cast, Dict, Optional, Tuple + +import click +import pyee + +from bumble.colors import color +import bumble.company_ids +import bumble.core +import bumble.device +import bumble.gatt +import bumble.hci +import bumble.profiles.bap +import bumble.profiles.pbp +import bumble.transport +import bumble.utils + + +# ----------------------------------------------------------------------------- +# Logging +# ----------------------------------------------------------------------------- +logger = logging.getLogger(__name__) + + +# ----------------------------------------------------------------------------- +# Constants +# ----------------------------------------------------------------------------- +AURACAST_DEFAULT_DEVICE_NAME = "Bumble Auracast" +AURACAST_DEFAULT_DEVICE_ADDRESS = bumble.hci.Address("F0:F1:F2:F3:F4:F5") + + +# ----------------------------------------------------------------------------- +# Discover Broadcasts +# ----------------------------------------------------------------------------- +class BroadcastDiscoverer: + @dataclasses.dataclass + class Broadcast(pyee.EventEmitter): + name: str + sync: bumble.device.PeriodicAdvertisingSync + rssi: int = 0 + public_broadcast_announcement: Optional[ + bumble.profiles.pbp.PublicBroadcastAnnouncement + ] = None + broadcast_audio_announcement: Optional[ + bumble.profiles.bap.BroadcastAudioAnnouncement + ] = None + basic_audio_announcement: Optional[ + bumble.profiles.bap.BasicAudioAnnouncement + ] = None + appearance: Optional[bumble.core.Appearance] = None + biginfo: Optional[bumble.device.BIGInfoAdvertisement] = None + manufacturer_data: Optional[Tuple[str, bytes]] = None + + def __post_init__(self) -> None: + super().__init__() + self.sync.on('establishment', self.on_sync_establishment) + self.sync.on('loss', self.on_sync_loss) + self.sync.on('periodic_advertisement', self.on_periodic_advertisement) + self.sync.on('biginfo_advertisement', self.on_biginfo_advertisement) + + self.establishment_timeout_task = asyncio.create_task( + self.wait_for_establishment() + ) + + async def wait_for_establishment(self) -> None: + await asyncio.sleep(5.0) + if self.sync.state == bumble.device.PeriodicAdvertisingSync.State.PENDING: + print( + color( + '!!! Periodic advertisement sync not established in time, ' + 'canceling', + 'red', + ) + ) + await self.sync.terminate() + + def update(self, advertisement: bumble.device.Advertisement) -> None: + self.rssi = advertisement.rssi + for service_data in advertisement.data.get_all( + bumble.core.AdvertisingData.SERVICE_DATA + ): + assert isinstance(service_data, tuple) + service_uuid, data = service_data + assert isinstance(data, bytes) + + if ( + service_uuid + == bumble.gatt.GATT_PUBLIC_BROADCAST_ANNOUNCEMENT_SERVICE + ): + self.public_broadcast_announcement = ( + bumble.profiles.pbp.PublicBroadcastAnnouncement.from_bytes(data) + ) + continue + + if ( + service_uuid + == bumble.gatt.GATT_BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE + ): + self.broadcast_audio_announcement = ( + bumble.profiles.bap.BroadcastAudioAnnouncement.from_bytes(data) + ) + continue + + self.appearance = advertisement.data.get( # type: ignore[assignment] + bumble.core.AdvertisingData.APPEARANCE + ) + + if manufacturer_data := advertisement.data.get( + bumble.core.AdvertisingData.MANUFACTURER_SPECIFIC_DATA + ): + assert isinstance(manufacturer_data, tuple) + company_id = cast(int, manufacturer_data[0]) + data = cast(bytes, manufacturer_data[1]) + self.manufacturer_data = ( + bumble.company_ids.COMPANY_IDENTIFIERS.get( + company_id, f'0x{company_id:04X}' + ), + data, + ) + + def print(self) -> None: + print( + color('Broadcast:', 'yellow'), + self.sync.advertiser_address, + color(self.sync.state.name, 'green'), + ) + print(f' {color("Name", "cyan")}: {self.name}') + if self.appearance: + print(f' {color("Appearance", "cyan")}: {str(self.appearance)}') + print(f' {color("RSSI", "cyan")}: {self.rssi}') + print(f' {color("SID", "cyan")}: {self.sync.sid}') + + if self.manufacturer_data: + print( + f' {color("Manufacturer Data", "cyan")}: ' + f'{self.manufacturer_data[0]} -> {self.manufacturer_data[1].hex()}' + ) + + if self.broadcast_audio_announcement: + print( + f' {color("Broadcast ID", "cyan")}: ' + f'{self.broadcast_audio_announcement.broadcast_id}' + ) + + if self.public_broadcast_announcement: + print( + f' {color("Features", "cyan")}: ' + f'{self.public_broadcast_announcement.features}' + ) + print( + f' {color("Metadata", "cyan")}: ' + f'{self.public_broadcast_announcement.metadata}' + ) + + if self.basic_audio_announcement: + print(color(' Audio:', 'cyan')) + print( + color(' Presentation Delay:', 'magenta'), + self.basic_audio_announcement.presentation_delay, + ) + for subgroup in self.basic_audio_announcement.subgroups: + print(color(' Subgroup:', 'magenta')) + print(color(' Codec ID:', 'yellow')) + print( + color(' Coding Format: ', 'green'), + subgroup.codec_id.coding_format.name, + ) + print( + color(' Company ID: ', 'green'), + subgroup.codec_id.company_id, + ) + print( + color(' Vendor Specific Codec ID:', 'green'), + subgroup.codec_id.vendor_specific_codec_id, + ) + print( + color(' Codec Config:', 'yellow'), + subgroup.codec_specific_configuration, + ) + print(color(' Metadata: ', 'yellow'), subgroup.metadata) + + for bis in subgroup.bis: + print(color(f' BIS [{bis.index}]:', 'yellow')) + print( + color(' Codec Config:', 'green'), + bis.codec_specific_configuration, + ) + + if self.biginfo: + print(color(' BIG:', 'cyan')) + print( + color(' Number of BIS:', 'magenta'), + self.biginfo.num_bis, + ) + print( + color(' PHY: ', 'magenta'), + self.biginfo.phy.name, + ) + print( + color(' Framed: ', 'magenta'), + self.biginfo.framed, + ) + print( + color(' Encrypted: ', 'magenta'), + self.biginfo.encrypted, + ) + + def on_sync_establishment(self) -> None: + self.establishment_timeout_task.cancel() + self.emit('change') + + def on_sync_loss(self) -> None: + self.basic_audio_announcement = None + self.biginfo = None + self.emit('change') + + def on_periodic_advertisement( + self, advertisement: bumble.device.PeriodicAdvertisement + ) -> None: + if advertisement.data is None: + return + + for service_data in advertisement.data.get_all( + bumble.core.AdvertisingData.SERVICE_DATA + ): + assert isinstance(service_data, tuple) + service_uuid, data = service_data + assert isinstance(data, bytes) + + if service_uuid == bumble.gatt.GATT_BASIC_AUDIO_ANNOUNCEMENT_SERVICE: + self.basic_audio_announcement = ( + bumble.profiles.bap.BasicAudioAnnouncement.from_bytes(data) + ) + break + + self.emit('change') + + def on_biginfo_advertisement( + self, advertisement: bumble.device.BIGInfoAdvertisement + ) -> None: + self.biginfo = advertisement + self.emit('change') + + def __init__( + self, + device: bumble.device.Device, + filter_duplicates: bool, + sync_timeout: float, + ): + self.device = device + self.filter_duplicates = filter_duplicates + self.sync_timeout = sync_timeout + self.broadcasts: Dict[bumble.hci.Address, BroadcastDiscoverer.Broadcast] = {} + self.status_message = '' + device.on('advertisement', self.on_advertisement) + + async def run(self) -> None: + self.status_message = color('Scanning...', 'green') + await self.device.start_scanning( + active=False, + filter_duplicates=False, + ) + + def refresh(self) -> None: + # Clear the screen from the top + print('\033[H') + print('\033[0J') + print('\033[H') + + # Print the status message + print(self.status_message) + print("==========================================") + + # Print all broadcasts + for broadcast in self.broadcasts.values(): + broadcast.print() + print('------------------------------------------') + + # Clear the screen to the bottom + print('\033[0J') + + def on_advertisement(self, advertisement: bumble.device.Advertisement) -> None: + if ( + broadcast_name := advertisement.data.get( + bumble.core.AdvertisingData.BROADCAST_NAME + ) + ) is None: + return + assert isinstance(broadcast_name, str) + + if broadcast := self.broadcasts.get(advertisement.address): + broadcast.update(advertisement) + self.refresh() + return + + bumble.utils.AsyncRunner.spawn( + self.on_new_broadcast(broadcast_name, advertisement) + ) + + async def on_new_broadcast( + self, name: str, advertisement: bumble.device.Advertisement + ) -> None: + periodic_advertising_sync = await self.device.create_periodic_advertising_sync( + advertiser_address=advertisement.address, + sid=advertisement.sid, + sync_timeout=self.sync_timeout, + filter_duplicates=self.filter_duplicates, + ) + broadcast = self.Broadcast( + name, + periodic_advertising_sync, + ) + broadcast.on('change', self.refresh) + broadcast.update(advertisement) + self.broadcasts[advertisement.address] = broadcast + periodic_advertising_sync.on('loss', lambda: self.on_broadcast_loss(broadcast)) + self.status_message = color( + f'+Found {len(self.broadcasts)} broadcasts', 'green' + ) + self.refresh() + + def on_broadcast_loss(self, broadcast: Broadcast) -> None: + del self.broadcasts[broadcast.sync.advertiser_address] + bumble.utils.AsyncRunner.spawn(broadcast.sync.terminate()) + self.status_message = color( + f'-Found {len(self.broadcasts)} broadcasts', 'green' + ) + self.refresh() + + +async def run_discover_broadcasts( + filter_duplicates: bool, sync_timeout: float, transport: str +) -> None: + async with await bumble.transport.open_transport(transport) as ( + hci_source, + hci_sink, + ): + device = bumble.device.Device.with_hci( + AURACAST_DEFAULT_DEVICE_NAME, + AURACAST_DEFAULT_DEVICE_ADDRESS, + hci_source, + hci_sink, + ) + await device.power_on() + discoverer = BroadcastDiscoverer(device, filter_duplicates, sync_timeout) + await discoverer.run() + await hci_source.terminated + + +# ----------------------------------------------------------------------------- +# Main +# ----------------------------------------------------------------------------- +@click.group() +@click.pass_context +def auracast( + ctx, +): + ctx.ensure_object(dict) + + +@auracast.command('discover-broadcasts') +@click.option( + '--filter-duplicates', is_flag=True, default=False, help='Filter duplicates' +) +@click.option( + '--sync-timeout', + metavar='SYNC_TIMEOUT', + type=float, + default=5.0, + help='Sync timeout (in seconds)', +) +@click.argument('transport') +@click.pass_context +def discover_broadcasts(ctx, filter_duplicates, sync_timeout, transport): + """Discover public broadcasts""" + asyncio.run(run_discover_broadcasts(filter_duplicates, sync_timeout, transport)) + + +def main(): + logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper()) + auracast() + + +# ----------------------------------------------------------------------------- +if __name__ == "__main__": + main() # pylint: disable=no-value-for-parameter diff --git a/bumble/core.py b/bumble/core.py index dce721a..92e0c33 100644 --- a/bumble/core.py +++ b/bumble/core.py @@ -16,11 +16,14 @@ # Imports # ----------------------------------------------------------------------------- from __future__ import annotations +import dataclasses import enum import struct from typing import List, Optional, Tuple, Union, cast, Dict +from typing_extensions import Self -from .company_ids import COMPANY_IDENTIFIERS +from bumble.company_ids import COMPANY_IDENTIFIERS +from bumble.utils import OpenIntEnum # ----------------------------------------------------------------------------- @@ -692,11 +695,569 @@ class DeviceClass: return name_or_number(class_names, minor_device_class) +# ----------------------------------------------------------------------------- +# Appearance +# ----------------------------------------------------------------------------- +class Appearance: + class Category(OpenIntEnum): + UNKNOWN = 0x0000 + PHONE = 0x0001 + COMPUTER = 0x0002 + WATCH = 0x0003 + CLOCK = 0x0004 + DISPLAY = 0x0005 + REMOTE_CONTROL = 0x0006 + EYE_GLASSES = 0x0007 + TAG = 0x0008 + KEYRING = 0x0009 + MEDIA_PLAYER = 0x000A + BARCODE_SCANNER = 0x000B + THERMOMETER = 0x000C + HEART_RATE_SENSOR = 0x000D + BLOOD_PRESSURE = 0x000E + HUMAN_INTERFACE_DEVICE = 0x000F + GLUCOSE_METER = 0x0010 + RUNNING_WALKING_SENSOR = 0x0011 + CYCLING = 0x0012 + CONTROL_DEVICE = 0x0013 + NETWORK_DEVICE = 0x0014 + SENSOR = 0x0015 + LIGHT_FIXTURES = 0x0016 + FAN = 0x0017 + HVAC = 0x0018 + AIR_CONDITIONING = 0x0019 + HUMIDIFIER = 0x001A + HEATING = 0x001B + ACCESS_CONTROL = 0x001C + MOTORIZED_DEVICE = 0x001D + POWER_DEVICE = 0x001E + LIGHT_SOURCE = 0x001F + WINDOW_COVERING = 0x0020 + AUDIO_SINK = 0x0021 + AUDIO_SOURCE = 0x0022 + MOTORIZED_VEHICLE = 0x0023 + DOMESTIC_APPLIANCE = 0x0024 + WEARABLE_AUDIO_DEVICE = 0x0025 + AIRCRAFT = 0x0026 + AV_EQUIPMENT = 0x0027 + DISPLAY_EQUIPMENT = 0x0028 + HEARING_AID = 0x0029 + GAMING = 0x002A + SIGNAGE = 0x002B + PULSE_OXIMETER = 0x0031 + WEIGHT_SCALE = 0x0032 + PERSONAL_MOBILITY_DEVICE = 0x0033 + CONTINUOUS_GLUCOSE_MONITOR = 0x0034 + INSULIN_PUMP = 0x0035 + MEDICATION_DELIVERY = 0x0036 + SPIROMETER = 0x0037 + OUTDOOR_SPORTS_ACTIVITY = 0x0051 + + class UnknownSubcategory(OpenIntEnum): + GENERIC_UNKNOWN = 0x00 + + class PhoneSubcategory(OpenIntEnum): + GENERIC_PHONE = 0x00 + + class ComputerSubcategory(OpenIntEnum): + GENERIC_COMPUTER = 0x00 + DESKTOP_WORKSTATION = 0x01 + SERVER_CLASS_COMPUTER = 0x02 + LAPTOP = 0x03 + HANDHELD_PC_PDA = 0x04 + PALM_SIZE_PC_PDA = 0x05 + WEARABLE_COMPUTER = 0x06 + TABLET = 0x07 + DOCKING_STATION = 0x08 + ALL_IN_ONE = 0x09 + BLADE_SERVER = 0x0A + CONVERTIBLE = 0x0B + DETACHABLE = 0x0C + IOT_GATEWAY = 0x0D + MINI_PC = 0x0E + STICK_PC = 0x0F + + class WatchSubcategory(OpenIntEnum): + GENENERIC_WATCH = 0x00 + SPORTS_WATCH = 0x01 + SMARTWATCH = 0x02 + + class ClockSubcategory(OpenIntEnum): + GENERIC_CLOCK = 0x00 + + class DisplaySubcategory(OpenIntEnum): + GENERIC_DISPLAY = 0x00 + + class RemoteControlSubcategory(OpenIntEnum): + GENERIC_REMOTE_CONTROL = 0x00 + + class EyeglassesSubcategory(OpenIntEnum): + GENERIC_EYEGLASSES = 0x00 + + class TagSubcategory(OpenIntEnum): + GENERIC_TAG = 0x00 + + class KeyringSubcategory(OpenIntEnum): + GENERIC_KEYRING = 0x00 + + class MediaPlayerSubcategory(OpenIntEnum): + GENERIC_MEDIA_PLAYER = 0x00 + + class BarcodeScannerSubcategory(OpenIntEnum): + GENERIC_BARCODE_SCANNER = 0x00 + + class ThermometerSubcategory(OpenIntEnum): + GENERIC_THERMOMETER = 0x00 + EAR_THERMOMETER = 0x01 + + class HeartRateSensorSubcategory(OpenIntEnum): + GENERIC_HEART_RATE_SENSOR = 0x00 + HEART_RATE_BELT = 0x01 + + class BloodPressureSubcategory(OpenIntEnum): + GENERIC_BLOOD_PRESSURE = 0x00 + ARM_BLOOD_PRESSURE = 0x01 + WRIST_BLOOD_PRESSURE = 0x02 + + class HumanInterfaceDeviceSubcategory(OpenIntEnum): + GENERIC_HUMAN_INTERFACE_DEVICE = 0x00 + KEYBOARD = 0x01 + MOUSE = 0x02 + JOYSTICK = 0x03 + GAMEPAD = 0x04 + DIGITIZER_TABLET = 0x05 + CARD_READER = 0x06 + DIGITAL_PEN = 0x07 + BARCODE_SCANNER = 0x08 + TOUCHPAD = 0x09 + PRESENTATION_REMOTE = 0x0A + + class GlucoseMeterSubcategory(OpenIntEnum): + GENERIC_GLUCOSE_METER = 0x00 + + class RunningWalkingSensorSubcategory(OpenIntEnum): + GENERIC_RUNNING_WALKING_SENSOR = 0x00 + IN_SHOE_RUNNING_WALKING_SENSOR = 0x01 + ON_SHOW_RUNNING_WALKING_SENSOR = 0x02 + ON_HIP_RUNNING_WALKING_SENSOR = 0x03 + + class CyclingSubcategory(OpenIntEnum): + GENERIC_CYCLING = 0x00 + CYCLING_COMPUTER = 0x01 + SPEED_SENSOR = 0x02 + CADENCE_SENSOR = 0x03 + POWER_SENSOR = 0x04 + SPEED_AND_CADENCE_SENSOR = 0x05 + + class ControlDeviceSubcategory(OpenIntEnum): + GENERIC_CONTROL_DEVICE = 0x00 + SWITCH = 0x01 + MULTI_SWITCH = 0x02 + BUTTON = 0x03 + SLIDER = 0x04 + ROTARY_SWITCH = 0x05 + TOUCH_PANEL = 0x06 + SINGLE_SWITCH = 0x07 + DOUBLE_SWITCH = 0x08 + TRIPLE_SWITCH = 0x09 + BATTERY_SWITCH = 0x0A + ENERGY_HARVESTING_SWITCH = 0x0B + PUSH_BUTTON = 0x0C + + class NetworkDeviceSubcategory(OpenIntEnum): + GENERIC_NETWORK_DEVICE = 0x00 + ACCESS_POINT = 0x01 + MESH_DEVICE = 0x02 + MESH_NETWORK_PROXY = 0x03 + + class SensorSubcategory(OpenIntEnum): + GENERIC_SENSOR = 0x00 + MOTION_SENSOR = 0x01 + AIR_QUALITY_SENSOR = 0x02 + TEMPERATURE_SENSOR = 0x03 + HUMIDITY_SENSOR = 0x04 + LEAK_SENSOR = 0x05 + SMOKE_SENSOR = 0x06 + OCCUPANCY_SENSOR = 0x07 + CONTACT_SENSOR = 0x08 + CARBON_MONOXIDE_SENSOR = 0x09 + CARBON_DIOXIDE_SENSOR = 0x0A + AMBIENT_LIGHT_SENSOR = 0x0B + ENERGY_SENSOR = 0x0C + COLOR_LIGHT_SENSOR = 0x0D + RAIN_SENSOR = 0x0E + FIRE_SENSOR = 0x0F + WIND_SENSOR = 0x10 + PROXIMITY_SENSOR = 0x11 + MULTI_SENSOR = 0x12 + FLUSH_MOUNTED_SENSOR = 0x13 + CEILING_MOUNTED_SENSOR = 0x14 + WALL_MOUNTED_SENSOR = 0x15 + MULTISENSOR = 0x16 + ENERGY_METER = 0x17 + FLAME_DETECTOR = 0x18 + VEHICLE_TIRE_PRESSURE_SENSOR = 0x19 + + class LightFixturesSubcategory(OpenIntEnum): + GENERIC_LIGHT_FIXTURES = 0x00 + WALL_LIGHT = 0x01 + CEILING_LIGHT = 0x02 + FLOOR_LIGHT = 0x03 + CABINET_LIGHT = 0x04 + DESK_LIGHT = 0x05 + TROFFER_LIGHT = 0x06 + PENDANT_LIGHT = 0x07 + IN_GROUND_LIGHT = 0x08 + FLOOD_LIGHT = 0x09 + UNDERWATER_LIGHT = 0x0A + BOLLARD_WITH_LIGHT = 0x0B + PATHWAY_LIGHT = 0x0C + GARDEN_LIGHT = 0x0D + POLE_TOP_LIGHT = 0x0E + SPOTLIGHT = 0x0F + LINEAR_LIGHT = 0x10 + STREET_LIGHT = 0x11 + SHELVES_LIGHT = 0x12 + BAY_LIGHT = 0x013 + EMERGENCY_EXIT_LIGHT = 0x14 + LIGHT_CONTROLLER = 0x15 + LIGHT_DRIVER = 0x16 + BULB = 0x17 + LOW_BAY_LIGHT = 0x18 + HIGH_BAY_LIGHT = 0x19 + + class FanSubcategory(OpenIntEnum): + GENERIC_FAN = 0x00 + CEILING_FAN = 0x01 + AXIAL_FAN = 0x02 + EXHAUST_FAN = 0x03 + PEDESTAL_FAN = 0x04 + DESK_FAN = 0x05 + WALL_FAN = 0x06 + + class HvacSubcategory(OpenIntEnum): + GENERIC_HVAC = 0x00 + THERMOSTAT = 0x01 + HUMIDIFIER = 0x02 + DEHUMIDIFIER = 0x03 + HEATER = 0x04 + RADIATOR = 0x05 + BOILER = 0x06 + HEAT_PUMP = 0x07 + INFRARED_HEATER = 0x08 + RADIANT_PANEL_HEATER = 0x09 + FAN_HEATER = 0x0A + AIR_CURTAIN = 0x0B + + class AirConditioningSubcategory(OpenIntEnum): + GENERIC_AIR_CONDITIONING = 0x00 + + class HumidifierSubcategory(OpenIntEnum): + GENERIC_HUMIDIFIER = 0x00 + + class HeatingSubcategory(OpenIntEnum): + GENERIC_HEATING = 0x00 + RADIATOR = 0x01 + BOILER = 0x02 + HEAT_PUMP = 0x03 + INFRARED_HEATER = 0x04 + RADIANT_PANEL_HEATER = 0x05 + FAN_HEATER = 0x06 + AIR_CURTAIN = 0x07 + + class AccessControlSubcategory(OpenIntEnum): + GENERIC_ACCESS_CONTROL = 0x00 + ACCESS_DOOR = 0x01 + GARAGE_DOOR = 0x02 + EMERGENCY_EXIT_DOOR = 0x03 + ACCESS_LOCK = 0x04 + ELEVATOR = 0x05 + WINDOW = 0x06 + ENTRANCE_GATE = 0x07 + DOOR_LOCK = 0x08 + LOCKER = 0x09 + + class MotorizedDeviceSubcategory(OpenIntEnum): + GENERIC_MOTORIZED_DEVICE = 0x00 + MOTORIZED_GATE = 0x01 + AWNING = 0x02 + BLINDS_OR_SHADES = 0x03 + CURTAINS = 0x04 + SCREEN = 0x05 + + class PowerDeviceSubcategory(OpenIntEnum): + GENERIC_POWER_DEVICE = 0x00 + POWER_OUTLET = 0x01 + POWER_STRIP = 0x02 + PLUG = 0x03 + POWER_SUPPLY = 0x04 + LED_DRIVER = 0x05 + FLUORESCENT_LAMP_GEAR = 0x06 + HID_LAMP_GEAR = 0x07 + CHARGE_CASE = 0x08 + POWER_BANK = 0x09 + + class LightSourceSubcategory(OpenIntEnum): + GENERIC_LIGHT_SOURCE = 0x00 + INCANDESCENT_LIGHT_BULB = 0x01 + LED_LAMP = 0x02 + HID_LAMP = 0x03 + FLUORESCENT_LAMP = 0x04 + LED_ARRAY = 0x05 + MULTI_COLOR_LED_ARRAY = 0x06 + LOW_VOLTAGE_HALOGEN = 0x07 + ORGANIC_LIGHT_EMITTING_DIODE = 0x08 + + class WindowCoveringSubcategory(OpenIntEnum): + GENERIC_WINDOW_COVERING = 0x00 + WINDOW_SHADES = 0x01 + WINDOW_BLINDS = 0x02 + WINDOW_AWNING = 0x03 + WINDOW_CURTAIN = 0x04 + EXTERIOR_SHUTTER = 0x05 + EXTERIOR_SCREEN = 0x06 + + class AudioSinkSubcategory(OpenIntEnum): + GENERIC_AUDIO_SINK = 0x00 + STANDALONE_SPEAKER = 0x01 + SOUNDBAR = 0x02 + BOOKSHELF_SPEAKER = 0x03 + STANDMOUNTED_SPEAKER = 0x04 + SPEAKERPHONE = 0x05 + + class AudioSourceSubcategory(OpenIntEnum): + GENERIC_AUDIO_SOURCE = 0x00 + MICROPHONE = 0x01 + ALARM = 0x02 + BELL = 0x03 + HORN = 0x04 + BROADCASTING_DEVICE = 0x05 + SERVICE_DESK = 0x06 + KIOSK = 0x07 + BROADCASTING_ROOM = 0x08 + AUDITORIUM = 0x09 + + class MotorizedVehicleSubcategory(OpenIntEnum): + GENERIC_MOTORIZED_VEHICLE = 0x00 + CAR = 0x01 + LARGE_GOODS_VEHICLE = 0x02 + TWO_WHEELED_VEHICLE = 0x03 + MOTORBIKE = 0x04 + SCOOTER = 0x05 + MOPED = 0x06 + THREE_WHEELED_VEHICLE = 0x07 + LIGHT_VEHICLE = 0x08 + QUAD_BIKE = 0x09 + MINIBUS = 0x0A + BUS = 0x0B + TROLLEY = 0x0C + AGRICULTURAL_VEHICLE = 0x0D + CAMPER_CARAVAN = 0x0E + RECREATIONAL_VEHICLE_MOTOR_HOME = 0x0F + + class DomesticApplianceSubcategory(OpenIntEnum): + GENERIC_DOMESTIC_APPLIANCE = 0x00 + REFRIGERATOR = 0x01 + FREEZER = 0x02 + OVEN = 0x03 + MICROWAVE = 0x04 + TOASTER = 0x05 + WASHING_MACHINE = 0x06 + DRYER = 0x07 + COFFEE_MAKER = 0x08 + CLOTHES_IRON = 0x09 + CURLING_IRON = 0x0A + HAIR_DRYER = 0x0B + VACUUM_CLEANER = 0x0C + ROBOTIC_VACUUM_CLEANER = 0x0D + RICE_COOKER = 0x0E + CLOTHES_STEAMER = 0x0F + + class WearableAudioDeviceSubcategory(OpenIntEnum): + GENERIC_WEARABLE_AUDIO_DEVICE = 0x00 + EARBUD = 0x01 + HEADSET = 0x02 + HEADPHONES = 0x03 + NECK_BAND = 0x04 + + class AircraftSubcategory(OpenIntEnum): + GENERIC_AIRCRAFT = 0x00 + LIGHT_AIRCRAFT = 0x01 + MICROLIGHT = 0x02 + PARAGLIDER = 0x03 + LARGE_PASSENGER_AIRCRAFT = 0x04 + + class AvEquipmentSubcategory(OpenIntEnum): + GENERIC_AV_EQUIPMENT = 0x00 + AMPLIFIER = 0x01 + RECEIVER = 0x02 + RADIO = 0x03 + TUNER = 0x04 + TURNTABLE = 0x05 + CD_PLAYER = 0x06 + DVD_PLAYER = 0x07 + BLUERAY_PLAYER = 0x08 + OPTICAL_DISC_PLAYER = 0x09 + SET_TOP_BOX = 0x0A + + class DisplayEquipmentSubcategory(OpenIntEnum): + GENERIC_DISPLAY_EQUIPMENT = 0x00 + TELEVISION = 0x01 + MONITOR = 0x02 + PROJECTOR = 0x03 + + class HearingAidSubcategory(OpenIntEnum): + GENERIC_HEARING_AID = 0x00 + IN_EAR_HEARING_AID = 0x01 + BEHIND_EAR_HEARING_AID = 0x02 + COCHLEAR_IMPLANT = 0x03 + + class GamingSubcategory(OpenIntEnum): + GENERIC_GAMING = 0x00 + HOME_VIDEO_GAME_CONSOLE = 0x01 + PORTABLE_HANDHELD_CONSOLE = 0x02 + + class SignageSubcategory(OpenIntEnum): + GENERIC_SIGNAGE = 0x00 + DIGITAL_SIGNAGE = 0x01 + ELECTRONIC_LABEL = 0x02 + + class PulseOximeterSubcategory(OpenIntEnum): + GENERIC_PULSE_OXIMETER = 0x00 + FINGERTIP_PULSE_OXIMETER = 0x01 + WRIST_WORN_PULSE_OXIMETER = 0x02 + + class WeightScaleSubcategory(OpenIntEnum): + GENERIC_WEIGHT_SCALE = 0x00 + + class PersonalMobilityDeviceSubcategory(OpenIntEnum): + GENERIC_PERSONAL_MOBILITY_DEVICE = 0x00 + POWERED_WHEELCHAIR = 0x01 + MOBILITY_SCOOTER = 0x02 + + class ContinuousGlucoseMonitorSubcategory(OpenIntEnum): + GENERIC_CONTINUOUS_GLUCOSE_MONITOR = 0x00 + + class InsulinPumpSubcategory(OpenIntEnum): + GENERIC_INSULIN_PUMP = 0x00 + INSULIN_PUMP_DURABLE_PUMP = 0x01 + INSULIN_PUMP_PATCH_PUMP = 0x02 + INSULIN_PEN = 0x03 + + class MedicationDeliverySubcategory(OpenIntEnum): + GENERIC_MEDICATION_DELIVERY = 0x00 + + class SpirometerSubcategory(OpenIntEnum): + GENERIC_SPIROMETER = 0x00 + HANDHELD_SPIROMETER = 0x01 + + class OutdoorSportsActivitySubcategory(OpenIntEnum): + GENERIC_OUTDOOR_SPORTS_ACTIVITY = 0x00 + LOCATION_DISPLAY = 0x01 + LOCATION_AND_NAVIGATION_DISPLAY = 0x02 + LOCATION_POD = 0x03 + LOCATION_AND_NAVIGATION_POD = 0x04 + + class _OpenSubcategory(OpenIntEnum): + GENERIC = 0x00 + + SUBCATEGORY_CLASSES = { + Category.UNKNOWN: UnknownSubcategory, + Category.PHONE: PhoneSubcategory, + Category.COMPUTER: ComputerSubcategory, + Category.WATCH: WatchSubcategory, + Category.CLOCK: ClockSubcategory, + Category.DISPLAY: DisplaySubcategory, + Category.REMOTE_CONTROL: RemoteControlSubcategory, + Category.EYE_GLASSES: EyeglassesSubcategory, + Category.TAG: TagSubcategory, + Category.KEYRING: KeyringSubcategory, + Category.MEDIA_PLAYER: MediaPlayerSubcategory, + Category.BARCODE_SCANNER: BarcodeScannerSubcategory, + Category.THERMOMETER: ThermometerSubcategory, + Category.HEART_RATE_SENSOR: HeartRateSensorSubcategory, + Category.BLOOD_PRESSURE: BloodPressureSubcategory, + Category.HUMAN_INTERFACE_DEVICE: HumanInterfaceDeviceSubcategory, + Category.GLUCOSE_METER: GlucoseMeterSubcategory, + Category.RUNNING_WALKING_SENSOR: RunningWalkingSensorSubcategory, + Category.CYCLING: CyclingSubcategory, + Category.CONTROL_DEVICE: ControlDeviceSubcategory, + Category.NETWORK_DEVICE: NetworkDeviceSubcategory, + Category.SENSOR: SensorSubcategory, + Category.LIGHT_FIXTURES: LightFixturesSubcategory, + Category.FAN: FanSubcategory, + Category.HVAC: HvacSubcategory, + Category.AIR_CONDITIONING: AirConditioningSubcategory, + Category.HUMIDIFIER: HumidifierSubcategory, + Category.HEATING: HeatingSubcategory, + Category.ACCESS_CONTROL: AccessControlSubcategory, + Category.MOTORIZED_DEVICE: MotorizedDeviceSubcategory, + Category.POWER_DEVICE: PowerDeviceSubcategory, + Category.LIGHT_SOURCE: LightSourceSubcategory, + Category.WINDOW_COVERING: WindowCoveringSubcategory, + Category.AUDIO_SINK: AudioSinkSubcategory, + Category.AUDIO_SOURCE: AudioSourceSubcategory, + Category.MOTORIZED_VEHICLE: MotorizedVehicleSubcategory, + Category.DOMESTIC_APPLIANCE: DomesticApplianceSubcategory, + Category.WEARABLE_AUDIO_DEVICE: WearableAudioDeviceSubcategory, + Category.AIRCRAFT: AircraftSubcategory, + Category.AV_EQUIPMENT: AvEquipmentSubcategory, + Category.DISPLAY_EQUIPMENT: DisplayEquipmentSubcategory, + Category.HEARING_AID: HearingAidSubcategory, + Category.GAMING: GamingSubcategory, + Category.SIGNAGE: SignageSubcategory, + Category.PULSE_OXIMETER: PulseOximeterSubcategory, + Category.WEIGHT_SCALE: WeightScaleSubcategory, + Category.PERSONAL_MOBILITY_DEVICE: PersonalMobilityDeviceSubcategory, + Category.CONTINUOUS_GLUCOSE_MONITOR: ContinuousGlucoseMonitorSubcategory, + Category.INSULIN_PUMP: InsulinPumpSubcategory, + Category.MEDICATION_DELIVERY: MedicationDeliverySubcategory, + Category.SPIROMETER: SpirometerSubcategory, + Category.OUTDOOR_SPORTS_ACTIVITY: OutdoorSportsActivitySubcategory, + } + + category: Category + subcategory: enum.IntEnum + + @classmethod + def from_int(cls, appearance: int) -> Self: + category = cls.Category(appearance >> 6) + return cls(category, appearance & 0x3F) + + def __init__(self, category: Category, subcategory: int) -> None: + self.category = category + if subcategory_class := self.SUBCATEGORY_CLASSES.get(category): + self.subcategory = subcategory_class(subcategory) + else: + self.subcategory = self._OpenSubcategory(subcategory) + + def __int__(self) -> int: + return self.category << 6 | self.subcategory + + def __repr__(self) -> str: + return ( + 'Appearance(' + f'category={self.category.name}, ' + f'subcategory={self.subcategory.name}' + ')' + ) + + def __str__(self) -> str: + return f'{self.category.name}/{self.subcategory.name}' + + # ----------------------------------------------------------------------------- # Advertising Data # ----------------------------------------------------------------------------- -AdvertisingObject = Union[ - List[UUID], Tuple[UUID, bytes], bytes, str, int, Tuple[int, int], Tuple[int, bytes] +AdvertisingDataObject = Union[ + List[UUID], + Tuple[UUID, bytes], + bytes, + str, + int, + Tuple[int, int], + Tuple[int, bytes], + Appearance, ] @@ -704,109 +1265,115 @@ class AdvertisingData: # fmt: off # pylint: disable=line-too-long - # This list is only partial, it still needs to be filled in from the spec - FLAGS = 0x01 - INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS = 0x02 - COMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS = 0x03 - INCOMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS = 0x04 - COMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS = 0x05 - INCOMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS = 0x06 - COMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS = 0x07 - SHORTENED_LOCAL_NAME = 0x08 - COMPLETE_LOCAL_NAME = 0x09 - TX_POWER_LEVEL = 0x0A - CLASS_OF_DEVICE = 0x0D - SIMPLE_PAIRING_HASH_C = 0x0E - SIMPLE_PAIRING_HASH_C_192 = 0x0E - SIMPLE_PAIRING_RANDOMIZER_R = 0x0F - SIMPLE_PAIRING_RANDOMIZER_R_192 = 0x0F - DEVICE_ID = 0x10 - SECURITY_MANAGER_TK_VALUE = 0x10 - SECURITY_MANAGER_OUT_OF_BAND_FLAGS = 0x11 - PERIPHERAL_CONNECTION_INTERVAL_RANGE = 0x12 - LIST_OF_16_BIT_SERVICE_SOLICITATION_UUIDS = 0x14 - LIST_OF_128_BIT_SERVICE_SOLICITATION_UUIDS = 0x15 - SERVICE_DATA = 0x16 - SERVICE_DATA_16_BIT_UUID = 0x16 - PUBLIC_TARGET_ADDRESS = 0x17 - RANDOM_TARGET_ADDRESS = 0x18 - APPEARANCE = 0x19 - ADVERTISING_INTERVAL = 0x1A - LE_BLUETOOTH_DEVICE_ADDRESS = 0x1B - LE_ROLE = 0x1C - SIMPLE_PAIRING_HASH_C_256 = 0x1D - SIMPLE_PAIRING_RANDOMIZER_R_256 = 0x1E - LIST_OF_32_BIT_SERVICE_SOLICITATION_UUIDS = 0x1F - SERVICE_DATA_32_BIT_UUID = 0x20 - SERVICE_DATA_128_BIT_UUID = 0x21 - LE_SECURE_CONNECTIONS_CONFIRMATION_VALUE = 0x22 - LE_SECURE_CONNECTIONS_RANDOM_VALUE = 0x23 - URI = 0x24 - INDOOR_POSITIONING = 0x25 - TRANSPORT_DISCOVERY_DATA = 0x26 - LE_SUPPORTED_FEATURES = 0x27 - CHANNEL_MAP_UPDATE_INDICATION = 0x28 - PB_ADV = 0x29 - MESH_MESSAGE = 0x2A - MESH_BEACON = 0x2B - BIGINFO = 0x2C - BROADCAST_CODE = 0x2D - RESOLVABLE_SET_IDENTIFIER = 0x2E - ADVERTISING_INTERVAL_LONG = 0x2F - THREE_D_INFORMATION_DATA = 0x3D - MANUFACTURER_SPECIFIC_DATA = 0xFF + FLAGS = 0x01 + INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS = 0x02 + COMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS = 0x03 + INCOMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS = 0x04 + COMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS = 0x05 + INCOMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS = 0x06 + COMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS = 0x07 + SHORTENED_LOCAL_NAME = 0x08 + COMPLETE_LOCAL_NAME = 0x09 + TX_POWER_LEVEL = 0x0A + CLASS_OF_DEVICE = 0x0D + SIMPLE_PAIRING_HASH_C = 0x0E + SIMPLE_PAIRING_HASH_C_192 = 0x0E + SIMPLE_PAIRING_RANDOMIZER_R = 0x0F + SIMPLE_PAIRING_RANDOMIZER_R_192 = 0x0F + DEVICE_ID = 0x10 + SECURITY_MANAGER_TK_VALUE = 0x10 + SECURITY_MANAGER_OUT_OF_BAND_FLAGS = 0x11 + PERIPHERAL_CONNECTION_INTERVAL_RANGE = 0x12 + LIST_OF_16_BIT_SERVICE_SOLICITATION_UUIDS = 0x14 + LIST_OF_128_BIT_SERVICE_SOLICITATION_UUIDS = 0x15 + SERVICE_DATA = 0x16 + SERVICE_DATA_16_BIT_UUID = 0x16 + PUBLIC_TARGET_ADDRESS = 0x17 + RANDOM_TARGET_ADDRESS = 0x18 + APPEARANCE = 0x19 + ADVERTISING_INTERVAL = 0x1A + LE_BLUETOOTH_DEVICE_ADDRESS = 0x1B + LE_ROLE = 0x1C + SIMPLE_PAIRING_HASH_C_256 = 0x1D + SIMPLE_PAIRING_RANDOMIZER_R_256 = 0x1E + LIST_OF_32_BIT_SERVICE_SOLICITATION_UUIDS = 0x1F + SERVICE_DATA_32_BIT_UUID = 0x20 + SERVICE_DATA_128_BIT_UUID = 0x21 + LE_SECURE_CONNECTIONS_CONFIRMATION_VALUE = 0x22 + LE_SECURE_CONNECTIONS_RANDOM_VALUE = 0x23 + URI = 0x24 + INDOOR_POSITIONING = 0x25 + TRANSPORT_DISCOVERY_DATA = 0x26 + LE_SUPPORTED_FEATURES = 0x27 + CHANNEL_MAP_UPDATE_INDICATION = 0x28 + PB_ADV = 0x29 + MESH_MESSAGE = 0x2A + MESH_BEACON = 0x2B + BIGINFO = 0x2C + BROADCAST_CODE = 0x2D + RESOLVABLE_SET_IDENTIFIER = 0x2E + ADVERTISING_INTERVAL_LONG = 0x2F + BROADCAST_NAME = 0x30 + ENCRYPTED_ADVERTISING_DATA = 0X31 + PERIODIC_ADVERTISING_RESPONSE_TIMING_INFORMATION = 0X32 + ELECTRONIC_SHELF_LABEL = 0X34 + THREE_D_INFORMATION_DATA = 0x3D + MANUFACTURER_SPECIFIC_DATA = 0xFF AD_TYPE_NAMES = { - FLAGS: 'FLAGS', - INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS: 'INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS', - COMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS: 'COMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS', - INCOMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS: 'INCOMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS', - COMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS: 'COMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS', - INCOMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS: 'INCOMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS', - COMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS: 'COMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS', - SHORTENED_LOCAL_NAME: 'SHORTENED_LOCAL_NAME', - COMPLETE_LOCAL_NAME: 'COMPLETE_LOCAL_NAME', - TX_POWER_LEVEL: 'TX_POWER_LEVEL', - CLASS_OF_DEVICE: 'CLASS_OF_DEVICE', - SIMPLE_PAIRING_HASH_C: 'SIMPLE_PAIRING_HASH_C', - SIMPLE_PAIRING_HASH_C_192: 'SIMPLE_PAIRING_HASH_C_192', - SIMPLE_PAIRING_RANDOMIZER_R: 'SIMPLE_PAIRING_RANDOMIZER_R', - SIMPLE_PAIRING_RANDOMIZER_R_192: 'SIMPLE_PAIRING_RANDOMIZER_R_192', - DEVICE_ID: 'DEVICE_ID', - SECURITY_MANAGER_TK_VALUE: 'SECURITY_MANAGER_TK_VALUE', - SECURITY_MANAGER_OUT_OF_BAND_FLAGS: 'SECURITY_MANAGER_OUT_OF_BAND_FLAGS', - PERIPHERAL_CONNECTION_INTERVAL_RANGE: 'PERIPHERAL_CONNECTION_INTERVAL_RANGE', - LIST_OF_16_BIT_SERVICE_SOLICITATION_UUIDS: 'LIST_OF_16_BIT_SERVICE_SOLICITATION_UUIDS', - LIST_OF_128_BIT_SERVICE_SOLICITATION_UUIDS: 'LIST_OF_128_BIT_SERVICE_SOLICITATION_UUIDS', - SERVICE_DATA: 'SERVICE_DATA', - SERVICE_DATA_16_BIT_UUID: 'SERVICE_DATA_16_BIT_UUID', - PUBLIC_TARGET_ADDRESS: 'PUBLIC_TARGET_ADDRESS', - RANDOM_TARGET_ADDRESS: 'RANDOM_TARGET_ADDRESS', - APPEARANCE: 'APPEARANCE', - ADVERTISING_INTERVAL: 'ADVERTISING_INTERVAL', - LE_BLUETOOTH_DEVICE_ADDRESS: 'LE_BLUETOOTH_DEVICE_ADDRESS', - LE_ROLE: 'LE_ROLE', - SIMPLE_PAIRING_HASH_C_256: 'SIMPLE_PAIRING_HASH_C_256', - SIMPLE_PAIRING_RANDOMIZER_R_256: 'SIMPLE_PAIRING_RANDOMIZER_R_256', - LIST_OF_32_BIT_SERVICE_SOLICITATION_UUIDS: 'LIST_OF_32_BIT_SERVICE_SOLICITATION_UUIDS', - SERVICE_DATA_32_BIT_UUID: 'SERVICE_DATA_32_BIT_UUID', - SERVICE_DATA_128_BIT_UUID: 'SERVICE_DATA_128_BIT_UUID', - LE_SECURE_CONNECTIONS_CONFIRMATION_VALUE: 'LE_SECURE_CONNECTIONS_CONFIRMATION_VALUE', - LE_SECURE_CONNECTIONS_RANDOM_VALUE: 'LE_SECURE_CONNECTIONS_RANDOM_VALUE', - URI: 'URI', - INDOOR_POSITIONING: 'INDOOR_POSITIONING', - TRANSPORT_DISCOVERY_DATA: 'TRANSPORT_DISCOVERY_DATA', - LE_SUPPORTED_FEATURES: 'LE_SUPPORTED_FEATURES', - CHANNEL_MAP_UPDATE_INDICATION: 'CHANNEL_MAP_UPDATE_INDICATION', - PB_ADV: 'PB_ADV', - MESH_MESSAGE: 'MESH_MESSAGE', - MESH_BEACON: 'MESH_BEACON', - BIGINFO: 'BIGINFO', - BROADCAST_CODE: 'BROADCAST_CODE', - RESOLVABLE_SET_IDENTIFIER: 'RESOLVABLE_SET_IDENTIFIER', - ADVERTISING_INTERVAL_LONG: 'ADVERTISING_INTERVAL_LONG', - THREE_D_INFORMATION_DATA: 'THREE_D_INFORMATION_DATA', - MANUFACTURER_SPECIFIC_DATA: 'MANUFACTURER_SPECIFIC_DATA' + FLAGS: 'FLAGS', + INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS: 'INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS', + COMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS: 'COMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS', + INCOMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS: 'INCOMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS', + COMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS: 'COMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS', + INCOMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS: 'INCOMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS', + COMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS: 'COMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS', + SHORTENED_LOCAL_NAME: 'SHORTENED_LOCAL_NAME', + COMPLETE_LOCAL_NAME: 'COMPLETE_LOCAL_NAME', + TX_POWER_LEVEL: 'TX_POWER_LEVEL', + CLASS_OF_DEVICE: 'CLASS_OF_DEVICE', + SIMPLE_PAIRING_HASH_C: 'SIMPLE_PAIRING_HASH_C', + SIMPLE_PAIRING_HASH_C_192: 'SIMPLE_PAIRING_HASH_C_192', + SIMPLE_PAIRING_RANDOMIZER_R: 'SIMPLE_PAIRING_RANDOMIZER_R', + SIMPLE_PAIRING_RANDOMIZER_R_192: 'SIMPLE_PAIRING_RANDOMIZER_R_192', + DEVICE_ID: 'DEVICE_ID', + SECURITY_MANAGER_TK_VALUE: 'SECURITY_MANAGER_TK_VALUE', + SECURITY_MANAGER_OUT_OF_BAND_FLAGS: 'SECURITY_MANAGER_OUT_OF_BAND_FLAGS', + PERIPHERAL_CONNECTION_INTERVAL_RANGE: 'PERIPHERAL_CONNECTION_INTERVAL_RANGE', + LIST_OF_16_BIT_SERVICE_SOLICITATION_UUIDS: 'LIST_OF_16_BIT_SERVICE_SOLICITATION_UUIDS', + LIST_OF_128_BIT_SERVICE_SOLICITATION_UUIDS: 'LIST_OF_128_BIT_SERVICE_SOLICITATION_UUIDS', + SERVICE_DATA_16_BIT_UUID: 'SERVICE_DATA_16_BIT_UUID', + PUBLIC_TARGET_ADDRESS: 'PUBLIC_TARGET_ADDRESS', + RANDOM_TARGET_ADDRESS: 'RANDOM_TARGET_ADDRESS', + APPEARANCE: 'APPEARANCE', + ADVERTISING_INTERVAL: 'ADVERTISING_INTERVAL', + LE_BLUETOOTH_DEVICE_ADDRESS: 'LE_BLUETOOTH_DEVICE_ADDRESS', + LE_ROLE: 'LE_ROLE', + SIMPLE_PAIRING_HASH_C_256: 'SIMPLE_PAIRING_HASH_C_256', + SIMPLE_PAIRING_RANDOMIZER_R_256: 'SIMPLE_PAIRING_RANDOMIZER_R_256', + LIST_OF_32_BIT_SERVICE_SOLICITATION_UUIDS: 'LIST_OF_32_BIT_SERVICE_SOLICITATION_UUIDS', + SERVICE_DATA_32_BIT_UUID: 'SERVICE_DATA_32_BIT_UUID', + SERVICE_DATA_128_BIT_UUID: 'SERVICE_DATA_128_BIT_UUID', + LE_SECURE_CONNECTIONS_CONFIRMATION_VALUE: 'LE_SECURE_CONNECTIONS_CONFIRMATION_VALUE', + LE_SECURE_CONNECTIONS_RANDOM_VALUE: 'LE_SECURE_CONNECTIONS_RANDOM_VALUE', + URI: 'URI', + INDOOR_POSITIONING: 'INDOOR_POSITIONING', + TRANSPORT_DISCOVERY_DATA: 'TRANSPORT_DISCOVERY_DATA', + LE_SUPPORTED_FEATURES: 'LE_SUPPORTED_FEATURES', + CHANNEL_MAP_UPDATE_INDICATION: 'CHANNEL_MAP_UPDATE_INDICATION', + PB_ADV: 'PB_ADV', + MESH_MESSAGE: 'MESH_MESSAGE', + MESH_BEACON: 'MESH_BEACON', + BIGINFO: 'BIGINFO', + BROADCAST_CODE: 'BROADCAST_CODE', + RESOLVABLE_SET_IDENTIFIER: 'RESOLVABLE_SET_IDENTIFIER', + ADVERTISING_INTERVAL_LONG: 'ADVERTISING_INTERVAL_LONG', + BROADCAST_NAME: 'BROADCAST_NAME', + ENCRYPTED_ADVERTISING_DATA: 'ENCRYPTED_ADVERTISING_DATA', + PERIODIC_ADVERTISING_RESPONSE_TIMING_INFORMATION: 'PERIODIC_ADVERTISING_RESPONSE_TIMING_INFORMATION', + ELECTRONIC_SHELF_LABEL: 'ELECTRONIC_SHELF_LABEL', + THREE_D_INFORMATION_DATA: 'THREE_D_INFORMATION_DATA', + MANUFACTURER_SPECIFIC_DATA: 'MANUFACTURER_SPECIFIC_DATA' } LE_LIMITED_DISCOVERABLE_MODE_FLAG = 0x01 @@ -915,7 +1482,11 @@ class AdvertisingData: ad_data_str = f'company={company_name}, data={ad_data[2:].hex()}' elif ad_type == AdvertisingData.APPEARANCE: ad_type_str = 'Appearance' - ad_data_str = ad_data.hex() + appearance = Appearance.from_int(struct.unpack_from(' AdvertisingObject: + def ad_data_to_object(ad_type: int, ad_data: bytes) -> AdvertisingDataObject: if ad_type in ( AdvertisingData.COMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS, AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS, @@ -959,16 +1530,14 @@ class AdvertisingData: AdvertisingData.SHORTENED_LOCAL_NAME, AdvertisingData.COMPLETE_LOCAL_NAME, AdvertisingData.URI, + AdvertisingData.BROADCAST_NAME, ): return ad_data.decode("utf-8") if ad_type in (AdvertisingData.TX_POWER_LEVEL, AdvertisingData.FLAGS): return cast(int, struct.unpack('B', ad_data)[0]) - if ad_type in ( - AdvertisingData.APPEARANCE, - AdvertisingData.ADVERTISING_INTERVAL, - ): + if ad_type in (AdvertisingData.ADVERTISING_INTERVAL,): return cast(int, struct.unpack(' None: @@ -993,27 +1567,27 @@ class AdvertisingData: self.ad_structures.append((ad_type, ad_data)) offset += length - def get_all(self, type_id: int, raw: bool = False) -> List[AdvertisingObject]: + def get_all(self, type_id: int, raw: bool = False) -> List[AdvertisingDataObject]: ''' Get Advertising Data Structure(s) with a given type Returns a (possibly empty) list of matches. ''' - def process_ad_data(ad_data: bytes) -> AdvertisingObject: + def process_ad_data(ad_data: bytes) -> AdvertisingDataObject: return ad_data if raw else self.ad_data_to_object(type_id, ad_data) return [process_ad_data(ad[1]) for ad in self.ad_structures if ad[0] == type_id] - def get(self, type_id: int, raw: bool = False) -> Optional[AdvertisingObject]: + def get(self, type_id: int, raw: bool = False) -> Optional[AdvertisingDataObject]: ''' Get Advertising Data Structure(s) with a given type Returns the first entry, or None if no structure matches. ''' - all = self.get_all(type_id, raw=raw) - return all[0] if all else None + all_objects = self.get_all(type_id, raw=raw) + return all_objects[0] if all_objects else None def __bytes__(self): return b''.join( diff --git a/bumble/device.py b/bumble/device.py index f9e6b9d..30763e9 100644 --- a/bumble/device.py +++ b/bumble/device.py @@ -16,22 +16,21 @@ # Imports # ----------------------------------------------------------------------------- from __future__ import annotations -from enum import IntEnum -import copy -import functools -import json import asyncio -import logging -import secrets -import sys +from collections.abc import Iterable from contextlib import ( asynccontextmanager, AsyncExitStack, closing, - AbstractAsyncContextManager, ) +import copy from dataclasses import dataclass, field -from collections.abc import Iterable +from enum import Enum, IntEnum +import functools +import json +import logging +import secrets +import sys from typing import ( Any, Callable, @@ -81,6 +80,7 @@ from .hci import ( HCI_MITM_REQUIRED_GENERAL_BONDING_AUTHENTICATION_REQUIREMENTS, HCI_MITM_REQUIRED_NO_BONDING_AUTHENTICATION_REQUIREMENTS, HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY, + HCI_OPERATION_CANCELLED_BY_HOST_ERROR, HCI_R2_PAGE_SCAN_REPETITION_MODE, HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR, HCI_SUCCESS, @@ -102,11 +102,16 @@ from .hci import ( HCI_LE_Accept_CIS_Request_Command, HCI_LE_Add_Device_To_Resolving_List_Command, HCI_LE_Advertising_Report_Event, + HCI_LE_BIGInfo_Advertising_Report_Event, HCI_LE_Clear_Resolving_List_Command, HCI_LE_Connection_Update_Command, HCI_LE_Create_Connection_Cancel_Command, HCI_LE_Create_Connection_Command, HCI_LE_Create_CIS_Command, + HCI_LE_Periodic_Advertising_Create_Sync_Command, + HCI_LE_Periodic_Advertising_Create_Sync_Cancel_Command, + HCI_LE_Periodic_Advertising_Report_Event, + HCI_LE_Periodic_Advertising_Terminate_Sync_Command, HCI_LE_Enable_Encryption_Command, HCI_LE_Extended_Advertising_Report_Event, HCI_LE_Extended_Create_Connection_Command, @@ -248,6 +253,8 @@ DEVICE_DEFAULT_L2CAP_COC_MAX_CREDITS = l2cap.L2CAP_LE_CREDIT_BASED_CONN DEVICE_DEFAULT_ADVERTISING_TX_POWER = ( HCI_LE_Set_Extended_Advertising_Parameters_Command.TX_POWER_NO_PREFERENCE ) +DEVICE_DEFAULT_PERIODIC_ADVERTISING_SYNC_SKIP = 0 +DEVICE_DEFAULT_PERIODIC_ADVERTISING_SYNC_TIMEOUT = 5.0 # fmt: on # pylint: enable=line-too-long @@ -552,6 +559,70 @@ class AdvertisingEventProperties: ) +# ----------------------------------------------------------------------------- +@dataclass +class PeriodicAdvertisement: + address: Address + sid: int + tx_power: int = ( + HCI_LE_Periodic_Advertising_Report_Event.TX_POWER_INFORMATION_NOT_AVAILABLE + ) + rssi: int = HCI_LE_Periodic_Advertising_Report_Event.RSSI_NOT_AVAILABLE + is_truncated: bool = False + data_bytes: bytes = b'' + + # Constants + TX_POWER_NOT_AVAILABLE: ClassVar[int] = ( + HCI_LE_Periodic_Advertising_Report_Event.TX_POWER_INFORMATION_NOT_AVAILABLE + ) + RSSI_NOT_AVAILABLE: ClassVar[int] = ( + HCI_LE_Periodic_Advertising_Report_Event.RSSI_NOT_AVAILABLE + ) + + def __post_init__(self) -> None: + self.data = ( + None if self.is_truncated else AdvertisingData.from_bytes(self.data_bytes) + ) + + +# ----------------------------------------------------------------------------- +@dataclass +class BIGInfoAdvertisement: + address: Address + sid: int + num_bis: int + nse: int + iso_interval: int + bn: int + pto: int + irc: int + max_pdu: int + sdu_interval: int + max_sdu: int + phy: Phy + framed: bool + encrypted: bool + + @classmethod + def from_report(cls, address: Address, sid: int, report) -> Self: + return cls( + address, + sid, + report.num_bis, + report.nse, + report.iso_interval, + report.bn, + report.pto, + report.irc, + report.max_pdu, + report.sdu_interval, + report.max_sdu, + Phy(report.phy), + report.framing != 0, + report.encryption != 0, + ) + + # ----------------------------------------------------------------------------- # TODO: replace with typing.TypeAlias when the code base is all Python >= 3.10 AdvertisingChannelMap = HCI_LE_Set_Extended_Advertising_Parameters_Command.ChannelMap @@ -795,6 +866,201 @@ class AdvertisingSet(EventEmitter): self.emit('termination', status) +# ----------------------------------------------------------------------------- +class PeriodicAdvertisingSync(EventEmitter): + class State(Enum): + INIT = 0 + PENDING = 1 + ESTABLISHED = 2 + CANCELLED = 3 + ERROR = 4 + LOST = 5 + TERMINATED = 6 + + _state: State + sync_handle: Optional[int] + advertiser_address: Address + sid: int + skip: int + sync_timeout: float # Sync timeout, in seconds + filter_duplicates: bool + status: int + advertiser_phy: int + periodic_advertising_interval: int + advertiser_clock_accuracy: int + + def __init__( + self, + device: Device, + advertiser_address: Address, + sid: int, + skip: int, + sync_timeout: float, + filter_duplicates: bool, + ) -> None: + super().__init__() + self._state = self.State.INIT + self.sync_handle = None + self.device = device + self.advertiser_address = advertiser_address + self.sid = sid + self.skip = skip + self.sync_timeout = sync_timeout + self.filter_duplicates = filter_duplicates + self.status = HCI_SUCCESS + self.advertiser_phy = 0 + self.periodic_advertising_interval = 0 + self.advertiser_clock_accuracy = 0 + self.data_accumulator = b'' + + @property + def state(self) -> State: + return self._state + + @state.setter + def state(self, state: State) -> None: + logger.debug(f'{self} -> {state.name}') + self._state = state + self.emit('state_change') + + async def establish(self) -> None: + if self.state != self.State.INIT: + raise InvalidStateError('sync not in init state') + + options = HCI_LE_Periodic_Advertising_Create_Sync_Command.Options(0) + if self.filter_duplicates: + options |= ( + HCI_LE_Periodic_Advertising_Create_Sync_Command.Options.DUPLICATE_FILTERING_INITIALLY_ENABLED + ) + + response = await self.device.send_command( + HCI_LE_Periodic_Advertising_Create_Sync_Command( + options=options, + advertising_sid=self.sid, + advertiser_address_type=self.advertiser_address.address_type, + advertiser_address=self.advertiser_address, + skip=self.skip, + sync_timeout=int(self.sync_timeout * 100), + sync_cte_type=0, + ) + ) + if response.status != HCI_Command_Status_Event.PENDING: + raise HCI_StatusError(response) + + self.state = self.State.PENDING + + async def terminate(self) -> None: + if self.state in (self.State.INIT, self.State.CANCELLED, self.State.TERMINATED): + return + + if self.state == self.State.PENDING: + self.state = self.State.CANCELLED + response = await self.device.send_command( + HCI_LE_Periodic_Advertising_Create_Sync_Cancel_Command(), + ) + if response.status == HCI_SUCCESS: + if self in self.device.periodic_advertising_syncs: + self.device.periodic_advertising_syncs.remove(self) + return + + if self.state in (self.State.ESTABLISHED, self.State.ERROR, self.State.LOST): + self.state = self.State.TERMINATED + await self.device.send_command( + HCI_LE_Periodic_Advertising_Terminate_Sync_Command( + sync_handle=self.sync_handle + ) + ) + self.device.periodic_advertising_syncs.remove(self) + + def on_establishment( + self, + status, + sync_handle, + advertiser_phy, + periodic_advertising_interval, + advertiser_clock_accuracy, + ) -> None: + self.status = status + + if self.state == self.State.CANCELLED: + # Somehow, we receive an established event after trying to cancel, most + # likely because the cancel command was sent too late, when the sync was + # already established, but before the established event was sent. + # We need to automatically terminate. + logger.debug( + "received established event for cancelled sync, will terminate" + ) + self.state = self.State.ESTABLISHED + AsyncRunner.spawn(self.terminate()) + return + + if status == HCI_SUCCESS: + self.sync_handle = sync_handle + self.advertiser_phy = advertiser_phy + self.periodic_advertising_interval = periodic_advertising_interval + self.advertiser_clock_accuracy = advertiser_clock_accuracy + self.state = self.State.ESTABLISHED + self.emit('establishment') + return + + # We don't need to keep a reference anymore + if self in self.device.periodic_advertising_syncs: + self.device.periodic_advertising_syncs.remove(self) + + if status == HCI_OPERATION_CANCELLED_BY_HOST_ERROR: + self.state = self.State.CANCELLED + self.emit('cancellation') + return + + self.state = self.State.ERROR + self.emit('error') + + def on_loss(self): + self.state = self.State.LOST + self.emit('loss') + + def on_periodic_advertising_report(self, report) -> None: + self.data_accumulator += report.data + if ( + report.data_status + == HCI_LE_Periodic_Advertising_Report_Event.DataStatus.DATA_INCOMPLETE_MORE_TO_COME + ): + return + + self.emit( + 'periodic_advertisement', + PeriodicAdvertisement( + self.advertiser_address, + self.sid, + report.tx_power, + report.rssi, + is_truncated=( + report.data_status + == HCI_LE_Periodic_Advertising_Report_Event.DataStatus.DATA_INCOMPLETE_TRUNCATED_NO_MORE_TO_COME + ), + data_bytes=self.data_accumulator, + ), + ) + self.data_accumulator = b'' + + def on_biginfo_advertising_report(self, report) -> None: + self.emit( + 'biginfo_advertisement', + BIGInfoAdvertisement.from_report(self.advertiser_address, self.sid, report), + ) + + def __str__(self) -> str: + return ( + 'PeriodicAdvertisingSync(' + f'state={self.state.name}, ' + f'sync_handle={self.sync_handle}, ' + f'sid={self.sid}, ' + f'skip={self.skip}, ' + f'filter_duplicates={self.filter_duplicates}' + ')' + ) + + # ----------------------------------------------------------------------------- class LePhyOptions: # Coded PHY preference @@ -1409,6 +1675,20 @@ def try_with_connection_from_address(function): return wrapper +# Decorator that converts the first argument from a sync handle to a periodic +# advertising sync object +def with_periodic_advertising_sync_from_handle(function): + @functools.wraps(function) + def wrapper(self, sync_handle, *args, **kwargs): + if (sync := self.lookup_periodic_advertising_sync(sync_handle)) is None: + raise ValueError( + f'no periodic advertising sync for handle: 0x{sync_handle:04x}' + ) + return function(self, sync, *args, **kwargs) + + return wrapper + + # Decorator that adds a method to the list of event handlers for host events. # This assumes that the method name starts with `on_` def host_event_handler(function): @@ -1439,6 +1719,7 @@ class Device(CompositeEventEmitter): Address, List[asyncio.Future[Union[Connection, Tuple[Address, int, int]]]] ] advertisement_accumulators: Dict[Address, AdvertisementDataAccumulator] + periodic_advertising_syncs: List[PeriodicAdvertisingSync] config: DeviceConfiguration legacy_advertiser: Optional[LegacyAdvertiser] sco_links: Dict[int, ScoLink] @@ -1524,6 +1805,7 @@ class Device(CompositeEventEmitter): [l2cap.L2CAP_Information_Request.EXTENDED_FEATURE_FIXED_CHANNELS] ) self.advertisement_accumulators = {} # Accumulators, by address + self.periodic_advertising_syncs = [] self.scanning = False self.scanning_is_passive = False self.discovering = False @@ -1706,6 +1988,18 @@ class Device(CompositeEventEmitter): return None + def lookup_periodic_advertising_sync( + self, sync_handle: int + ) -> Optional[PeriodicAdvertisingSync]: + return next( + ( + sync + for sync in self.periodic_advertising_syncs + if sync.sync_handle == sync_handle + ), + None, + ) + @deprecated("Please use create_l2cap_server()") def register_l2cap_server(self, psm, server) -> int: return self.l2cap_channel_manager.register_server(psm, server) @@ -2368,6 +2662,116 @@ class Device(CompositeEventEmitter): if advertisement := accumulator.update(report): self.emit('advertisement', advertisement) + async def create_periodic_advertising_sync( + self, + advertiser_address: Address, + sid: int, + skip: int = DEVICE_DEFAULT_PERIODIC_ADVERTISING_SYNC_SKIP, + sync_timeout: float = DEVICE_DEFAULT_PERIODIC_ADVERTISING_SYNC_TIMEOUT, + filter_duplicates: bool = False, + ) -> PeriodicAdvertisingSync: + # Check that there isn't already an equivalent entry + if any( + sync.advertiser_address == advertiser_address and sync.sid == sid + for sync in self.periodic_advertising_syncs + ): + raise ValueError("equivalent entry already created") + + # Create a new entry + sync = PeriodicAdvertisingSync( + device=self, + advertiser_address=advertiser_address, + sid=sid, + skip=skip, + sync_timeout=sync_timeout, + filter_duplicates=filter_duplicates, + ) + + self.periodic_advertising_syncs.append(sync) + + # Check if any sync should be started + await self._update_periodic_advertising_syncs() + + return sync + + async def _update_periodic_advertising_syncs(self) -> None: + # Check if there's already a pending sync + if any( + sync.state == PeriodicAdvertisingSync.State.PENDING + for sync in self.periodic_advertising_syncs + ): + logger.debug("at least one sync pending, nothing to update yet") + return + + # Start the next sync that's waiting to be started + if ready := next( + ( + sync + for sync in self.periodic_advertising_syncs + if sync.state == PeriodicAdvertisingSync.State.INIT + ), + None, + ): + await ready.establish() + return + + @host_event_handler + def on_periodic_advertising_sync_establishment( + self, + status: int, + sync_handle: int, + advertising_sid: int, + advertiser_address: Address, + advertiser_phy: int, + periodic_advertising_interval: int, + advertiser_clock_accuracy: int, + ) -> None: + for periodic_advertising_sync in self.periodic_advertising_syncs: + if ( + periodic_advertising_sync.advertiser_address == advertiser_address + and periodic_advertising_sync.sid == advertising_sid + ): + periodic_advertising_sync.on_establishment( + status, + sync_handle, + advertiser_phy, + periodic_advertising_interval, + advertiser_clock_accuracy, + ) + + AsyncRunner.spawn(self._update_periodic_advertising_syncs()) + + return + + logger.warning( + "periodic advertising sync establishment for unknown address/sid" + ) + + @host_event_handler + @with_periodic_advertising_sync_from_handle + def on_periodic_advertising_sync_loss( + self, periodic_advertising_sync: PeriodicAdvertisingSync + ): + periodic_advertising_sync.on_loss() + + @host_event_handler + @with_periodic_advertising_sync_from_handle + def on_periodic_advertising_report( + self, + periodic_advertising_sync: PeriodicAdvertisingSync, + report: HCI_LE_Periodic_Advertising_Report_Event, + ): + periodic_advertising_sync.on_periodic_advertising_report(report) + + @host_event_handler + @with_periodic_advertising_sync_from_handle + def on_biginfo_advertising_report( + self, + periodic_advertising_sync: PeriodicAdvertisingSync, + report: HCI_LE_BIGInfo_Advertising_Report_Event, + ): + periodic_advertising_sync.on_biginfo_advertising_report(report) + async def start_discovery(self, auto_restart: bool = True) -> None: await self.send_command( HCI_Write_Inquiry_Mode_Command(inquiry_mode=HCI_EXTENDED_INQUIRY_MODE), diff --git a/bumble/hci.py b/bumble/hci.py index 3ef1032..30a9704 100644 --- a/bumble/hci.py +++ b/bumble/hci.py @@ -1381,7 +1381,7 @@ class LmpFeatureMask(enum.IntFlag): STATUS_SPEC = {'size': 1, 'mapper': lambda x: HCI_Constant.status_name(x)} -class CodecID(enum.IntEnum): +class CodecID(OpenIntEnum): # fmt: off U_LOG = 0x00 A_LOG = 0x01 @@ -1968,6 +1968,9 @@ class Address: def __str__(self): return self.to_string() + def __repr__(self): + return f'Address({self.to_string(False)}/{self.address_type_name(self.address_type)})' + # Predefined address values Address.NIL = Address(b"\xff\xff\xff\xff\xff\xff", Address.PUBLIC_DEVICE_ADDRESS) @@ -4453,6 +4456,80 @@ class HCI_LE_Extended_Create_Connection_Command(HCI_Command): ) +# ----------------------------------------------------------------------------- +@HCI_Command.command( + [ + ( + 'options', + { + 'size': 1, + 'mapper': lambda x: HCI_LE_Periodic_Advertising_Create_Sync_Command.Options( + x + ).name, + }, + ), + ('advertising_sid', 1), + ('advertiser_address_type', Address.ADDRESS_TYPE_SPEC), + ('advertiser_address', Address.parse_address_preceded_by_type), + ('skip', 2), + ('sync_timeout', 2), + ( + 'sync_cte_type', + { + 'size': 1, + 'mapper': lambda x: HCI_LE_Periodic_Advertising_Create_Sync_Command.CteType( + x + ).name, + }, + ), + ] +) +class HCI_LE_Periodic_Advertising_Create_Sync_Command(HCI_Command): + ''' + See Bluetooth spec @ 7.8.67 LE Periodic Advertising Create Sync command + ''' + + class Options(enum.IntFlag): + USE_PERIODIC_ADVERTISER_LIST = 1 << 0 + REPORTING_INITIALLY_DISABLED = 1 << 1 + DUPLICATE_FILTERING_INITIALLY_ENABLED = 1 << 2 + + class CteType(enum.IntFlag): + DO_NOT_SYNC_TO_PACKETS_WITH_AN_AOA_CONSTANT_TONE_EXTENSION = 1 << 0 + DO_NOT_SYNC_TO_PACKETS_WITH_AN_AOD_CONSTANT_TONE_EXTENSION_1US = 1 << 1 + DO_NOT_SYNC_TO_PACKETS_WITH_AN_AOD_CONSTANT_TONE_EXTENSION_2US = 1 << 2 + DO_NOT_SYNC_TO_PACKETS_WITH_A_TYPE_3_CONSTANT_TONE_EXTENSION = 1 << 3 + DO_NOT_SYNC_TO_PACKETS_WITHOUT_A_CONSTANT_TONE_EXTENSION = 1 << 4 + + +# ----------------------------------------------------------------------------- +@HCI_Command.command() +class HCI_LE_Periodic_Advertising_Create_Sync_Cancel_Command(HCI_Command): + ''' + See Bluetooth spec @ 7.8.68 LE Periodic Advertising Create Sync Cancel Command + ''' + + +# ----------------------------------------------------------------------------- +@HCI_Command.command([('sync_handle', 2)]) +class HCI_LE_Periodic_Advertising_Terminate_Sync_Command(HCI_Command): + ''' + See Bluetooth spec @ 7.8.69 LE Periodic Advertising Terminate Sync Command + ''' + + +# ----------------------------------------------------------------------------- +@HCI_Command.command([('sync_handle', 2), ('enable', 1)]) +class HCI_LE_Set_Periodic_Advertising_Receive_Enable_Command(HCI_Command): + ''' + See Bluetooth spec @ 7.8.88 LE Set Periodic Advertising Receive Enable Command + ''' + + class Enable(enum.IntFlag): + REPORTING_ENABLED = 1 << 0 + DUPLICATE_FILTERING_ENABLED = 1 << 1 + + # ----------------------------------------------------------------------------- @HCI_Command.command( [ @@ -4488,14 +4565,6 @@ class HCI_LE_Set_Privacy_Mode_Command(HCI_Command): return name_or_number(cls.PRIVACY_MODE_NAMES, privacy_mode) -# ----------------------------------------------------------------------------- -@HCI_Command.command([('bit_number', 1), ('bit_value', 1)]) -class HCI_LE_Set_Host_Feature_Command(HCI_Command): - ''' - See Bluetooth spec @ 7.8.115 LE Set Host Feature Command - ''' - - # ----------------------------------------------------------------------------- @HCI_Command.command( fields=[ @@ -4656,6 +4725,14 @@ class HCI_LE_Remove_ISO_Data_Path_Command(HCI_Command): data_path_direction: int +# ----------------------------------------------------------------------------- +@HCI_Command.command([('bit_number', 1), ('bit_value', 1)]) +class HCI_LE_Set_Host_Feature_Command(HCI_Command): + ''' + See Bluetooth spec @ 7.8.115 LE Set Host Feature Command + ''' + + # ----------------------------------------------------------------------------- # HCI Events # ----------------------------------------------------------------------------- @@ -5272,6 +5349,142 @@ HCI_LE_Meta_Event.subevent_classes[HCI_LE_EXTENDED_ADVERTISING_REPORT_EVENT] = ( ) +# ----------------------------------------------------------------------------- +@HCI_LE_Meta_Event.event( + [ + ('status', STATUS_SPEC), + ('sync_handle', 2), + ('advertising_sid', 1), + ('advertiser_address_type', Address.ADDRESS_TYPE_SPEC), + ('advertiser_address', Address.parse_address_preceded_by_type), + ('advertiser_phy', {'size': 1, 'mapper': HCI_Constant.le_phy_name}), + ('periodic_advertising_interval', 2), + ('advertiser_clock_accuracy', 1), + ] +) +class HCI_LE_Periodic_Advertising_Sync_Established_Event(HCI_LE_Meta_Event): + ''' + See Bluetooth spec @ 7.7.65.14 LE Periodic Advertising Sync Established Event + ''' + + +# ----------------------------------------------------------------------------- +@HCI_LE_Meta_Event.event( + [ + ('status', STATUS_SPEC), + ('sync_handle', 2), + ('advertising_sid', 1), + ('advertiser_address_type', Address.ADDRESS_TYPE_SPEC), + ('advertiser_address', Address.parse_address_preceded_by_type), + ('advertiser_phy', {'size': 1, 'mapper': HCI_Constant.le_phy_name}), + ('periodic_advertising_interval', 2), + ('advertiser_clock_accuracy', 1), + ('num_subevents', 1), + ('subevent_interval', 1), + ('response_slot_delay', 1), + ('response_slot_spacing', 1), + ] +) +class HCI_LE_Periodic_Advertising_Sync_Established_V2_Event(HCI_LE_Meta_Event): + ''' + See Bluetooth spec @ 7.7.65.14 LE Periodic Advertising Sync Established Event + ''' + + +# ----------------------------------------------------------------------------- +@HCI_LE_Meta_Event.event( + [ + ('sync_handle', 2), + ('tx_power', -1), + ('rssi', -1), + ( + 'cte_type', + { + 'size': 1, + 'mapper': lambda x: HCI_LE_Periodic_Advertising_Report_Event.CteType( + x + ).name, + }, + ), + ( + 'data_status', + { + 'size': 1, + 'mapper': lambda x: HCI_LE_Periodic_Advertising_Report_Event.DataStatus( + x + ).name, + }, + ), + ('data', 'v'), + ] +) +class HCI_LE_Periodic_Advertising_Report_Event(HCI_LE_Meta_Event): + ''' + See Bluetooth spec @ 7.7.65.15 LE Periodic Advertising Report Event + ''' + + TX_POWER_INFORMATION_NOT_AVAILABLE = 0x7F + RSSI_NOT_AVAILABLE = 0x7F + + class CteType(OpenIntEnum): + AOA_CONSTANT_TONE_EXTENSION = 0x00 + AOD_CONSTANT_TONE_EXTENSION_1US = 0x01 + AOD_CONSTANT_TONE_EXTENSION_2US = 0x02 + NO_CONSTANT_TONE_EXTENSION = 0xFF + + class DataStatus(OpenIntEnum): + DATA_COMPLETE = 0x00 + DATA_INCOMPLETE_MORE_TO_COME = 0x01 + DATA_INCOMPLETE_TRUNCATED_NO_MORE_TO_COME = 0x02 + + +# ----------------------------------------------------------------------------- +@HCI_LE_Meta_Event.event( + [ + ('sync_handle', 2), + ('tx_power', -1), + ('rssi', -1), + ( + 'cte_type', + { + 'size': 1, + 'mapper': lambda x: HCI_LE_Periodic_Advertising_Report_Event.CteType( + x + ).name, + }, + ), + ('periodic_event_counter', 2), + ('subevent', 1), + ( + 'data_status', + { + 'size': 1, + 'mapper': lambda x: HCI_LE_Periodic_Advertising_Report_Event.DataStatus( + x + ).name, + }, + ), + ('data', 'v'), + ] +) +class HCI_LE_Periodic_Advertising_Report_V2_Event(HCI_LE_Meta_Event): + ''' + See Bluetooth spec @ 7.7.65.15 LE Periodic Advertising Report Event + ''' + + +# ----------------------------------------------------------------------------- +@HCI_LE_Meta_Event.event( + [ + ('sync_handle', 2), + ] +) +class HCI_LE_Periodic_Advertising_Sync_Lost_Event(HCI_LE_Meta_Event): + ''' + See Bluetooth spec @ 7.7.65.16 LE Periodic Advertising Sync Lost Event + ''' + + # ----------------------------------------------------------------------------- @HCI_LE_Meta_Event.event( [ @@ -5337,6 +5550,30 @@ class HCI_LE_CIS_Request_Event(HCI_LE_Meta_Event): ''' +# ----------------------------------------------------------------------------- +@HCI_LE_Meta_Event.event( + [ + ('sync_handle', 2), + ('num_bis', 1), + ('nse', 1), + ('iso_interval', 2), + ('bn', 1), + ('pto', 1), + ('irc', 1), + ('max_pdu', 2), + ('sdu_interval', 3), + ('max_sdu', 2), + ('phy', {'size': 1, 'mapper': HCI_Constant.le_phy_name}), + ('framing', 1), + ('encryption', 1), + ] +) +class HCI_LE_BIGInfo_Advertising_Report_Event(HCI_LE_Meta_Event): + ''' + See Bluetooth spec @ 7.7.65.34 LE BIGInfo Advertising Report Event + ''' + + # ----------------------------------------------------------------------------- @HCI_Event.event([('status', STATUS_SPEC)]) class HCI_Inquiry_Complete_Event(HCI_Event): diff --git a/bumble/host.py b/bumble/host.py index 64b6668..41fbabd 100644 --- a/bumble/host.py +++ b/bumble/host.py @@ -905,6 +905,27 @@ class Host(AbortableEventEmitter): event.num_completed_extended_advertising_events, ) + def on_hci_le_periodic_advertising_sync_established_event(self, event): + self.emit( + 'periodic_advertising_sync_establishment', + event.status, + event.sync_handle, + event.advertising_sid, + event.advertiser_address, + event.advertiser_phy, + event.periodic_advertising_interval, + event.advertiser_clock_accuracy, + ) + + def on_hci_le_periodic_advertising_sync_lost_event(self, event): + self.emit('periodic_advertising_sync_loss', event.sync_handle) + + def on_hci_le_periodic_advertising_report_event(self, event): + self.emit('periodic_advertising_report', event.sync_handle, event) + + def on_hci_le_biginfo_advertising_report_event(self, event): + self.emit('biginfo_advertising_report', event.sync_handle, event) + def on_hci_le_cis_request_event(self, event): self.emit( 'cis_request', diff --git a/bumble/pandora/host.py b/bumble/pandora/host.py index 4904274..aff063c 100644 --- a/bumble/pandora/host.py +++ b/bumble/pandora/host.py @@ -28,6 +28,7 @@ from bumble.core import ( BT_PERIPHERAL_ROLE, UUID, AdvertisingData, + Appearance, ConnectionError, ) from bumble.device import ( @@ -988,8 +989,8 @@ class HostService(HostServicer): dt.random_target_addresses.extend( [data[i * 6 :: i * 6 + 6] for i in range(int(len(data) / 6))] ) - if i := cast(int, ad.get(AdvertisingData.APPEARANCE)): - dt.appearance = i + if appearance := cast(Appearance, ad.get(AdvertisingData.APPEARANCE)): + dt.appearance = int(appearance) if i := cast(int, ad.get(AdvertisingData.ADVERTISING_INTERVAL)): dt.advertising_interval = i if s := cast(str, ad.get(AdvertisingData.URI)): diff --git a/bumble/profiles/bap.py b/bumble/profiles/bap.py index c0123b1..188ca27 100644 --- a/bumble/profiles/bap.py +++ b/bumble/profiles/bap.py @@ -25,6 +25,7 @@ import struct import functools import logging from typing import Optional, List, Union, Type, Dict, Any, Tuple +from typing_extensions import Self from bumble import core from bumble import colors @@ -32,6 +33,8 @@ from bumble import device from bumble import hci from bumble import gatt from bumble import gatt_client +from bumble import utils +from bumble.profiles import le_audio # ----------------------------------------------------------------------------- @@ -115,7 +118,7 @@ class ContextType(enum.IntFlag): EMERGENCY_ALARM = 0x0800 -class SamplingFrequency(enum.IntEnum): +class SamplingFrequency(utils.OpenIntEnum): '''Bluetooth Assigned Numbers, Section 6.12.5.1 - Sampling Frequency''' # fmt: off @@ -240,7 +243,7 @@ class SupportedFrameDuration(enum.IntFlag): DURATION_10000_US_PREFERRED = 0b0010 -class AnnouncementType(enum.IntEnum): +class AnnouncementType(utils.OpenIntEnum): '''Basic Audio Profile, 3.5.3. Additional Audio Stream Control Service requirements''' # fmt: off @@ -613,7 +616,7 @@ class CodecSpecificConfiguration: * Basic Audio Profile, 4.3.2 - Codec_Specific_Capabilities LTV requirements ''' - class Type(enum.IntEnum): + class Type(utils.OpenIntEnum): # fmt: off SAMPLING_FREQUENCY = 0x01 FRAME_DURATION = 0x02 @@ -725,6 +728,99 @@ class PacRecord: ) +@dataclasses.dataclass +class BroadcastAudioAnnouncement: + broadcast_id: int + + @classmethod + def from_bytes(cls, data: bytes) -> Self: + return cls(int.from_bytes(data[:3], 'little')) + + +@dataclasses.dataclass +class BasicAudioAnnouncement: + @dataclasses.dataclass + class BIS: + index: int + codec_specific_configuration: CodecSpecificConfiguration + + @dataclasses.dataclass + class CodecInfo: + coding_format: hci.CodecID + company_id: int + vendor_specific_codec_id: int + + @classmethod + def from_bytes(cls, data: bytes) -> Self: + coding_format = hci.CodecID(data[0]) + company_id = int.from_bytes(data[1:3], 'little') + vendor_specific_codec_id = int.from_bytes(data[3:5], 'little') + return cls(coding_format, company_id, vendor_specific_codec_id) + + @dataclasses.dataclass + class Subgroup: + codec_id: BasicAudioAnnouncement.CodecInfo + codec_specific_configuration: CodecSpecificConfiguration + metadata: le_audio.Metadata + bis: List[BasicAudioAnnouncement.BIS] + + presentation_delay: int + subgroups: List[BasicAudioAnnouncement.Subgroup] + + @classmethod + def from_bytes(cls, data: bytes) -> Self: + presentation_delay = int.from_bytes(data[:3], 'little') + subgroups = [] + offset = 4 + for _ in range(data[3]): + num_bis = data[offset] + offset += 1 + codec_id = cls.CodecInfo.from_bytes(data[offset : offset + 5]) + offset += 5 + codec_specific_configuration_length = data[offset] + offset += 1 + codec_specific_configuration = data[ + offset : offset + codec_specific_configuration_length + ] + offset += codec_specific_configuration_length + metadata_length = data[offset] + offset += 1 + metadata = le_audio.Metadata.from_bytes( + data[offset : offset + metadata_length] + ) + offset += metadata_length + + bis = [] + for _ in range(num_bis): + bis_index = data[offset] + offset += 1 + bis_codec_specific_configuration_length = data[offset] + offset += 1 + bis_codec_specific_configuration = data[ + offset : offset + bis_codec_specific_configuration_length + ] + offset += bis_codec_specific_configuration_length + bis.append( + cls.BIS( + bis_index, + CodecSpecificConfiguration.from_bytes( + bis_codec_specific_configuration + ), + ) + ) + + subgroups.append( + cls.Subgroup( + codec_id, + CodecSpecificConfiguration.from_bytes(codec_specific_configuration), + metadata, + bis, + ) + ) + + return cls(presentation_delay, subgroups) + + # ----------------------------------------------------------------------------- # Server # ----------------------------------------------------------------------------- @@ -744,9 +840,9 @@ class PublishedAudioCapabilitiesService(gatt.TemplateService): supported_sink_context: ContextType, available_source_context: ContextType, available_sink_context: ContextType, - sink_pac: Sequence[PacRecord] = [], + sink_pac: Sequence[PacRecord] = (), sink_audio_locations: Optional[AudioLocation] = None, - source_pac: Sequence[PacRecord] = [], + source_pac: Sequence[PacRecord] = (), source_audio_locations: Optional[AudioLocation] = None, ) -> None: characteristics = [] diff --git a/bumble/profiles/le_audio.py b/bumble/profiles/le_audio.py new file mode 100644 index 0000000..8d1e629 --- /dev/null +++ b/bumble/profiles/le_audio.py @@ -0,0 +1,49 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# ----------------------------------------------------------------------------- +# Imports +# ----------------------------------------------------------------------------- +from __future__ import annotations +import dataclasses +from typing import List +from typing_extensions import Self + + +# ----------------------------------------------------------------------------- +# Classes +# ----------------------------------------------------------------------------- +@dataclasses.dataclass +class Metadata: + @dataclasses.dataclass + class Entry: + tag: int + data: bytes + + entries: List[Entry] + + @classmethod + def from_bytes(cls, data: bytes) -> Self: + entries = [] + offset = 0 + length = len(data) + while length >= 2: + entry_length = data[offset] + entry_tag = data[offset + 1] + entry_data = data[offset + 2 : offset + 2 + entry_length - 1] + entries.append(cls.Entry(entry_tag, entry_data)) + length -= entry_length + offset += entry_length + + return cls(entries) diff --git a/bumble/profiles/pbp.py b/bumble/profiles/pbp.py new file mode 100644 index 0000000..058bd6d --- /dev/null +++ b/bumble/profiles/pbp.py @@ -0,0 +1,46 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# ----------------------------------------------------------------------------- +# Imports +# ----------------------------------------------------------------------------- +from __future__ import annotations +import dataclasses +import enum +from typing_extensions import Self + +from bumble.profiles import le_audio + + +# ----------------------------------------------------------------------------- +# Classes +# ----------------------------------------------------------------------------- +@dataclasses.dataclass +class PublicBroadcastAnnouncement: + class Features(enum.IntFlag): + ENCRYPTED = 1 << 0 + STANDARD_QUALITY_CONFIGURATION = 1 << 1 + HIGH_QUALITY_CONFIGURATION = 1 << 2 + + features: Features + metadata: le_audio.Metadata + + @classmethod + def from_bytes(cls, data: bytes) -> Self: + features = cls.Features(data[0]) + metadata_length = data[1] + metadata_ltv = data[1 : 1 + metadata_length] + return cls( + features=features, metadata=le_audio.Metadata.from_bytes(metadata_ltv) + ) diff --git a/tests/core_test.py b/tests/core_test.py index 11afb1c..7592082 100644 --- a/tests/core_test.py +++ b/tests/core_test.py @@ -15,7 +15,9 @@ # ----------------------------------------------------------------------------- # Imports # ----------------------------------------------------------------------------- -from bumble.core import AdvertisingData, UUID, get_dict_key_by_value +from enum import IntEnum + +from bumble.core import AdvertisingData, Appearance, UUID, get_dict_key_by_value # ----------------------------------------------------------------------------- @@ -66,8 +68,35 @@ def test_uuid_to_hex_str() -> None: ) +# ----------------------------------------------------------------------------- +def test_appearance() -> None: + a = Appearance(Appearance.Category.COMPUTER, Appearance.ComputerSubcategory.LAPTOP) + assert str(a) == 'COMPUTER/LAPTOP' + assert int(a) == 0x0083 + + a = Appearance(Appearance.Category.HUMAN_INTERFACE_DEVICE, 0x77) + assert str(a) == 'HUMAN_INTERFACE_DEVICE/HumanInterfaceDeviceSubcategory[119]' + assert int(a) == 0x03C0 | 0x77 + + a = Appearance.from_int(0x0381) + assert a.category == Appearance.Category.BLOOD_PRESSURE + assert a.subcategory == Appearance.BloodPressureSubcategory.ARM_BLOOD_PRESSURE + assert int(a) == 0x381 + + a = Appearance.from_int(0x038A) + assert a.category == Appearance.Category.BLOOD_PRESSURE + assert a.subcategory == 0x0A + assert int(a) == 0x038A + + a = Appearance.from_int(0x3333) + assert a.category == 0xCC + assert a.subcategory == 0x33 + assert int(a) == 0x3333 + + # ----------------------------------------------------------------------------- if __name__ == '__main__': test_ad_data() test_get_dict_key_by_value() test_uuid_to_hex_str() + test_appearance()