# Copyright 2021-2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ----------------------------------------------------------------------------- # GATT - Generic Attribute Profile # Client # # See Bluetooth spec @ Vol 3, Part G # # ----------------------------------------------------------------------------- # ----------------------------------------------------------------------------- # Imports # ----------------------------------------------------------------------------- import asyncio import logging import struct from colors import color from pyee import EventEmitter from .hci import HCI_Constant from .att import ( ATT_ATTRIBUTE_NOT_FOUND_ERROR, ATT_ATTRIBUTE_NOT_LONG_ERROR, ATT_CID, ATT_DEFAULT_MTU, ATT_ERROR_RESPONSE, ATT_INVALID_OFFSET_ERROR, ATT_PDU, ATT_RESPONSES, ATT_Exchange_MTU_Request, ATT_Find_By_Type_Value_Request, ATT_Find_Information_Request, ATT_Handle_Value_Confirmation, ATT_Read_Blob_Request, ATT_Read_By_Group_Type_Request, ATT_Read_By_Type_Request, ATT_Read_Request, ATT_Write_Command, ATT_Write_Request, ) from . import core from .core import UUID, InvalidStateError, ProtocolError from .gatt import ( GATT_CHARACTERISTIC_ATTRIBUTE_TYPE, GATT_CLIENT_CHARACTERISTIC_CONFIGURATION_DESCRIPTOR, GATT_PRIMARY_SERVICE_ATTRIBUTE_TYPE, GATT_REQUEST_TIMEOUT, GATT_SECONDARY_SERVICE_ATTRIBUTE_TYPE, Characteristic, ClientCharacteristicConfigurationBits, ) # ----------------------------------------------------------------------------- # Logging # ----------------------------------------------------------------------------- logger = logging.getLogger(__name__) # ----------------------------------------------------------------------------- # Proxies # ----------------------------------------------------------------------------- class AttributeProxy(EventEmitter): def __init__(self, client, handle, end_group_handle, attribute_type): EventEmitter.__init__(self) self.client = client self.handle = handle self.end_group_handle = end_group_handle self.type = attribute_type async def read_value(self, no_long_read=False): return self.decode_value( await self.client.read_value(self.handle, no_long_read) ) async def write_value(self, value, with_response=False): return await self.client.write_value( self.handle, self.encode_value(value), with_response ) def encode_value(self, value): return value def decode_value(self, value_bytes): return value_bytes def __str__(self): return f'Attribute(handle=0x{self.handle:04X}, type={self.type})' class ServiceProxy(AttributeProxy): @staticmethod def from_client(service_class, client, service_uuid): # The service and its characteristics are considered to have already been # discovered services = client.get_services_by_uuid(service_uuid) service = services[0] if services else None return service_class(service) if service else None def __init__(self, client, handle, end_group_handle, uuid, primary=True): attribute_type = ( GATT_PRIMARY_SERVICE_ATTRIBUTE_TYPE if primary else GATT_SECONDARY_SERVICE_ATTRIBUTE_TYPE ) super().__init__(client, handle, end_group_handle, attribute_type) self.uuid = uuid self.characteristics = [] async def discover_characteristics(self, uuids=()): return await self.client.discover_characteristics(uuids, self) def get_characteristics_by_uuid(self, uuid): return self.client.get_characteristics_by_uuid(uuid, self) def __str__(self): return f'Service(handle=0x{self.handle:04X}, uuid={self.uuid})' class CharacteristicProxy(AttributeProxy): def __init__(self, client, handle, end_group_handle, uuid, properties): super().__init__(client, handle, end_group_handle, uuid) self.uuid = uuid self.properties = properties self.descriptors = [] self.descriptors_discovered = False self.subscribers = {} # Map from subscriber to proxy subscriber def get_descriptor(self, descriptor_type): for descriptor in self.descriptors: if descriptor.type == descriptor_type: return descriptor return None async def discover_descriptors(self): return await self.client.discover_descriptors(self) async def subscribe(self, subscriber=None, prefer_notify=True): if subscriber is not None: if subscriber in self.subscribers: # We already have a proxy subscriber subscriber = self.subscribers[subscriber] else: # Create and register a proxy that will decode the value original_subscriber = subscriber def on_change(value): original_subscriber(self.decode_value(value)) self.subscribers[subscriber] = on_change subscriber = on_change return await self.client.subscribe(self, subscriber, prefer_notify) async def unsubscribe(self, subscriber=None): if subscriber in self.subscribers: subscriber = self.subscribers.pop(subscriber) return await self.client.unsubscribe(self, subscriber) def __str__(self): return ( f'Characteristic(handle=0x{self.handle:04X}, ' f'uuid={self.uuid}, ' f'properties={Characteristic.properties_as_string(self.properties)})' ) class DescriptorProxy(AttributeProxy): def __init__(self, client, handle, descriptor_type): super().__init__(client, handle, 0, descriptor_type) def __str__(self): return f'Descriptor(handle=0x{self.handle:04X}, type={self.type})' class ProfileServiceProxy: ''' Base class for profile-specific service proxies ''' @classmethod def from_client(cls, client): return ServiceProxy.from_client(cls, client, cls.SERVICE_CLASS.UUID) # ----------------------------------------------------------------------------- # GATT Client # ----------------------------------------------------------------------------- class Client: def __init__(self, connection): self.connection = connection self.mtu_exchange_done = False self.request_semaphore = asyncio.Semaphore(1) self.pending_request = None self.pending_response = None self.notification_subscribers = ( {} ) # Notification subscribers, by attribute handle self.indication_subscribers = {} # Indication subscribers, by attribute handle self.services = [] def send_gatt_pdu(self, pdu): self.connection.send_l2cap_pdu(ATT_CID, pdu) async def send_command(self, command): logger.debug( f'GATT Command from client: [0x{self.connection.handle:04X}] {command}' ) self.send_gatt_pdu(command.to_bytes()) async def send_request(self, request): logger.debug( f'GATT Request from client: [0x{self.connection.handle:04X}] {request}' ) # Wait until we can send (only one pending command at a time for the connection) response = None async with self.request_semaphore: assert self.pending_request is None assert self.pending_response is None # Create a future value to hold the eventual response self.pending_response = asyncio.get_running_loop().create_future() self.pending_request = request try: self.send_gatt_pdu(request.to_bytes()) response = await asyncio.wait_for( self.pending_response, GATT_REQUEST_TIMEOUT ) except asyncio.TimeoutError as error: logger.warning(color('!!! GATT Request timeout', 'red')) raise core.TimeoutError(f'GATT timeout for {request.name}') from error finally: self.pending_request = None self.pending_response = None return response def send_confirmation(self, confirmation): logger.debug( f'GATT Confirmation from client: [0x{self.connection.handle:04X}] ' f'{confirmation}' ) self.send_gatt_pdu(confirmation.to_bytes()) async def request_mtu(self, mtu): # Check the range if mtu < ATT_DEFAULT_MTU: raise ValueError(f'MTU must be >= {ATT_DEFAULT_MTU}') if mtu > 0xFFFF: raise ValueError('MTU must be <= 0xFFFF') # We can only send one request per connection if self.mtu_exchange_done: return self.connection.att_mtu # Send the request self.mtu_exchange_done = True response = await self.send_request(ATT_Exchange_MTU_Request(client_rx_mtu=mtu)) if response.op_code == ATT_ERROR_RESPONSE: raise ProtocolError( response.error_code, 'att', ATT_PDU.error_name(response.error_code), response, ) # Compute the final MTU self.connection.att_mtu = min(mtu, response.server_rx_mtu) return self.connection.att_mtu def get_services_by_uuid(self, uuid): return [service for service in self.services if service.uuid == uuid] def get_characteristics_by_uuid(self, uuid, service=None): services = [service] if service else self.services return [ c for c in [c for s in services for c in s.characteristics] if c.uuid == uuid ] def on_service_discovered(self, service): '''Add a service to the service list if it wasn't already there''' already_known = False for existing_service in self.services: if existing_service.handle == service.handle: already_known = True break if not already_known: self.services.append(service) async def discover_services(self, uuids=None): ''' See Vol 3, Part G - 4.4.1 Discover All Primary Services ''' starting_handle = 0x0001 services = [] while starting_handle < 0xFFFF: response = await self.send_request( ATT_Read_By_Group_Type_Request( starting_handle=starting_handle, ending_handle=0xFFFF, attribute_group_type=GATT_PRIMARY_SERVICE_ATTRIBUTE_TYPE, ) ) if response is None: # TODO raise appropriate exception return [] # Check if we reached the end of the iteration if response.op_code == ATT_ERROR_RESPONSE: if response.error_code != ATT_ATTRIBUTE_NOT_FOUND_ERROR: # Unexpected end logger.warning( '!!! unexpected error while discovering services: ' f'{HCI_Constant.error_name(response.error_code)}' ) # TODO raise appropriate exception return break for ( attribute_handle, end_group_handle, attribute_value, ) in response.attributes: if ( attribute_handle < starting_handle or end_group_handle < attribute_handle ): # Something's not right logger.warning( f'bogus handle values: {attribute_handle} {end_group_handle}' ) return # Create a service proxy for this service service = ServiceProxy( self, attribute_handle, end_group_handle, UUID.from_bytes(attribute_value), True, ) # Filter out returned services based on the given uuids list if (not uuids) or (service.uuid in uuids): services.append(service) # Add the service to the peer's service list self.on_service_discovered(service) # Stop if for some reason the list was empty if not response.attributes: break # Move on to the next chunk starting_handle = response.attributes[-1][1] + 1 return services async def discover_service(self, uuid): ''' See Vol 3, Part G - 4.4.2 Discover Primary Service by Service UUID ''' # Force uuid to be a UUID object if isinstance(uuid, str): uuid = UUID(uuid) starting_handle = 0x0001 services = [] while starting_handle < 0xFFFF: response = await self.send_request( ATT_Find_By_Type_Value_Request( starting_handle=starting_handle, ending_handle=0xFFFF, attribute_type=GATT_PRIMARY_SERVICE_ATTRIBUTE_TYPE, attribute_value=uuid.to_pdu_bytes(), ) ) if response is None: # TODO raise appropriate exception return [] # Check if we reached the end of the iteration if response.op_code == ATT_ERROR_RESPONSE: if response.error_code != ATT_ATTRIBUTE_NOT_FOUND_ERROR: # Unexpected end logger.warning( '!!! unexpected error while discovering services: ' f'{HCI_Constant.error_name(response.error_code)}' ) # TODO raise appropriate exception return break for attribute_handle, end_group_handle in response.handles_information: if ( attribute_handle < starting_handle or end_group_handle < attribute_handle ): # Something's not right logger.warning( f'bogus handle values: {attribute_handle} {end_group_handle}' ) return # Create a service proxy for this service service = ServiceProxy( self, attribute_handle, end_group_handle, uuid, True ) # Add the service to the peer's service list services.append(service) self.on_service_discovered(service) # Check if we've reached the end already if end_group_handle == 0xFFFF: break # Stop if for some reason the list was empty if not response.handles_information: break # Move on to the next chunk starting_handle = response.handles_information[-1][1] + 1 return services async def discover_included_services(self, _service): ''' See Vol 3, Part G - 4.5.1 Find Included Services ''' # TODO return [] async def discover_characteristics(self, uuids, service): ''' See Vol 3, Part G - 4.6.1 Discover All Characteristics of a Service and 4.6.2 Discover Characteristics by UUID ''' # Cast the UUIDs type from string to object if needed uuids = [UUID(uuid) if isinstance(uuid, str) else uuid for uuid in uuids] # Decide which services to discover for services = [service] if service else self.services # Perform characteristic discovery for each service discovered_characteristics = [] for service in services: starting_handle = service.handle ending_handle = service.end_group_handle characteristics = [] while starting_handle <= ending_handle: response = await self.send_request( ATT_Read_By_Type_Request( starting_handle=starting_handle, ending_handle=ending_handle, attribute_type=GATT_CHARACTERISTIC_ATTRIBUTE_TYPE, ) ) if response is None: # TODO raise appropriate exception return [] # Check if we reached the end of the iteration if response.op_code == ATT_ERROR_RESPONSE: if response.error_code != ATT_ATTRIBUTE_NOT_FOUND_ERROR: # Unexpected end logger.warning( '!!! unexpected error while discovering characteristics: ' f'{HCI_Constant.error_name(response.error_code)}' ) # TODO raise appropriate exception return break # Stop if for some reason the list was empty if not response.attributes: break # Process all characteristics returned in this iteration for attribute_handle, attribute_value in response.attributes: if attribute_handle < starting_handle: # Something's not right logger.warning(f'bogus handle value: {attribute_handle}') return [] properties, handle = struct.unpack_from('