add basic support for mypy type checking

This commit is contained in:
Gilles Boccon-Gibod
2023-01-20 00:20:50 -08:00
parent 8a91c614c7
commit 99758e4b7d
30 changed files with 280 additions and 77 deletions

View File

@@ -22,9 +22,11 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import struct
from colors import color
from pyee import EventEmitter
from typing import Dict, Type
from bumble.core import UUID, name_or_number
from bumble.hci import HCI_Object, key_with_value
@@ -197,7 +199,7 @@ class ATT_PDU:
See Bluetooth spec @ Vol 3, Part F - 3.3 ATTRIBUTE PDU
'''
pdu_classes = {}
pdu_classes: Dict[int, Type[ATT_PDU]] = {}
op_code = 0
name = None

View File

@@ -15,12 +15,14 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import asyncio
import struct
import time
import logging
from colors import color
from pyee import EventEmitter
from typing import Dict, Type
from .core import (
BT_ADVANCED_AUDIO_DISTRIBUTION_SERVICE,
@@ -627,7 +629,8 @@ class Message: # pylint:disable=attribute-defined-outside-init
RESPONSE_REJECT: 'RESPONSE_REJECT',
}
subclasses = {} # Subclasses, by signal identifier and message type
# Subclasses, by signal identifier and message type
subclasses: Dict[int, Dict[int, Type[Message]]] = {}
@staticmethod
def message_type_name(message_type):

View File

@@ -15,6 +15,7 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import struct
from .company_ids import COMPANY_IDENTIFIERS
@@ -145,7 +146,7 @@ class UUID:
'''
BASE_UUID = bytes.fromhex('00001000800000805F9B34FB')
UUIDS = [] # Registry of all instances created
UUIDS: list[UUID] = [] # Registry of all instances created
def __init__(self, uuid_str_or_int, name=None):
if isinstance(uuid_str_or_int, int):

View File

@@ -15,6 +15,7 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
from enum import IntEnum
import functools
import json
@@ -22,6 +23,8 @@ import asyncio
import logging
from contextlib import asynccontextmanager, AsyncExitStack
from dataclasses import dataclass
from typing import ClassVar
from colors import color
from .att import ATT_CID, ATT_DEFAULT_MTU, ATT_PDU
@@ -494,6 +497,7 @@ class Peer:
# -----------------------------------------------------------------------------
@dataclass
class ConnectionParametersPreferences:
default: ClassVar[ConnectionParametersPreferences]
connection_interval_min: int = DEVICE_DEFAULT_CONNECTION_INTERVAL_MIN
connection_interval_max: int = DEVICE_DEFAULT_CONNECTION_INTERVAL_MAX
max_latency: int = DEVICE_DEFAULT_CONNECTION_MAX_LATENCY
@@ -833,7 +837,7 @@ def host_event_handler(function):
# List of host event handlers for the Device class.
# (we define this list outside the class, because referencing a class in method
# decorators is not straightforward)
device_host_event_handlers = []
device_host_event_handlers: list[str] = []
# -----------------------------------------------------------------------------
@@ -2355,7 +2359,7 @@ class Device(CompositeEventEmitter):
if transport == BT_BR_EDR_TRANSPORT:
# Create a new connection
connection: Connection = self.pending_connections.pop(peer_address)
connection = self.pending_connections.pop(peer_address)
connection.complete(
connection_handle, peer_resolvable_address, role, connection_parameters
)

View File

@@ -217,7 +217,7 @@ class Service(Attribute):
uuid.to_pdu_bytes(),
)
self.uuid = uuid
self.included_services = []
# self.included_services = []
self.characteristics = characteristics[:]
self.primary = primary

View File

@@ -15,11 +15,13 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import struct
import collections
import logging
import functools
from colors import color
from typing import Dict, Type
from .core import (
BT_BR_EDR_TRANSPORT,
@@ -1690,6 +1692,11 @@ class Address:
RANDOM_IDENTITY_ADDRESS: 'RANDOM_IDENTITY_ADDRESS',
}
# Type declarations
NIL: Address
ANY: Address
ANY_RANDOM: Address
# pylint: disable-next=unnecessary-lambda
ADDRESS_TYPE_SPEC = {'size': 1, 'mapper': lambda x: Address.address_type_name(x)}
@@ -1874,7 +1881,7 @@ class HCI_Command(HCI_Packet):
'''
hci_packet_type = HCI_COMMAND_PACKET
command_classes = {}
command_classes: Dict[int, Type[HCI_Command]] = {}
@staticmethod
def command(fields=(), return_parameters_fields=()):
@@ -4020,8 +4027,8 @@ class HCI_Event(HCI_Packet):
'''
hci_packet_type = HCI_EVENT_PACKET
event_classes = {}
meta_event_classes = {}
event_classes: Dict[int, Type[HCI_Event]] = {}
meta_event_classes: Dict[int, Type[HCI_LE_Meta_Event]] = {}
@staticmethod
def event(fields=()):

View File

@@ -15,6 +15,7 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import asyncio
import logging
import struct
@@ -22,6 +23,7 @@ import struct
from collections import deque
from colors import color
from pyee import EventEmitter
from typing import Dict, Type
from .core import BT_CENTRAL_ROLE, InvalidStateError, ProtocolError
from .hci import (
@@ -184,7 +186,7 @@ class L2CAP_Control_Frame:
See Bluetooth spec @ Vol 3, Part A - 4 SIGNALING PACKET FORMATS
'''
classes = {}
classes: Dict[int, Type[L2CAP_Control_Frame]] = {}
code = 0
name = None

View File

@@ -17,7 +17,7 @@
# Imports
# -----------------------------------------------------------------------------
import struct
from typing import Tuple
from typing import Optional, Tuple
from ..gatt_client import ProfileServiceProxy
from ..gatt import (
@@ -52,14 +52,14 @@ class DeviceInformationService(TemplateService):
def __init__(
self,
manufacturer_name: str = None,
model_number: str = None,
serial_number: str = None,
hardware_revision: str = None,
firmware_revision: str = None,
software_revision: str = None,
system_id: Tuple[int, int] = None, # (OUI, Manufacturer ID)
ieee_regulatory_certification_data_list: bytes = None
manufacturer_name: Optional[str] = None,
model_number: Optional[str] = None,
serial_number: Optional[str] = None,
hardware_revision: Optional[str] = None,
firmware_revision: Optional[str] = None,
software_revision: Optional[str] = None,
system_id: Optional[Tuple[int, int]] = None, # (OUI, Manufacturer ID)
ieee_regulatory_certification_data_list: Optional[bytes] = None
# TODO: pnp_id
):
characteristics = [

0
bumble/profiles/py.typed Normal file
View File

0
bumble/py.typed Normal file
View File

View File

@@ -15,10 +15,12 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import logging
import struct
from colors import color
import colors
from typing import Dict, Type
from . import core
from .core import InvalidStateError
@@ -520,7 +522,7 @@ class SDP_PDU:
See Bluetooth spec @ Vol 3, Part B - 4.2 PROTOCOL DATA UNIT FORMAT
'''
sdp_pdu_classes = {}
sdp_pdu_classes: Dict[int, Type[SDP_PDU]] = {}
name = None
pdu_id = 0

View File

@@ -22,9 +22,12 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import logging
import asyncio
import secrets
from typing import Dict, Type
from pyee import EventEmitter
from colors import color
@@ -184,7 +187,7 @@ class SMP_Command:
See Bluetooth spec @ Vol 3, Part H - 3 SECURITY MANAGER PROTOCOL
'''
smp_classes = {}
smp_classes: Dict[int, Type[SMP_Command]] = {}
code = 0
name = ''

View File

@@ -1,4 +1,4 @@
# Copyright 2021-2022 Google LLC
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -20,9 +20,11 @@ import grpc
from .common import PumpedTransport, PumpedPacketSource, PumpedPacketSink
from .emulated_bluetooth_pb2_grpc import EmulatedBluetoothServiceStub
from .emulated_bluetooth_packets_pb2 import HCIPacket
from .emulated_bluetooth_vhci_pb2_grpc import VhciForwardingServiceStub
# pylint: disable-next=no-name-in-module
from .emulated_bluetooth_packets_pb2 import HCIPacket
# -----------------------------------------------------------------------------
# Logging

View File

@@ -1,4 +1,4 @@
# Copyright 2021-2022 Google LLC
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -16,10 +16,9 @@
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: emulated_bluetooth_packets.proto
"""Generated protocol buffer code."""
from google.protobuf.internal import builder as _builder
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)
@@ -31,20 +30,10 @@ DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
b'\n emulated_bluetooth_packets.proto\x12\x1b\x61ndroid.emulation.bluetooth\"\xfb\x01\n\tHCIPacket\x12?\n\x04type\x18\x01 \x01(\x0e\x32\x31.android.emulation.bluetooth.HCIPacket.PacketType\x12\x0e\n\x06packet\x18\x02 \x01(\x0c\"\x9c\x01\n\nPacketType\x12\x1b\n\x17PACKET_TYPE_UNSPECIFIED\x10\x00\x12\x1b\n\x17PACKET_TYPE_HCI_COMMAND\x10\x01\x12\x13\n\x0fPACKET_TYPE_ACL\x10\x02\x12\x13\n\x0fPACKET_TYPE_SCO\x10\x03\x12\x15\n\x11PACKET_TYPE_EVENT\x10\x04\x12\x13\n\x0fPACKET_TYPE_ISO\x10\x05\x42J\n\x1f\x63om.android.emulation.bluetoothP\x01\xf8\x01\x01\xa2\x02\x03\x41\x45\x42\xaa\x02\x1b\x41ndroid.Emulation.Bluetoothb\x06proto3'
)
_HCIPACKET = DESCRIPTOR.message_types_by_name['HCIPacket']
_HCIPACKET_PACKETTYPE = _HCIPACKET.enum_types_by_name['PacketType']
HCIPacket = _reflection.GeneratedProtocolMessageType(
'HCIPacket',
(_message.Message,),
{
'DESCRIPTOR': _HCIPACKET,
'__module__': 'emulated_bluetooth_packets_pb2'
# @@protoc_insertion_point(class_scope:android.emulation.bluetooth.HCIPacket)
},
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
_builder.BuildTopDescriptorsAndMessages(
DESCRIPTOR, 'emulated_bluetooth_packets_pb2', globals()
)
_sym_db.RegisterMessage(HCIPacket)
if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None

View File

@@ -0,0 +1,41 @@
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from google.protobuf.internal import enum_type_wrapper as _enum_type_wrapper
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from typing import ClassVar as _ClassVar, Optional as _Optional, Union as _Union
DESCRIPTOR: _descriptor.FileDescriptor
class HCIPacket(_message.Message):
__slots__ = ["packet", "type"]
class PacketType(int, metaclass=_enum_type_wrapper.EnumTypeWrapper):
__slots__ = []
PACKET_FIELD_NUMBER: _ClassVar[int]
PACKET_TYPE_ACL: HCIPacket.PacketType
PACKET_TYPE_EVENT: HCIPacket.PacketType
PACKET_TYPE_HCI_COMMAND: HCIPacket.PacketType
PACKET_TYPE_ISO: HCIPacket.PacketType
PACKET_TYPE_SCO: HCIPacket.PacketType
PACKET_TYPE_UNSPECIFIED: HCIPacket.PacketType
TYPE_FIELD_NUMBER: _ClassVar[int]
packet: bytes
type: HCIPacket.PacketType
def __init__(
self,
type: _Optional[_Union[HCIPacket.PacketType, str]] = ...,
packet: _Optional[bytes] = ...,
) -> None: ...

View File

@@ -0,0 +1,17 @@
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
"""Client and server classes corresponding to protobuf-defined services."""
import grpc

View File

@@ -1,4 +1,4 @@
# Copyright 2021-2022 Google LLC
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -16,10 +16,9 @@
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: emulated_bluetooth.proto
"""Generated protocol buffer code."""
from google.protobuf.internal import builder as _builder
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)
@@ -34,20 +33,8 @@ DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
b'\n\x18\x65mulated_bluetooth.proto\x12\x1b\x61ndroid.emulation.bluetooth\x1a emulated_bluetooth_packets.proto\"\x19\n\x07RawData\x12\x0e\n\x06packet\x18\x01 \x01(\x0c\x32\xcb\x02\n\x18\x45mulatedBluetoothService\x12\x64\n\x12registerClassicPhy\x12$.android.emulation.bluetooth.RawData\x1a$.android.emulation.bluetooth.RawData(\x01\x30\x01\x12`\n\x0eregisterBlePhy\x12$.android.emulation.bluetooth.RawData\x1a$.android.emulation.bluetooth.RawData(\x01\x30\x01\x12g\n\x11registerHCIDevice\x12&.android.emulation.bluetooth.HCIPacket\x1a&.android.emulation.bluetooth.HCIPacket(\x01\x30\x01\x42\"\n\x1e\x63om.android.emulator.bluetoothP\x01\x62\x06proto3'
)
_RAWDATA = DESCRIPTOR.message_types_by_name['RawData']
RawData = _reflection.GeneratedProtocolMessageType(
'RawData',
(_message.Message,),
{
'DESCRIPTOR': _RAWDATA,
'__module__': 'emulated_bluetooth_pb2'
# @@protoc_insertion_point(class_scope:android.emulation.bluetooth.RawData)
},
)
_sym_db.RegisterMessage(RawData)
_EMULATEDBLUETOOTHSERVICE = DESCRIPTOR.services_by_name['EmulatedBluetoothService']
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'emulated_bluetooth_pb2', globals())
if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None

View File

@@ -0,0 +1,26 @@
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import emulated_bluetooth_packets_pb2 as _emulated_bluetooth_packets_pb2
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from typing import ClassVar as _ClassVar, Optional as _Optional
DESCRIPTOR: _descriptor.FileDescriptor
class RawData(_message.Message):
__slots__ = ["packet"]
PACKET_FIELD_NUMBER: _ClassVar[int]
packet: bytes
def __init__(self, packet: _Optional[bytes] = ...) -> None: ...

View File

@@ -1,4 +1,4 @@
# Copyright 2021-2022 Google LLC
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.

View File

@@ -1,4 +1,4 @@
# Copyright 2021-2022 Google LLC
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -16,10 +16,9 @@
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: emulated_bluetooth_vhci.proto
"""Generated protocol buffer code."""
from google.protobuf.internal import builder as _builder
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)
@@ -27,15 +26,17 @@ from google.protobuf import symbol_database as _symbol_database
_sym_db = _symbol_database.Default()
import emulated_bluetooth_packets_pb2 as emulated__bluetooth__packets__pb2
from . import emulated_bluetooth_packets_pb2 as emulated__bluetooth__packets__pb2
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
b'\n\x1d\x65mulated_bluetooth_vhci.proto\x12\x1b\x61ndroid.emulation.bluetooth\x1a emulated_bluetooth_packets.proto2y\n\x15VhciForwardingService\x12`\n\nattachVhci\x12&.android.emulation.bluetooth.HCIPacket\x1a&.android.emulation.bluetooth.HCIPacket(\x01\x30\x01\x42J\n\x1f\x63om.android.emulation.bluetoothP\x01\xf8\x01\x01\xa2\x02\x03\x41\x45\x42\xaa\x02\x1b\x41ndroid.Emulation.Bluetoothb\x06proto3'
)
_VHCIFORWARDINGSERVICE = DESCRIPTOR.services_by_name['VhciForwardingService']
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
_builder.BuildTopDescriptorsAndMessages(
DESCRIPTOR, 'emulated_bluetooth_vhci_pb2', globals()
)
if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None

View File

@@ -0,0 +1,19 @@
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import emulated_bluetooth_packets_pb2 as _emulated_bluetooth_packets_pb2
from google.protobuf import descriptor as _descriptor
from typing import ClassVar as _ClassVar
DESCRIPTOR: _descriptor.FileDescriptor

View File

@@ -1,4 +1,4 @@
# Copyright 2021-2022 Google LLC
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.

View File

View File

@@ -204,7 +204,7 @@ async def open_pyusb_transport(spec):
await self.sink.stop()
usb.util.release_interface(self.device, 0)
usb_find = usb.core.find
usb_find = usb.core.find
try:
import libusb_package
except ImportError:
@@ -215,9 +215,7 @@ async def open_pyusb_transport(spec):
# Find the device according to the spec moniker
if ':' in spec:
vendor_id, product_id = spec.split(':')
device = usb_find(
idVendor=int(vendor_id, 16), idProduct=int(product_id, 16)
)
device = usb_find(idVendor=int(vendor_id, 16), idProduct=int(product_id, 16))
else:
device_index = int(spec)
devices = list(

View File

@@ -51,8 +51,12 @@ def load_libusb():
else:
if libusb_path := libusb_package.get_library_path():
logger.debug(f'loading libusb library at {libusb_path}')
dll_loader = ctypes.WinDLL if platform.system() == 'Windows' else ctypes.CDLL
libusb_dll = dll_loader(str(libusb_path), use_errno=True, use_last_error=True)
dll_loader = (
ctypes.WinDLL if platform.system() == 'Windows' else ctypes.CDLL
)
libusb_dll = dll_loader(
str(libusb_path), use_errno=True, use_last_error=True
)
usb1.loadLibrary(libusb_dll)