# Copyright 2021-2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ----------------------------------------------------------------------------- # Imports # ----------------------------------------------------------------------------- from __future__ import annotations import struct from typing import List, Optional, Tuple, Union, cast from .company_ids import COMPANY_IDENTIFIERS # ----------------------------------------------------------------------------- # Constants # ----------------------------------------------------------------------------- # fmt: off BT_CENTRAL_ROLE = 0 BT_PERIPHERAL_ROLE = 1 BT_BR_EDR_TRANSPORT = 0 BT_LE_TRANSPORT = 1 # fmt: on # ----------------------------------------------------------------------------- # Utils # ----------------------------------------------------------------------------- def bit_flags_to_strings(bits, bit_flag_names): names = [] index = 0 while bits != 0: if bits & 1: name = bit_flag_names[index] if index < len(bit_flag_names) else f'#{index}' names.append(name) bits >>= 1 index += 1 return names def name_or_number(dictionary, number, width=2): name = dictionary.get(number) if name is not None: return name return f'[0x{number:0{width}X}]' def padded_bytes(buffer, size): padding_size = max(size - len(buffer), 0) return buffer + bytes(padding_size) def get_dict_key_by_value(dictionary, value): for key, val in dictionary.items(): if val == value: return key return None # ----------------------------------------------------------------------------- # Exceptions # ----------------------------------------------------------------------------- class BaseError(Exception): """Base class for errors with an error code, error name and namespace""" def __init__(self, error_code, error_namespace='', error_name='', details=''): super().__init__() self.error_code = error_code self.error_namespace = error_namespace self.error_name = error_name self.details = details def __str__(self): if self.error_namespace: namespace = f'{self.error_namespace}/' else: namespace = '' if self.error_name: name = f'{self.error_name} [0x{self.error_code:X}]' else: name = f'0x{self.error_code:X}' return f'{type(self).__name__}({namespace}{name})' class ProtocolError(BaseError): """Protocol Error""" class TimeoutError(Exception): # pylint: disable=redefined-builtin """Timeout Error""" class CommandTimeoutError(Exception): """Command Timeout Error""" class InvalidStateError(Exception): """Invalid State Error""" class ConnectionError(BaseError): # pylint: disable=redefined-builtin """Connection Error""" FAILURE = 0x01 CONNECTION_REFUSED = 0x02 def __init__( self, error_code, transport, peer_address, error_namespace='', error_name='', details='', ): super().__init__(error_code, error_namespace, error_name, details) self.transport = transport self.peer_address = peer_address # ----------------------------------------------------------------------------- # UUID # # NOTE: the internal byte representation is in little-endian byte order # # Base UUID: 00000000-0000-1000-8000- 00805F9B34FB # ----------------------------------------------------------------------------- class UUID: ''' See Bluetooth spec Vol 3, Part B - 2.5.1 UUID Note that this class expects and works in little-endian byte-order throughout. The exception is when interacting with strings, which are in big-endian byte-order. ''' BASE_UUID = bytes.fromhex('00001000800000805F9B34FB')[::-1] # little-endian UUIDS: List[UUID] = [] # Registry of all instances created def __init__(self, uuid_str_or_int, name=None): if isinstance(uuid_str_or_int, int): self.uuid_bytes = struct.pack(' UUID: if len(uuid_bytes) in (2, 4, 16): self = cls.__new__(cls) self.uuid_bytes = uuid_bytes self.name = name return self.register() raise ValueError('only 2, 4 and 16 bytes are allowed') @classmethod def from_16_bits(cls, uuid_16, name=None): return cls.from_bytes(struct.pack(' str: if len(self.uuid_bytes) == 2 or len(self.uuid_bytes) == 4: return bytes(reversed(self.uuid_bytes)).hex().upper() return ''.join( [ bytes(reversed(self.uuid_bytes[12:16])).hex(), bytes(reversed(self.uuid_bytes[10:12])).hex(), bytes(reversed(self.uuid_bytes[8:10])).hex(), bytes(reversed(self.uuid_bytes[6:8])).hex(), bytes(reversed(self.uuid_bytes[0:6])).hex(), ] ).upper() def __bytes__(self): return self.to_bytes() def __eq__(self, other): if isinstance(other, UUID): return self.to_bytes(force_128=True) == other.to_bytes(force_128=True) if isinstance(other, str): return UUID(other) == self return False def __hash__(self): return hash(self.uuid_bytes) def __str__(self): if len(self.uuid_bytes) == 2: uuid = struct.unpack('> 13 & 0x7FF), (class_of_device >> 8 & 0x1F), (class_of_device >> 2 & 0x3F), ) @staticmethod def pack_class_of_device(service_classes, major_device_class, minor_device_class): return service_classes << 13 | major_device_class << 8 | minor_device_class << 2 @staticmethod def service_class_labels(service_class_flags): return bit_flags_to_strings( service_class_flags, DeviceClass.SERVICE_CLASS_LABELS ) @staticmethod def major_device_class_name(device_class): return name_or_number(DeviceClass.MAJOR_DEVICE_CLASS_NAMES, device_class) @staticmethod def minor_device_class_name(major_device_class, minor_device_class): class_names = DeviceClass.MINOR_DEVICE_CLASS_NAMES.get(major_device_class) if class_names is None: return f'#{minor_device_class:02X}' return name_or_number(class_names, minor_device_class) # ----------------------------------------------------------------------------- # Advertising Data # ----------------------------------------------------------------------------- AdvertisingObject = Union[ List[UUID], Tuple[UUID, bytes], bytes, str, int, Tuple[int, int], Tuple[int, bytes] ] class AdvertisingData: # fmt: off # pylint: disable=line-too-long # This list is only partial, it still needs to be filled in from the spec FLAGS = 0x01 INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS = 0x02 COMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS = 0x03 INCOMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS = 0x04 COMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS = 0x05 INCOMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS = 0x06 COMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS = 0x07 SHORTENED_LOCAL_NAME = 0x08 COMPLETE_LOCAL_NAME = 0x09 TX_POWER_LEVEL = 0x0A CLASS_OF_DEVICE = 0x0D SIMPLE_PAIRING_HASH_C = 0x0E SIMPLE_PAIRING_HASH_C_192 = 0x0E SIMPLE_PAIRING_RANDOMIZER_R = 0x0F SIMPLE_PAIRING_RANDOMIZER_R_192 = 0x0F DEVICE_ID = 0x10 SECURITY_MANAGER_TK_VALUE = 0x10 SECURITY_MANAGER_OUT_OF_BAND_FLAGS = 0x11 PERIPHERAL_CONNECTION_INTERVAL_RANGE = 0x12 LIST_OF_16_BIT_SERVICE_SOLICITATION_UUIDS = 0x14 LIST_OF_128_BIT_SERVICE_SOLICITATION_UUIDS = 0x15 SERVICE_DATA = 0x16 SERVICE_DATA_16_BIT_UUID = 0x16 PUBLIC_TARGET_ADDRESS = 0x17 RANDOM_TARGET_ADDRESS = 0x18 APPEARANCE = 0x19 ADVERTISING_INTERVAL = 0x1A LE_BLUETOOTH_DEVICE_ADDRESS = 0x1B LE_ROLE = 0x1C SIMPLE_PAIRING_HASH_C_256 = 0x1D SIMPLE_PAIRING_RANDOMIZER_R_256 = 0x1E LIST_OF_32_BIT_SERVICE_SOLICITATION_UUIDS = 0x1F SERVICE_DATA_32_BIT_UUID = 0x20 SERVICE_DATA_128_BIT_UUID = 0x21 LE_SECURE_CONNECTIONS_CONFIRMATION_VALUE = 0x22 LE_SECURE_CONNECTIONS_RANDOM_VALUE = 0x23 URI = 0x24 INDOOR_POSITIONING = 0x25 TRANSPORT_DISCOVERY_DATA = 0x26 LE_SUPPORTED_FEATURES = 0x27 CHANNEL_MAP_UPDATE_INDICATION = 0x28 PB_ADV = 0x29 MESH_MESSAGE = 0x2A MESH_BEACON = 0x2B BIGINFO = 0x2C BROADCAST_CODE = 0x2D RESOLVABLE_SET_IDENTIFIER = 0x2E ADVERTISING_INTERVAL_LONG = 0x2F THREE_D_INFORMATION_DATA = 0x3D MANUFACTURER_SPECIFIC_DATA = 0xFF AD_TYPE_NAMES = { FLAGS: 'FLAGS', INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS: 'INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS', COMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS: 'COMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS', INCOMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS: 'INCOMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS', COMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS: 'COMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS', INCOMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS: 'INCOMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS', COMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS: 'COMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS', SHORTENED_LOCAL_NAME: 'SHORTENED_LOCAL_NAME', COMPLETE_LOCAL_NAME: 'COMPLETE_LOCAL_NAME', TX_POWER_LEVEL: 'TX_POWER_LEVEL', CLASS_OF_DEVICE: 'CLASS_OF_DEVICE', SIMPLE_PAIRING_HASH_C: 'SIMPLE_PAIRING_HASH_C', SIMPLE_PAIRING_HASH_C_192: 'SIMPLE_PAIRING_HASH_C_192', SIMPLE_PAIRING_RANDOMIZER_R: 'SIMPLE_PAIRING_RANDOMIZER_R', SIMPLE_PAIRING_RANDOMIZER_R_192: 'SIMPLE_PAIRING_RANDOMIZER_R_192', DEVICE_ID: 'DEVICE_ID', SECURITY_MANAGER_TK_VALUE: 'SECURITY_MANAGER_TK_VALUE', SECURITY_MANAGER_OUT_OF_BAND_FLAGS: 'SECURITY_MANAGER_OUT_OF_BAND_FLAGS', PERIPHERAL_CONNECTION_INTERVAL_RANGE: 'PERIPHERAL_CONNECTION_INTERVAL_RANGE', LIST_OF_16_BIT_SERVICE_SOLICITATION_UUIDS: 'LIST_OF_16_BIT_SERVICE_SOLICITATION_UUIDS', LIST_OF_128_BIT_SERVICE_SOLICITATION_UUIDS: 'LIST_OF_128_BIT_SERVICE_SOLICITATION_UUIDS', SERVICE_DATA: 'SERVICE_DATA', SERVICE_DATA_16_BIT_UUID: 'SERVICE_DATA_16_BIT_UUID', PUBLIC_TARGET_ADDRESS: 'PUBLIC_TARGET_ADDRESS', RANDOM_TARGET_ADDRESS: 'RANDOM_TARGET_ADDRESS', APPEARANCE: 'APPEARANCE', ADVERTISING_INTERVAL: 'ADVERTISING_INTERVAL', LE_BLUETOOTH_DEVICE_ADDRESS: 'LE_BLUETOOTH_DEVICE_ADDRESS', LE_ROLE: 'LE_ROLE', SIMPLE_PAIRING_HASH_C_256: 'SIMPLE_PAIRING_HASH_C_256', SIMPLE_PAIRING_RANDOMIZER_R_256: 'SIMPLE_PAIRING_RANDOMIZER_R_256', LIST_OF_32_BIT_SERVICE_SOLICITATION_UUIDS: 'LIST_OF_32_BIT_SERVICE_SOLICITATION_UUIDS', SERVICE_DATA_32_BIT_UUID: 'SERVICE_DATA_32_BIT_UUID', SERVICE_DATA_128_BIT_UUID: 'SERVICE_DATA_128_BIT_UUID', LE_SECURE_CONNECTIONS_CONFIRMATION_VALUE: 'LE_SECURE_CONNECTIONS_CONFIRMATION_VALUE', LE_SECURE_CONNECTIONS_RANDOM_VALUE: 'LE_SECURE_CONNECTIONS_RANDOM_VALUE', URI: 'URI', INDOOR_POSITIONING: 'INDOOR_POSITIONING', TRANSPORT_DISCOVERY_DATA: 'TRANSPORT_DISCOVERY_DATA', LE_SUPPORTED_FEATURES: 'LE_SUPPORTED_FEATURES', CHANNEL_MAP_UPDATE_INDICATION: 'CHANNEL_MAP_UPDATE_INDICATION', PB_ADV: 'PB_ADV', MESH_MESSAGE: 'MESH_MESSAGE', MESH_BEACON: 'MESH_BEACON', BIGINFO: 'BIGINFO', BROADCAST_CODE: 'BROADCAST_CODE', RESOLVABLE_SET_IDENTIFIER: 'RESOLVABLE_SET_IDENTIFIER', ADVERTISING_INTERVAL_LONG: 'ADVERTISING_INTERVAL_LONG', THREE_D_INFORMATION_DATA: 'THREE_D_INFORMATION_DATA', MANUFACTURER_SPECIFIC_DATA: 'MANUFACTURER_SPECIFIC_DATA' } LE_LIMITED_DISCOVERABLE_MODE_FLAG = 0x01 LE_GENERAL_DISCOVERABLE_MODE_FLAG = 0x02 BR_EDR_NOT_SUPPORTED_FLAG = 0x04 BR_EDR_CONTROLLER_FLAG = 0x08 BR_EDR_HOST_FLAG = 0x10 ad_structures: List[Tuple[int, bytes]] # fmt: on # pylint: enable=line-too-long def __init__(self, ad_structures: Optional[List[Tuple[int, bytes]]] = None) -> None: if ad_structures is None: ad_structures = [] self.ad_structures = ad_structures[:] @staticmethod def from_bytes(data): instance = AdvertisingData() instance.append(data) return instance @staticmethod def flags_to_string(flags, short=False): flag_names = ( ['LE Limited', 'LE General', 'No BR/EDR', 'BR/EDR C', 'BR/EDR H'] if short else [ 'LE Limited Discoverable Mode', 'LE General Discoverable Mode', 'BR/EDR Not Supported', 'Simultaneous LE and BR/EDR (Controller)', 'Simultaneous LE and BR/EDR (Host)', ] ) return ','.join(bit_flags_to_strings(flags, flag_names)) @staticmethod def uuid_list_to_objects(ad_data: bytes, uuid_size: int) -> List[UUID]: uuids = [] offset = 0 while (uuid_size * (offset + 1)) <= len(ad_data): uuids.append(UUID.from_bytes(ad_data[offset : offset + uuid_size])) offset += uuid_size return uuids @staticmethod def uuid_list_to_string(ad_data, uuid_size): return ', '.join( [ str(uuid) for uuid in AdvertisingData.uuid_list_to_objects(ad_data, uuid_size) ] ) @staticmethod def ad_data_to_string(ad_type, ad_data): if ad_type == AdvertisingData.FLAGS: ad_type_str = 'Flags' ad_data_str = AdvertisingData.flags_to_string(ad_data[0], short=True) elif ad_type == AdvertisingData.COMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS: ad_type_str = 'Complete List of 16-bit Service Class UUIDs' ad_data_str = AdvertisingData.uuid_list_to_string(ad_data, 2) elif ad_type == AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS: ad_type_str = 'Incomplete List of 16-bit Service Class UUIDs' ad_data_str = AdvertisingData.uuid_list_to_string(ad_data, 2) elif ad_type == AdvertisingData.COMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS: ad_type_str = 'Complete List of 32-bit Service Class UUIDs' ad_data_str = AdvertisingData.uuid_list_to_string(ad_data, 4) elif ad_type == AdvertisingData.INCOMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS: ad_type_str = 'Incomplete List of 32-bit Service Class UUIDs' ad_data_str = AdvertisingData.uuid_list_to_string(ad_data, 4) elif ad_type == AdvertisingData.COMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS: ad_type_str = 'Complete List of 128-bit Service Class UUIDs' ad_data_str = AdvertisingData.uuid_list_to_string(ad_data, 16) elif ad_type == AdvertisingData.INCOMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS: ad_type_str = 'Incomplete List of 128-bit Service Class UUIDs' ad_data_str = AdvertisingData.uuid_list_to_string(ad_data, 16) elif ad_type == AdvertisingData.SERVICE_DATA_16_BIT_UUID: ad_type_str = 'Service Data' uuid = UUID.from_bytes(ad_data[:2]) ad_data_str = f'service={uuid}, data={ad_data[2:].hex()}' elif ad_type == AdvertisingData.SERVICE_DATA_32_BIT_UUID: ad_type_str = 'Service Data' uuid = UUID.from_bytes(ad_data[:4]) ad_data_str = f'service={uuid}, data={ad_data[4:].hex()}' elif ad_type == AdvertisingData.SERVICE_DATA_128_BIT_UUID: ad_type_str = 'Service Data' uuid = UUID.from_bytes(ad_data[:16]) ad_data_str = f'service={uuid}, data={ad_data[16:].hex()}' elif ad_type == AdvertisingData.SHORTENED_LOCAL_NAME: ad_type_str = 'Shortened Local Name' ad_data_str = f'"{ad_data.decode("utf-8")}"' elif ad_type == AdvertisingData.COMPLETE_LOCAL_NAME: ad_type_str = 'Complete Local Name' ad_data_str = f'"{ad_data.decode("utf-8")}"' elif ad_type == AdvertisingData.TX_POWER_LEVEL: ad_type_str = 'TX Power Level' ad_data_str = str(ad_data[0]) elif ad_type == AdvertisingData.MANUFACTURER_SPECIFIC_DATA: ad_type_str = 'Manufacturer Specific Data' company_id = struct.unpack_from(' AdvertisingObject: if ad_type in ( AdvertisingData.COMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS, AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS, AdvertisingData.LIST_OF_16_BIT_SERVICE_SOLICITATION_UUIDS, ): return AdvertisingData.uuid_list_to_objects(ad_data, 2) if ad_type in ( AdvertisingData.COMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS, AdvertisingData.INCOMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS, AdvertisingData.LIST_OF_32_BIT_SERVICE_SOLICITATION_UUIDS, ): return AdvertisingData.uuid_list_to_objects(ad_data, 4) if ad_type in ( AdvertisingData.COMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS, AdvertisingData.INCOMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS, AdvertisingData.LIST_OF_128_BIT_SERVICE_SOLICITATION_UUIDS, ): return AdvertisingData.uuid_list_to_objects(ad_data, 16) if ad_type == AdvertisingData.SERVICE_DATA_16_BIT_UUID: return (UUID.from_bytes(ad_data[:2]), ad_data[2:]) if ad_type == AdvertisingData.SERVICE_DATA_32_BIT_UUID: return (UUID.from_bytes(ad_data[:4]), ad_data[4:]) if ad_type == AdvertisingData.SERVICE_DATA_128_BIT_UUID: return (UUID.from_bytes(ad_data[:16]), ad_data[16:]) if ad_type in ( AdvertisingData.SHORTENED_LOCAL_NAME, AdvertisingData.COMPLETE_LOCAL_NAME, AdvertisingData.URI, ): return ad_data.decode("utf-8") if ad_type in (AdvertisingData.TX_POWER_LEVEL, AdvertisingData.FLAGS): return cast(int, struct.unpack('B', ad_data)[0]) if ad_type in ( AdvertisingData.APPEARANCE, AdvertisingData.ADVERTISING_INTERVAL, ): return cast(int, struct.unpack(' 0: ad_type = data[offset] ad_data = data[offset + 1 : offset + length] self.ad_structures.append((ad_type, ad_data)) offset += length def get_all(self, type_id: int, raw: bool = False) -> List[AdvertisingObject]: ''' Get Advertising Data Structure(s) with a given type Returns a (possibly empty) list of matches. ''' def process_ad_data(ad_data: bytes) -> AdvertisingObject: return ad_data if raw else self.ad_data_to_object(type_id, ad_data) return [process_ad_data(ad[1]) for ad in self.ad_structures if ad[0] == type_id] def get(self, type_id: int, raw: bool = False) -> Optional[AdvertisingObject]: ''' Get Advertising Data Structure(s) with a given type Returns the first entry, or None if no structure matches. ''' all = self.get_all(type_id, raw=raw) return all[0] if all else None def __bytes__(self): return b''.join( [bytes([len(x[1]) + 1, x[0]]) + x[1] for x in self.ad_structures] ) def to_string(self, separator=', '): return separator.join( [AdvertisingData.ad_data_to_string(x[0], x[1]) for x in self.ad_structures] ) def __str__(self): return self.to_string() # ----------------------------------------------------------------------------- # Connection Parameters # ----------------------------------------------------------------------------- class ConnectionParameters: def __init__(self, connection_interval, peripheral_latency, supervision_timeout): self.connection_interval = connection_interval self.peripheral_latency = peripheral_latency self.supervision_timeout = supervision_timeout def __str__(self): return ( f'ConnectionParameters(connection_interval={self.connection_interval}, ' f'peripheral_latency={self.peripheral_latency}, ' f'supervision_timeout={self.supervision_timeout}' ) # ----------------------------------------------------------------------------- # Connection PHY # ----------------------------------------------------------------------------- class ConnectionPHY: def __init__(self, tx_phy, rx_phy): self.tx_phy = tx_phy self.rx_phy = rx_phy def __str__(self): return f'ConnectionPHY(tx_phy={self.tx_phy}, rx_phy={self.rx_phy})'