Merge pull request #646 from zxzxwu/ad

Improve AdvertisingData type annotations
This commit is contained in:
zxzxwu
2025-02-22 02:52:09 +08:00
committed by GitHub
4 changed files with 323 additions and 185 deletions

View File

@@ -127,11 +127,9 @@ class BroadcastScanner(pyee.EventEmitter):
def update(self, advertisement: bumble.device.Advertisement) -> None: def update(self, advertisement: bumble.device.Advertisement) -> None:
self.rssi = advertisement.rssi self.rssi = advertisement.rssi
for service_data in advertisement.data.get_all( for service_data in advertisement.data.get_all(
core.AdvertisingData.SERVICE_DATA core.AdvertisingData.Type.SERVICE_DATA_16_BIT_UUID
): ):
assert isinstance(service_data, tuple)
service_uuid, data = service_data service_uuid, data = service_data
assert isinstance(data, bytes)
if service_uuid == gatt.GATT_PUBLIC_BROADCAST_ANNOUNCEMENT_SERVICE: if service_uuid == gatt.GATT_PUBLIC_BROADCAST_ANNOUNCEMENT_SERVICE:
self.public_broadcast_announcement = ( self.public_broadcast_announcement = (
@@ -145,16 +143,14 @@ class BroadcastScanner(pyee.EventEmitter):
) )
continue continue
self.appearance = advertisement.data.get( # type: ignore[assignment] self.appearance = advertisement.data.get(
core.AdvertisingData.APPEARANCE core.AdvertisingData.Type.APPEARANCE
) )
if manufacturer_data := advertisement.data.get( if manufacturer_data := advertisement.data.get(
core.AdvertisingData.MANUFACTURER_SPECIFIC_DATA core.AdvertisingData.Type.MANUFACTURER_SPECIFIC_DATA
): ):
assert isinstance(manufacturer_data, tuple) company_id, data = manufacturer_data
company_id = cast(int, manufacturer_data[0])
data = cast(bytes, manufacturer_data[1])
self.manufacturer_data = ( self.manufacturer_data = (
company_ids.COMPANY_IDENTIFIERS.get( company_ids.COMPANY_IDENTIFIERS.get(
company_id, f'0x{company_id:04X}' company_id, f'0x{company_id:04X}'
@@ -271,11 +267,9 @@ class BroadcastScanner(pyee.EventEmitter):
return return
for service_data in advertisement.data.get_all( for service_data in advertisement.data.get_all(
core.AdvertisingData.SERVICE_DATA core.AdvertisingData.Type.SERVICE_DATA_16_BIT_UUID
): ):
assert isinstance(service_data, tuple)
service_uuid, data = service_data service_uuid, data = service_data
assert isinstance(data, bytes)
if service_uuid == gatt.GATT_BASIC_AUDIO_ANNOUNCEMENT_SERVICE: if service_uuid == gatt.GATT_BASIC_AUDIO_ANNOUNCEMENT_SERVICE:
self.basic_audio_announcement = ( self.basic_audio_announcement = (
@@ -316,24 +310,23 @@ class BroadcastScanner(pyee.EventEmitter):
def on_advertisement(self, advertisement: bumble.device.Advertisement) -> None: def on_advertisement(self, advertisement: bumble.device.Advertisement) -> None:
if not ( if not (
ads := advertisement.data.get_all( ads := advertisement.data.get_all(
core.AdvertisingData.SERVICE_DATA_16_BIT_UUID core.AdvertisingData.Type.SERVICE_DATA_16_BIT_UUID
) )
) or not ( ) or not (
broadcast_audio_announcement := next( broadcast_audio_announcement := next(
( (
ad ad
for ad in ads for ad in ads
if isinstance(ad, tuple) if ad[0] == gatt.GATT_BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE
and ad[0] == gatt.GATT_BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE
), ),
None, None,
) )
): ):
return return
broadcast_name = advertisement.data.get(core.AdvertisingData.BROADCAST_NAME) broadcast_name = advertisement.data.get_all(
assert isinstance(broadcast_name, str) or broadcast_name is None core.AdvertisingData.Type.BROADCAST_NAME
assert isinstance(broadcast_audio_announcement[1], bytes) )
if broadcast := self.broadcasts.get(advertisement.address): if broadcast := self.broadcasts.get(advertisement.address):
broadcast.update(advertisement) broadcast.update(advertisement)
@@ -341,7 +334,7 @@ class BroadcastScanner(pyee.EventEmitter):
bumble.utils.AsyncRunner.spawn( bumble.utils.AsyncRunner.spawn(
self.on_new_broadcast( self.on_new_broadcast(
broadcast_name, broadcast_name[0] if broadcast_name else None,
advertisement, advertisement,
bap.BroadcastAudioAnnouncement.from_bytes( bap.BroadcastAudioAnnouncement.from_bytes(
broadcast_audio_announcement[1] broadcast_audio_announcement[1]

View File

@@ -1,4 +1,4 @@
# Copyright 2021-2022 Google LLC # Copyright 2021-2025 Google LLC
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
@@ -16,10 +16,10 @@
# Imports # Imports
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
from __future__ import annotations from __future__ import annotations
import dataclasses
import enum import enum
import struct import struct
from typing import List, Optional, Tuple, Union, cast, Dict from typing import cast, overload, Literal, Union, Optional
from typing_extensions import Self from typing_extensions import Self
from bumble.company_ids import COMPANY_IDENTIFIERS from bumble.company_ids import COMPANY_IDENTIFIERS
@@ -57,7 +57,7 @@ def bit_flags_to_strings(bits, bit_flag_names):
return names return names
def name_or_number(dictionary: Dict[int, str], number: int, width: int = 2) -> str: def name_or_number(dictionary: dict[int, str], number: int, width: int = 2) -> str:
name = dictionary.get(number) name = dictionary.get(number)
if name is not None: if name is not None:
return name return name
@@ -200,7 +200,7 @@ class UUID:
''' '''
BASE_UUID = bytes.fromhex('00001000800000805F9B34FB')[::-1] # little-endian BASE_UUID = bytes.fromhex('00001000800000805F9B34FB')[::-1] # little-endian
UUIDS: List[UUID] = [] # Registry of all instances created UUIDS: list[UUID] = [] # Registry of all instances created
uuid_bytes: bytes uuid_bytes: bytes
name: Optional[str] name: Optional[str]
@@ -259,11 +259,11 @@ class UUID:
return cls.from_bytes(struct.pack('<I', uuid_32), name) return cls.from_bytes(struct.pack('<I', uuid_32), name)
@classmethod @classmethod
def parse_uuid(cls, uuid_as_bytes: bytes, offset: int) -> Tuple[int, UUID]: def parse_uuid(cls, uuid_as_bytes: bytes, offset: int) -> tuple[int, UUID]:
return len(uuid_as_bytes), cls.from_bytes(uuid_as_bytes[offset:]) return len(uuid_as_bytes), cls.from_bytes(uuid_as_bytes[offset:])
@classmethod @classmethod
def parse_uuid_2(cls, uuid_as_bytes: bytes, offset: int) -> Tuple[int, UUID]: def parse_uuid_2(cls, uuid_as_bytes: bytes, offset: int) -> tuple[int, UUID]:
return offset + 2, cls.from_bytes(uuid_as_bytes[offset : offset + 2]) return offset + 2, cls.from_bytes(uuid_as_bytes[offset : offset + 2])
def to_bytes(self, force_128: bool = False) -> bytes: def to_bytes(self, force_128: bool = False) -> bytes:
@@ -1280,13 +1280,13 @@ class Appearance:
# Advertising Data # Advertising Data
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
AdvertisingDataObject = Union[ AdvertisingDataObject = Union[
List[UUID], list[UUID],
Tuple[UUID, bytes], tuple[UUID, bytes],
bytes, bytes,
str, str,
int, int,
Tuple[int, int], tuple[int, int],
Tuple[int, bytes], tuple[int, bytes],
Appearance, Appearance,
] ]
@@ -1295,6 +1295,7 @@ class AdvertisingData:
# fmt: off # fmt: off
# pylint: disable=line-too-long # pylint: disable=line-too-long
class Type(OpenIntEnum):
FLAGS = 0x01 FLAGS = 0x01
INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS = 0x02 INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS = 0x02
COMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS = 0x03 COMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS = 0x03
@@ -1316,7 +1317,6 @@ class AdvertisingData:
PERIPHERAL_CONNECTION_INTERVAL_RANGE = 0x12 PERIPHERAL_CONNECTION_INTERVAL_RANGE = 0x12
LIST_OF_16_BIT_SERVICE_SOLICITATION_UUIDS = 0x14 LIST_OF_16_BIT_SERVICE_SOLICITATION_UUIDS = 0x14
LIST_OF_128_BIT_SERVICE_SOLICITATION_UUIDS = 0x15 LIST_OF_128_BIT_SERVICE_SOLICITATION_UUIDS = 0x15
SERVICE_DATA = 0x16
SERVICE_DATA_16_BIT_UUID = 0x16 SERVICE_DATA_16_BIT_UUID = 0x16
PUBLIC_TARGET_ADDRESS = 0x17 PUBLIC_TARGET_ADDRESS = 0x17
RANDOM_TARGET_ADDRESS = 0x18 RANDOM_TARGET_ADDRESS = 0x18
@@ -1344,67 +1344,67 @@ class AdvertisingData:
RESOLVABLE_SET_IDENTIFIER = 0x2E RESOLVABLE_SET_IDENTIFIER = 0x2E
ADVERTISING_INTERVAL_LONG = 0x2F ADVERTISING_INTERVAL_LONG = 0x2F
BROADCAST_NAME = 0x30 BROADCAST_NAME = 0x30
ENCRYPTED_ADVERTISING_DATA = 0X31 ENCRYPTED_ADVERTISING_DATA = 0x31
PERIODIC_ADVERTISING_RESPONSE_TIMING_INFORMATION = 0X32 PERIODIC_ADVERTISING_RESPONSE_TIMING_INFORMATION = 0x32
ELECTRONIC_SHELF_LABEL = 0X34 ELECTRONIC_SHELF_LABEL = 0x34
THREE_D_INFORMATION_DATA = 0x3D THREE_D_INFORMATION_DATA = 0x3D
MANUFACTURER_SPECIFIC_DATA = 0xFF MANUFACTURER_SPECIFIC_DATA = 0xFF
AD_TYPE_NAMES = { # For backward-compatibility
FLAGS: 'FLAGS', FLAGS = Type.FLAGS
INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS: 'INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS', INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS = Type.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS
COMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS: 'COMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS', COMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS = Type.COMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS
INCOMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS: 'INCOMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS', INCOMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS = Type.INCOMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS
COMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS: 'COMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS', COMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS = Type.COMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS
INCOMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS: 'INCOMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS', INCOMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS = Type.INCOMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS
COMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS: 'COMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS', COMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS = Type.COMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS
SHORTENED_LOCAL_NAME: 'SHORTENED_LOCAL_NAME', SHORTENED_LOCAL_NAME = Type.SHORTENED_LOCAL_NAME
COMPLETE_LOCAL_NAME: 'COMPLETE_LOCAL_NAME', COMPLETE_LOCAL_NAME = Type.COMPLETE_LOCAL_NAME
TX_POWER_LEVEL: 'TX_POWER_LEVEL', TX_POWER_LEVEL = Type.TX_POWER_LEVEL
CLASS_OF_DEVICE: 'CLASS_OF_DEVICE', CLASS_OF_DEVICE = Type.CLASS_OF_DEVICE
SIMPLE_PAIRING_HASH_C: 'SIMPLE_PAIRING_HASH_C', SIMPLE_PAIRING_HASH_C = Type.SIMPLE_PAIRING_HASH_C
SIMPLE_PAIRING_HASH_C_192: 'SIMPLE_PAIRING_HASH_C_192', SIMPLE_PAIRING_HASH_C_192 = Type.SIMPLE_PAIRING_HASH_C_192
SIMPLE_PAIRING_RANDOMIZER_R: 'SIMPLE_PAIRING_RANDOMIZER_R', SIMPLE_PAIRING_RANDOMIZER_R = Type.SIMPLE_PAIRING_RANDOMIZER_R
SIMPLE_PAIRING_RANDOMIZER_R_192: 'SIMPLE_PAIRING_RANDOMIZER_R_192', SIMPLE_PAIRING_RANDOMIZER_R_192 = Type.SIMPLE_PAIRING_RANDOMIZER_R_192
DEVICE_ID: 'DEVICE_ID', DEVICE_ID = Type.DEVICE_ID
SECURITY_MANAGER_TK_VALUE: 'SECURITY_MANAGER_TK_VALUE', SECURITY_MANAGER_TK_VALUE = Type.SECURITY_MANAGER_TK_VALUE
SECURITY_MANAGER_OUT_OF_BAND_FLAGS: 'SECURITY_MANAGER_OUT_OF_BAND_FLAGS', SECURITY_MANAGER_OUT_OF_BAND_FLAGS = Type.SECURITY_MANAGER_OUT_OF_BAND_FLAGS
PERIPHERAL_CONNECTION_INTERVAL_RANGE: 'PERIPHERAL_CONNECTION_INTERVAL_RANGE', PERIPHERAL_CONNECTION_INTERVAL_RANGE = Type.PERIPHERAL_CONNECTION_INTERVAL_RANGE
LIST_OF_16_BIT_SERVICE_SOLICITATION_UUIDS: 'LIST_OF_16_BIT_SERVICE_SOLICITATION_UUIDS', LIST_OF_16_BIT_SERVICE_SOLICITATION_UUIDS = Type.LIST_OF_16_BIT_SERVICE_SOLICITATION_UUIDS
LIST_OF_128_BIT_SERVICE_SOLICITATION_UUIDS: 'LIST_OF_128_BIT_SERVICE_SOLICITATION_UUIDS', LIST_OF_128_BIT_SERVICE_SOLICITATION_UUIDS = Type.LIST_OF_128_BIT_SERVICE_SOLICITATION_UUIDS
SERVICE_DATA_16_BIT_UUID: 'SERVICE_DATA_16_BIT_UUID', SERVICE_DATA = Type.SERVICE_DATA_16_BIT_UUID
PUBLIC_TARGET_ADDRESS: 'PUBLIC_TARGET_ADDRESS', SERVICE_DATA_16_BIT_UUID = Type.SERVICE_DATA_16_BIT_UUID
RANDOM_TARGET_ADDRESS: 'RANDOM_TARGET_ADDRESS', PUBLIC_TARGET_ADDRESS = Type.PUBLIC_TARGET_ADDRESS
APPEARANCE: 'APPEARANCE', RANDOM_TARGET_ADDRESS = Type.RANDOM_TARGET_ADDRESS
ADVERTISING_INTERVAL: 'ADVERTISING_INTERVAL', APPEARANCE = Type.APPEARANCE
LE_BLUETOOTH_DEVICE_ADDRESS: 'LE_BLUETOOTH_DEVICE_ADDRESS', ADVERTISING_INTERVAL = Type.ADVERTISING_INTERVAL
LE_ROLE: 'LE_ROLE', LE_BLUETOOTH_DEVICE_ADDRESS = Type.LE_BLUETOOTH_DEVICE_ADDRESS
SIMPLE_PAIRING_HASH_C_256: 'SIMPLE_PAIRING_HASH_C_256', LE_ROLE = Type.LE_ROLE
SIMPLE_PAIRING_RANDOMIZER_R_256: 'SIMPLE_PAIRING_RANDOMIZER_R_256', SIMPLE_PAIRING_HASH_C_256 = Type.SIMPLE_PAIRING_HASH_C_256
LIST_OF_32_BIT_SERVICE_SOLICITATION_UUIDS: 'LIST_OF_32_BIT_SERVICE_SOLICITATION_UUIDS', SIMPLE_PAIRING_RANDOMIZER_R_256 = Type.SIMPLE_PAIRING_RANDOMIZER_R_256
SERVICE_DATA_32_BIT_UUID: 'SERVICE_DATA_32_BIT_UUID', LIST_OF_32_BIT_SERVICE_SOLICITATION_UUIDS = Type.LIST_OF_32_BIT_SERVICE_SOLICITATION_UUIDS
SERVICE_DATA_128_BIT_UUID: 'SERVICE_DATA_128_BIT_UUID', SERVICE_DATA_32_BIT_UUID = Type.SERVICE_DATA_32_BIT_UUID
LE_SECURE_CONNECTIONS_CONFIRMATION_VALUE: 'LE_SECURE_CONNECTIONS_CONFIRMATION_VALUE', SERVICE_DATA_128_BIT_UUID = Type.SERVICE_DATA_128_BIT_UUID
LE_SECURE_CONNECTIONS_RANDOM_VALUE: 'LE_SECURE_CONNECTIONS_RANDOM_VALUE', LE_SECURE_CONNECTIONS_CONFIRMATION_VALUE = Type.LE_SECURE_CONNECTIONS_CONFIRMATION_VALUE
URI: 'URI', LE_SECURE_CONNECTIONS_RANDOM_VALUE = Type.LE_SECURE_CONNECTIONS_RANDOM_VALUE
INDOOR_POSITIONING: 'INDOOR_POSITIONING', URI = Type.URI
TRANSPORT_DISCOVERY_DATA: 'TRANSPORT_DISCOVERY_DATA', INDOOR_POSITIONING = Type.INDOOR_POSITIONING
LE_SUPPORTED_FEATURES: 'LE_SUPPORTED_FEATURES', TRANSPORT_DISCOVERY_DATA = Type.TRANSPORT_DISCOVERY_DATA
CHANNEL_MAP_UPDATE_INDICATION: 'CHANNEL_MAP_UPDATE_INDICATION', LE_SUPPORTED_FEATURES = Type.LE_SUPPORTED_FEATURES
PB_ADV: 'PB_ADV', CHANNEL_MAP_UPDATE_INDICATION = Type.CHANNEL_MAP_UPDATE_INDICATION
MESH_MESSAGE: 'MESH_MESSAGE', PB_ADV = Type.PB_ADV
MESH_BEACON: 'MESH_BEACON', MESH_MESSAGE = Type.MESH_MESSAGE
BIGINFO: 'BIGINFO', MESH_BEACON = Type.MESH_BEACON
BROADCAST_CODE: 'BROADCAST_CODE', BIGINFO = Type.BIGINFO
RESOLVABLE_SET_IDENTIFIER: 'RESOLVABLE_SET_IDENTIFIER', BROADCAST_CODE = Type.BROADCAST_CODE
ADVERTISING_INTERVAL_LONG: 'ADVERTISING_INTERVAL_LONG', RESOLVABLE_SET_IDENTIFIER = Type.RESOLVABLE_SET_IDENTIFIER
BROADCAST_NAME: 'BROADCAST_NAME', ADVERTISING_INTERVAL_LONG = Type.ADVERTISING_INTERVAL_LONG
ENCRYPTED_ADVERTISING_DATA: 'ENCRYPTED_ADVERTISING_DATA', BROADCAST_NAME = Type.BROADCAST_NAME
PERIODIC_ADVERTISING_RESPONSE_TIMING_INFORMATION: 'PERIODIC_ADVERTISING_RESPONSE_TIMING_INFORMATION', ENCRYPTED_ADVERTISING_DATA = Type.ENCRYPTED_ADVERTISING_DATA
ELECTRONIC_SHELF_LABEL: 'ELECTRONIC_SHELF_LABEL', PERIODIC_ADVERTISING_RESPONSE_TIMING_INFORMATION = Type.PERIODIC_ADVERTISING_RESPONSE_TIMING_INFORMATION
THREE_D_INFORMATION_DATA: 'THREE_D_INFORMATION_DATA', ELECTRONIC_SHELF_LABEL = Type.ELECTRONIC_SHELF_LABEL
MANUFACTURER_SPECIFIC_DATA: 'MANUFACTURER_SPECIFIC_DATA' THREE_D_INFORMATION_DATA = Type.THREE_D_INFORMATION_DATA
} MANUFACTURER_SPECIFIC_DATA = Type.MANUFACTURER_SPECIFIC_DATA
LE_LIMITED_DISCOVERABLE_MODE_FLAG = 0x01 LE_LIMITED_DISCOVERABLE_MODE_FLAG = 0x01
LE_GENERAL_DISCOVERABLE_MODE_FLAG = 0x02 LE_GENERAL_DISCOVERABLE_MODE_FLAG = 0x02
@@ -1412,12 +1412,12 @@ class AdvertisingData:
BR_EDR_CONTROLLER_FLAG = 0x08 BR_EDR_CONTROLLER_FLAG = 0x08
BR_EDR_HOST_FLAG = 0x10 BR_EDR_HOST_FLAG = 0x10
ad_structures: List[Tuple[int, bytes]] ad_structures: list[tuple[int, bytes]]
# fmt: on # fmt: on
# pylint: enable=line-too-long # pylint: enable=line-too-long
def __init__(self, ad_structures: Optional[List[Tuple[int, bytes]]] = None) -> None: def __init__(self, ad_structures: Optional[list[tuple[int, bytes]]] = None) -> None:
if ad_structures is None: if ad_structures is None:
ad_structures = [] ad_structures = []
self.ad_structures = ad_structures[:] self.ad_structures = ad_structures[:]
@@ -1444,7 +1444,7 @@ class AdvertisingData:
return ','.join(bit_flags_to_strings(flags, flag_names)) return ','.join(bit_flags_to_strings(flags, flag_names))
@staticmethod @staticmethod
def uuid_list_to_objects(ad_data: bytes, uuid_size: int) -> List[UUID]: def uuid_list_to_objects(ad_data: bytes, uuid_size: int) -> list[UUID]:
uuids = [] uuids = []
offset = 0 offset = 0
while (offset + uuid_size) <= len(ad_data): while (offset + uuid_size) <= len(ad_data):
@@ -1461,8 +1461,8 @@ class AdvertisingData:
] ]
) )
@staticmethod @classmethod
def ad_data_to_string(ad_type, ad_data): def ad_data_to_string(cls, ad_type: int, ad_data: bytes) -> str:
if ad_type == AdvertisingData.FLAGS: if ad_type == AdvertisingData.FLAGS:
ad_type_str = 'Flags' ad_type_str = 'Flags'
ad_data_str = AdvertisingData.flags_to_string(ad_data[0], short=True) ad_data_str = AdvertisingData.flags_to_string(ad_data[0], short=True)
@@ -1521,72 +1521,72 @@ class AdvertisingData:
ad_type_str = 'Broadcast Name' ad_type_str = 'Broadcast Name'
ad_data_str = ad_data.decode('utf-8') ad_data_str = ad_data.decode('utf-8')
else: else:
ad_type_str = AdvertisingData.AD_TYPE_NAMES.get(ad_type, f'0x{ad_type:02X}') ad_type_str = AdvertisingData.Type(ad_type).name
ad_data_str = ad_data.hex() ad_data_str = ad_data.hex()
return f'[{ad_type_str}]: {ad_data_str}' return f'[{ad_type_str}]: {ad_data_str}'
# pylint: disable=too-many-return-statements # pylint: disable=too-many-return-statements
@staticmethod @classmethod
def ad_data_to_object(ad_type: int, ad_data: bytes) -> AdvertisingDataObject: def ad_data_to_object(cls, ad_type: int, ad_data: bytes) -> AdvertisingDataObject:
if ad_type in ( if ad_type in (
AdvertisingData.COMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS, AdvertisingData.Type.COMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS, AdvertisingData.Type.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.LIST_OF_16_BIT_SERVICE_SOLICITATION_UUIDS, AdvertisingData.Type.LIST_OF_16_BIT_SERVICE_SOLICITATION_UUIDS,
): ):
return AdvertisingData.uuid_list_to_objects(ad_data, 2) return AdvertisingData.uuid_list_to_objects(ad_data, 2)
if ad_type in ( if ad_type in (
AdvertisingData.COMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS, AdvertisingData.Type.COMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.INCOMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS, AdvertisingData.Type.INCOMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.LIST_OF_32_BIT_SERVICE_SOLICITATION_UUIDS, AdvertisingData.Type.LIST_OF_32_BIT_SERVICE_SOLICITATION_UUIDS,
): ):
return AdvertisingData.uuid_list_to_objects(ad_data, 4) return AdvertisingData.uuid_list_to_objects(ad_data, 4)
if ad_type in ( if ad_type in (
AdvertisingData.COMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS, AdvertisingData.Type.COMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.INCOMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS, AdvertisingData.Type.INCOMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.LIST_OF_128_BIT_SERVICE_SOLICITATION_UUIDS, AdvertisingData.Type.LIST_OF_128_BIT_SERVICE_SOLICITATION_UUIDS,
): ):
return AdvertisingData.uuid_list_to_objects(ad_data, 16) return AdvertisingData.uuid_list_to_objects(ad_data, 16)
if ad_type == AdvertisingData.SERVICE_DATA_16_BIT_UUID: if ad_type == AdvertisingData.Type.SERVICE_DATA_16_BIT_UUID:
return (UUID.from_bytes(ad_data[:2]), ad_data[2:]) return (UUID.from_bytes(ad_data[:2]), ad_data[2:])
if ad_type == AdvertisingData.SERVICE_DATA_32_BIT_UUID: if ad_type == AdvertisingData.Type.SERVICE_DATA_32_BIT_UUID:
return (UUID.from_bytes(ad_data[:4]), ad_data[4:]) return (UUID.from_bytes(ad_data[:4]), ad_data[4:])
if ad_type == AdvertisingData.SERVICE_DATA_128_BIT_UUID: if ad_type == AdvertisingData.Type.SERVICE_DATA_128_BIT_UUID:
return (UUID.from_bytes(ad_data[:16]), ad_data[16:]) return (UUID.from_bytes(ad_data[:16]), ad_data[16:])
if ad_type in ( if ad_type in (
AdvertisingData.SHORTENED_LOCAL_NAME, AdvertisingData.Type.SHORTENED_LOCAL_NAME,
AdvertisingData.COMPLETE_LOCAL_NAME, AdvertisingData.Type.COMPLETE_LOCAL_NAME,
AdvertisingData.URI, AdvertisingData.Type.URI,
AdvertisingData.BROADCAST_NAME, AdvertisingData.Type.BROADCAST_NAME,
): ):
return ad_data.decode("utf-8") return ad_data.decode("utf-8")
if ad_type in (AdvertisingData.TX_POWER_LEVEL, AdvertisingData.FLAGS): if ad_type in (AdvertisingData.Type.TX_POWER_LEVEL, AdvertisingData.Type.FLAGS):
return cast(int, struct.unpack('B', ad_data)[0]) return cast(int, struct.unpack('B', ad_data)[0])
if ad_type in (AdvertisingData.ADVERTISING_INTERVAL,): if ad_type in (AdvertisingData.Type.ADVERTISING_INTERVAL,):
return cast(int, struct.unpack('<H', ad_data)[0]) return cast(int, struct.unpack('<H', ad_data)[0])
if ad_type == AdvertisingData.CLASS_OF_DEVICE: if ad_type == AdvertisingData.Type.CLASS_OF_DEVICE:
return cast(int, struct.unpack('<I', bytes([*ad_data, 0]))[0]) return cast(int, struct.unpack('<I', bytes([*ad_data, 0]))[0])
if ad_type == AdvertisingData.PERIPHERAL_CONNECTION_INTERVAL_RANGE: if ad_type == AdvertisingData.Type.PERIPHERAL_CONNECTION_INTERVAL_RANGE:
return cast(Tuple[int, int], struct.unpack('<HH', ad_data)) return cast(tuple[int, int], struct.unpack('<HH', ad_data))
if ad_type == AdvertisingData.MANUFACTURER_SPECIFIC_DATA: if ad_type == AdvertisingData.Type.APPEARANCE:
return (cast(int, struct.unpack_from('<H', ad_data, 0)[0]), ad_data[2:])
if ad_type == AdvertisingData.APPEARANCE:
return Appearance.from_int( return Appearance.from_int(
cast(int, struct.unpack_from('<H', ad_data, 0)[0]) cast(int, struct.unpack_from('<H', ad_data, 0)[0])
) )
if ad_type == AdvertisingData.Type.MANUFACTURER_SPECIFIC_DATA:
return (cast(int, struct.unpack_from('<H', ad_data, 0)[0]), ad_data[2:])
return ad_data return ad_data
def append(self, data: bytes) -> None: def append(self, data: bytes) -> None:
@@ -1600,7 +1600,80 @@ class AdvertisingData:
self.ad_structures.append((ad_type, ad_data)) self.ad_structures.append((ad_type, ad_data))
offset += length offset += length
def get_all(self, type_id: int, raw: bool = False) -> List[AdvertisingDataObject]: @overload
def get_all(
self,
type_id: Literal[
AdvertisingData.Type.COMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.Type.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.Type.LIST_OF_16_BIT_SERVICE_SOLICITATION_UUIDS,
AdvertisingData.Type.COMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.Type.INCOMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.Type.LIST_OF_32_BIT_SERVICE_SOLICITATION_UUIDS,
AdvertisingData.Type.COMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.Type.INCOMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.Type.LIST_OF_128_BIT_SERVICE_SOLICITATION_UUIDS,
],
raw: Literal[False] = False,
) -> list[list[UUID]]: ...
@overload
def get_all(
self,
type_id: Literal[
AdvertisingData.Type.SERVICE_DATA_16_BIT_UUID,
AdvertisingData.Type.SERVICE_DATA_32_BIT_UUID,
AdvertisingData.Type.SERVICE_DATA_128_BIT_UUID,
],
raw: Literal[False] = False,
) -> list[tuple[UUID, bytes]]: ...
@overload
def get_all(
self,
type_id: Literal[
AdvertisingData.Type.SHORTENED_LOCAL_NAME,
AdvertisingData.Type.COMPLETE_LOCAL_NAME,
AdvertisingData.Type.URI,
AdvertisingData.Type.BROADCAST_NAME,
],
raw: Literal[False] = False,
) -> list[str]: ...
@overload
def get_all(
self,
type_id: Literal[
AdvertisingData.Type.TX_POWER_LEVEL,
AdvertisingData.Type.FLAGS,
AdvertisingData.Type.ADVERTISING_INTERVAL,
AdvertisingData.Type.CLASS_OF_DEVICE,
],
raw: Literal[False] = False,
) -> list[int]: ...
@overload
def get_all(
self,
type_id: Literal[AdvertisingData.Type.PERIPHERAL_CONNECTION_INTERVAL_RANGE,],
raw: Literal[False] = False,
) -> list[tuple[int, int]]: ...
@overload
def get_all(
self,
type_id: Literal[AdvertisingData.Type.MANUFACTURER_SPECIFIC_DATA,],
raw: Literal[False] = False,
) -> list[tuple[int, bytes]]: ...
@overload
def get_all(
self,
type_id: Literal[AdvertisingData.Type.APPEARANCE,],
raw: Literal[False] = False,
) -> list[Appearance]: ...
@overload
def get_all(self, type_id: int, raw: Literal[True]) -> list[bytes]: ...
@overload
def get_all(
self, type_id: int, raw: bool = False
) -> list[AdvertisingDataObject]: ...
def get_all(self, type_id: int, raw: bool = False) -> list[AdvertisingDataObject]: # type: ignore[misc]
''' '''
Get Advertising Data Structure(s) with a given type Get Advertising Data Structure(s) with a given type
@@ -1612,6 +1685,79 @@ class AdvertisingData:
return [process_ad_data(ad[1]) for ad in self.ad_structures if ad[0] == type_id] return [process_ad_data(ad[1]) for ad in self.ad_structures if ad[0] == type_id]
@overload
def get(
self,
type_id: Literal[
AdvertisingData.Type.COMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.Type.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.Type.LIST_OF_16_BIT_SERVICE_SOLICITATION_UUIDS,
AdvertisingData.Type.COMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.Type.INCOMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.Type.LIST_OF_32_BIT_SERVICE_SOLICITATION_UUIDS,
AdvertisingData.Type.COMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.Type.INCOMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.Type.LIST_OF_128_BIT_SERVICE_SOLICITATION_UUIDS,
],
raw: Literal[False] = False,
) -> Optional[list[UUID]]: ...
@overload
def get(
self,
type_id: Literal[
AdvertisingData.Type.SERVICE_DATA_16_BIT_UUID,
AdvertisingData.Type.SERVICE_DATA_32_BIT_UUID,
AdvertisingData.Type.SERVICE_DATA_128_BIT_UUID,
],
raw: Literal[False] = False,
) -> Optional[tuple[UUID, bytes]]: ...
@overload
def get(
self,
type_id: Literal[
AdvertisingData.Type.SHORTENED_LOCAL_NAME,
AdvertisingData.Type.COMPLETE_LOCAL_NAME,
AdvertisingData.Type.URI,
AdvertisingData.Type.BROADCAST_NAME,
],
raw: Literal[False] = False,
) -> Optional[Optional[str]]: ...
@overload
def get(
self,
type_id: Literal[
AdvertisingData.Type.TX_POWER_LEVEL,
AdvertisingData.Type.FLAGS,
AdvertisingData.Type.ADVERTISING_INTERVAL,
AdvertisingData.Type.CLASS_OF_DEVICE,
],
raw: Literal[False] = False,
) -> Optional[int]: ...
@overload
def get(
self,
type_id: Literal[AdvertisingData.Type.PERIPHERAL_CONNECTION_INTERVAL_RANGE,],
raw: Literal[False] = False,
) -> Optional[tuple[int, int]]: ...
@overload
def get(
self,
type_id: Literal[AdvertisingData.Type.MANUFACTURER_SPECIFIC_DATA,],
raw: Literal[False] = False,
) -> Optional[tuple[int, bytes]]: ...
@overload
def get(
self,
type_id: Literal[AdvertisingData.Type.APPEARANCE,],
raw: Literal[False] = False,
) -> Optional[Appearance]: ...
@overload
def get(self, type_id: int, raw: Literal[True]) -> Optional[bytes]: ...
@overload
def get(
self, type_id: int, raw: bool = False
) -> Optional[AdvertisingDataObject]: ...
def get(self, type_id: int, raw: bool = False) -> Optional[AdvertisingDataObject]: def get(self, type_id: int, raw: bool = False) -> Optional[AdvertisingDataObject]:
''' '''
Get Advertising Data Structure(s) with a given type Get Advertising Data Structure(s) with a given type

View File

@@ -4010,12 +4010,11 @@ class Device(CompositeEventEmitter):
# Create a future to wait for an address to be found # Create a future to wait for an address to be found
peer_address = asyncio.get_running_loop().create_future() peer_address = asyncio.get_running_loop().create_future()
def on_peer_found(address, ad_data): def on_peer_found(address: hci.Address, ad_data: AdvertisingData) -> None:
local_name = ad_data.get(AdvertisingData.COMPLETE_LOCAL_NAME, raw=True) local_name = ad_data.get(
if local_name is None: AdvertisingData.Type.COMPLETE_LOCAL_NAME
local_name = ad_data.get(AdvertisingData.SHORTENED_LOCAL_NAME, raw=True) ) or ad_data.get(AdvertisingData.Type.SHORTENED_LOCAL_NAME)
if local_name is not None: if local_name == name:
if local_name.decode('utf-8') == name:
peer_address.set_result(address) peer_address.set_result(address)
listener = None listener = None

View File

@@ -1,4 +1,4 @@
# Copyright 2021-2023 Google LLC # Copyright 2021-2025 Google LLC
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
@@ -76,18 +76,18 @@ class OobData:
return instance return instance
def to_ad(self) -> AdvertisingData: def to_ad(self) -> AdvertisingData:
ad_structures = [] ad_structures: list[tuple[int, bytes]] = []
if self.address is not None: if self.address is not None:
ad_structures.append( ad_structures.append(
(AdvertisingData.LE_BLUETOOTH_DEVICE_ADDRESS, bytes(self.address)) (AdvertisingData.Type.LE_BLUETOOTH_DEVICE_ADDRESS, bytes(self.address))
) )
if self.role is not None: if self.role is not None:
ad_structures.append((AdvertisingData.LE_ROLE, bytes([self.role]))) ad_structures.append((AdvertisingData.Type.LE_ROLE, bytes([self.role])))
if self.shared_data is not None: if self.shared_data is not None:
ad_structures.extend(self.shared_data.to_ad().ad_structures) ad_structures.extend(self.shared_data.to_ad().ad_structures)
if self.legacy_context is not None: if self.legacy_context is not None:
ad_structures.append( ad_structures.append(
(AdvertisingData.SECURITY_MANAGER_TK_VALUE, self.legacy_context.tk) (AdvertisingData.Type.SECURITY_MANAGER_TK_VALUE, self.legacy_context.tk)
) )
return AdvertisingData(ad_structures) return AdvertisingData(ad_structures)