Add some missing types to apps/console.py, bumble/gatt_client.py

Added via code inspection (not via a tool like pytype)
This commit is contained in:
Alan Rosenthal
2023-03-24 21:41:03 -04:00
committed by Alan Rosenthal
parent fdf2da7023
commit a50181e6b8
3 changed files with 52 additions and 19 deletions

View File

@@ -24,6 +24,7 @@ import logging
import os
import random
import re
from typing import Optional
from collections import OrderedDict
import click
@@ -58,6 +59,7 @@ from bumble.device import ConnectionParametersPreferences, Device, Connection, P
from bumble.utils import AsyncRunner
from bumble.transport import open_transport_or_link
from bumble.gatt import Characteristic, Service, CharacteristicDeclaration, Descriptor
from bumble.gatt_client import CharacteristicProxy
from bumble.hci import (
HCI_Constant,
HCI_LE_1M_PHY,
@@ -119,6 +121,8 @@ def parse_phys(phys):
# Console App
# -----------------------------------------------------------------------------
class ConsoleApp:
connected_peer: Optional[Peer]
def __init__(self):
self.known_addresses = set()
self.known_attributes = []
@@ -490,7 +494,9 @@ class ConsoleApp:
self.show_attributes(attributes)
def find_characteristic(self, param):
def find_characteristic(self, param) -> Optional[CharacteristicProxy]:
if not self.connected_peer:
return None
parts = param.split('.')
if len(parts) == 2:
service_uuid = UUID(parts[0]) if parts[0] != '*' else None

View File

@@ -28,8 +28,8 @@ import struct
from pyee import EventEmitter
from typing import Dict, Type, TYPE_CHECKING
from bumble.core import UUID, name_or_number, get_dict_key_by_value
from bumble.hci import HCI_Object, key_with_value
from bumble.core import UUID, name_or_number, get_dict_key_by_value, ProtocolError
from bumble.hci import HCI_Object, key_with_value, HCI_Constant
from bumble.colors import color
if TYPE_CHECKING:
@@ -185,13 +185,18 @@ UUID_2_FIELD_SPEC = lambda x, y: UUID.parse_uuid_2(x, y) # noqa: E731
# -----------------------------------------------------------------------------
# Exceptions
# -----------------------------------------------------------------------------
class ATT_Error(Exception):
def __init__(self, error_code, att_handle=0x0000):
self.error_code = error_code
class ATT_Error(ProtocolError):
def __init__(self, error_code, att_handle=0x0000, message=''):
super().__init__(
error_code,
error_namespace='att',
error_name=ATT_PDU.error_name(self.error_code),
)
self.att_handle = att_handle
self.message = message
def __str__(self):
return f'ATT_Error({ATT_PDU.error_name(self.error_code)})'
return f'ATT_Error(error={self.error_name}, handle={self.att_handle:04X}): {self.message}'
# -----------------------------------------------------------------------------

View File

@@ -23,9 +23,11 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import asyncio
import logging
import struct
from typing import List, Optional
from pyee import EventEmitter
@@ -50,6 +52,7 @@ from .att import (
ATT_Read_Request,
ATT_Write_Command,
ATT_Write_Request,
ATT_Error,
)
from . import core
from .core import UUID, InvalidStateError, ProtocolError
@@ -59,6 +62,7 @@ from .gatt import (
GATT_PRIMARY_SERVICE_ATTRIBUTE_TYPE,
GATT_REQUEST_TIMEOUT,
GATT_SECONDARY_SERVICE_ATTRIBUTE_TYPE,
Service,
Characteristic,
ClientCharacteristicConfigurationBits,
)
@@ -73,6 +77,8 @@ logger = logging.getLogger(__name__)
# Proxies
# -----------------------------------------------------------------------------
class AttributeProxy(EventEmitter):
client: Client
def __init__(self, client, handle, end_group_handle, attribute_type):
EventEmitter.__init__(self)
self.client = client
@@ -101,6 +107,9 @@ class AttributeProxy(EventEmitter):
class ServiceProxy(AttributeProxy):
uuid: UUID
characteristics: List[CharacteristicProxy]
@staticmethod
def from_client(service_class, client, service_uuid):
# The service and its characteristics are considered to have already been
@@ -130,6 +139,8 @@ class ServiceProxy(AttributeProxy):
class CharacteristicProxy(AttributeProxy):
descriptors: List[DescriptorProxy]
def __init__(self, client, handle, end_group_handle, uuid, properties):
super().__init__(client, handle, end_group_handle, uuid)
self.uuid = uuid
@@ -201,6 +212,8 @@ class ProfileServiceProxy:
# GATT Client
# -----------------------------------------------------------------------------
class Client:
services: List[ServiceProxy]
def __init__(self, connection):
self.connection = connection
self.mtu_exchange_done = False
@@ -306,7 +319,7 @@ class Client:
if not already_known:
self.services.append(service)
async def discover_services(self, uuids=None):
async def discover_services(self, uuids=None) -> List[ServiceProxy]:
'''
See Vol 3, Part G - 4.4.1 Discover All Primary Services
'''
@@ -332,8 +345,10 @@ class Client:
'!!! unexpected error while discovering services: '
f'{HCI_Constant.error_name(response.error_code)}'
)
# TODO raise appropriate exception
return
raise ATT_Error(
error_code=response.error_code,
message='Unexpected error while discovering services',
)
break
for (
@@ -349,7 +364,7 @@ class Client:
logger.warning(
f'bogus handle values: {attribute_handle} {end_group_handle}'
)
return
return []
# Create a service proxy for this service
service = ServiceProxy(
@@ -452,7 +467,9 @@ class Client:
# TODO
return []
async def discover_characteristics(self, uuids, service):
async def discover_characteristics(
self, uuids, service: Optional[ServiceProxy]
) -> List[CharacteristicProxy]:
'''
See Vol 3, Part G - 4.6.1 Discover All Characteristics of a Service and 4.6.2
Discover Characteristics by UUID
@@ -465,12 +482,12 @@ class Client:
services = [service] if service else self.services
# Perform characteristic discovery for each service
discovered_characteristics = []
discovered_characteristics: List[CharacteristicProxy] = []
for service in services:
starting_handle = service.handle
ending_handle = service.end_group_handle
characteristics = []
characteristics: List[CharacteristicProxy] = []
while starting_handle <= ending_handle:
response = await self.send_request(
ATT_Read_By_Type_Request(
@@ -491,8 +508,10 @@ class Client:
'!!! unexpected error while discovering characteristics: '
f'{HCI_Constant.error_name(response.error_code)}'
)
# TODO raise appropriate exception
return
raise ATT_Error(
error_code=response.error_code,
message='Unexpected error while discovering characteristics',
)
break
# Stop if for some reason the list was empty
@@ -535,8 +554,11 @@ class Client:
return discovered_characteristics
async def discover_descriptors(
self, characteristic=None, start_handle=None, end_handle=None
):
self,
characteristic: Optional[CharacteristicProxy] = None,
start_handle=None,
end_handle=None,
) -> List[DescriptorProxy]:
'''
See Vol 3, Part G - 4.7.1 Discover All Characteristic Descriptors
'''
@@ -549,7 +571,7 @@ class Client:
else:
return []
descriptors = []
descriptors: List[DescriptorProxy] = []
while starting_handle <= ending_handle:
response = await self.send_request(
ATT_Find_Information_Request(