Compare commits

...

19 Commits

Author SHA1 Message Date
Abel Lucas 0cd93ca0c5 gatt: update read_characteristics_by_uuid to returns [(handle, value)] instead of [value] 2022-11-29 18:03:56 +00:00
Lucas Abel 0d70cbde64 Merge pull request #75 from google/uael/fixes
Pairing: device/host fixes & improvements
2022-11-28 21:42:43 -08:00
Gilles Boccon-Gibod f41d0682b2 Merge pull request #80 from AlanRosenthal/alan/gatt_server_getter
Added class CharacteristicDeclaration, gatt_server getters
2022-11-28 19:21:08 -08:00
Gilles Boccon-Gibod 062dc1e53d Merge pull request #85 from AlanRosenthal/alan/gatt_server_console2
Add `bumble-console --device-config` support for gatt services
2022-11-28 19:19:25 -08:00
Abel Lucas 662704e551 classic: complete authentication when being the .authenticate acceptor 2022-11-29 00:28:39 +00:00
Abel Lucas 02a474c44e smp: emit enough information on pairing complete to deduce security level 2022-11-29 00:28:38 +00:00
Abel Lucas a1c7aec492 device: fix .find_connection_by_bd_addr 2022-11-29 00:28:38 +00:00
Abel Lucas 6112f00049 device: introduce BR/EDR pending connections
This commit enable the BR/EDR pairing to run asynchronously to
the connection being established.

When in security mode 3, a controller shall start authentication as
part of the connection, which result in HCI events being sent on a BD
address without a completed connection (ie. no connection handle).
2022-11-29 00:28:38 +00:00
Alan Rosenthal f56ac14f2c Add bumble-console --device-config support for gatt services
This PR adds support for bumble-console to be preloaded with gatt services via `--device-config`.
This PR also adds some type annotations
2022-11-28 14:11:27 -05:00
Gilles Boccon-Gibod a739fc71ce Merge pull request #84 from google/gbg/78
use libusb auto-detach feature
2022-11-27 12:42:02 -08:00
Alan Rosenthal b89f9030a0 Added class CharacteristicDeclaration, gatt_server getters
* Converted CharacteristicDeclaration implementation to class
* Added ability to get a gatt_server attribute by service UUID, characteristics UUID, descriptor UUID
2022-11-27 19:22:25 +00:00
Gilles Boccon-Gibod b437bd8619 Merge pull request #82 from AlanRosenthal/alan/pytest_fixes
Fix test failures
2022-11-25 13:19:53 -08:00
Alan Rosenthal a3e4674819 Fix test failures
a. `DeprecationWarning: The 'warn' method is deprecated, use 'warning' instead`
Updated call in `bumble/smp.py`

b. `ModuleNotFoundError: No module named 'bumble.apps'`
Updated imports in `tests/import_test.py`

c. Added `pytest-html` for easier viewing of test results
Added package in `setup.cfg`, and hook in `tasks.py`

d. Updated workflows to use `invoke test`

This is a partial fix of #81
2022-11-23 11:31:27 -05:00
Abel Lucas 5f1d57fcb0 device: simplify and fixes remote name request 2022-11-22 21:20:56 +00:00
Gilles Boccon-Gibod ae0b739e4a Merge pull request #79 from google/gbg/fix-host-reset
fix sequencing logic broken by earlier merge
2022-11-22 09:16:26 -08:00
Michael Mogenson 0570b59796 Merge pull request #77 from mogenson/l2cap-on-event
Fix for 'Host' object has no attribute 'add_listener'
2022-11-22 09:39:04 -05:00
Gilles Boccon-Gibod 22218627f6 fix sequencing logic broken by earlier merge 2022-11-21 21:07:47 -08:00
Michael Mogenson 1c72242264 Fix for 'Host' object has no attribute 'add_listener'
Pyee's add_listener() method was not added until release 9.0.0. Bumble's
setup.cfg specifies a minimum pyee version of 8.2.2.

Remove the call to add_listener() in l2cap.py. If the add_listener() API
is prefered over the on(), another solution would be to bump the pyee
version requirement.
2022-11-21 12:31:21 -05:00
Abel Lucas 9c133706e6 keys: add a way to remove all bonds from key store 2022-11-18 18:22:15 +00:00
18 changed files with 368 additions and 106 deletions
+3 -3
View File
@@ -21,7 +21,7 @@ jobs:
- name: Get history and tags for SCM versioning to work
run: |
git fetch --prune --unshallow
git fetch --depth=1 origin +refs/tags/*:refs/tags/*
git fetch --depth=1 origin +refs/tags/*:refs/tags/*
- name: Set up Python 3.10
uses: actions/setup-python@v3
with:
@@ -30,9 +30,9 @@ jobs:
run: |
python -m pip install --upgrade pip
python -m pip install ".[build,test,development,documentation]"
- name: Test with pytest
- name: Test
run: |
pytest
invoke test
- name: Build
run: |
inv build
+1 -4
View File
@@ -3,9 +3,6 @@ build/
dist/
*.egg-info/
*~
bumble/__pycache__
docs/mkdocs/site
tests/__pycache__
test-results.xml
bumble/transport/__pycache__
bumble/profiles/__pycache__
__pycache__
+4 -1
View File
@@ -443,7 +443,10 @@ class ConsoleApp:
# Discover all services, characteristics and descriptors
self.append_to_output('discovering services...')
await self.connected_peer.discover_services()
self.append_to_output(f'found {len(self.connected_peer.services)} services, discovering charateristics...')
self.append_to_output(
f'found {len(self.connected_peer.services)} services,'
' discovering characteristics...'
)
await self.connected_peer.discover_characteristics()
self.append_to_output('found characteristics, discovering descriptors...')
for service in self.connected_peer.services:
+3 -3
View File
@@ -149,9 +149,9 @@ async def get_peer_name(peer, mode):
if not services:
return None
values = await peer.read_characteristics_by_uuid(GATT_DEVICE_NAME_CHARACTERISTIC, services[0])
if values:
return values[0].decode('utf-8')
characteristics = await peer.read_characteristics_by_uuid(GATT_DEVICE_NAME_CHARACTERISTIC, services[0])
if characteristics:
return characteristics[0][1].decode('utf-8')
# -----------------------------------------------------------------------------
+7 -1
View File
@@ -58,6 +58,12 @@ def padded_bytes(buffer, size):
return buffer + bytes(padding_size)
def get_dict_key_by_value(dictionary, value):
for key, val in dictionary.items():
if val == value:
return key
return None
# -----------------------------------------------------------------------------
# Exceptions
# -----------------------------------------------------------------------------
@@ -135,7 +141,7 @@ class UUID:
else:
uuid_str = uuid_str_or_int
if len(uuid_str) != 32 and len(uuid_str) != 8 and len(uuid_str) != 4:
raise ValueError('invalid UUID format')
raise ValueError(f"invalid UUID format: {uuid_str}")
self.uuid_bytes = bytes(reversed(bytes.fromhex(uuid_str)))
self.name = name
+105 -47
View File
@@ -413,12 +413,36 @@ class Connection(CompositeEventEmitter):
self.parameters = parameters
self.encryption = 0
self.authenticated = False
self.sc = False
self.link_key_type = None
self.authenticating = False
self.phy = phy
self.att_mtu = ATT_DEFAULT_MTU
self.data_length = DEVICE_DEFAULT_DATA_LENGTH
self.gatt_client = None # Per-connection client
self.gatt_server = device.gatt_server # By default, use the device's shared server
# [Classic only]
@classmethod
def incomplete(cls, device, peer_address):
"""
Instantiate an incomplete connection (ie. one waiting for a HCI Connection Complete event).
Once received it shall be completed using the `.complete` method.
"""
return cls(device, None, BT_BR_EDR_TRANSPORT, device.public_address, peer_address, None, None, None, None)
# [Classic only]
def complete(self, handle, peer_resolvable_address, role, parameters):
"""
Finish an incomplete connection upon completion.
"""
assert self.handle is None
assert self.transport == BT_BR_EDR_TRANSPORT
self.handle = handle
self.peer_resolvable_address = peer_resolvable_address
self.role = role
self.parameters = parameters
@property
def role_name(self):
return 'CENTRAL' if self.role == BT_CENTRAL_ROLE else 'PERIPHERAL'
@@ -540,6 +564,7 @@ class DeviceConfiguration:
)
self.irk = bytes(16) # This really must be changed for any level of security
self.keystore = None
self.gatt_services = []
def load_from_dict(self, config):
# Load simple properties
@@ -556,6 +581,7 @@ class DeviceConfiguration:
self.classic_accept_any = config.get('classic_accept_any', self.classic_accept_any)
self.connectable = config.get('connectable', self.connectable)
self.discoverable = config.get('discoverable', self.discoverable)
self.gatt_services = config.get('gatt_services', self.gatt_services)
# Load or synthesize an IRK
irk = config.get('irk')
@@ -589,7 +615,7 @@ def with_connection_from_handle(function):
@functools.wraps(function)
def wrapper(self, connection_handle, *args, **kwargs):
if (connection := self.lookup_connection(connection_handle)) is None:
raise ValueError('no connection for handle')
raise ValueError(f"no connection for handle: 0x{connection_handle:04x}")
return function(self, connection, *args, **kwargs)
return wrapper
@@ -598,6 +624,8 @@ def with_connection_from_handle(function):
def with_connection_from_address(function):
@functools.wraps(function)
def wrapper(self, address, *args, **kwargs):
if (connection := self.pending_connections.get(address, False)):
return function(self, connection, *args, **kwargs)
for connection in self.connections.values():
if connection.peer_address == address:
return function(self, connection, *args, **kwargs)
@@ -609,6 +637,8 @@ def with_connection_from_address(function):
def try_with_connection_from_address(function):
@functools.wraps(function)
def wrapper(self, address, *args, **kwargs):
if (connection := self.pending_connections.get(address, False)):
return function(self, connection, address, *args, **kwargs)
for connection in self.connections.values():
if connection.peer_address == address:
return function(self, connection, address, *args, **kwargs)
@@ -696,6 +726,7 @@ class Device(CompositeEventEmitter):
self.le_connecting = False
self.disconnecting = False
self.connections = {} # Connections, by connection handle
self.pending_connections = {} # Connections, by BD address (BR/EDR only)
self.classic_enabled = False
self.inquiry_response = None
self.address_resolver = None
@@ -726,6 +757,26 @@ class Device(CompositeEventEmitter):
self.connectable = config.connectable
self.classic_accept_any = config.classic_accept_any
for service in config.gatt_services:
characteristics = []
for characteristic in service.get("characteristics", []):
descriptors = []
for descriptor in characteristic.get("descriptors", []):
new_descriptor = Descriptor(
descriptor_type=descriptor["descriptor_type"],
permissions=descriptor["permission"],
)
descriptors.append(new_descriptor)
new_characteristic = Characteristic(
uuid=characteristic["uuid"],
properties=characteristic["properties"],
permissions=int(characteristic["permissions"], 0),
descriptors=descriptors,
)
characteristics.append(new_characteristic)
new_service = Service(uuid=service["uuid"], characteristics=characteristics)
self.gatt_server.add_service(new_service)
# If a name is passed, override the name from the config
if name:
self.name = name
@@ -796,7 +847,7 @@ class Device(CompositeEventEmitter):
def find_connection_by_bd_addr(self, bd_addr, transport=None, check_address_type=False):
for connection in self.connections.values():
if connection.peer_address.get_bytes() == bd_addr.get_bytes():
if connection.peer_address.to_bytes() == bd_addr.to_bytes():
if check_address_type and connection.peer_address.address_type != bd_addr.address_type:
continue
if transport is None or connection.transport == transport:
@@ -1323,6 +1374,9 @@ class Device(CompositeEventEmitter):
max_ce_length = int(prefs.max_ce_length / 0.625),
))
else:
# Save pending connection
self.pending_connections[peer_address] = Connection.incomplete(self, peer_address)
# TODO: allow passing other settings
result = await self.send_command(HCI_Create_Connection_Command(
bd_addr = peer_address,
@@ -1360,6 +1414,8 @@ class Device(CompositeEventEmitter):
if transport == BT_LE_TRANSPORT:
self.le_connecting = False
self.connect_own_address_type = None
else:
self.pending_connections.pop(peer_address, None)
async def accept(
self,
@@ -1373,7 +1429,7 @@ class Device(CompositeEventEmitter):
Notes:
* A `connect` to the same peer will also complete this call.
* The `timeout` parameter is only handled while waiting for the connection request,
once received and accepeted, the controller shall issue a connection complete event.
once received and accepted, the controller shall issue a connection complete event.
'''
if type(peer_address) is str:
@@ -1429,6 +1485,9 @@ class Device(CompositeEventEmitter):
self.on('connection', on_connection)
self.on('connection_failure', on_connection_failure)
# Save pending connection
self.pending_connections[peer_address] = Connection.incomplete(self, peer_address)
try:
# Accept connection request
await self.send_command(HCI_Accept_Connection_Request_Command(
@@ -1442,6 +1501,7 @@ class Device(CompositeEventEmitter):
finally:
self.remove_listener('connection', on_connection)
self.remove_listener('connection_failure', on_connection_failure)
self.pending_connections.pop(peer_address, None)
@asynccontextmanager
async def connect_as_gatt(self, peer_address):
@@ -1685,9 +1745,13 @@ class Device(CompositeEventEmitter):
logger.warn(f'HCI_Authentication_Requested_Command failed: {HCI_Constant.error_name(result.status)}')
raise HCI_StatusError(result)
# Save in connection we are trying to authenticate
connection.authenticating = True
# Wait for the authentication to complete
await pending_authentication
finally:
connection.authenticating = False
connection.remove_listener('connection_authentication', on_authentication)
connection.remove_listener('connection_authentication_failure', on_authentication_failure)
@@ -1764,28 +1828,18 @@ class Device(CompositeEventEmitter):
# Set up event handlers
pending_name = asyncio.get_running_loop().create_future()
if type(remote) == Address:
peer_address = remote
handler = self.on(
'remote_name',
lambda address, remote_name:
pending_name.set_result(remote_name) if address == remote else None
)
failure_handler = self.on(
'remote_name_failure',
lambda address, error_code:
pending_name.set_exception(HCI_Error(error_code)) if address == remote else None
)
else:
peer_address = remote.peer_address
handler = remote.on(
'remote_name',
lambda: pending_name.set_result(remote.peer_name)
)
failure_handler = remote.on(
'remote_name_failure',
lambda error_code: pending_name.set_exception(HCI_Error(error_code))
)
peer_address = remote if type(remote) == Address else remote.peer_address
handler = self.on(
'remote_name',
lambda address, remote_name:
pending_name.set_result(remote_name) if address == peer_address else None
)
failure_handler = self.on(
'remote_name_failure',
lambda address, error_code:
pending_name.set_exception(HCI_Error(error_code)) if address == peer_address else None
)
try:
result = await self.send_command(
@@ -1804,12 +1858,8 @@ class Device(CompositeEventEmitter):
# Wait for the result
return await pending_name
finally:
if type(remote) == Address:
self.remove_listener('remote_name', handler)
self.remove_listener('remote_name_failure', failure_handler)
else:
remote.remove_listener('remote_name', handler)
remote.remove_listener('remote_name_failure', failure_handler)
self.remove_listener('remote_name', handler)
self.remove_listener('remote_name_failure', failure_handler)
# [Classic only]
@host_event_handler
@@ -1827,6 +1877,9 @@ class Device(CompositeEventEmitter):
asyncio.create_task(store_keys())
if (connection := self.find_connection_by_bd_addr(bd_addr, transport=BT_BR_EDR_TRANSPORT)):
connection.link_key_type = key_type
def add_service(self, service):
self.gatt_server.add_service(service)
@@ -1853,17 +1906,8 @@ class Device(CompositeEventEmitter):
if transport == BT_BR_EDR_TRANSPORT:
# Create a new connection
connection = Connection(
self,
connection_handle,
transport,
self.public_address,
peer_address,
peer_resolvable_address,
role,
connection_parameters,
phy=None
)
connection: Connection = self.pending_connections.pop(peer_address)
connection.complete(connection_handle, peer_resolvable_address, role, connection_parameters)
self.connections[connection_handle] = connection
# We may have an accept ongoing waiting for a connection request for `peer_address`.
@@ -1969,6 +2013,9 @@ class Device(CompositeEventEmitter):
# device configuration is set to accept any incoming connection
elif self.classic_accept_any:
# Save pending connection
self.pending_connections[bd_addr] = Connection.incomplete(self, bd_addr)
self.host.send_command_sync(
HCI_Accept_Connection_Request_Command(
bd_addr = bd_addr,
@@ -2042,6 +2089,17 @@ class Device(CompositeEventEmitter):
logger.debug(f'*** Connection Authentication Failure: [0x{connection.handle:04X}] {connection.peer_address} as {connection.role_name}, error={error}')
connection.emit('connection_authentication_failure', error)
@host_event_handler
@with_connection_from_address
def on_ssp_complete(self, connection):
# On Secure Simple Pairing complete, in case:
# - Connection isn't already authenticated
# - AND We are not the initiator of the authentication
# We must trigger authentication to known if we are truly authenticated
if not connection.authenticating and not connection.authenticated:
logger.debug(f'*** Trigger Connection Authentication: [0x{connection.handle:04X}] {connection.peer_address}')
asyncio.create_task(connection.authenticate())
# [Classic only]
@host_event_handler
@with_connection_from_address
@@ -2178,8 +2236,7 @@ class Device(CompositeEventEmitter):
if connection:
connection.peer_name = remote_name
connection.emit('remote_name')
else:
self.emit('remote_name', address, remote_name)
self.emit('remote_name', address, remote_name)
except UnicodeDecodeError as error:
logger.warning('peer name is not valid UTF-8')
if connection:
@@ -2193,8 +2250,7 @@ class Device(CompositeEventEmitter):
def on_remote_name_failure(self, connection, address, error):
if connection:
connection.emit('remote_name_failure', error)
else:
self.emit('remote_name_failure', address, error)
self.emit('remote_name_failure', address, error)
@host_event_handler
@with_connection_from_handle
@@ -2260,7 +2316,9 @@ class Device(CompositeEventEmitter):
connection.emit('pairing_start')
@with_connection_from_handle
def on_pairing(self, connection, keys):
def on_pairing(self, connection, keys, sc):
connection.sc = sc
connection.authenticated = True
connection.emit('pairing', keys)
@with_connection_from_handle
+32 -3
View File
@@ -22,6 +22,7 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import asyncio
import enum
import types
@@ -187,7 +188,7 @@ class Service(Attribute):
See Vol 3, Part G - 3.1 SERVICE DEFINITION
'''
def __init__(self, uuid, characteristics, primary=True):
def __init__(self, uuid, characteristics: list[Characteristic], primary=True):
# Convert the uuid to a UUID object if it isn't already
if type(uuid) is str:
uuid = UUID(uuid)
@@ -256,10 +257,21 @@ class Characteristic(Attribute):
if properties & p
])
def __init__(self, uuid, properties, permissions, value = b'', descriptors = []):
@staticmethod
def string_to_properties(properties_str: str):
return functools.reduce(
lambda x, y: x | get_dict_key_by_value(Characteristic.PROPERTY_NAMES, y),
properties_str.split(","),
0,
)
def __init__(self, uuid, properties, permissions, value = b'', descriptors: list[Descriptor] = []):
super().__init__(uuid, permissions, value)
self.uuid = self.type
self.properties = properties
if type(properties) is str:
self.properties = Characteristic.string_to_properties(properties)
else:
self.properties = properties
self.descriptors = descriptors
def get_descriptor(self, descriptor_type):
@@ -271,6 +283,23 @@ class Characteristic(Attribute):
return f'Characteristic(handle=0x{self.handle:04X}, end=0x{self.end_group_handle:04X}, uuid={self.uuid}, properties={Characteristic.properties_as_string(self.properties)})'
# -----------------------------------------------------------------------------
class CharacteristicDeclaration(Attribute):
'''
See Vol 3, Part G - 3.3.1 CHARACTERISTIC DECLARATION
'''
def __init__(self, characteristic, value_handle):
declaration_bytes = struct.pack(
'<BH',
characteristic.properties,
value_handle
) + characteristic.uuid.to_pdu_bytes()
super().__init__(GATT_CHARACTERISTIC_ATTRIBUTE_TYPE, Attribute.READABLE, declaration_bytes)
self.characteristic = characteristic
def __str__(self):
return f'CharacteristicDeclaration(handle=0x{self.handle:04X}, value_handle=0x{self.value_handle:04X}, uuid={self.uuid}, properties={Characteristic.properties_as_string(self.properties)})'
# -----------------------------------------------------------------------------
class CharacteristicValue:
'''
+1 -1
View File
@@ -712,7 +712,7 @@ class Client:
logger.warning(f'bogus handle value: {attribute_handle}')
return []
characteristics_values.append(attribute_value)
characteristics_values.append((attribute_handle, attribute_value))
# Move on to the next characteristics
starting_handle = response.attributes[-1][0] + 1
+65 -11
View File
@@ -26,6 +26,7 @@
import asyncio
import logging
from collections import defaultdict
from typing import Tuple, Optional
from pyee import EventEmitter
from colors import color
@@ -60,6 +61,9 @@ class Server(EventEmitter):
self.indication_semaphores = defaultdict(lambda: asyncio.Semaphore(1))
self.pending_confirmations = defaultdict(lambda: None)
def __str__(self):
return "\n".join(map(str, self.attributes))
def send_gatt_pdu(self, connection_handle, pdu):
self.device.send_l2cap_pdu(connection_handle, ATT_CID, pdu)
@@ -79,6 +83,63 @@ class Server(EventEmitter):
return attribute
return None
def get_service_attribute(self, service_uuid: UUID) -> Optional[Service]:
return next(
(
attribute
for attribute in self.attributes
if attribute.type == GATT_PRIMARY_SERVICE_ATTRIBUTE_TYPE
and attribute.uuid == service_uuid
),
None,
)
def get_characteristic_attributes(
self, service_uuid: UUID, characteristic_uuid: UUID
) -> Optional[Tuple[CharacteristicDeclaration, Characteristic]]:
service_handle = self.get_service_attribute(service_uuid)
if not service_handle:
return None
return next(
(
(attribute, self.get_attribute(attribute.characteristic.handle))
for attribute in map(
self.get_attribute,
range(service_handle.handle, service_handle.end_group_handle + 1),
)
if attribute.type == GATT_CHARACTERISTIC_ATTRIBUTE_TYPE
and attribute.characteristic.uuid == characteristic_uuid
),
None,
)
def get_descriptor_attribute(
self, service_uuid: UUID, characteristic_uuid: UUID, descriptor_uuid: UUID
) -> Optional[Descriptor]:
characteristics = self.get_characteristic_attributes(
service_uuid, characteristic_uuid
)
if not characteristics:
return None
(_, characteristic_value) = characteristics
return next(
(
attribute
for attribute in map(
self.get_attribute,
range(
characteristic_value.handle + 1,
characteristic_value.end_group_handle + 1,
),
)
if attribute.type == descriptor_uuid
),
None,
)
def add_attribute(self, attribute):
# Assign a handle to this attribute
attribute.handle = self.next_handle()
@@ -87,7 +148,7 @@ class Server(EventEmitter):
# Add this attribute to the list
self.attributes.append(attribute)
def add_service(self, service):
def add_service(self, service: Service):
# Add the service attribute to the DB
self.add_attribute(service)
@@ -95,16 +156,9 @@ class Server(EventEmitter):
# Add all characteristics
for characteristic in service.characteristics:
# Add a Characteristic Declaration (Vol 3, Part G - 3.3.1 Characteristic Declaration)
declaration_bytes = struct.pack(
'<BH',
characteristic.properties,
self.next_handle() + 1, # The value will be the next attribute after this declaration
) + characteristic.uuid.to_pdu_bytes()
characteristic_declaration = Attribute(
GATT_CHARACTERISTIC_ATTRIBUTE_TYPE,
Attribute.READABLE,
declaration_bytes
# Add a Characteristic Declaration
characteristic_declaration = CharacteristicDeclaration(
characteristic, self.next_handle() + 1
)
self.add_attribute(characteristic_declaration)
+19 -12
View File
@@ -121,24 +121,28 @@ class Host(EventEmitter):
self.hc_acl_data_packet_length = response.return_parameters.hc_acl_data_packet_length
self.hc_total_num_acl_data_packets = response.return_parameters.hc_total_num_acl_data_packets
logger.debug(
f'HCI ACL flow control: hc_acl_data_packet_length={self.hc_acl_data_packet_length},'
f'hc_total_num_acl_data_packets={self.hc_total_num_acl_data_packets}'
)
if self.supports_command(HCI_LE_READ_BUFFER_SIZE_COMMAND):
response = await self.send_command(HCI_LE_Read_Buffer_Size_Command(), check_result=True)
self.hc_le_acl_data_packet_length = response.return_parameters.hc_le_acl_data_packet_length
self.hc_total_num_le_acl_data_packets = response.return_parameters.hc_total_num_le_acl_data_packets
if response.return_parameters.hc_le_acl_data_packet_length == 0 or response.return_parameters.hc_total_num_le_acl_data_packets == 0:
# LE and Classic share the same values
self.hc_le_acl_data_packet_length = self.hc_acl_data_packet_length
self.hc_total_num_le_acl_data_packets = self.hc_total_num_acl_data_packets
logger.debug(
f'HCI LE ACL flow control: hc_le_acl_data_packet_length={self.hc_le_acl_data_packet_length},'
f'hc_total_num_le_acl_data_packets={self.hc_total_num_le_acl_data_packets}'
)
logger.debug(
f'HCI ACL flow control: hc_acl_data_packet_length={self.hc_acl_data_packet_length},'
f'hc_total_num_acl_data_packets={self.hc_total_num_acl_data_packets}'
)
logger.debug(
f'HCI LE ACL flow control: hc_le_acl_data_packet_length={self.hc_le_acl_data_packet_length},'
f'hc_total_num_le_acl_data_packets={self.hc_total_num_le_acl_data_packets}'
)
if (
response.return_parameters.hc_le_acl_data_packet_length == 0 or
response.return_parameters.hc_total_num_le_acl_data_packets == 0
):
# LE and Classic share the same values
self.hc_le_acl_data_packet_length = self.hc_acl_data_packet_length
self.hc_total_num_le_acl_data_packets = self.hc_total_num_acl_data_packets
if (
self.supports_command(HCI_LE_READ_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND) and
@@ -595,6 +599,9 @@ class Host(EventEmitter):
def on_hci_simple_pairing_complete_event(self, event):
logger.debug(f'simple pairing complete for {event.bd_addr}: status={HCI_Constant.status_name(event.status)}')
# Notify the client
if event.status == HCI_SUCCESS:
self.emit('ssp_complete', event.bd_addr)
def on_hci_pin_code_request_event(self, event):
# For now, just refuse all requests
+12
View File
@@ -20,6 +20,7 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import logging
import os
import json
@@ -143,6 +144,10 @@ class KeyStore:
async def get_all(self):
return []
async def delete_all(self):
all_keys = await self.get_all()
await asyncio.gather(*(self.delete(name) for (name, _) in all_keys))
async def get_resolving_keys(self):
all_keys = await self.get_all()
resolving_keys = []
@@ -259,6 +264,13 @@ class JsonKeyStore(KeyStore):
return [(name, PairingKeys.from_dict(keys)) for (name, keys) in namespace.items()]
async def delete_all(self):
db = await self.load()
db.pop(self.namespace, None)
await self.save(db)
async def get(self, name):
db = await self.load()
+1 -1
View File
@@ -1224,7 +1224,7 @@ class ChannelManager:
self._host.remove_listener('disconnection', self.on_disconnection)
self._host = host
if host is not None:
host.add_listener('disconnection', self.on_disconnection)
host.on('disconnection', self.on_disconnection)
def find_channel(self, connection_handle, cid):
if connection_channels := self.channels.get(connection_handle):
+2 -2
View File
@@ -1110,7 +1110,7 @@ class Session:
self.manager.on_pairing(self, peer_address, keys)
def on_pairing_failure(self, reason):
logger.warn(f'pairing failure ({error_name(reason)})')
logger.warning(f'pairing failure ({error_name(reason)})')
if self.completed:
return
@@ -1583,7 +1583,7 @@ class Manager(EventEmitter):
asyncio.create_task(store_keys())
# Notify the device
self.device.on_pairing(session.connection.handle, keys)
self.device.on_pairing(session.connection.handle, keys, session.sc)
def on_pairing_failure(self, session, reason):
self.device.on_pairing_failure(session.connection.handle, reason)
+1
View File
@@ -65,6 +65,7 @@ build =
test =
pytest >= 6.2
pytest-asyncio >= 0.17
pytest-html >= 3.2.0
coverage >= 6.4
development =
invoke >= 1.4
+9 -4
View File
@@ -52,8 +52,9 @@ build_tasks.add_task(mkdocs, name="mkdocs")
test_tasks = Collection()
ns.add_collection(test_tasks, name="test")
@task
def test(ctx, filter=None, junit=False, install=False):
@task(incrementable=["verbose"])
def test(ctx, filter=None, junit=False, install=False, html=False, verbose=0):
# Install the package before running the tests
if install:
ctx.run("python -m pip install .[test]")
@@ -62,8 +63,12 @@ def test(ctx, filter=None, junit=False, install=False):
if junit:
args += "--junit-xml test-results.xml"
if filter is not None:
args += " -k '{}'".format(filter)
ctx.run("python -m pytest {} {}".format(os.path.join(ROOT_DIR, "tests"), args))
args += f" -k '{filter}'"
if html:
args += " --html results.html"
if verbose > 0:
args += f" -{'v' * verbose}"
ctx.run(f"python -m pytest {os.path.join(ROOT_DIR, 'tests')} {args}")
test_tasks.add_task(test, default=True)
+11 -2
View File
@@ -15,8 +15,7 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from bumble.core import AdvertisingData
from bumble.core import AdvertisingData, get_dict_key_by_value
# -----------------------------------------------------------------------------
def test_ad_data():
@@ -39,6 +38,16 @@ def test_ad_data():
assert(ad.get(AdvertisingData.TX_POWER_LEVEL, return_all=True, raw=True) == [bytes([123]), bytes([234])])
# -----------------------------------------------------------------------------
def test_get_dict_key_by_value():
dictionary = {
"A": 1,
"B": 2
}
assert get_dict_key_by_value(dictionary, 1) == "A"
assert get_dict_key_by_value(dictionary, 2) == "B"
assert get_dict_key_by_value(dictionary, 3) is None
# -----------------------------------------------------------------------------
if __name__ == '__main__':
test_ad_data()
+81
View File
@@ -28,6 +28,7 @@ from bumble.device import Device, Peer
from bumble.host import Host
from bumble.gatt import (
GATT_BATTERY_LEVEL_CHARACTERISTIC,
GATT_CLIENT_CHARACTERISTIC_CONFIGURATION_DESCRIPTOR,
CharacteristicAdapter,
DelegatedCharacteristicAdapter,
PackedCharacteristicAdapter,
@@ -226,6 +227,37 @@ async def test_characteristic_encoding():
assert last_change is None
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_attribute_getters():
[client, server] = LinkedDevices().devices[:2]
characteristic_uuid = UUID('FDB159DB-036C-49E3-B3DB-6325AC750806')
characteristic = Characteristic(
characteristic_uuid,
Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY,
Characteristic.READABLE | Characteristic.WRITEABLE,
bytes([123])
)
service_uuid = UUID('3A657F47-D34F-46B3-B1EC-698E29B6B829')
service = Service(service_uuid, [characteristic])
server.add_service(service)
service_attr = server.gatt_server.get_service_attribute(service_uuid)
assert service_attr
(char_decl_attr, char_value_attr) = server.gatt_server.get_characteristic_attributes(service_uuid, characteristic_uuid)
assert char_decl_attr and char_value_attr
desc_attr = server.gatt_server.get_descriptor_attribute(service_uuid, characteristic_uuid, GATT_CLIENT_CHARACTERISTIC_CONFIGURATION_DESCRIPTOR)
assert desc_attr
# assert all handles are in expected order
assert service_attr.handle < char_decl_attr.handle < char_value_attr.handle < desc_attr.handle == service_attr.end_group_handle
# assert characteristic declarations attribute is followed by characteristic value attribute
assert char_decl_attr.handle + 1 == char_value_attr.handle
# -----------------------------------------------------------------------------
def test_CharacteristicAdapter():
# Check that the CharacteristicAdapter base class is transparent
@@ -705,6 +737,55 @@ async def test_mtu_exchange():
assert d2_connection.att_mtu == 50
# -----------------------------------------------------------------------------
def test_char_property_to_string():
# single
assert Characteristic.property_name(0x01) == "BROADCAST"
assert Characteristic.property_name(Characteristic.BROADCAST) == "BROADCAST"
# double
assert Characteristic.properties_as_string(0x03) == "BROADCAST,READ"
assert Characteristic.properties_as_string(Characteristic.BROADCAST | Characteristic.READ) == "BROADCAST,READ"
# -----------------------------------------------------------------------------
def test_char_property_string_to_type():
# single
assert Characteristic.string_to_properties("BROADCAST") == Characteristic.BROADCAST
# double
assert Characteristic.string_to_properties("BROADCAST,READ") == Characteristic.BROADCAST | Characteristic.READ
assert Characteristic.string_to_properties("READ,BROADCAST") == Characteristic.BROADCAST | Characteristic.READ
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_server_string():
[_, server] = LinkedDevices().devices[:2]
characteristic = Characteristic(
'FDB159DB-036C-49E3-B3DB-6325AC750806',
Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY,
Characteristic.READABLE | Characteristic.WRITEABLE,
bytes([123])
)
service = Service(
'3A657F47-D34F-46B3-B1EC-698E29B6B829',
[characteristic]
)
server.add_service(service)
assert str(server.gatt_server) == """Service(handle=0x0001, end=0x0005, uuid=UUID-16:1800 (Generic Access))
Attribute(handle=0x0002, type=UUID-16:2803 (Characteristic), permissions=1, value=020300002a)
Characteristic(handle=0x0003, end=0x0003, uuid=UUID-16:2A00 (Device Name), properties=READ)
Attribute(handle=0x0004, type=UUID-16:2803 (Characteristic), permissions=1, value=020500012a)
Characteristic(handle=0x0005, end=0x0005, uuid=UUID-16:2A01 (Appearance), properties=READ)
Service(handle=0x0006, end=0x0009, uuid=3A657F47-D34F-46B3-B1EC-698E29B6B829)
Attribute(handle=0x0007, type=UUID-16:2803 (Characteristic), permissions=1, value=1a0800060875ac2563dbb3e3496c03db59b1fd)
Characteristic(handle=0x0008, end=0x0009, uuid=FDB159DB-036C-49E3-B3DB-6325AC750806, properties=READ,WRITE,NOTIFY)
Descriptor(handle=0x0009, type=UUID-16:2902 (Client Characteristic Configuration), value=0000)"""
# -----------------------------------------------------------------------------
async def async_main():
await test_read_write()
+11 -11
View File
@@ -64,37 +64,37 @@ def test_import():
# -----------------------------------------------------------------------------
def test_app_imports():
from bumble.apps.console import main
from apps.console import main
assert main
from bumble.apps.controller_info import main
from apps.controller_info import main
assert main
from bumble.apps.controllers import main
from apps.controllers import main
assert main
from bumble.apps.gatt_dump import main
from apps.gatt_dump import main
assert main
from bumble.apps.gg_bridge import main
from apps.gg_bridge import main
assert main
from bumble.apps.hci_bridge import main
from apps.hci_bridge import main
assert main
from bumble.apps.pair import main
from apps.pair import main
assert main
from bumble.apps.scan import main
from apps.scan import main
assert main
from bumble.apps.show import main
from apps.show import main
assert main
from bumble.apps.unbond import main
from apps.unbond import main
assert main
from bumble.apps.usb_probe import main
from apps.usb_probe import main
assert main