# Copyright 2021-2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ----------------------------------------------------------------------------- # GATT - Generic Attribute Profile # Client # # See Bluetooth spec @ Vol 3, Part G # # ----------------------------------------------------------------------------- # ----------------------------------------------------------------------------- # Imports # ----------------------------------------------------------------------------- import asyncio import logging import struct from colors import color from .core import ProtocolError, TimeoutError from .hci import * from .att import * from .gatt import ( GATT_CLIENT_CHARACTERISTIC_CONFIGURATION_DESCRIPTOR, GATT_REQUEST_TIMEOUT, GATT_PRIMARY_SERVICE_ATTRIBUTE_TYPE, GATT_CHARACTERISTIC_ATTRIBUTE_TYPE, Service, Characteristic, Descriptor ) # ----------------------------------------------------------------------------- # Logging # ----------------------------------------------------------------------------- logger = logging.getLogger(__name__) # ----------------------------------------------------------------------------- # GATT Client # ----------------------------------------------------------------------------- class Client: def __init__(self, connection): self.connection = connection self.mtu = ATT_DEFAULT_MTU self.mtu_exchange_done = False self.request_semaphore = asyncio.Semaphore(1) self.pending_request = None self.pending_response = None self.notification_subscribers = {} # Notification subscribers, by attribute handle self.indication_subscribers = {} # Indication subscribers, by attribute handle self.services = [] def send_gatt_pdu(self, pdu): self.connection.send_l2cap_pdu(ATT_CID, pdu) async def send_command(self, command): logger.debug(f'GATT Command from client: [0x{self.connection.handle:04X}] {command}') self.send_gatt_pdu(command.to_bytes()) async def send_request(self, request): logger.debug(f'GATT Request from client: [0x{self.connection.handle:04X}] {request}') # Wait until we can send (only one pending command at a time for the connection) response = None async with self.request_semaphore: assert(self.pending_request is None) assert(self.pending_response is None) # Create a future value to hold the eventual response self.pending_response = asyncio.get_running_loop().create_future() self.pending_request = request try: self.send_gatt_pdu(request.to_bytes()) response = await asyncio.wait_for(self.pending_response, GATT_REQUEST_TIMEOUT) except asyncio.TimeoutError: logger.warning(color('!!! GATT Request timeout', 'red')) raise TimeoutError(f'GATT timeout for {request.name}') finally: self.pending_request = None self.pending_response = None return response def send_confirmation(self, confirmation): logger.debug(f'GATT Confirmation from client: [0x{self.connection.handle:04X}] {confirmation}') self.send_gatt_pdu(confirmation.to_bytes()) async def request_mtu(self, mtu): # Check the range if mtu < ATT_DEFAULT_MTU: raise ValueError(f'MTU must be >= {ATT_DEFAULT_MTU}') if mtu > 0xFFFF: raise ValueError('MTU must be <= 0xFFFF') # We can only send one request per connection if self.mtu_exchange_done: return # Send the request self.mtu_exchange_done = True response = await self.send_request(ATT_Exchange_MTU_Request(client_rx_mtu = mtu)) if response.op_code == ATT_ERROR_RESPONSE: raise ProtocolError( response.error_code, 'att', ATT_PDU.error_name(response.error_code), response ) self.mtu = max(ATT_DEFAULT_MTU, response.server_rx_mtu) return self.mtu def get_services_by_uuid(self, uuid): return [service for service in self.services if service.uuid == uuid] def get_characteristics_by_uuid(self, uuid, service = None): services = [service] if service else self.services return [c for c in [c for s in services for c in s.characteristics] if c.uuid == uuid] def on_service_discovered(self, service): ''' Add a service to the service list if it wasn't already there ''' already_known = False for existing_service in self.services: if existing_service.handle == service.handle: already_known = True break if not already_known: self.services.append(service) async def discover_services(self, uuids = None): ''' See Vol 3, Part G - 4.4.1 Discover All Primary Services ''' starting_handle = 0x0001 services = [] while starting_handle < 0xFFFF: response = await self.send_request( ATT_Read_By_Group_Type_Request( starting_handle = starting_handle, ending_handle = 0xFFFF, attribute_group_type = GATT_PRIMARY_SERVICE_ATTRIBUTE_TYPE ) ) if response is None: # TODO raise appropriate exception return [] # Check if we reached the end of the iteration if response.op_code == ATT_ERROR_RESPONSE: if response.error_code != ATT_ATTRIBUTE_NOT_FOUND_ERROR: # Unexpected end logger.waning(f'!!! unexpected error while discovering services: {HCI_Constant.error_name(response.error_code)}') # TODO raise appropriate exception return break for attribute_handle, end_group_handle, attribute_value in response.attributes: if attribute_handle < starting_handle or end_group_handle < attribute_handle: # Something's not right logger.warning(f'bogus handle values: {attribute_handle} {end_group_handle}') return # Create a primary service object service = Service(UUID.from_bytes(attribute_value), [], True) service.handle = attribute_handle service.end_group_handle = end_group_handle # Filter out returned services based on the given uuids list if (not uuids) or (service.uuid in uuids): services.append(service) # Add the service to the peer's service list self.on_service_discovered(service) # Stop if for some reason the list was empty if not response.attributes: break # Move on to the next chunk starting_handle = response.attributes[-1][1] + 1 return services async def discover_service(self, uuid): ''' See Vol 3, Part G - 4.4.2 Discover Primary Service by Service UUID ''' # Force uuid to be a UUID object if type(uuid) is str: uuid = UUID(uuid) starting_handle = 0x0001 services = [] while starting_handle < 0xFFFF: response = await self.send_request( ATT_Find_By_Type_Value_Request( starting_handle = starting_handle, ending_handle = 0xFFFF, attribute_type = GATT_PRIMARY_SERVICE_ATTRIBUTE_TYPE, attribute_value = uuid.to_pdu_bytes() ) ) if response is None: # TODO raise appropriate exception return [] # Check if we reached the end of the iteration if response.op_code == ATT_ERROR_RESPONSE: if response.error_code != ATT_ATTRIBUTE_NOT_FOUND_ERROR: # Unexpected end logger.waning(f'!!! unexpected error while discovering services: {HCI_Constant.error_name(response.error_code)}') # TODO raise appropriate exception return break for attribute_handle, end_group_handle in response.handles_information: if attribute_handle < starting_handle or end_group_handle < attribute_handle: # Something's not right logger.warning(f'bogus handle values: {attribute_handle} {end_group_handle}') return # Create a primary service object service = Service(uuid, [], True) service.handle = attribute_handle service.end_group_handle = end_group_handle # Add the service to the peer's service list services.append(service) self.on_service_discovered(service) # Check if we've reached the end already if end_group_handle == 0xFFFF: break # Stop if for some reason the list was empty if not response.handles_information: break # Move on to the next chunk starting_handle = response.handles_information[-1][1] + 1 return services async def discover_included_services(self, service): ''' See Vol 3, Part G - 4.5.1 Find Included Services ''' # TODO return [] async def discover_characteristics(self, uuids, service): ''' See Vol 3, Part G - 4.6.1 Discover All Characteristics of a Service and 4.6.2 Discover Characteristics by UUID ''' # Cast the UUIDs type from string to object if needed uuids = [UUID(uuid) if type(uuid) is str else uuid for uuid in uuids] # Decide which services to discover for services = [service] if service else self.services # Perform characteristic discovery for each service discovered_characteristics = [] for service in services: starting_handle = service.handle ending_handle = service.end_group_handle characteristics = [] while starting_handle <= ending_handle: response = await self.send_request( ATT_Read_By_Type_Request( starting_handle = starting_handle, ending_handle = ending_handle, attribute_type = GATT_CHARACTERISTIC_ATTRIBUTE_TYPE ) ) if response is None: # TODO raise appropriate exception return [] # Check if we reached the end of the iteration if response.op_code == ATT_ERROR_RESPONSE: if response.error_code != ATT_ATTRIBUTE_NOT_FOUND_ERROR: # Unexpected end logger.warning(f'!!! unexpected error while discovering characteristics: {HCI_Constant.error_name(response.error_code)}') # TODO raise appropriate exception return break # Stop if for some reason the list was empty if not response.attributes: break # Process all characteristics returned in this iteration for attribute_handle, attribute_value in response.attributes: if attribute_handle < starting_handle: # Something's not right logger.warning(f'bogus handle value: {attribute_handle}') return [] properties, handle = struct.unpack_from('