Compare commits

...

20 Commits

Author SHA1 Message Date
Gilles Boccon-Gibod
24a863983d Merge branch 'gbg/replace-bitstruct' of https://github.com/google/bumble into gbg/replace-bitstruct
# Conflicts:
#	bumble/a2dp.py
#	pyproject.toml
2023-02-04 09:31:18 -08:00
Gilles Boccon-Gibod
b7ef09d4a3 fix format 2023-02-04 09:26:31 -08:00
Gilles Boccon-Gibod
b5b6cd13b8 replace bitstruct with construct 2023-02-04 09:23:13 -08:00
Gilles Boccon-Gibod
ef781bc374 replace bitstruct with construct 2023-02-03 19:41:07 -08:00
Lucas Abel
00978c1d63 Merge pull request #118 from google/uael/type-hints
overall: add types hints to the small subset used by avatar
2023-02-02 12:48:40 -08:00
uael
b731f6f556 overall: add types hints to the small subset used by avatar 2023-02-02 19:37:55 +00:00
Lucas Abel
ed261886e1 Merge pull request #119 from google/uael/fix-ci-packages-version
build: fix version of packages running checks in CI
2023-02-02 11:03:34 -08:00
uael
5e18094c31 build: fix version of packages running checks in CI 2023-02-02 17:23:15 +00:00
Lucas Abel
9a9b4e5bf1 Merge pull request #117 from google/uael/host-fixes
host: fixed `.latency` attribute error
2023-01-27 17:38:11 -08:00
Abel Lucas
895f1618d8 host: fixed .latency attribute error 2023-01-27 23:05:43 +00:00
Gilles Boccon-Gibod
52746e0c68 Merge pull request #116 from google/barbibulle-patch-1
fix libusb-package dependency
2023-01-25 15:59:42 -08:00
Gilles Boccon-Gibod
f9b7072423 Update setup.cfg 2023-01-25 15:37:33 -08:00
Gilles Boccon-Gibod
fa4be1958f Merge pull request #114 from google/gbg/fix-constant-typo
fix typo in constant name
2023-01-23 08:50:07 -08:00
Gilles Boccon-Gibod
f1686d8a9a fix typo in constant name 2023-01-22 19:10:13 -08:00
Gilles Boccon-Gibod
5c6a7f2036 Merge pull request #113 from google/gbg/mypy
add basic support for mypy type checking
2023-01-20 08:08:19 -08:00
Gilles Boccon-Gibod
99758e4b7d add basic support for mypy type checking 2023-01-20 00:20:50 -08:00
Alan Rosenthal
7385de6a69 Merge pull request #95 from AlanRosenthal/alan/fix_show_attributes
Fix `show attributes`
2023-01-19 14:57:22 -05:00
Alan Rosenthal
bb297e7516 Fix show attributes
`show attributes` wasn't being populated since `show_attributes()` was never called.

Also updated `show attributes` to match the color and indentation of `show services`
2023-01-19 12:21:37 -05:00
Lucas Abel
8a91c614c7 Merge pull request #109 from qiaoccolato/main
transport: make libusb_package optional
2023-01-18 14:48:05 -08:00
Qiao Yang
70a50a74b7 transport: make libusb_package optional 2023-01-17 15:17:11 -08:00
40 changed files with 595 additions and 281 deletions

View File

@@ -57,7 +57,7 @@ from bumble.core import UUID, AdvertisingData, BT_LE_TRANSPORT
from bumble.device import ConnectionParametersPreferences, Device, Connection, Peer
from bumble.utils import AsyncRunner
from bumble.transport import open_transport_or_link
from bumble.gatt import Characteristic
from bumble.gatt import Characteristic, Service, CharacteristicDeclaration, Descriptor
from bumble.hci import (
HCI_Constant,
HCI_LE_1M_PHY,
@@ -154,10 +154,10 @@ class ConsoleApp:
'rssi': {'on': None, 'off': None},
'show': {
'scan': None,
'services': None,
'attributes': None,
'log': None,
'device': None,
'local-services': None,
'remote-services': None,
},
'filter': {
'address': None,
@@ -197,8 +197,8 @@ class ConsoleApp:
)
self.output_max_lines = 20
self.scan_results_text = FormattedTextControl()
self.services_text = FormattedTextControl()
self.attributes_text = FormattedTextControl()
self.local_services_text = FormattedTextControl()
self.remote_services_text = FormattedTextControl()
self.device_text = FormattedTextControl()
self.log_text = FormattedTextControl(
get_cursor_position=lambda: Point(0, max(0, len(self.log_lines) - 1))
@@ -214,12 +214,12 @@ class ConsoleApp:
filter=Condition(lambda: self.top_tab == 'scan'),
),
ConditionalContainer(
Frame(Window(self.services_text), title='Services'),
filter=Condition(lambda: self.top_tab == 'services'),
Frame(Window(self.local_services_text), title='Local Services'),
filter=Condition(lambda: self.top_tab == 'local-services'),
),
ConditionalContainer(
Frame(Window(self.attributes_text), title='Attributes'),
filter=Condition(lambda: self.top_tab == 'attributes'),
Frame(Window(self.remote_services_text), title='Remove Services'),
filter=Condition(lambda: self.top_tab == 'remote-services'),
),
ConditionalContainer(
Frame(Window(self.log_text, height=self.log_height), title='Log'),
@@ -281,6 +281,7 @@ class ConsoleApp:
self.device.listener = DeviceListener(self)
await self.device.power_on()
self.show_device(self.device)
self.show_local_services(self.device.gatt_server.attributes)
# Run the UI
await self.ui.run_async()
@@ -359,32 +360,38 @@ class ConsoleApp:
self.scan_results_text.text = ANSI('\n'.join(lines))
self.ui.invalidate()
def show_services(self, services):
def show_remote_services(self, services):
lines = []
del self.known_attributes[:]
for service in services:
lines.append(('ansicyan', str(service) + '\n'))
lines.append(("ansicyan", f"{service}\n"))
for characteristic in service.characteristics:
lines.append(('ansimagenta', ' ' + str(characteristic) + '\n'))
lines.append(('ansimagenta', f' {characteristic} + \n'))
self.known_attributes.append(
f'{service.uuid.to_hex_str()}.{characteristic.uuid.to_hex_str()}'
)
self.known_attributes.append(f'*.{characteristic.uuid.to_hex_str()}')
self.known_attributes.append(f'#{characteristic.handle:X}')
for descriptor in characteristic.descriptors:
lines.append(('ansigreen', ' ' + str(descriptor) + '\n'))
lines.append(("ansigreen", f" {descriptor}\n"))
self.services_text.text = lines
self.remote_services_text.text = lines
self.ui.invalidate()
def show_attributes(self, attributes):
def show_local_services(self, attributes):
lines = []
for attribute in attributes:
lines.append(('ansicyan', f'{attribute}\n'))
if isinstance(attribute, Service):
lines.append(("ansicyan", f"{attribute}\n"))
elif isinstance(attribute, (Characteristic, CharacteristicDeclaration)):
lines.append(("ansimagenta", f" {attribute}\n"))
elif isinstance(attribute, Descriptor):
lines.append(("ansigreen", f" {attribute}\n"))
else:
lines.append(("ansiyellow", f"{attribute}\n"))
self.attributes_text.text = lines
self.local_services_text.text = lines
self.ui.invalidate()
def show_device(self, device):
@@ -469,7 +476,7 @@ class ConsoleApp:
await self.connected_peer.discover_descriptors(characteristic)
self.append_to_output('discovery completed')
self.show_services(self.connected_peer.services)
self.show_remote_services(self.connected_peer.services)
async def discover_attributes(self):
if not self.connected_peer:
@@ -655,7 +662,13 @@ class ConsoleApp:
async def do_show(self, params):
if params:
if params[0] in {'scan', 'services', 'attributes', 'log', 'device'}:
if params[0] in {
'scan',
'log',
'device',
'local-services',
'remote-services',
}:
self.top_tab = params[0]
self.ui.invalidate()

View File

@@ -18,7 +18,7 @@
import struct
import logging
from collections import namedtuple
import bitstruct
import construct
from .company_ids import COMPANY_IDENTIFIERS
from .sdp import (
@@ -258,7 +258,17 @@ class SbcMediaCodecInformation(
A2DP spec - 4.3.2 Codec Specific Information Elements
'''
BIT_FIELDS = 'u4u4u4u2u2u8u8'
BIT_FIELDS = construct.Bitwise(
construct.Sequence(
construct.BitsInteger(4),
construct.BitsInteger(4),
construct.BitsInteger(4),
construct.BitsInteger(2),
construct.BitsInteger(2),
construct.BitsInteger(8),
construct.BitsInteger(8),
)
)
SAMPLING_FREQUENCY_BITS = {16000: 1 << 3, 32000: 1 << 2, 44100: 1 << 1, 48000: 1}
CHANNEL_MODE_BITS = {
SBC_MONO_CHANNEL_MODE: 1 << 3,
@@ -276,7 +286,7 @@ class SbcMediaCodecInformation(
@staticmethod
def from_bytes(data):
return SbcMediaCodecInformation(
*bitstruct.unpack(SbcMediaCodecInformation.BIT_FIELDS, data)
*SbcMediaCodecInformation.BIT_FIELDS.parse(data)
)
@classmethod
@@ -326,7 +336,7 @@ class SbcMediaCodecInformation(
)
def __bytes__(self):
return bitstruct.pack(self.BIT_FIELDS, *self)
return self.BIT_FIELDS.build(self)
def __str__(self):
channel_modes = ['MONO', 'DUAL_CHANNEL', 'STEREO', 'JOINT_STEREO']
@@ -350,14 +360,23 @@ class SbcMediaCodecInformation(
class AacMediaCodecInformation(
namedtuple(
'AacMediaCodecInformation',
['object_type', 'sampling_frequency', 'channels', 'vbr', 'bitrate'],
['object_type', 'sampling_frequency', 'channels', 'rfa', 'vbr', 'bitrate'],
)
):
'''
A2DP spec - 4.5.2 Codec Specific Information Elements
'''
BIT_FIELDS = 'u8u12u2p2u1u23'
BIT_FIELDS = construct.Bitwise(
construct.Sequence(
construct.BitsInteger(8),
construct.BitsInteger(12),
construct.BitsInteger(2),
construct.BitsInteger(2),
construct.BitsInteger(1),
construct.BitsInteger(23),
)
)
OBJECT_TYPE_BITS = {
MPEG_2_AAC_LC_OBJECT_TYPE: 1 << 7,
MPEG_4_AAC_LC_OBJECT_TYPE: 1 << 6,
@@ -383,7 +402,7 @@ class AacMediaCodecInformation(
@staticmethod
def from_bytes(data):
return AacMediaCodecInformation(
*bitstruct.unpack(AacMediaCodecInformation.BIT_FIELDS, data)
*AacMediaCodecInformation.BIT_FIELDS.parse(data)
)
@classmethod
@@ -394,6 +413,7 @@ class AacMediaCodecInformation(
object_type=cls.OBJECT_TYPE_BITS[object_type],
sampling_frequency=cls.SAMPLING_FREQUENCY_BITS[sampling_frequency],
channels=cls.CHANNELS_BITS[channels],
rfa=0,
vbr=vbr,
bitrate=bitrate,
)
@@ -411,7 +431,7 @@ class AacMediaCodecInformation(
)
def __bytes__(self):
return bitstruct.pack(self.BIT_FIELDS, *self)
return self.BIT_FIELDS.build(self)
def __str__(self):
object_types = [

View File

@@ -22,9 +22,11 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import struct
from colors import color
from pyee import EventEmitter
from typing import Dict, Type
from bumble.core import UUID, name_or_number
from bumble.hci import HCI_Object, key_with_value
@@ -197,7 +199,7 @@ class ATT_PDU:
See Bluetooth spec @ Vol 3, Part F - 3.3 ATTRIBUTE PDU
'''
pdu_classes = {}
pdu_classes: Dict[int, Type[ATT_PDU]] = {}
op_code = 0
name = None

View File

@@ -15,12 +15,14 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import asyncio
import struct
import time
import logging
from colors import color
from pyee import EventEmitter
from typing import Dict, Type
from .core import (
BT_ADVANCED_AUDIO_DISTRIBUTION_SERVICE,
@@ -627,7 +629,8 @@ class Message: # pylint:disable=attribute-defined-outside-init
RESPONSE_REJECT: 'RESPONSE_REJECT',
}
subclasses = {} # Subclasses, by signal identifier and message type
# Subclasses, by signal identifier and message type
subclasses: Dict[int, Dict[int, Type[Message]]] = {}
@staticmethod
def message_type_name(message_type):

View File

@@ -46,7 +46,6 @@ from bumble.hci import (
HCI_LE_Connection_Complete_Event,
HCI_LE_Read_Remote_Features_Complete_Event,
HCI_Number_Of_Completed_Packets_Event,
HCI_Object,
HCI_Packet,
)
@@ -1029,7 +1028,7 @@ class Controller:
}
return bytes([HCI_SUCCESS])
def on_hci_le_read_transmit_power_command(self, command):
def on_hci_le_read_transmit_power_command(self, _command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.74 LE Read Transmit Power Command
'''

View File

@@ -15,7 +15,9 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import struct
from typing import List, Optional, Tuple, Union, cast
from .company_ids import COMPANY_IDENTIFIERS
@@ -145,7 +147,7 @@ class UUID:
'''
BASE_UUID = bytes.fromhex('00001000800000805F9B34FB')
UUIDS = [] # Registry of all instances created
UUIDS: List[UUID] = [] # Registry of all instances created
def __init__(self, uuid_str_or_int, name=None):
if isinstance(uuid_str_or_int, int):
@@ -180,7 +182,7 @@ class UUID:
return self
@classmethod
def from_bytes(cls, uuid_bytes, name=None):
def from_bytes(cls, uuid_bytes: bytes, name: Optional[str] = None) -> UUID:
if len(uuid_bytes) in (2, 4, 16):
self = cls.__new__(cls)
self.uuid_bytes = uuid_bytes
@@ -224,7 +226,7 @@ class UUID:
'''
return self.to_bytes(force_128=(len(self.uuid_bytes) == 4))
def to_hex_str(self):
def to_hex_str(self) -> str:
if len(self.uuid_bytes) == 2 or len(self.uuid_bytes) == 4:
return bytes(reversed(self.uuid_bytes)).hex().upper()
@@ -606,6 +608,11 @@ class DeviceClass:
# -----------------------------------------------------------------------------
# Advertising Data
# -----------------------------------------------------------------------------
AdvertisingObject = Union[
List[UUID], Tuple[UUID, bytes], bytes, str, int, Tuple[int, int], Tuple[int, bytes]
]
class AdvertisingData:
# fmt: off
# pylint: disable=line-too-long
@@ -721,10 +728,12 @@ class AdvertisingData:
BR_EDR_CONTROLLER_FLAG = 0x08
BR_EDR_HOST_FLAG = 0x10
ad_structures: List[Tuple[int, bytes]]
# fmt: on
# pylint: enable=line-too-long
def __init__(self, ad_structures=None):
def __init__(self, ad_structures: Optional[List[Tuple[int, bytes]]] = None) -> None:
if ad_structures is None:
ad_structures = []
self.ad_structures = ad_structures[:]
@@ -751,7 +760,7 @@ class AdvertisingData:
return ','.join(bit_flags_to_strings(flags, flag_names))
@staticmethod
def uuid_list_to_objects(ad_data, uuid_size):
def uuid_list_to_objects(ad_data: bytes, uuid_size: int) -> List[UUID]:
uuids = []
offset = 0
while (uuid_size * (offset + 1)) <= len(ad_data):
@@ -828,7 +837,7 @@ class AdvertisingData:
# pylint: disable=too-many-return-statements
@staticmethod
def ad_data_to_object(ad_type, ad_data):
def ad_data_to_object(ad_type: int, ad_data: bytes) -> AdvertisingObject:
if ad_type in (
AdvertisingData.COMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS,
@@ -867,22 +876,22 @@ class AdvertisingData:
return ad_data.decode("utf-8")
if ad_type in (AdvertisingData.TX_POWER_LEVEL, AdvertisingData.FLAGS):
return ad_data[0]
return cast(int, struct.unpack('B', ad_data)[0])
if ad_type in (
AdvertisingData.APPEARANCE,
AdvertisingData.ADVERTISING_INTERVAL,
):
return struct.unpack('<H', ad_data)[0]
return cast(int, struct.unpack('<H', ad_data)[0])
if ad_type == AdvertisingData.CLASS_OF_DEVICE:
return struct.unpack('<I', bytes([*ad_data, 0]))[0]
return cast(int, struct.unpack('<I', bytes([*ad_data, 0]))[0])
if ad_type == AdvertisingData.PERIPHERAL_CONNECTION_INTERVAL_RANGE:
return struct.unpack('<HH', ad_data)
return cast(Tuple[int, int], struct.unpack('<HH', ad_data))
if ad_type == AdvertisingData.MANUFACTURER_SPECIFIC_DATA:
return (struct.unpack_from('<H', ad_data, 0)[0], ad_data[2:])
return (cast(int, struct.unpack_from('<H', ad_data, 0)[0]), ad_data[2:])
return ad_data
@@ -897,26 +906,27 @@ class AdvertisingData:
self.ad_structures.append((ad_type, ad_data))
offset += length
def get(self, type_id, return_all=False, raw=False):
def get_all(self, type_id: int, raw: bool = False) -> List[AdvertisingObject]:
'''
Get Advertising Data Structure(s) with a given type
If return_all is True, returns a (possibly empty) list of matches,
else returns the first entry, or None if no structure matches.
Returns a (possibly empty) list of matches.
'''
def process_ad_data(ad_data):
def process_ad_data(ad_data: bytes) -> AdvertisingObject:
return ad_data if raw else self.ad_data_to_object(type_id, ad_data)
if return_all:
return [
process_ad_data(ad[1]) for ad in self.ad_structures if ad[0] == type_id
]
return [process_ad_data(ad[1]) for ad in self.ad_structures if ad[0] == type_id]
return next(
(process_ad_data(ad[1]) for ad in self.ad_structures if ad[0] == type_id),
None,
)
def get(self, type_id: int, raw: bool = False) -> Optional[AdvertisingObject]:
'''
Get Advertising Data Structure(s) with a given type
Returns the first entry, or None if no structure matches.
'''
all = self.get_all(type_id, raw=raw)
return all[0] if all else None
def __bytes__(self):
return b''.join(

View File

@@ -15,6 +15,7 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
from enum import IntEnum
import functools
import json
@@ -22,6 +23,8 @@ import asyncio
import logging
from contextlib import asynccontextmanager, AsyncExitStack
from dataclasses import dataclass
from typing import Any, ClassVar, Dict, List, Optional, Tuple, Union
from colors import color
from .att import ATT_CID, ATT_DEFAULT_MTU, ATT_PDU
@@ -194,6 +197,8 @@ DEVICE_DEFAULT_L2CAP_COC_MAX_CREDITS = l2cap.L2CAP_LE_CREDIT_BASED_CONN
# -----------------------------------------------------------------------------
class Advertisement:
address: Address
TX_POWER_NOT_AVAILABLE = (
HCI_LE_Extended_Advertising_Report_Event.TX_POWER_INFORMATION_NOT_AVAILABLE
)
@@ -494,6 +499,7 @@ class Peer:
# -----------------------------------------------------------------------------
@dataclass
class ConnectionParametersPreferences:
default: ClassVar[ConnectionParametersPreferences]
connection_interval_min: int = DEVICE_DEFAULT_CONNECTION_INTERVAL_MIN
connection_interval_max: int = DEVICE_DEFAULT_CONNECTION_INTERVAL_MAX
max_latency: int = DEVICE_DEFAULT_CONNECTION_MAX_LATENCY
@@ -507,6 +513,17 @@ ConnectionParametersPreferences.default = ConnectionParametersPreferences()
# -----------------------------------------------------------------------------
class Connection(CompositeEventEmitter):
device: Device
handle: int
transport: int
self_address: Address
peer_address: Address
role: int
encryption: int
authenticated: bool
sc: bool
link_key_type: int
@composite_listener
class Listener:
def on_disconnection(self, reason):
@@ -607,6 +624,10 @@ class Connection(CompositeEventEmitter):
def is_encrypted(self):
return self.encryption != 0
@property
def is_incomplete(self) -> bool:
return self.handle == None
def send_l2cap_pdu(self, cid, pdu):
self.device.send_l2cap_pdu(self.handle, cid, pdu)
@@ -622,20 +643,22 @@ class Connection(CompositeEventEmitter):
):
return await self.device.open_l2cap_channel(self, psm, max_credits, mtu, mps)
async def disconnect(self, reason=HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR):
return await self.device.disconnect(self, reason)
async def disconnect(
self, reason: int = HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR
) -> None:
await self.device.disconnect(self, reason)
async def pair(self):
async def pair(self) -> None:
return await self.device.pair(self)
def request_pairing(self):
def request_pairing(self) -> None:
return self.device.request_pairing(self)
# [Classic only]
async def authenticate(self):
async def authenticate(self) -> None:
return await self.device.authenticate(self)
async def encrypt(self, enable=True):
async def encrypt(self, enable: bool = True) -> None:
return await self.device.encrypt(self, enable)
async def sustain(self, timeout=None):
@@ -703,10 +726,10 @@ class Connection(CompositeEventEmitter):
# -----------------------------------------------------------------------------
class DeviceConfiguration:
def __init__(self):
def __init__(self) -> None:
# Setup defaults
self.name = DEVICE_DEFAULT_NAME
self.address = DEVICE_DEFAULT_ADDRESS
self.address = Address(DEVICE_DEFAULT_ADDRESS)
self.class_of_device = DEVICE_DEFAULT_CLASS_OF_DEVICE
self.scan_response_data = DEVICE_DEFAULT_SCAN_RESPONSE_DATA
self.advertising_interval_min = DEVICE_DEFAULT_ADVERTISING_INTERVAL
@@ -726,12 +749,13 @@ class DeviceConfiguration:
)
self.irk = bytes(16) # This really must be changed for any level of security
self.keystore = None
self.gatt_services = []
self.gatt_services: List[Dict[str, Any]] = []
def load_from_dict(self, config):
def load_from_dict(self, config: Dict[str, Any]) -> None:
# Load simple properties
self.name = config.get('name', self.name)
self.address = Address(config.get('address', self.address))
if address := config.get('address', None):
self.address = Address(address)
self.class_of_device = config.get('class_of_device', self.class_of_device)
self.advertising_interval_min = config.get(
'advertising_interval', self.advertising_interval_min
@@ -833,11 +857,27 @@ def host_event_handler(function):
# List of host event handlers for the Device class.
# (we define this list outside the class, because referencing a class in method
# decorators is not straightforward)
device_host_event_handlers = []
device_host_event_handlers: list[str] = []
# -----------------------------------------------------------------------------
class Device(CompositeEventEmitter):
# incomplete list of fields.
random_address: Address
public_address: Address
classic_enabled: bool
name: str
class_of_device: int
gatt_server: gatt_server.Server
advertising_data: bytes
scan_response_data: bytes
connections: Dict[int, Connection]
pending_connections: Dict[Address, Connection]
classic_pending_accepts: Dict[
Address, List[asyncio.Future[Union[Connection, Tuple[Address, int, int]]]]
]
advertisement_accumulators: Dict[Address, AdvertisementDataAccumulator]
@composite_listener
class Listener:
def on_advertisement(self, advertisement):
@@ -884,12 +924,12 @@ class Device(CompositeEventEmitter):
def __init__(
self,
name=None,
address=None,
config=None,
host=None,
generic_access_service=True,
):
name: Optional[str] = None,
address: Optional[Address] = None,
config: Optional[DeviceConfiguration] = None,
host: Optional[Host] = None,
generic_access_service: bool = True,
) -> None:
super().__init__()
self._host = None
@@ -991,10 +1031,12 @@ class Device(CompositeEventEmitter):
setup_event_forwarding(self.gatt_server, self, 'characteristic_subscription')
# Set the initial host
self.host = host
if host:
self.host = host
@property
def host(self):
def host(self) -> Host:
assert self._host
return self._host
@host.setter
@@ -1028,15 +1070,18 @@ class Device(CompositeEventEmitter):
def sdp_service_records(self, service_records):
self.sdp_server.service_records = service_records
def lookup_connection(self, connection_handle):
def lookup_connection(self, connection_handle: int) -> Optional[Connection]:
if connection := self.connections.get(connection_handle):
return connection
return None
def find_connection_by_bd_addr(
self, bd_addr, transport=None, check_address_type=False
):
self,
bd_addr: Address,
transport: Optional[int] = None,
check_address_type: bool = False,
) -> Optional[Connection]:
for connection in self.connections.values():
if connection.peer_address.to_bytes() == bd_addr.to_bytes():
if (
@@ -1094,11 +1139,11 @@ class Device(CompositeEventEmitter):
logger.warning('!!! Command timed out')
raise CommandTimeoutError() from error
async def power_on(self):
async def power_on(self) -> None:
# Reset the controller
await self.host.reset()
response = await self.send_command(HCI_Read_BD_ADDR_Command())
response = await self.send_command(HCI_Read_BD_ADDR_Command()) # type: ignore[call-arg]
if response.return_parameters.status == HCI_SUCCESS:
logger.debug(
color(f'BD_ADDR: {response.return_parameters.bd_addr}', 'yellow')
@@ -1110,7 +1155,7 @@ class Device(CompositeEventEmitter):
HCI_Write_LE_Host_Support_Command(
le_supported_host=int(self.le_enabled),
simultaneous_le_host=int(self.le_simultaneous_enabled),
)
) # type: ignore[call-arg]
)
if self.le_enabled:
@@ -1120,7 +1165,7 @@ class Device(CompositeEventEmitter):
if self.host.supports_command(HCI_LE_RAND_COMMAND):
# Get 8 random bytes
response = await self.send_command(
HCI_LE_Rand_Command(), check_result=True
HCI_LE_Rand_Command(), check_result=True # type: ignore[call-arg]
)
# Ensure the address bytes can be a static random address
@@ -1141,7 +1186,7 @@ class Device(CompositeEventEmitter):
await self.send_command(
HCI_LE_Set_Random_Address_Command(
random_address=self.random_address
),
), # type: ignore[call-arg]
check_result=True,
)
@@ -1149,7 +1194,7 @@ class Device(CompositeEventEmitter):
if self.keystore and self.host.supports_command(
HCI_LE_CLEAR_RESOLVING_LIST_COMMAND
):
await self.send_command(HCI_LE_Clear_Resolving_List_Command())
await self.send_command(HCI_LE_Clear_Resolving_List_Command()) # type: ignore[call-arg]
resolving_keys = await self.keystore.get_resolving_keys()
for (irk, address) in resolving_keys:
@@ -1159,7 +1204,7 @@ class Device(CompositeEventEmitter):
peer_identity_address=address,
peer_irk=irk,
local_irk=self.irk,
)
) # type: ignore[call-arg]
)
# Enable address resolution
@@ -1174,28 +1219,24 @@ class Device(CompositeEventEmitter):
if self.classic_enabled:
await self.send_command(
HCI_Write_Local_Name_Command(local_name=self.name.encode('utf8'))
HCI_Write_Local_Name_Command(local_name=self.name.encode('utf8')) # type: ignore[call-arg]
)
await self.send_command(
HCI_Write_Class_Of_Device_Command(class_of_device=self.class_of_device)
HCI_Write_Class_Of_Device_Command(class_of_device=self.class_of_device) # type: ignore[call-arg]
)
await self.send_command(
HCI_Write_Simple_Pairing_Mode_Command(
simple_pairing_mode=int(self.classic_ssp_enabled)
)
) # type: ignore[call-arg]
)
await self.send_command(
HCI_Write_Secure_Connections_Host_Support_Command(
secure_connections_host_support=int(self.classic_sc_enabled)
)
) # type: ignore[call-arg]
)
await self.set_connectable(self.connectable)
await self.set_discoverable(self.discoverable)
# Let the SMP manager know about the address
# TODO: allow using a public address
self.smp_manager.address = self.random_address
# Done
self.powered_on = True
@@ -1217,11 +1258,11 @@ class Device(CompositeEventEmitter):
async def start_advertising(
self,
advertising_type=AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE,
target=None,
own_address_type=OwnAddressType.RANDOM,
auto_restart=False,
):
advertising_type: AdvertisingType = AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE,
target: Optional[Address] = None,
own_address_type: int = OwnAddressType.RANDOM,
auto_restart: bool = False,
) -> None:
# If we're advertising, stop first
if self.advertising:
await self.stop_advertising()
@@ -1231,7 +1272,7 @@ class Device(CompositeEventEmitter):
await self.send_command(
HCI_LE_Set_Advertising_Data_Command(
advertising_data=self.advertising_data
),
), # type: ignore[call-arg]
check_result=True,
)
@@ -1240,7 +1281,7 @@ class Device(CompositeEventEmitter):
await self.send_command(
HCI_LE_Set_Scan_Response_Data_Command(
scan_response_data=self.scan_response_data
),
), # type: ignore[call-arg]
check_result=True,
)
@@ -1266,13 +1307,13 @@ class Device(CompositeEventEmitter):
peer_address=peer_address,
advertising_channel_map=7,
advertising_filter_policy=0,
),
), # type: ignore[call-arg]
check_result=True,
)
# Enable advertising
await self.send_command(
HCI_LE_Set_Advertising_Enable_Command(advertising_enable=1),
HCI_LE_Set_Advertising_Enable_Command(advertising_enable=1), # type: ignore[call-arg]
check_result=True,
)
@@ -1281,11 +1322,11 @@ class Device(CompositeEventEmitter):
self.advertising_type = advertising_type
self.advertising = True
async def stop_advertising(self):
async def stop_advertising(self) -> None:
# Disable advertising
if self.advertising:
await self.send_command(
HCI_LE_Set_Advertising_Enable_Command(advertising_enable=0),
HCI_LE_Set_Advertising_Enable_Command(advertising_enable=0), # type: ignore[call-arg]
check_result=True,
)
@@ -1300,14 +1341,14 @@ class Device(CompositeEventEmitter):
async def start_scanning(
self,
legacy=False,
active=True,
scan_interval=DEVICE_DEFAULT_SCAN_INTERVAL, # Scan interval in ms
scan_window=DEVICE_DEFAULT_SCAN_WINDOW, # Scan window in ms
own_address_type=OwnAddressType.RANDOM,
filter_duplicates=False,
scanning_phys=(HCI_LE_1M_PHY, HCI_LE_CODED_PHY),
):
legacy: bool = False,
active: bool = True,
scan_interval: int = DEVICE_DEFAULT_SCAN_INTERVAL, # Scan interval in ms
scan_window: int = DEVICE_DEFAULT_SCAN_WINDOW, # Scan window in ms
own_address_type: int = OwnAddressType.RANDOM,
filter_duplicates: bool = False,
scanning_phys: Tuple[int, int] = (HCI_LE_1M_PHY, HCI_LE_CODED_PHY),
) -> None:
# Check that the arguments are legal
if scan_interval < scan_window:
raise ValueError('scan_interval must be >= scan_window')
@@ -1357,7 +1398,7 @@ class Device(CompositeEventEmitter):
scan_types=[scan_type] * scanning_phy_count,
scan_intervals=[int(scan_window / 0.625)] * scanning_phy_count,
scan_windows=[int(scan_window / 0.625)] * scanning_phy_count,
),
), # type: ignore[call-arg]
check_result=True,
)
@@ -1368,7 +1409,7 @@ class Device(CompositeEventEmitter):
filter_duplicates=1 if filter_duplicates else 0,
duration=0, # TODO allow other values
period=0, # TODO allow other values
),
), # type: ignore[call-arg]
check_result=True,
)
else:
@@ -1386,7 +1427,7 @@ class Device(CompositeEventEmitter):
le_scan_window=int(scan_window / 0.625),
own_address_type=own_address_type,
scanning_filter_policy=HCI_LE_Set_Scan_Parameters_Command.BASIC_UNFILTERED_POLICY,
),
), # type: ignore[call-arg]
check_result=True,
)
@@ -1394,25 +1435,25 @@ class Device(CompositeEventEmitter):
await self.send_command(
HCI_LE_Set_Scan_Enable_Command(
le_scan_enable=1, filter_duplicates=1 if filter_duplicates else 0
),
), # type: ignore[call-arg]
check_result=True,
)
self.scanning_is_passive = not active
self.scanning = True
async def stop_scanning(self):
async def stop_scanning(self) -> None:
# Disable scanning
if self.supports_le_feature(HCI_LE_EXTENDED_ADVERTISING_LE_SUPPORTED_FEATURE):
await self.send_command(
HCI_LE_Set_Extended_Scan_Enable_Command(
enable=0, filter_duplicates=0, duration=0, period=0
),
), # type: ignore[call-arg]
check_result=True,
)
else:
await self.send_command(
HCI_LE_Set_Scan_Enable_Command(le_scan_enable=0, filter_duplicates=0),
HCI_LE_Set_Scan_Enable_Command(le_scan_enable=0, filter_duplicates=0), # type: ignore[call-arg]
check_result=True,
)
@@ -1430,9 +1471,9 @@ class Device(CompositeEventEmitter):
if advertisement := accumulator.update(report):
self.emit('advertisement', advertisement)
async def start_discovery(self, auto_restart=True):
async def start_discovery(self, auto_restart: bool = True) -> None:
await self.send_command(
HCI_Write_Inquiry_Mode_Command(inquiry_mode=HCI_EXTENDED_INQUIRY_MODE),
HCI_Write_Inquiry_Mode_Command(inquiry_mode=HCI_EXTENDED_INQUIRY_MODE), # type: ignore[call-arg]
check_result=True,
)
@@ -1441,7 +1482,7 @@ class Device(CompositeEventEmitter):
lap=HCI_GENERAL_INQUIRY_LAP,
inquiry_length=DEVICE_DEFAULT_INQUIRY_LENGTH,
num_responses=0, # Unlimited number of responses.
)
) # type: ignore[call-arg]
)
if response.status != HCI_Command_Status_Event.PENDING:
self.discovering = False
@@ -1450,9 +1491,9 @@ class Device(CompositeEventEmitter):
self.auto_restart_inquiry = auto_restart
self.discovering = True
async def stop_discovery(self):
async def stop_discovery(self) -> None:
if self.discovering:
await self.send_command(HCI_Inquiry_Cancel_Command(), check_result=True)
await self.send_command(HCI_Inquiry_Cancel_Command(), check_result=True) # type: ignore[call-arg]
self.auto_restart_inquiry = True
self.discovering = False
@@ -1480,7 +1521,7 @@ class Device(CompositeEventEmitter):
HCI_Write_Scan_Enable_Command(scan_enable=scan_enable)
)
async def set_discoverable(self, discoverable=True):
async def set_discoverable(self, discoverable: bool = True) -> None:
self.discoverable = discoverable
if self.classic_enabled:
# Synthesize an inquiry response if none is set already
@@ -1500,7 +1541,7 @@ class Device(CompositeEventEmitter):
await self.send_command(
HCI_Write_Extended_Inquiry_Response_Command(
fec_required=0, extended_inquiry_response=self.inquiry_response
),
), # type: ignore[call-arg]
check_result=True,
)
await self.set_scan_enable(
@@ -1508,7 +1549,7 @@ class Device(CompositeEventEmitter):
page_scan_enabled=self.connectable,
)
async def set_connectable(self, connectable=True):
async def set_connectable(self, connectable: bool = True) -> None:
self.connectable = connectable
if self.classic_enabled:
await self.set_scan_enable(
@@ -1518,12 +1559,14 @@ class Device(CompositeEventEmitter):
async def connect(
self,
peer_address,
transport=BT_LE_TRANSPORT,
connection_parameters_preferences=None,
own_address_type=OwnAddressType.RANDOM,
timeout=DEVICE_DEFAULT_CONNECT_TIMEOUT,
):
peer_address: Union[Address, str],
transport: int = BT_LE_TRANSPORT,
connection_parameters_preferences: Optional[
Dict[int, ConnectionParametersPreferences]
] = None,
own_address_type: int = OwnAddressType.RANDOM,
timeout: Optional[float] = DEVICE_DEFAULT_CONNECT_TIMEOUT,
) -> Connection:
'''
Request a connection to a peer.
When transport is BLE, this method cannot be called if there is already a
@@ -1570,6 +1613,8 @@ class Device(CompositeEventEmitter):
):
raise ValueError('BR/EDR addresses must be PUBLIC')
assert isinstance(peer_address, Address)
def on_connection(connection):
if transport == BT_LE_TRANSPORT or (
# match BR/EDR connection event against peer address
@@ -1687,7 +1732,7 @@ class Device(CompositeEventEmitter):
supervision_timeouts=supervision_timeouts,
min_ce_lengths=min_ce_lengths,
max_ce_lengths=max_ce_lengths,
)
) # type: ignore[call-arg]
)
else:
if HCI_LE_1M_PHY not in connection_parameters_preferences:
@@ -1716,7 +1761,7 @@ class Device(CompositeEventEmitter):
supervision_timeout=int(prefs.supervision_timeout / 10),
min_ce_length=int(prefs.min_ce_length / 0.625),
max_ce_length=int(prefs.max_ce_length / 0.625),
)
) # type: ignore[call-arg]
)
else:
# Save pending connection
@@ -1733,7 +1778,7 @@ class Device(CompositeEventEmitter):
clock_offset=0x0000,
allow_role_switch=0x01,
reserved=0,
)
) # type: ignore[call-arg]
)
if result.status != HCI_Command_Status_Event.PENDING:
@@ -1752,10 +1797,10 @@ class Device(CompositeEventEmitter):
)
except asyncio.TimeoutError:
if transport == BT_LE_TRANSPORT:
await self.send_command(HCI_LE_Create_Connection_Cancel_Command())
await self.send_command(HCI_LE_Create_Connection_Cancel_Command()) # type: ignore[call-arg]
else:
await self.send_command(
HCI_Create_Connection_Cancel_Command(bd_addr=peer_address)
HCI_Create_Connection_Cancel_Command(bd_addr=peer_address) # type: ignore[call-arg]
)
try:
@@ -1773,10 +1818,10 @@ class Device(CompositeEventEmitter):
async def accept(
self,
peer_address=Address.ANY,
role=BT_PERIPHERAL_ROLE,
timeout=DEVICE_DEFAULT_CONNECT_TIMEOUT,
):
peer_address: Union[Address, str] = Address.ANY,
role: int = BT_PERIPHERAL_ROLE,
timeout: Optional[float] = DEVICE_DEFAULT_CONNECT_TIMEOUT,
) -> Connection:
'''
Wait and accept any incoming connection or a connection from `peer_address` when
set.
@@ -1798,22 +1843,24 @@ class Device(CompositeEventEmitter):
peer_address, BT_BR_EDR_TRANSPORT
) # TODO: timeout
assert isinstance(peer_address, Address)
if peer_address == Address.NIL:
raise ValueError('accept on nil address')
# Create a future so that we can wait for the request
pending_request = asyncio.get_running_loop().create_future()
pending_request_fut = asyncio.get_running_loop().create_future()
if peer_address == Address.ANY:
self.classic_pending_accepts[Address.ANY].append(pending_request)
self.classic_pending_accepts[Address.ANY].append(pending_request_fut)
elif peer_address in self.classic_pending_accepts:
raise InvalidStateError('accept connection already pending')
else:
self.classic_pending_accepts[peer_address] = pending_request
self.classic_pending_accepts[peer_address] = [pending_request_fut]
try:
# Wait for a request or a completed connection
pending_request = self.abort_on('flush', pending_request)
pending_request = self.abort_on('flush', pending_request_fut)
result = await (
asyncio.wait_for(pending_request, timeout)
if timeout
@@ -1822,7 +1869,7 @@ class Device(CompositeEventEmitter):
except Exception:
# Remove future from device context
if peer_address == Address.ANY:
self.classic_pending_accepts[Address.ANY].remove(pending_request)
self.classic_pending_accepts[Address.ANY].remove(pending_request_fut)
else:
self.classic_pending_accepts.pop(peer_address)
raise
@@ -1834,6 +1881,7 @@ class Device(CompositeEventEmitter):
# Otherwise, result came from `on_connection_request`
peer_address, _class_of_device, _link_type = result
assert isinstance(peer_address, Address)
# Create a future so that we can wait for the connection's result
pending_connection = asyncio.get_running_loop().create_future()
@@ -1863,7 +1911,7 @@ class Device(CompositeEventEmitter):
try:
# Accept connection request
await self.send_command(
HCI_Accept_Connection_Request_Command(bd_addr=peer_address, role=role)
HCI_Accept_Connection_Request_Command(bd_addr=peer_address, role=role) # type: ignore[call-arg]
)
# Wait for connection complete
@@ -2239,7 +2287,7 @@ class Device(CompositeEventEmitter):
)
# [Classic only]
async def request_remote_name(self, remote): # remote: Connection | Address
async def request_remote_name(self, remote: Union[Address, Connection]) -> str:
# Set up event handlers
pending_name = asyncio.get_running_loop().create_future()
@@ -2267,7 +2315,7 @@ class Device(CompositeEventEmitter):
page_scan_repetition_mode=HCI_Remote_Name_Request_Command.R2,
reserved=0,
clock_offset=0, # TODO investigate non-0 values
)
) # type: ignore[call-arg]
)
if result.status != HCI_COMMAND_STATUS_PENDING:
@@ -2355,7 +2403,7 @@ class Device(CompositeEventEmitter):
if transport == BT_BR_EDR_TRANSPORT:
# Create a new connection
connection: Connection = self.pending_connections.pop(peer_address)
connection = self.pending_connections.pop(peer_address)
connection.complete(
connection_handle, peer_resolvable_address, role, connection_parameters
)
@@ -2368,7 +2416,7 @@ class Device(CompositeEventEmitter):
# In this case, set the completed `connection` to the `accept` future
# result.
if peer_address in self.classic_pending_accepts:
future = self.classic_pending_accepts.pop(peer_address)
future, *_ = self.classic_pending_accepts.pop(peer_address)
future.set_result(connection)
# Emit an event to notify listeners of the new connection
@@ -2469,7 +2517,7 @@ class Device(CompositeEventEmitter):
# match a pending future using `bd_addr`
if bd_addr in self.classic_pending_accepts:
future = self.classic_pending_accepts.pop(bd_addr)
future, *_ = self.classic_pending_accepts.pop(bd_addr)
future.set_result((bd_addr, class_of_device, link_type))
# match first pending future for ANY address

View File

@@ -28,7 +28,7 @@ import enum
import functools
import logging
import struct
from typing import Sequence
from typing import Optional, Sequence
from colors import color
from .core import UUID, get_dict_key_by_value
@@ -204,6 +204,8 @@ class Service(Attribute):
See Vol 3, Part G - 3.1 SERVICE DEFINITION
'''
uuid: UUID
def __init__(self, uuid, characteristics: list[Characteristic], primary=True):
# Convert the uuid to a UUID object if it isn't already
if isinstance(uuid, str):
@@ -217,11 +219,11 @@ class Service(Attribute):
uuid.to_pdu_bytes(),
)
self.uuid = uuid
self.included_services = []
# self.included_services = []
self.characteristics = characteristics[:]
self.primary = primary
def get_advertising_data(self):
def get_advertising_data(self) -> Optional[bytes]:
"""
Get Service specific advertising data
Defined by each Service, default value is empty

View File

@@ -27,7 +27,7 @@ import asyncio
import logging
from collections import defaultdict
import struct
from typing import Tuple, Optional
from typing import List, Tuple, Optional
from pyee import EventEmitter
from colors import color
@@ -90,6 +90,8 @@ GATT_SERVER_DEFAULT_MAX_MTU = 517
# GATT Server
# -----------------------------------------------------------------------------
class Server(EventEmitter):
attributes: List[Attribute]
def __init__(self, device):
super().__init__()
self.device = device
@@ -140,6 +142,7 @@ class Server(EventEmitter):
attribute
for attribute in self.attributes
if attribute.type == GATT_PRIMARY_SERVICE_ATTRIBUTE_TYPE
and isinstance(attribute, Service)
and attribute.uuid == service_uuid
),
None,

View File

@@ -15,11 +15,13 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import struct
import collections
import logging
import functools
from colors import color
from typing import Dict, Type, Union
from .core import (
BT_BR_EDR_TRANSPORT,
@@ -1690,6 +1692,11 @@ class Address:
RANDOM_IDENTITY_ADDRESS: 'RANDOM_IDENTITY_ADDRESS',
}
# Type declarations
NIL: Address
ANY: Address
ANY_RANDOM: Address
# pylint: disable-next=unnecessary-lambda
ADDRESS_TYPE_SPEC = {'size': 1, 'mapper': lambda x: Address.address_type_name(x)}
@@ -1722,7 +1729,9 @@ class Address:
address_type = data[offset - 1]
return Address.parse_address_with_type(data, offset, address_type)
def __init__(self, address, address_type=RANDOM_DEVICE_ADDRESS):
def __init__(
self, address: Union[bytes, str], address_type: int = RANDOM_DEVICE_ADDRESS
):
'''
Initialize an instance. `address` may be a byte array in little-endian
format, or a hex string in big-endian format (with optional ':'
@@ -1874,7 +1883,7 @@ class HCI_Command(HCI_Packet):
'''
hci_packet_type = HCI_COMMAND_PACKET
command_classes = {}
command_classes: Dict[int, Type[HCI_Command]] = {}
@staticmethod
def command(fields=(), return_parameters_fields=()):
@@ -4020,8 +4029,8 @@ class HCI_Event(HCI_Packet):
'''
hci_packet_type = HCI_EVENT_PACKET
event_classes = {}
meta_event_classes = {}
event_classes: Dict[int, Type[HCI_Event]] = {}
meta_event_classes: Dict[int, Type[HCI_LE_Meta_Event]] = {}
@staticmethod
def event(fields=()):

View File

@@ -141,7 +141,7 @@ class Host(AbortableEventEmitter):
if controller_sink:
self.set_packet_sink(controller_sink)
async def flush(self):
async def flush(self) -> None:
# Make sure no command is pending
await self.command_semaphore.acquire()
@@ -660,7 +660,7 @@ class Host(AbortableEventEmitter):
connection_handle=event.connection_handle,
interval_min=event.interval_min,
interval_max=event.interval_max,
latency=event.latency,
max_latency=event.max_latency,
timeout=event.timeout,
min_ce_length=0,
max_ce_length=0,

View File

@@ -24,6 +24,7 @@ import asyncio
import logging
import os
import json
from typing import Optional
from colors import color
from .hci import Address
@@ -242,7 +243,7 @@ class JsonKeyStore(KeyStore):
# Atomically replace the previous file
os.rename(temp_filename, self.filename)
async def delete(self, name):
async def delete(self, name: str) -> None:
db = await self.load()
namespace = db.get(self.namespace)
@@ -278,7 +279,7 @@ class JsonKeyStore(KeyStore):
await self.save(db)
async def get(self, name):
async def get(self, name: str) -> Optional[PairingKeys]:
db = await self.load()
namespace = db.get(self.namespace)

View File

@@ -15,6 +15,7 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import asyncio
import logging
import struct
@@ -22,6 +23,7 @@ import struct
from collections import deque
from colors import color
from pyee import EventEmitter
from typing import Dict, Type
from .core import BT_CENTRAL_ROLE, InvalidStateError, ProtocolError
from .hci import (
@@ -184,7 +186,7 @@ class L2CAP_Control_Frame:
See Bluetooth spec @ Vol 3, Part A - 4 SIGNALING PACKET FORMATS
'''
classes = {}
classes: Dict[int, Type[L2CAP_Control_Frame]] = {}
code = 0
name = None
@@ -383,7 +385,7 @@ class L2CAP_Connection_Response(L2CAP_Control_Frame):
CONNECTION_SUCCESSFUL = 0x0000
CONNECTION_PENDING = 0x0001
CONNECTION_REFUSED_LE_PSM_NOT_SUPPORTED = 0x0002
CONNECTION_REFUSED_PSM_NOT_SUPPORTED = 0x0002
CONNECTION_REFUSED_SECURITY_BLOCK = 0x0003
CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE = 0x0004
CONNECTION_REFUSED_INVALID_SOURCE_CID = 0x0006
@@ -394,7 +396,7 @@ class L2CAP_Connection_Response(L2CAP_Control_Frame):
RESULT_NAMES = {
CONNECTION_SUCCESSFUL: 'CONNECTION_SUCCESSFUL',
CONNECTION_PENDING: 'CONNECTION_PENDING',
CONNECTION_REFUSED_LE_PSM_NOT_SUPPORTED: 'CONNECTION_REFUSED_LE_PSM_NOT_SUPPORTED',
CONNECTION_REFUSED_PSM_NOT_SUPPORTED: 'CONNECTION_REFUSED_PSM_NOT_SUPPORTED',
CONNECTION_REFUSED_SECURITY_BLOCK: 'CONNECTION_REFUSED_SECURITY_BLOCK',
CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE: 'CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE',
CONNECTION_REFUSED_INVALID_SOURCE_CID: 'CONNECTION_REFUSED_INVALID_SOURCE_CID',
@@ -1619,7 +1621,7 @@ class ChannelManager:
destination_cid=request.source_cid,
source_cid=0,
# pylint: disable=line-too-long
result=L2CAP_Connection_Response.CONNECTION_REFUSED_LE_PSM_NOT_SUPPORTED,
result=L2CAP_Connection_Response.CONNECTION_REFUSED_PSM_NOT_SUPPORTED,
status=0x0000,
),
)

View File

@@ -17,7 +17,7 @@
# Imports
# -----------------------------------------------------------------------------
import struct
from typing import Tuple
from typing import Optional, Tuple
from ..gatt_client import ProfileServiceProxy
from ..gatt import (
@@ -52,14 +52,14 @@ class DeviceInformationService(TemplateService):
def __init__(
self,
manufacturer_name: str = None,
model_number: str = None,
serial_number: str = None,
hardware_revision: str = None,
firmware_revision: str = None,
software_revision: str = None,
system_id: Tuple[int, int] = None, # (OUI, Manufacturer ID)
ieee_regulatory_certification_data_list: bytes = None
manufacturer_name: Optional[str] = None,
model_number: Optional[str] = None,
serial_number: Optional[str] = None,
hardware_revision: Optional[str] = None,
firmware_revision: Optional[str] = None,
software_revision: Optional[str] = None,
system_id: Optional[Tuple[int, int]] = None, # (OUI, Manufacturer ID)
ieee_regulatory_certification_data_list: Optional[bytes] = None
# TODO: pnp_id
):
characteristics = [

0
bumble/profiles/py.typed Normal file
View File

0
bumble/py.typed Normal file
View File

View File

@@ -15,10 +15,12 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import logging
import struct
from colors import color
import colors
from typing import Dict, List, Type
from . import core
from .core import InvalidStateError
@@ -181,63 +183,63 @@ class DataElement:
raise ValueError('integer types must have a value size specified')
@staticmethod
def nil():
def nil() -> DataElement:
return DataElement(DataElement.NIL, None)
@staticmethod
def unsigned_integer(value, value_size):
def unsigned_integer(value: int, value_size: int) -> DataElement:
return DataElement(DataElement.UNSIGNED_INTEGER, value, value_size)
@staticmethod
def unsigned_integer_8(value):
def unsigned_integer_8(value: int) -> DataElement:
return DataElement(DataElement.UNSIGNED_INTEGER, value, value_size=1)
@staticmethod
def unsigned_integer_16(value):
def unsigned_integer_16(value: int) -> DataElement:
return DataElement(DataElement.UNSIGNED_INTEGER, value, value_size=2)
@staticmethod
def unsigned_integer_32(value):
def unsigned_integer_32(value: int) -> DataElement:
return DataElement(DataElement.UNSIGNED_INTEGER, value, value_size=4)
@staticmethod
def signed_integer(value, value_size):
def signed_integer(value: int, value_size: int) -> DataElement:
return DataElement(DataElement.SIGNED_INTEGER, value, value_size)
@staticmethod
def signed_integer_8(value):
def signed_integer_8(value: int) -> DataElement:
return DataElement(DataElement.SIGNED_INTEGER, value, value_size=1)
@staticmethod
def signed_integer_16(value):
def signed_integer_16(value: int) -> DataElement:
return DataElement(DataElement.SIGNED_INTEGER, value, value_size=2)
@staticmethod
def signed_integer_32(value):
def signed_integer_32(value: int) -> DataElement:
return DataElement(DataElement.SIGNED_INTEGER, value, value_size=4)
@staticmethod
def uuid(value):
def uuid(value: core.UUID) -> DataElement:
return DataElement(DataElement.UUID, value)
@staticmethod
def text_string(value):
def text_string(value: str) -> DataElement:
return DataElement(DataElement.TEXT_STRING, value)
@staticmethod
def boolean(value):
def boolean(value: bool) -> DataElement:
return DataElement(DataElement.BOOLEAN, value)
@staticmethod
def sequence(value):
def sequence(value: List[DataElement]) -> DataElement:
return DataElement(DataElement.SEQUENCE, value)
@staticmethod
def alternative(value):
def alternative(value: List[DataElement]) -> DataElement:
return DataElement(DataElement.ALTERNATIVE, value)
@staticmethod
def url(value):
def url(value: str) -> DataElement:
return DataElement(DataElement.URL, value)
@staticmethod
@@ -456,7 +458,7 @@ class DataElement:
# -----------------------------------------------------------------------------
class ServiceAttribute:
def __init__(self, attribute_id, value):
def __init__(self, attribute_id: int, value: DataElement) -> None:
self.id = attribute_id
self.value = value
@@ -520,7 +522,7 @@ class SDP_PDU:
See Bluetooth spec @ Vol 3, Part B - 4.2 PROTOCOL DATA UNIT FORMAT
'''
sdp_pdu_classes = {}
sdp_pdu_classes: Dict[int, Type[SDP_PDU]] = {}
name = None
pdu_id = 0

View File

@@ -22,9 +22,12 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import logging
import asyncio
import secrets
from typing import Dict, Optional, Type
from pyee import EventEmitter
from colors import color
@@ -184,7 +187,7 @@ class SMP_Command:
See Bluetooth spec @ Vol 3, Part H - 3 SECURITY MANAGER PROTOCOL
'''
smp_classes = {}
smp_classes: Dict[int, Type[SMP_Command]] = {}
code = 0
name = ''
@@ -501,27 +504,27 @@ class PairingDelegate:
def __init__(
self,
io_capability=NO_OUTPUT_NO_INPUT,
local_initiator_key_distribution=DEFAULT_KEY_DISTRIBUTION,
local_responder_key_distribution=DEFAULT_KEY_DISTRIBUTION,
):
io_capability: int = NO_OUTPUT_NO_INPUT,
local_initiator_key_distribution: int = DEFAULT_KEY_DISTRIBUTION,
local_responder_key_distribution: int = DEFAULT_KEY_DISTRIBUTION,
) -> None:
self.io_capability = io_capability
self.local_initiator_key_distribution = local_initiator_key_distribution
self.local_responder_key_distribution = local_responder_key_distribution
async def accept(self):
async def accept(self) -> bool:
return True
async def confirm(self):
async def confirm(self) -> bool:
return True
async def compare_numbers(self, _number, _digits=6):
async def compare_numbers(self, _number: int, _digits: int = 6) -> bool:
return True
async def get_number(self):
async def get_number(self) -> int:
return 0
async def display_number(self, _number, _digits=6):
async def display_number(self, _number: int, _digits: int = 6) -> None:
pass
async def key_distribution_response(
@@ -535,7 +538,13 @@ class PairingDelegate:
# -----------------------------------------------------------------------------
class PairingConfig:
def __init__(self, sc=True, mitm=True, bonding=True, delegate=None):
def __init__(
self,
sc: bool = True,
mitm: bool = True,
bonding: bool = True,
delegate: Optional[PairingDelegate] = None,
) -> None:
self.sc = sc
self.mitm = mitm
self.bonding = bonding

View File

@@ -28,7 +28,7 @@ logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
async def open_transport(name):
async def open_transport(name: str) -> Transport:
'''
Open a transport by name.
The name must be <type>:<parameters>

View File

@@ -1,4 +1,4 @@
# Copyright 2021-2022 Google LLC
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -20,9 +20,11 @@ import grpc
from .common import PumpedTransport, PumpedPacketSource, PumpedPacketSink
from .emulated_bluetooth_pb2_grpc import EmulatedBluetoothServiceStub
from .emulated_bluetooth_packets_pb2 import HCIPacket
from .emulated_bluetooth_vhci_pb2_grpc import VhciForwardingServiceStub
# pylint: disable-next=no-name-in-module
from .emulated_bluetooth_packets_pb2 import HCIPacket
# -----------------------------------------------------------------------------
# Logging

View File

@@ -259,7 +259,7 @@ class Transport:
def __iter__(self):
return iter((self.source, self.sink))
async def close(self):
async def close(self) -> None:
self.source.close()
self.sink.close()

View File

@@ -1,4 +1,4 @@
# Copyright 2021-2022 Google LLC
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -16,10 +16,9 @@
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: emulated_bluetooth_packets.proto
"""Generated protocol buffer code."""
from google.protobuf.internal import builder as _builder
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)
@@ -31,20 +30,10 @@ DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
b'\n emulated_bluetooth_packets.proto\x12\x1b\x61ndroid.emulation.bluetooth\"\xfb\x01\n\tHCIPacket\x12?\n\x04type\x18\x01 \x01(\x0e\x32\x31.android.emulation.bluetooth.HCIPacket.PacketType\x12\x0e\n\x06packet\x18\x02 \x01(\x0c\"\x9c\x01\n\nPacketType\x12\x1b\n\x17PACKET_TYPE_UNSPECIFIED\x10\x00\x12\x1b\n\x17PACKET_TYPE_HCI_COMMAND\x10\x01\x12\x13\n\x0fPACKET_TYPE_ACL\x10\x02\x12\x13\n\x0fPACKET_TYPE_SCO\x10\x03\x12\x15\n\x11PACKET_TYPE_EVENT\x10\x04\x12\x13\n\x0fPACKET_TYPE_ISO\x10\x05\x42J\n\x1f\x63om.android.emulation.bluetoothP\x01\xf8\x01\x01\xa2\x02\x03\x41\x45\x42\xaa\x02\x1b\x41ndroid.Emulation.Bluetoothb\x06proto3'
)
_HCIPACKET = DESCRIPTOR.message_types_by_name['HCIPacket']
_HCIPACKET_PACKETTYPE = _HCIPACKET.enum_types_by_name['PacketType']
HCIPacket = _reflection.GeneratedProtocolMessageType(
'HCIPacket',
(_message.Message,),
{
'DESCRIPTOR': _HCIPACKET,
'__module__': 'emulated_bluetooth_packets_pb2'
# @@protoc_insertion_point(class_scope:android.emulation.bluetooth.HCIPacket)
},
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
_builder.BuildTopDescriptorsAndMessages(
DESCRIPTOR, 'emulated_bluetooth_packets_pb2', globals()
)
_sym_db.RegisterMessage(HCIPacket)
if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None

View File

@@ -0,0 +1,41 @@
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from google.protobuf.internal import enum_type_wrapper as _enum_type_wrapper
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from typing import ClassVar as _ClassVar, Optional as _Optional, Union as _Union
DESCRIPTOR: _descriptor.FileDescriptor
class HCIPacket(_message.Message):
__slots__ = ["packet", "type"]
class PacketType(int, metaclass=_enum_type_wrapper.EnumTypeWrapper):
__slots__ = []
PACKET_FIELD_NUMBER: _ClassVar[int]
PACKET_TYPE_ACL: HCIPacket.PacketType
PACKET_TYPE_EVENT: HCIPacket.PacketType
PACKET_TYPE_HCI_COMMAND: HCIPacket.PacketType
PACKET_TYPE_ISO: HCIPacket.PacketType
PACKET_TYPE_SCO: HCIPacket.PacketType
PACKET_TYPE_UNSPECIFIED: HCIPacket.PacketType
TYPE_FIELD_NUMBER: _ClassVar[int]
packet: bytes
type: HCIPacket.PacketType
def __init__(
self,
type: _Optional[_Union[HCIPacket.PacketType, str]] = ...,
packet: _Optional[bytes] = ...,
) -> None: ...

View File

@@ -0,0 +1,17 @@
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
"""Client and server classes corresponding to protobuf-defined services."""
import grpc

View File

@@ -1,4 +1,4 @@
# Copyright 2021-2022 Google LLC
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -16,10 +16,9 @@
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: emulated_bluetooth.proto
"""Generated protocol buffer code."""
from google.protobuf.internal import builder as _builder
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)
@@ -34,20 +33,8 @@ DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
b'\n\x18\x65mulated_bluetooth.proto\x12\x1b\x61ndroid.emulation.bluetooth\x1a emulated_bluetooth_packets.proto\"\x19\n\x07RawData\x12\x0e\n\x06packet\x18\x01 \x01(\x0c\x32\xcb\x02\n\x18\x45mulatedBluetoothService\x12\x64\n\x12registerClassicPhy\x12$.android.emulation.bluetooth.RawData\x1a$.android.emulation.bluetooth.RawData(\x01\x30\x01\x12`\n\x0eregisterBlePhy\x12$.android.emulation.bluetooth.RawData\x1a$.android.emulation.bluetooth.RawData(\x01\x30\x01\x12g\n\x11registerHCIDevice\x12&.android.emulation.bluetooth.HCIPacket\x1a&.android.emulation.bluetooth.HCIPacket(\x01\x30\x01\x42\"\n\x1e\x63om.android.emulator.bluetoothP\x01\x62\x06proto3'
)
_RAWDATA = DESCRIPTOR.message_types_by_name['RawData']
RawData = _reflection.GeneratedProtocolMessageType(
'RawData',
(_message.Message,),
{
'DESCRIPTOR': _RAWDATA,
'__module__': 'emulated_bluetooth_pb2'
# @@protoc_insertion_point(class_scope:android.emulation.bluetooth.RawData)
},
)
_sym_db.RegisterMessage(RawData)
_EMULATEDBLUETOOTHSERVICE = DESCRIPTOR.services_by_name['EmulatedBluetoothService']
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'emulated_bluetooth_pb2', globals())
if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None

View File

@@ -0,0 +1,26 @@
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import emulated_bluetooth_packets_pb2 as _emulated_bluetooth_packets_pb2
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from typing import ClassVar as _ClassVar, Optional as _Optional
DESCRIPTOR: _descriptor.FileDescriptor
class RawData(_message.Message):
__slots__ = ["packet"]
PACKET_FIELD_NUMBER: _ClassVar[int]
packet: bytes
def __init__(self, packet: _Optional[bytes] = ...) -> None: ...

View File

@@ -1,4 +1,4 @@
# Copyright 2021-2022 Google LLC
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.

View File

@@ -1,4 +1,4 @@
# Copyright 2021-2022 Google LLC
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -16,10 +16,9 @@
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: emulated_bluetooth_vhci.proto
"""Generated protocol buffer code."""
from google.protobuf.internal import builder as _builder
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)
@@ -27,15 +26,17 @@ from google.protobuf import symbol_database as _symbol_database
_sym_db = _symbol_database.Default()
import emulated_bluetooth_packets_pb2 as emulated__bluetooth__packets__pb2
from . import emulated_bluetooth_packets_pb2 as emulated__bluetooth__packets__pb2
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
b'\n\x1d\x65mulated_bluetooth_vhci.proto\x12\x1b\x61ndroid.emulation.bluetooth\x1a emulated_bluetooth_packets.proto2y\n\x15VhciForwardingService\x12`\n\nattachVhci\x12&.android.emulation.bluetooth.HCIPacket\x1a&.android.emulation.bluetooth.HCIPacket(\x01\x30\x01\x42J\n\x1f\x63om.android.emulation.bluetoothP\x01\xf8\x01\x01\xa2\x02\x03\x41\x45\x42\xaa\x02\x1b\x41ndroid.Emulation.Bluetoothb\x06proto3'
)
_VHCIFORWARDINGSERVICE = DESCRIPTOR.services_by_name['VhciForwardingService']
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
_builder.BuildTopDescriptorsAndMessages(
DESCRIPTOR, 'emulated_bluetooth_vhci_pb2', globals()
)
if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None

View File

@@ -0,0 +1,19 @@
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import emulated_bluetooth_packets_pb2 as _emulated_bluetooth_packets_pb2
from google.protobuf import descriptor as _descriptor
from typing import ClassVar as _ClassVar
DESCRIPTOR: _descriptor.FileDescriptor

View File

@@ -1,4 +1,4 @@
# Copyright 2021-2022 Google LLC
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.

View File

View File

@@ -20,7 +20,6 @@ import logging
import threading
import time
import libusb_package
import usb.core
import usb.util
from colors import color
@@ -205,16 +204,22 @@ async def open_pyusb_transport(spec):
await self.sink.stop()
usb.util.release_interface(self.device, 0)
usb_find = usb.core.find
try:
import libusb_package
except ImportError:
logger.debug('libusb_package is not available')
else:
usb_find = libusb_package.find
# Find the device according to the spec moniker
if ':' in spec:
vendor_id, product_id = spec.split(':')
device = libusb_package.find(
idVendor=int(vendor_id, 16), idProduct=int(product_id, 16)
)
device = usb_find(idVendor=int(vendor_id, 16), idProduct=int(product_id, 16))
else:
device_index = int(spec)
devices = list(
libusb_package.find(
usb_find(
find_all=1,
bDeviceClass=USB_DEVICE_CLASS_WIRELESS_CONTROLLER,
bDeviceSubClass=USB_DEVICE_SUBCLASS_RF_CONTROLLER,

View File

@@ -22,7 +22,6 @@ import collections
import ctypes
import platform
import libusb_package
import usb1
from colors import color
@@ -45,11 +44,20 @@ def load_libusb():
If the library does not exists, do nothing and usb1 will search default system paths
when usb1.USBContext is created.
'''
if libusb_path := libusb_package.get_library_path():
logger.debug(f'loading libusb library at {libusb_path}')
dll_loader = ctypes.WinDLL if platform.system() == 'Windows' else ctypes.CDLL
libusb_dll = dll_loader(str(libusb_path), use_errno=True, use_last_error=True)
usb1.loadLibrary(libusb_dll)
try:
import libusb_package
except ImportError:
logger.debug('libusb_package is not available')
else:
if libusb_path := libusb_package.get_library_path():
logger.debug(f'loading libusb library at {libusb_path}')
dll_loader = (
ctypes.WinDLL if platform.system() == 'Windows' else ctypes.CDLL
)
libusb_dll = dll_loader(
str(libusb_path), use_errno=True, use_last_error=True
)
usb1.loadLibrary(libusb_dll)
async def open_usb_transport(spec):

View File

@@ -20,7 +20,7 @@ import logging
import traceback
import collections
import sys
from typing import Awaitable
from typing import Awaitable, TypeVar
from functools import wraps
from colors import color
from pyee import EventEmitter
@@ -65,8 +65,11 @@ def composite_listener(cls):
# -----------------------------------------------------------------------------
_T = TypeVar('_T')
class AbortableEventEmitter(EventEmitter):
def abort_on(self, event: str, awaitable: Awaitable):
def abort_on(self, event: str, awaitable: Awaitable[_T]) -> Awaitable[_T]:
"""
Set a coroutine or future to abort when an event occur.
"""

View File

@@ -50,3 +50,44 @@ signature-mutators="AsyncRunner.run_in_task"
[tool.black]
skip-string-normalization = true
[[tool.mypy.overrides]]
module = "bumble.transport.emulated_bluetooth_pb2_grpc"
ignore_missing_imports = true
[[tool.mypy.overrides]]
module = "bumble.transport.emulated_bluetooth_packets_pb2"
ignore_errors = true
[[tool.mypy.overrides]]
module = "aioconsole.*"
ignore_missing_imports = true
[[tool.mypy.overrides]]
module = "colors.*"
ignore_missing_imports = true
[[tool.mypy.overrides]]
module = "construct.*"
ignore_missing_imports = true
[[tool.mypy.overrides]]
module = "emulated_bluetooth_packets_pb2.*"
ignore_missing_imports = true
[[tool.mypy.overrides]]
module = "grpc.*"
ignore_missing_imports = true
[[tool.mypy.overrides]]
module = "serial_asyncio.*"
ignore_missing_imports = true
[[tool.mypy.overrides]]
module = "usb.*"
ignore_missing_imports = true
[[tool.mypy.overrides]]
module = "usb1.*"
ignore_missing_imports = true

View File

@@ -0,0 +1,27 @@
# Invoke this script with an argument pointing to where the Android emulator .proto files are.
# The .proto files should be slightly modified from their original version (as distributed with
# the Android emulator):
# --> Remove unused types/methods from emulated_bluetooth.proto
PROTOC_OUT=bumble/transport
LICENSE_FILE_INPUT=bumble/transport/android_emulator.py
proto_files=(emulated_bluetooth.proto emulated_bluetooth_vhci.proto emulated_bluetooth_packets.proto)
for proto_file in "${proto_files[@]}"
do
python -m grpc_tools.protoc -I$1 --proto_path=bumble/transport --python_out=$PROTOC_OUT --pyi_out=$PROTOC_OUT --grpc_python_out=$PROTOC_OUT $1/$proto_file
done
python_files=(emulated_bluetooth_pb2.py emulated_bluetooth_pb2_grpc.py emulated_bluetooth_packets_pb2.py emulated_bluetooth_packets_pb2_grpc.py emulated_bluetooth_vhci_pb2_grpc.py emulated_bluetooth_vhci_pb2.py)
for python_file in "${python_files[@]}"
do
sed -i '' 's/^import .*_pb2 as/from . &/' $PROTOC_OUT/$python_file
done
stub_files=(emulated_bluetooth_pb2.pyi emulated_bluetooth_packets_pb2.pyi emulated_bluetooth_vhci_pb2.pyi)
for source_file in "${python_files[@]}" "${stub_files[@]}"
do
head -14 $LICENSE_FILE_INPUT > $PROTOC_OUT/${source_file}.lic
cat $PROTOC_OUT/$source_file >> $PROTOC_OUT/${source_file}.lic
mv $PROTOC_OUT/${source_file}.lic $PROTOC_OUT/$source_file
done

View File

@@ -28,16 +28,17 @@ packages = bumble, bumble.transport, bumble.profiles, bumble.apps, bumble.apps.l
package_dir =
bumble = bumble
bumble.apps = apps
include-package-data = True
install_requires =
aioconsole >= 0.4.1
ansicolors >= 1.1
appdirs >= 1.4
bitstruct >= 8.12
click >= 7.1.2; platform_system!='Emscripten'
construct >= 2.10
cryptography == 35; platform_system!='Emscripten'
grpcio >= 1.46; platform_system!='Emscripten'
libusb1 >= 2.0.1; platform_system!='Emscripten'
libusb-package == 1.0.26.0; platform_system!='Emscripten'
libusb-package == 1.0.26.1; platform_system!='Emscripten'
prompt_toolkit >= 3.0.16; platform_system!='Emscripten'
protobuf >= 3.12.4
pyee >= 8.2.2
@@ -60,6 +61,9 @@ console_scripts =
bumble-usb-probe = bumble.apps.usb_probe:main
bumble-link-relay = bumble.apps.link_relay.link_relay:main
[options.package_data]
* = py.typed, *.pyi
[options.extras_require]
build =
build >= 0.7
@@ -69,10 +73,14 @@ test =
pytest-html >= 3.2.0
coverage >= 6.4
development =
black >= 22.10
black == 22.10
invoke >= 1.7.3
mypy == 0.991
nox >= 2022
pylint >= 2.15.8
pylint == 2.15.8
types-appdirs >= 1.4.3
types-invoke >= 1.7.3
types-protobuf >= 4.21.0
documentation =
mkdocs >= 1.4.0
mkdocs-material >= 8.5.6

View File

@@ -22,7 +22,7 @@ Invoke tasks
import os
from invoke import task, call, Collection
from invoke.exceptions import UnexpectedExit
from invoke.exceptions import Exit, UnexpectedExit
# -----------------------------------------------------------------------------
@@ -126,9 +126,9 @@ def lint(ctx, disable='C,R', errors_only=False):
try:
ctx.run(f"pylint {' '.join(options)} bumble apps examples tasks.py")
print("The linter is happy. ✅ 😊 🐝'")
except UnexpectedExit:
except UnexpectedExit as exc:
print("Please check your code against the linter messages. ❌")
print(">>> Linter done.")
raise Exit(code=1) from exc
# -----------------------------------------------------------------------------
@@ -143,13 +143,31 @@ def format_code(ctx, check=False, diff=False):
print(">>> Running the formatter...")
try:
ctx.run(f"black -S {' '.join(options)} .")
except UnexpectedExit:
except UnexpectedExit as exc:
print("Please run 'invoke project.format' or 'black .' to format the code. ❌")
print(">>> formatter done.")
raise Exit(code=1) from exc
# -----------------------------------------------------------------------------
@task(pre=[call(format_code, check=True), call(lint, errors_only=True), test])
@task
def check_types(ctx):
checklist = ["apps", "bumble", "examples", "tests", "tasks.py"]
try:
ctx.run(f"mypy {' '.join(checklist)}")
except UnexpectedExit as exc:
print("Please check your code against the mypy messages.")
raise Exit(code=1) from exc
# -----------------------------------------------------------------------------
@task(
pre=[
call(format_code, check=True),
call(lint, errors_only=True),
call(check_types),
test,
]
)
def pre_commit(_ctx):
print("All good!")
@@ -157,4 +175,5 @@ def pre_commit(_ctx):
# -----------------------------------------------------------------------------
project_tasks.add_task(lint)
project_tasks.add_task(format_code, name="format")
project_tasks.add_task(check_types, name="check-types")
project_tasks.add_task(pre_commit)

View File

@@ -25,10 +25,8 @@ def test_ad_data():
assert data == ad_bytes
assert ad.get(AdvertisingData.COMPLETE_LOCAL_NAME, raw=True) is None
assert ad.get(AdvertisingData.TX_POWER_LEVEL, raw=True) == bytes([123])
assert ad.get(AdvertisingData.COMPLETE_LOCAL_NAME, return_all=True, raw=True) == []
assert ad.get(AdvertisingData.TX_POWER_LEVEL, return_all=True, raw=True) == [
bytes([123])
]
assert ad.get_all(AdvertisingData.COMPLETE_LOCAL_NAME, raw=True) == []
assert ad.get_all(AdvertisingData.TX_POWER_LEVEL, raw=True) == [bytes([123])]
data2 = bytes([2, AdvertisingData.TX_POWER_LEVEL, 234])
ad.append(data2)
@@ -36,8 +34,8 @@ def test_ad_data():
assert ad_bytes == data + data2
assert ad.get(AdvertisingData.COMPLETE_LOCAL_NAME, raw=True) is None
assert ad.get(AdvertisingData.TX_POWER_LEVEL, raw=True) == bytes([123])
assert ad.get(AdvertisingData.COMPLETE_LOCAL_NAME, return_all=True, raw=True) == []
assert ad.get(AdvertisingData.TX_POWER_LEVEL, return_all=True, raw=True) == [
assert ad.get_all(AdvertisingData.COMPLETE_LOCAL_NAME, raw=True) == []
assert ad.get_all(AdvertisingData.TX_POWER_LEVEL, raw=True) == [
bytes([123]),
bytes([234]),
]

View File

@@ -16,7 +16,7 @@
# Imports
# -----------------------------------------------------------------------------
from bumble.device import Device
from bumble.transport import PacketParser
from bumble.transport.common import PacketParser
# -----------------------------------------------------------------------------