# Copyright 2021-2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ----------------------------------------------------------------------------- # GATT - Generic Attribute Profile # Client # # See Bluetooth spec @ Vol 3, Part G # # ----------------------------------------------------------------------------- # ----------------------------------------------------------------------------- # Imports # ----------------------------------------------------------------------------- from __future__ import annotations import asyncio import logging import struct from datetime import datetime from typing import List, Optional, Dict, Tuple, Callable, Union, Any from pyee import EventEmitter from .colors import color from .hci import HCI_Constant from .att import ( ATT_ATTRIBUTE_NOT_FOUND_ERROR, ATT_ATTRIBUTE_NOT_LONG_ERROR, ATT_CID, ATT_DEFAULT_MTU, ATT_ERROR_RESPONSE, ATT_INVALID_OFFSET_ERROR, ATT_PDU, ATT_RESPONSES, ATT_Exchange_MTU_Request, ATT_Find_By_Type_Value_Request, ATT_Find_Information_Request, ATT_Handle_Value_Confirmation, ATT_Read_Blob_Request, ATT_Read_By_Group_Type_Request, ATT_Read_By_Type_Request, ATT_Read_Request, ATT_Write_Command, ATT_Write_Request, ATT_Error, ) from . import core from .core import UUID, InvalidStateError, ProtocolError from .gatt import ( GATT_CHARACTERISTIC_ATTRIBUTE_TYPE, GATT_CLIENT_CHARACTERISTIC_CONFIGURATION_DESCRIPTOR, GATT_PRIMARY_SERVICE_ATTRIBUTE_TYPE, GATT_REQUEST_TIMEOUT, GATT_SECONDARY_SERVICE_ATTRIBUTE_TYPE, Characteristic, ClientCharacteristicConfigurationBits, ) # ----------------------------------------------------------------------------- # Logging # ----------------------------------------------------------------------------- logger = logging.getLogger(__name__) # ----------------------------------------------------------------------------- # Proxies # ----------------------------------------------------------------------------- class AttributeProxy(EventEmitter): client: Client def __init__(self, client, handle, end_group_handle, attribute_type): EventEmitter.__init__(self) self.client = client self.handle = handle self.end_group_handle = end_group_handle self.type = attribute_type async def read_value(self, no_long_read=False): return self.decode_value( await self.client.read_value(self.handle, no_long_read) ) async def write_value(self, value, with_response=False): return await self.client.write_value( self.handle, self.encode_value(value), with_response ) def encode_value(self, value): return value def decode_value(self, value_bytes): return value_bytes def __str__(self): return f'Attribute(handle=0x{self.handle:04X}, type={self.type})' class ServiceProxy(AttributeProxy): uuid: UUID characteristics: List[CharacteristicProxy] @staticmethod def from_client(service_class, client, service_uuid): # The service and its characteristics are considered to have already been # discovered services = client.get_services_by_uuid(service_uuid) service = services[0] if services else None return service_class(service) if service else None def __init__(self, client, handle, end_group_handle, uuid, primary=True): attribute_type = ( GATT_PRIMARY_SERVICE_ATTRIBUTE_TYPE if primary else GATT_SECONDARY_SERVICE_ATTRIBUTE_TYPE ) super().__init__(client, handle, end_group_handle, attribute_type) self.uuid = uuid self.characteristics = [] async def discover_characteristics(self, uuids=()): return await self.client.discover_characteristics(uuids, self) def get_characteristics_by_uuid(self, uuid): return self.client.get_characteristics_by_uuid(uuid, self) def __str__(self): return f'Service(handle=0x{self.handle:04X}, uuid={self.uuid})' class CharacteristicProxy(AttributeProxy): properties: Characteristic.Properties descriptors: List[DescriptorProxy] subscribers: Dict[Any, Callable] def __init__( self, client, handle, end_group_handle, uuid, properties: int, ): super().__init__(client, handle, end_group_handle, uuid) self.uuid = uuid self.properties = Characteristic.Properties(properties) self.descriptors = [] self.descriptors_discovered = False self.subscribers = {} # Map from subscriber to proxy subscriber def get_descriptor(self, descriptor_type): for descriptor in self.descriptors: if descriptor.type == descriptor_type: return descriptor return None async def discover_descriptors(self): return await self.client.discover_descriptors(self) async def subscribe( self, subscriber: Optional[Callable] = None, prefer_notify=True ): if subscriber is not None: if subscriber in self.subscribers: # We already have a proxy subscriber subscriber = self.subscribers[subscriber] else: # Create and register a proxy that will decode the value original_subscriber = subscriber def on_change(value): original_subscriber(self.decode_value(value)) self.subscribers[subscriber] = on_change subscriber = on_change return await self.client.subscribe(self, subscriber, prefer_notify) async def unsubscribe(self, subscriber=None): if subscriber in self.subscribers: subscriber = self.subscribers.pop(subscriber) return await self.client.unsubscribe(self, subscriber) def __str__(self): return ( f'Characteristic(handle=0x{self.handle:04X}, ' f'uuid={self.uuid}, ' f'{self.properties!s})' ) class DescriptorProxy(AttributeProxy): def __init__(self, client, handle, descriptor_type): super().__init__(client, handle, 0, descriptor_type) def __str__(self): return f'Descriptor(handle=0x{self.handle:04X}, type={self.type})' class ProfileServiceProxy: ''' Base class for profile-specific service proxies ''' @classmethod def from_client(cls, client): return ServiceProxy.from_client(cls, client, cls.SERVICE_CLASS.UUID) # ----------------------------------------------------------------------------- # GATT Client # ----------------------------------------------------------------------------- class Client: services: List[ServiceProxy] cached_values: Dict[int, Tuple[datetime, bytes]] def __init__(self, connection): self.connection = connection self.mtu_exchange_done = False self.request_semaphore = asyncio.Semaphore(1) self.pending_request = None self.pending_response = None self.notification_subscribers = ( {} ) # Notification subscribers, by attribute handle self.indication_subscribers = {} # Indication subscribers, by attribute handle self.services = [] self.cached_values = {} def send_gatt_pdu(self, pdu): self.connection.send_l2cap_pdu(ATT_CID, pdu) async def send_command(self, command): logger.debug( f'GATT Command from client: [0x{self.connection.handle:04X}] {command}' ) self.send_gatt_pdu(command.to_bytes()) async def send_request(self, request): logger.debug( f'GATT Request from client: [0x{self.connection.handle:04X}] {request}' ) # Wait until we can send (only one pending command at a time for the connection) response = None async with self.request_semaphore: assert self.pending_request is None assert self.pending_response is None # Create a future value to hold the eventual response self.pending_response = asyncio.get_running_loop().create_future() self.pending_request = request try: self.send_gatt_pdu(request.to_bytes()) response = await asyncio.wait_for( self.pending_response, GATT_REQUEST_TIMEOUT ) except asyncio.TimeoutError as error: logger.warning(color('!!! GATT Request timeout', 'red')) raise core.TimeoutError(f'GATT timeout for {request.name}') from error finally: self.pending_request = None self.pending_response = None return response def send_confirmation(self, confirmation): logger.debug( f'GATT Confirmation from client: [0x{self.connection.handle:04X}] ' f'{confirmation}' ) self.send_gatt_pdu(confirmation.to_bytes()) async def request_mtu(self, mtu): # Check the range if mtu < ATT_DEFAULT_MTU: raise ValueError(f'MTU must be >= {ATT_DEFAULT_MTU}') if mtu > 0xFFFF: raise ValueError('MTU must be <= 0xFFFF') # We can only send one request per connection if self.mtu_exchange_done: return self.connection.att_mtu # Send the request self.mtu_exchange_done = True response = await self.send_request(ATT_Exchange_MTU_Request(client_rx_mtu=mtu)) if response.op_code == ATT_ERROR_RESPONSE: raise ProtocolError( response.error_code, 'att', ATT_PDU.error_name(response.error_code), response, ) # Compute the final MTU self.connection.att_mtu = min(mtu, response.server_rx_mtu) return self.connection.att_mtu def get_services_by_uuid(self, uuid): return [service for service in self.services if service.uuid == uuid] def get_characteristics_by_uuid(self, uuid, service=None): services = [service] if service else self.services return [ c for c in [c for s in services for c in s.characteristics] if c.uuid == uuid ] def get_attribute_grouping( self, attribute_handle: int ) -> Optional[ Union[ ServiceProxy, Tuple[ServiceProxy, CharacteristicProxy], Tuple[ServiceProxy, CharacteristicProxy, DescriptorProxy], ] ]: """ Get the attribute(s) associated with an attribute handle """ for service in self.services: if service.handle == attribute_handle: return service if service.handle <= attribute_handle <= service.end_group_handle: for characteristic in service.characteristics: if characteristic.handle == attribute_handle: return (service, characteristic) if ( characteristic.handle <= attribute_handle <= characteristic.end_group_handle ): for descriptor in characteristic.descriptors: if descriptor.handle == attribute_handle: return (service, characteristic, descriptor) return None def on_service_discovered(self, service): '''Add a service to the service list if it wasn't already there''' already_known = False for existing_service in self.services: if existing_service.handle == service.handle: already_known = True break if not already_known: self.services.append(service) async def discover_services(self, uuids=None) -> List[ServiceProxy]: ''' See Vol 3, Part G - 4.4.1 Discover All Primary Services ''' starting_handle = 0x0001 services = [] while starting_handle < 0xFFFF: response = await self.send_request( ATT_Read_By_Group_Type_Request( starting_handle=starting_handle, ending_handle=0xFFFF, attribute_group_type=GATT_PRIMARY_SERVICE_ATTRIBUTE_TYPE, ) ) if response is None: # TODO raise appropriate exception return [] # Check if we reached the end of the iteration if response.op_code == ATT_ERROR_RESPONSE: if response.error_code != ATT_ATTRIBUTE_NOT_FOUND_ERROR: # Unexpected end logger.warning( '!!! unexpected error while discovering services: ' f'{HCI_Constant.error_name(response.error_code)}' ) raise ATT_Error( error_code=response.error_code, message='Unexpected error while discovering services', ) break for ( attribute_handle, end_group_handle, attribute_value, ) in response.attributes: if ( attribute_handle < starting_handle or end_group_handle < attribute_handle ): # Something's not right logger.warning( f'bogus handle values: {attribute_handle} {end_group_handle}' ) return [] # Create a service proxy for this service service = ServiceProxy( self, attribute_handle, end_group_handle, UUID.from_bytes(attribute_value), True, ) # Filter out returned services based on the given uuids list if (not uuids) or (service.uuid in uuids): services.append(service) # Add the service to the peer's service list self.on_service_discovered(service) # Stop if for some reason the list was empty if not response.attributes: break # Move on to the next chunk starting_handle = response.attributes[-1][1] + 1 return services async def discover_service(self, uuid): ''' See Vol 3, Part G - 4.4.2 Discover Primary Service by Service UUID ''' # Force uuid to be a UUID object if isinstance(uuid, str): uuid = UUID(uuid) starting_handle = 0x0001 services = [] while starting_handle < 0xFFFF: response = await self.send_request( ATT_Find_By_Type_Value_Request( starting_handle=starting_handle, ending_handle=0xFFFF, attribute_type=GATT_PRIMARY_SERVICE_ATTRIBUTE_TYPE, attribute_value=uuid.to_pdu_bytes(), ) ) if response is None: # TODO raise appropriate exception return [] # Check if we reached the end of the iteration if response.op_code == ATT_ERROR_RESPONSE: if response.error_code != ATT_ATTRIBUTE_NOT_FOUND_ERROR: # Unexpected end logger.warning( '!!! unexpected error while discovering services: ' f'{HCI_Constant.error_name(response.error_code)}' ) # TODO raise appropriate exception return break for attribute_handle, end_group_handle in response.handles_information: if ( attribute_handle < starting_handle or end_group_handle < attribute_handle ): # Something's not right logger.warning( f'bogus handle values: {attribute_handle} {end_group_handle}' ) return # Create a service proxy for this service service = ServiceProxy( self, attribute_handle, end_group_handle, uuid, True ) # Add the service to the peer's service list services.append(service) self.on_service_discovered(service) # Check if we've reached the end already if end_group_handle == 0xFFFF: break # Stop if for some reason the list was empty if not response.handles_information: break # Move on to the next chunk starting_handle = response.handles_information[-1][1] + 1 return services async def discover_included_services(self, _service): ''' See Vol 3, Part G - 4.5.1 Find Included Services ''' # TODO return [] async def discover_characteristics( self, uuids, service: Optional[ServiceProxy] ) -> List[CharacteristicProxy]: ''' See Vol 3, Part G - 4.6.1 Discover All Characteristics of a Service and 4.6.2 Discover Characteristics by UUID ''' # Cast the UUIDs type from string to object if needed uuids = [UUID(uuid) if isinstance(uuid, str) else uuid for uuid in uuids] # Decide which services to discover for services = [service] if service else self.services # Perform characteristic discovery for each service discovered_characteristics: List[CharacteristicProxy] = [] for service in services: starting_handle = service.handle ending_handle = service.end_group_handle characteristics: List[CharacteristicProxy] = [] while starting_handle <= ending_handle: response = await self.send_request( ATT_Read_By_Type_Request( starting_handle=starting_handle, ending_handle=ending_handle, attribute_type=GATT_CHARACTERISTIC_ATTRIBUTE_TYPE, ) ) if response is None: # TODO raise appropriate exception return [] # Check if we reached the end of the iteration if response.op_code == ATT_ERROR_RESPONSE: if response.error_code != ATT_ATTRIBUTE_NOT_FOUND_ERROR: # Unexpected end logger.warning( '!!! unexpected error while discovering characteristics: ' f'{HCI_Constant.error_name(response.error_code)}' ) raise ATT_Error( error_code=response.error_code, message='Unexpected error while discovering characteristics', ) break # Stop if for some reason the list was empty if not response.attributes: break # Process all characteristics returned in this iteration for attribute_handle, attribute_value in response.attributes: if attribute_handle < starting_handle: # Something's not right logger.warning(f'bogus handle value: {attribute_handle}') return [] properties, handle = struct.unpack_from(' List[DescriptorProxy]: ''' See Vol 3, Part G - 4.7.1 Discover All Characteristic Descriptors ''' if characteristic: starting_handle = characteristic.handle + 1 ending_handle = characteristic.end_group_handle elif start_handle and end_handle: starting_handle = start_handle ending_handle = end_handle else: return [] descriptors: List[DescriptorProxy] = [] while starting_handle <= ending_handle: response = await self.send_request( ATT_Find_Information_Request( starting_handle=starting_handle, ending_handle=ending_handle ) ) if response is None: # TODO raise appropriate exception return [] # Check if we reached the end of the iteration if response.op_code == ATT_ERROR_RESPONSE: if response.error_code != ATT_ATTRIBUTE_NOT_FOUND_ERROR: # Unexpected end logger.warning( '!!! unexpected error while discovering descriptors: ' f'{HCI_Constant.error_name(response.error_code)}' ) # TODO raise appropriate exception return [] break # Stop if for some reason the list was empty if not response.information: break # Process all descriptors returned in this iteration for attribute_handle, attribute_uuid in response.information: if attribute_handle < starting_handle: # Something's not right logger.warning(f'bogus handle value: {attribute_handle}') return [] descriptor = DescriptorProxy( self, attribute_handle, UUID.from_bytes(attribute_uuid) ) descriptors.append(descriptor) # TODO: read descriptor value # Move on to the next descriptor starting_handle = response.information[-1][0] + 1 # Set the characteristic's descriptors if characteristic: characteristic.descriptors = descriptors return descriptors async def discover_attributes(self): ''' Discover all attributes, regardless of type ''' starting_handle = 0x0001 ending_handle = 0xFFFF attributes = [] while True: response = await self.send_request( ATT_Find_Information_Request( starting_handle=starting_handle, ending_handle=ending_handle ) ) if response is None: return [] # Check if we reached the end of the iteration if response.op_code == ATT_ERROR_RESPONSE: if response.error_code != ATT_ATTRIBUTE_NOT_FOUND_ERROR: # Unexpected end logger.warning( '!!! unexpected error while discovering attributes: ' f'{HCI_Constant.error_name(response.error_code)}' ) return [] break for attribute_handle, attribute_uuid in response.information: if attribute_handle < starting_handle: # Something's not right logger.warning(f'bogus handle value: {attribute_handle}') return [] attribute = AttributeProxy( self, attribute_handle, 0, UUID.from_bytes(attribute_uuid) ) attributes.append(attribute) # Move on to the next attributes starting_handle = attributes[-1].handle + 1 return attributes async def subscribe(self, characteristic, subscriber=None, prefer_notify=True): # If we haven't already discovered the descriptors for this characteristic, # do it now if not characteristic.descriptors_discovered: await self.discover_descriptors(characteristic) # Look for the CCCD descriptor cccd = characteristic.get_descriptor( GATT_CLIENT_CHARACTERISTIC_CONFIGURATION_DESCRIPTOR ) if not cccd: logger.warning('subscribing to characteristic with no CCCD descriptor') return if ( characteristic.properties & Characteristic.Properties.NOTIFY and characteristic.properties & Characteristic.Properties.INDICATE ): if prefer_notify: bits = ClientCharacteristicConfigurationBits.NOTIFICATION subscribers = self.notification_subscribers else: bits = ClientCharacteristicConfigurationBits.INDICATION subscribers = self.indication_subscribers elif characteristic.properties & Characteristic.Properties.NOTIFY: bits = ClientCharacteristicConfigurationBits.NOTIFICATION subscribers = self.notification_subscribers elif characteristic.properties & Characteristic.Properties.INDICATE: bits = ClientCharacteristicConfigurationBits.INDICATION subscribers = self.indication_subscribers else: raise InvalidStateError("characteristic is not notify or indicate") # Add subscribers to the sets subscriber_set = subscribers.setdefault(characteristic.handle, set()) if subscriber is not None: subscriber_set.add(subscriber) # Add the characteristic as a subscriber, which will result in the # characteristic emitting an 'update' event when a notification or indication # is received subscriber_set.add(characteristic) await self.write_value(cccd, struct.pack('