forked from auracaster/bumble_mirror
Compare commits
12 Commits
gbg/pairin
...
v0.0.148
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9af426db45 | ||
|
|
4286b2ab59 | ||
|
|
3442358dea | ||
|
|
bf3e05ef91 | ||
|
|
5351ab8a42 | ||
|
|
49b2c13e69 | ||
|
|
962737a97b | ||
|
|
85496aaff5 | ||
|
|
a95e601a5c | ||
|
|
df218b5370 | ||
|
|
0f737244b5 | ||
|
|
a258ba383a |
19
LICENSE
19
LICENSE
@@ -200,3 +200,22 @@
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
|
||||
|
||||
---
|
||||
|
||||
|
||||
Files: bumble/colors.py
|
||||
Copyright (c) 2012 Giorgos Verigakis <verigak@gmail.com>
|
||||
|
||||
Permission to use, copy, modify, and distribute this software for any
|
||||
purpose with or without fee is hereby granted, provided that the above
|
||||
copyright notice and this permission notice appear in all copies.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
|
||||
@@ -24,6 +24,7 @@ import logging
|
||||
import os
|
||||
import random
|
||||
import re
|
||||
import humanize
|
||||
from typing import Optional, Union
|
||||
from collections import OrderedDict
|
||||
|
||||
@@ -165,6 +166,7 @@ class ConsoleApp:
|
||||
'local-services': None,
|
||||
'remote-services': None,
|
||||
'local-values': None,
|
||||
'remote-values': None,
|
||||
},
|
||||
'filter': {
|
||||
'address': None,
|
||||
@@ -212,6 +214,7 @@ class ConsoleApp:
|
||||
get_cursor_position=lambda: Point(0, max(0, len(self.log_lines) - 1))
|
||||
)
|
||||
self.local_values_text = FormattedTextControl()
|
||||
self.remote_values_text = FormattedTextControl()
|
||||
self.log_height = Dimension(min=7, weight=4)
|
||||
self.log_max_lines = 100
|
||||
self.log_lines = []
|
||||
@@ -234,6 +237,10 @@ class ConsoleApp:
|
||||
Frame(Window(self.remote_services_text), title='Remote Services'),
|
||||
filter=Condition(lambda: self.top_tab == 'remote-services'),
|
||||
),
|
||||
ConditionalContainer(
|
||||
Frame(Window(self.remote_values_text), title='Remote Values'),
|
||||
filter=Condition(lambda: self.top_tab == 'remote-values'),
|
||||
),
|
||||
ConditionalContainer(
|
||||
Frame(Window(self.log_text, height=self.log_height), title='Log'),
|
||||
filter=Condition(lambda: self.top_tab == 'log'),
|
||||
@@ -737,6 +744,7 @@ class ConsoleApp:
|
||||
'local-services',
|
||||
'remote-services',
|
||||
'local-values',
|
||||
'remote-values',
|
||||
}:
|
||||
self.top_tab = params[0]
|
||||
self.ui.invalidate()
|
||||
@@ -745,6 +753,10 @@ class ConsoleApp:
|
||||
await self.do_show_local_values()
|
||||
await asyncio.sleep(1)
|
||||
|
||||
while self.top_tab == 'remote-values':
|
||||
await self.do_show_remote_values()
|
||||
await asyncio.sleep(1)
|
||||
|
||||
async def do_show_local_values(self):
|
||||
prettytable = PrettyTable()
|
||||
field_names = ["Service", "Characteristic", "Descriptor"]
|
||||
@@ -800,6 +812,40 @@ class ConsoleApp:
|
||||
self.local_values_text.text = prettytable.get_string()
|
||||
self.ui.invalidate()
|
||||
|
||||
async def do_show_remote_values(self):
|
||||
prettytable = PrettyTable(
|
||||
field_names=[
|
||||
"Connection",
|
||||
"Service",
|
||||
"Characteristic",
|
||||
"Descriptor",
|
||||
"Time",
|
||||
"Value",
|
||||
]
|
||||
)
|
||||
for connection in self.device.connections.values():
|
||||
for handle, (time, value) in connection.gatt_client.cached_values.items():
|
||||
row = [connection.handle]
|
||||
attribute = connection.gatt_client.get_attributes(handle)
|
||||
if not attribute:
|
||||
continue
|
||||
if len(attribute) == 3:
|
||||
row.extend(
|
||||
[attribute[0].uuid, attribute[1].uuid, attribute[2].type]
|
||||
)
|
||||
elif len(attribute) == 2:
|
||||
row.extend([attribute[0].uuid, attribute[1].uuid, ""])
|
||||
elif len(attribute) == 1:
|
||||
row.extend([attribute[0].uuid, "", ""])
|
||||
else:
|
||||
continue
|
||||
|
||||
row.extend([humanize.naturaltime(time), value])
|
||||
prettytable.add_row(row)
|
||||
|
||||
self.remote_values_text.text = prettytable.get_string()
|
||||
self.ui.invalidate()
|
||||
|
||||
async def do_get_phy(self, _):
|
||||
if not self.connected_peer:
|
||||
self.show_error('not connected')
|
||||
@@ -899,9 +945,9 @@ class ConsoleApp:
|
||||
# send data to any subscribers
|
||||
if isinstance(attribute, Characteristic):
|
||||
attribute.write_value(None, value)
|
||||
if attribute.has_properties([Characteristic.NOTIFY]):
|
||||
if attribute.has_properties(Characteristic.NOTIFY):
|
||||
await self.device.gatt_server.notify_subscribers(attribute)
|
||||
if attribute.has_properties([Characteristic.INDICATE]):
|
||||
if attribute.has_properties(Characteristic.INDICATE):
|
||||
await self.device.gatt_server.indicate_subscribers(attribute)
|
||||
|
||||
async def do_subscribe(self, params):
|
||||
|
||||
11
apps/pair.py
11
apps/pair.py
@@ -24,7 +24,7 @@ from prompt_toolkit.shortcuts import PromptSession
|
||||
from bumble.colors import color
|
||||
from bumble.device import Device, Peer
|
||||
from bumble.transport import open_transport_or_link
|
||||
from bumble.smp import PairingDelegate, PairingConfig
|
||||
from bumble.pairing import PairingDelegate, PairingConfig
|
||||
from bumble.smp import error_name as smp_error_name
|
||||
from bumble.keys import JsonKeyStore
|
||||
from bumble.core import ProtocolError
|
||||
@@ -345,8 +345,13 @@ async def pair(
|
||||
print(color(f'Pairing failed: {error}', 'red'))
|
||||
return
|
||||
else:
|
||||
# Advertise so that peers can find us and connect
|
||||
await device.start_advertising(auto_restart=True)
|
||||
if mode == 'le':
|
||||
# Advertise so that peers can find us and connect
|
||||
await device.start_advertising(auto_restart=True)
|
||||
else:
|
||||
# Become discoverable and connectable
|
||||
await device.set_discoverable(True)
|
||||
await device.set_connectable(True)
|
||||
|
||||
# Run until the user asks to exit
|
||||
await Waiter.instance.wait_until_terminated()
|
||||
|
||||
147
bumble/device.py
147
bumble/device.py
@@ -29,11 +29,13 @@ from .colors import color
|
||||
from .att import ATT_CID, ATT_DEFAULT_MTU, ATT_PDU
|
||||
from .gatt import Characteristic, Descriptor, Service
|
||||
from .hci import (
|
||||
HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_192_TYPE,
|
||||
HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE,
|
||||
HCI_CENTRAL_ROLE,
|
||||
HCI_COMMAND_STATUS_PENDING,
|
||||
HCI_CONNECTION_REJECTED_DUE_TO_LIMITED_RESOURCES_ERROR,
|
||||
HCI_DISPLAY_ONLY_IO_CAPABILITY,
|
||||
HCI_DISPLAY_YES_NO_IO_CAPABILITY,
|
||||
HCI_DISPLAY_ONLY_IO_CAPABILITY,
|
||||
HCI_EXTENDED_INQUIRY_MODE,
|
||||
HCI_GENERAL_INQUIRY_LAP,
|
||||
HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR,
|
||||
@@ -141,6 +143,7 @@ from .keys import (
|
||||
KeyStore,
|
||||
PairingKeys,
|
||||
)
|
||||
from .pairing import PairingConfig
|
||||
from . import gatt_client
|
||||
from . import gatt_server
|
||||
from . import smp
|
||||
@@ -198,6 +201,7 @@ DEVICE_DEFAULT_L2CAP_COC_MAX_CREDITS = l2cap.L2CAP_LE_CREDIT_BASED_CONN
|
||||
# Classes
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
class Advertisement:
|
||||
address: Address
|
||||
@@ -529,6 +533,9 @@ class Connection(CompositeEventEmitter):
|
||||
authenticated: bool
|
||||
sc: bool
|
||||
link_key_type: int
|
||||
gatt_client: gatt_client.Client
|
||||
pairing_peer_io_capability: Optional[int]
|
||||
pairing_peer_authentication_requirements: Optional[int]
|
||||
|
||||
@composite_listener
|
||||
class Listener:
|
||||
@@ -592,6 +599,8 @@ class Connection(CompositeEventEmitter):
|
||||
self.gatt_server = (
|
||||
device.gatt_server
|
||||
) # By default, use the device's shared server
|
||||
self.pairing_peer_io_capability = None
|
||||
self.pairing_peer_authentication_requirements = None
|
||||
|
||||
# [Classic only]
|
||||
@classmethod
|
||||
@@ -1048,7 +1057,10 @@ class Device(CompositeEventEmitter):
|
||||
self.random_address = address
|
||||
|
||||
# Setup SMP
|
||||
self.smp_manager = smp.Manager(self)
|
||||
self.smp_manager = smp.Manager(
|
||||
self, pairing_config_factory=lambda connection: PairingConfig()
|
||||
)
|
||||
|
||||
self.l2cap_channel_manager.register_fixed_channel(smp.SMP_CID, self.on_smp_pdu)
|
||||
|
||||
# Register the SDP server with the L2CAP Channel Manager
|
||||
@@ -1239,7 +1251,7 @@ class Device(CompositeEventEmitter):
|
||||
await self.send_command(HCI_LE_Clear_Resolving_List_Command()) # type: ignore[call-arg]
|
||||
|
||||
resolving_keys = await self.keystore.get_resolving_keys()
|
||||
for (irk, address) in resolving_keys:
|
||||
for irk, address in resolving_keys:
|
||||
await self.send_command(
|
||||
HCI_LE_Add_Device_To_Resolving_List_Command(
|
||||
peer_identity_address_type=address.address_type,
|
||||
@@ -1620,7 +1632,7 @@ class Device(CompositeEventEmitter):
|
||||
pending connection.
|
||||
|
||||
connection_parameters_preferences: (BLE only, ignored for BR/EDR)
|
||||
* None: use all PHYs with default parameters
|
||||
* None: use the 1M PHY with default parameters
|
||||
* map: each entry has a PHY as key and a ConnectionParametersPreferences
|
||||
object as value
|
||||
|
||||
@@ -1689,9 +1701,7 @@ class Device(CompositeEventEmitter):
|
||||
if connection_parameters_preferences is None:
|
||||
if connection_parameters_preferences is None:
|
||||
connection_parameters_preferences = {
|
||||
HCI_LE_1M_PHY: ConnectionParametersPreferences.default,
|
||||
HCI_LE_2M_PHY: ConnectionParametersPreferences.default,
|
||||
HCI_LE_CODED_PHY: ConnectionParametersPreferences.default,
|
||||
HCI_LE_1M_PHY: ConnectionParametersPreferences.default
|
||||
}
|
||||
|
||||
self.connect_own_address_type = own_address_type
|
||||
@@ -2229,8 +2239,9 @@ class Device(CompositeEventEmitter):
|
||||
if keys is not None:
|
||||
logger.debug('found keys in the key store')
|
||||
if keys.link_key is None:
|
||||
logger.debug('no link key')
|
||||
logger.warning('no link key')
|
||||
return None
|
||||
|
||||
return keys.link_key.value
|
||||
|
||||
# [Classic only]
|
||||
@@ -2435,8 +2446,14 @@ class Device(CompositeEventEmitter):
|
||||
def on_link_key(self, bd_addr, link_key, key_type):
|
||||
# Store the keys in the key store
|
||||
if self.keystore:
|
||||
authenticated = key_type in (
|
||||
HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_192_TYPE,
|
||||
HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE,
|
||||
)
|
||||
pairing_keys = PairingKeys()
|
||||
pairing_keys.link_key = PairingKeys.Key(value=link_key)
|
||||
pairing_keys.link_key = PairingKeys.Key(
|
||||
value=link_key, authenticated=authenticated
|
||||
)
|
||||
|
||||
async def store_keys():
|
||||
try:
|
||||
@@ -2702,7 +2719,7 @@ class Device(CompositeEventEmitter):
|
||||
# On Secure Simple Pairing complete, in case:
|
||||
# - Connection isn't already authenticated
|
||||
# - AND we are not the initiator of the authentication
|
||||
# We must trigger authentication to known if we are truly authenticated
|
||||
# We must trigger authentication to know if we are truly authenticated
|
||||
if not connection.authenticating and not connection.authenticated:
|
||||
logger.debug(
|
||||
f'*** Trigger Connection Authentication: [0x{connection.handle:04X}] '
|
||||
@@ -2717,22 +2734,6 @@ class Device(CompositeEventEmitter):
|
||||
# Ask what the pairing config should be for this connection
|
||||
pairing_config = self.pairing_config_factory(connection)
|
||||
|
||||
# Map the SMP IO capability to a Classic IO capability
|
||||
# pylint: disable=line-too-long
|
||||
io_capability = {
|
||||
smp.SMP_DISPLAY_ONLY_IO_CAPABILITY: HCI_DISPLAY_ONLY_IO_CAPABILITY,
|
||||
smp.SMP_DISPLAY_YES_NO_IO_CAPABILITY: HCI_DISPLAY_YES_NO_IO_CAPABILITY,
|
||||
smp.SMP_KEYBOARD_ONLY_IO_CAPABILITY: HCI_KEYBOARD_ONLY_IO_CAPABILITY,
|
||||
smp.SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY,
|
||||
smp.SMP_KEYBOARD_DISPLAY_IO_CAPABILITY: HCI_DISPLAY_YES_NO_IO_CAPABILITY,
|
||||
}.get(pairing_config.delegate.io_capability)
|
||||
|
||||
if io_capability is None:
|
||||
logger.warning(
|
||||
f'cannot map IO capability ({pairing_config.delegate.io_capability}'
|
||||
)
|
||||
io_capability = HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY
|
||||
|
||||
# Compute the authentication requirements
|
||||
authentication_requirements = (
|
||||
# No Bonding
|
||||
@@ -2751,53 +2752,50 @@ class Device(CompositeEventEmitter):
|
||||
self.host.send_command_sync(
|
||||
HCI_IO_Capability_Request_Reply_Command(
|
||||
bd_addr=connection.peer_address,
|
||||
io_capability=io_capability,
|
||||
io_capability=pairing_config.delegate.classic_io_capability,
|
||||
oob_data_present=0x00, # Not present
|
||||
authentication_requirements=authentication_requirements,
|
||||
)
|
||||
)
|
||||
|
||||
# [Classic only]
|
||||
@host_event_handler
|
||||
@with_connection_from_address
|
||||
def on_authentication_io_capability_response(
|
||||
self, connection, io_capability, authentication_requirements
|
||||
):
|
||||
connection.peer_pairing_io_capability = io_capability
|
||||
connection.peer_pairing_authentication_requirements = (
|
||||
authentication_requirements
|
||||
)
|
||||
|
||||
# [Classic only]
|
||||
@host_event_handler
|
||||
@with_connection_from_address
|
||||
def on_authentication_user_confirmation_request(self, connection, code):
|
||||
# Ask what the pairing config should be for this connection
|
||||
pairing_config = self.pairing_config_factory(connection)
|
||||
|
||||
can_compare = pairing_config.delegate.io_capability not in (
|
||||
smp.SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY,
|
||||
smp.SMP_DISPLAY_ONLY_IO_CAPABILITY,
|
||||
)
|
||||
io_capability = pairing_config.delegate.classic_io_capability
|
||||
|
||||
# Respond
|
||||
if can_compare:
|
||||
|
||||
async def compare_numbers():
|
||||
numbers_match = await connection.abort_on(
|
||||
'disconnection',
|
||||
pairing_config.delegate.compare_numbers(code, digits=6),
|
||||
)
|
||||
if numbers_match:
|
||||
await self.host.send_command(
|
||||
HCI_User_Confirmation_Request_Reply_Command(
|
||||
bd_addr=connection.peer_address
|
||||
)
|
||||
)
|
||||
else:
|
||||
await self.host.send_command(
|
||||
HCI_User_Confirmation_Request_Negative_Reply_Command(
|
||||
bd_addr=connection.peer_address
|
||||
)
|
||||
if io_capability == HCI_DISPLAY_YES_NO_IO_CAPABILITY:
|
||||
if connection.peer_pairing_io_capability in (
|
||||
HCI_DISPLAY_YES_NO_IO_CAPABILITY,
|
||||
HCI_DISPLAY_ONLY_IO_CAPABILITY,
|
||||
):
|
||||
# Display the code and ask the user to compare
|
||||
async def prompt():
|
||||
return (
|
||||
await pairing_config.delegate.compare_numbers(code, digits=6),
|
||||
)
|
||||
|
||||
asyncio.create_task(compare_numbers())
|
||||
else:
|
||||
else:
|
||||
# Ask the user to confirm the pairing, without showing a code
|
||||
async def prompt():
|
||||
return await pairing_config.delegate.confirm()
|
||||
|
||||
async def confirm():
|
||||
confirm = await connection.abort_on(
|
||||
'disconnection', pairing_config.delegate.confirm()
|
||||
)
|
||||
if confirm:
|
||||
if await prompt():
|
||||
await self.host.send_command(
|
||||
HCI_User_Confirmation_Request_Reply_Command(
|
||||
bd_addr=connection.peer_address
|
||||
@@ -2810,7 +2808,17 @@ class Device(CompositeEventEmitter):
|
||||
)
|
||||
)
|
||||
|
||||
asyncio.create_task(confirm())
|
||||
AsyncRunner.spawn(connection.abort_on('disconnection', confirm()))
|
||||
return
|
||||
|
||||
if io_capability == HCI_DISPLAY_ONLY_IO_CAPABILITY:
|
||||
# Display the code to the user
|
||||
AsyncRunner.spawn(pairing_config.delegate.display_number(code, 6))
|
||||
|
||||
# Automatic confirmation
|
||||
self.host.send_command_sync(
|
||||
HCI_User_Confirmation_Request_Reply_Command(bd_addr=connection.peer_address)
|
||||
)
|
||||
|
||||
# [Classic only]
|
||||
@host_event_handler
|
||||
@@ -2818,15 +2826,11 @@ class Device(CompositeEventEmitter):
|
||||
def on_authentication_user_passkey_request(self, connection):
|
||||
# Ask what the pairing config should be for this connection
|
||||
pairing_config = self.pairing_config_factory(connection)
|
||||
|
||||
can_input = pairing_config.delegate.io_capability in (
|
||||
smp.SMP_KEYBOARD_ONLY_IO_CAPABILITY,
|
||||
smp.SMP_KEYBOARD_DISPLAY_IO_CAPABILITY,
|
||||
)
|
||||
io_capability = pairing_config.delegate.classic_io_capability
|
||||
|
||||
# Respond
|
||||
if can_input:
|
||||
|
||||
if io_capability == HCI_KEYBOARD_ONLY_IO_CAPABILITY:
|
||||
# Ask the user to input a number
|
||||
async def get_number():
|
||||
number = await connection.abort_on(
|
||||
'disconnection', pairing_config.delegate.get_number()
|
||||
@@ -2856,18 +2860,14 @@ class Device(CompositeEventEmitter):
|
||||
@host_event_handler
|
||||
@with_connection_from_address
|
||||
def on_pin_code_request(self, connection):
|
||||
# classic legacy pairing
|
||||
# Classic legacy pairing
|
||||
# Ask what the pairing config should be for this connection
|
||||
pairing_config = self.pairing_config_factory(connection)
|
||||
io_capability = pairing_config.delegate.classic_io_capability
|
||||
|
||||
can_input = pairing_config.delegate.io_capability in (
|
||||
smp.SMP_KEYBOARD_ONLY_IO_CAPABILITY,
|
||||
smp.SMP_KEYBOARD_DISPLAY_IO_CAPABILITY,
|
||||
)
|
||||
|
||||
# respond the pin code
|
||||
if can_input:
|
||||
|
||||
# Respond
|
||||
if io_capability == HCI_KEYBOARD_ONLY_IO_CAPABILITY:
|
||||
# Ask the user to enter a string
|
||||
async def get_pin_code():
|
||||
pin_code = await connection.abort_on(
|
||||
'disconnection', pairing_config.delegate.get_string(16)
|
||||
@@ -2907,6 +2907,7 @@ class Device(CompositeEventEmitter):
|
||||
# Ask what the pairing config should be for this connection
|
||||
pairing_config = self.pairing_config_factory(connection)
|
||||
|
||||
# Show the passkey to the user
|
||||
connection.abort_on(
|
||||
'disconnection', pairing_config.delegate.display_number(passkey)
|
||||
)
|
||||
|
||||
@@ -27,7 +27,8 @@ from __future__ import annotations
|
||||
import asyncio
|
||||
import logging
|
||||
import struct
|
||||
from typing import List, Optional, Dict, Any, Callable
|
||||
from datetime import datetime
|
||||
from typing import List, Optional, Dict, Tuple, Callable, Union, Any
|
||||
|
||||
from pyee import EventEmitter
|
||||
|
||||
@@ -167,7 +168,9 @@ class CharacteristicProxy(AttributeProxy):
|
||||
async def discover_descriptors(self):
|
||||
return await self.client.discover_descriptors(self)
|
||||
|
||||
async def subscribe(self, subscriber=None, prefer_notify=True):
|
||||
async def subscribe(
|
||||
self, subscriber: Optional[Callable] = None, prefer_notify=True
|
||||
):
|
||||
if subscriber is not None:
|
||||
if subscriber in self.subscribers:
|
||||
# We already have a proxy subscriber
|
||||
@@ -221,6 +224,7 @@ class ProfileServiceProxy:
|
||||
# -----------------------------------------------------------------------------
|
||||
class Client:
|
||||
services: List[ServiceProxy]
|
||||
cached_values: Dict[int, Tuple[datetime, bytes]]
|
||||
|
||||
def __init__(self, connection):
|
||||
self.connection = connection
|
||||
@@ -233,6 +237,7 @@ class Client:
|
||||
) # Notification subscribers, by attribute handle
|
||||
self.indication_subscribers = {} # Indication subscribers, by attribute handle
|
||||
self.services = []
|
||||
self.cached_values = {}
|
||||
|
||||
def send_gatt_pdu(self, pdu):
|
||||
self.connection.send_l2cap_pdu(ATT_CID, pdu)
|
||||
@@ -317,6 +322,35 @@ class Client:
|
||||
if c.uuid == uuid
|
||||
]
|
||||
|
||||
def get_attribute_grouping(
|
||||
self, attribute_handle: int
|
||||
) -> Optional[
|
||||
Union[
|
||||
ServiceProxy,
|
||||
Tuple[ServiceProxy, CharacteristicProxy],
|
||||
Tuple[ServiceProxy, CharacteristicProxy, DescriptorProxy],
|
||||
]
|
||||
]:
|
||||
"""
|
||||
Get the attribute(s) associated with an attribute handle
|
||||
"""
|
||||
for service in self.services:
|
||||
if service.handle == attribute_handle:
|
||||
return service
|
||||
if service.handle <= attribute_handle <= service.end_group_handle:
|
||||
for characteristic in service.characteristics:
|
||||
if characteristic.handle == attribute_handle:
|
||||
return (service, characteristic)
|
||||
if (
|
||||
characteristic.handle
|
||||
<= attribute_handle
|
||||
<= characteristic.end_group_handle
|
||||
):
|
||||
for descriptor in characteristic.descriptors:
|
||||
if descriptor.handle == attribute_handle:
|
||||
return (service, characteristic, descriptor)
|
||||
return None
|
||||
|
||||
def on_service_discovered(self, service):
|
||||
'''Add a service to the service list if it wasn't already there'''
|
||||
already_known = False
|
||||
@@ -808,6 +842,7 @@ class Client:
|
||||
|
||||
offset += len(part)
|
||||
|
||||
self.cache_value(attribute_handle, attribute_value)
|
||||
# Return the value as bytes
|
||||
return attribute_value
|
||||
|
||||
@@ -942,6 +977,8 @@ class Client:
|
||||
)
|
||||
if not subscribers:
|
||||
logger.warning('!!! received notification with no subscriber')
|
||||
|
||||
self.cache_value(notification.attribute_handle, notification.attribute_value)
|
||||
for subscriber in subscribers:
|
||||
if callable(subscriber):
|
||||
subscriber(notification.attribute_value)
|
||||
@@ -953,6 +990,8 @@ class Client:
|
||||
subscribers = self.indication_subscribers.get(indication.attribute_handle, [])
|
||||
if not subscribers:
|
||||
logger.warning('!!! received indication with no subscriber')
|
||||
|
||||
self.cache_value(indication.attribute_handle, indication.attribute_value)
|
||||
for subscriber in subscribers:
|
||||
if callable(subscriber):
|
||||
subscriber(indication.attribute_value)
|
||||
@@ -961,3 +1000,9 @@ class Client:
|
||||
|
||||
# Confirm that we received the indication
|
||||
self.send_confirmation(ATT_Handle_Value_Confirmation())
|
||||
|
||||
def cache_value(self, attribute_handle: int, value: bytes):
|
||||
self.cached_values[attribute_handle] = (
|
||||
datetime.now(),
|
||||
value,
|
||||
)
|
||||
|
||||
@@ -395,8 +395,8 @@ class Host(AbortableEventEmitter):
|
||||
|
||||
def supports_command(self, command):
|
||||
# Find the support flag position for this command
|
||||
for (octet, flags) in enumerate(HCI_SUPPORTED_COMMANDS_FLAGS):
|
||||
for (flag_position, value) in enumerate(flags):
|
||||
for octet, flags in enumerate(HCI_SUPPORTED_COMMANDS_FLAGS):
|
||||
for flag_position, value in enumerate(flags):
|
||||
if value == command:
|
||||
# Check if the flag is set
|
||||
if octet < len(self.local_supported_commands) and flag_position < 8:
|
||||
@@ -409,7 +409,7 @@ class Host(AbortableEventEmitter):
|
||||
@property
|
||||
def supported_commands(self):
|
||||
commands = []
|
||||
for (octet, flags) in enumerate(self.local_supported_commands):
|
||||
for octet, flags in enumerate(self.local_supported_commands):
|
||||
if octet < len(HCI_SUPPORTED_COMMANDS_FLAGS):
|
||||
for flag in range(8):
|
||||
if flags & (1 << flag) != 0:
|
||||
@@ -839,7 +839,12 @@ class Host(AbortableEventEmitter):
|
||||
self.emit('authentication_io_capability_request', event.bd_addr)
|
||||
|
||||
def on_hci_io_capability_response_event(self, event):
|
||||
pass
|
||||
self.emit(
|
||||
'authentication_io_capability_response',
|
||||
event.bd_addr,
|
||||
event.io_capability,
|
||||
event.authentication_requirements,
|
||||
)
|
||||
|
||||
def on_hci_user_confirmation_request_event(self, event):
|
||||
self.emit(
|
||||
|
||||
184
bumble/pairing.py
Normal file
184
bumble/pairing.py
Normal file
@@ -0,0 +1,184 @@
|
||||
# Copyright 2021-2023 Google LLC
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# https://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Imports
|
||||
# -----------------------------------------------------------------------------
|
||||
import enum
|
||||
from typing import Optional, Tuple
|
||||
|
||||
from .hci import (
|
||||
HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY,
|
||||
HCI_DISPLAY_ONLY_IO_CAPABILITY,
|
||||
HCI_DISPLAY_YES_NO_IO_CAPABILITY,
|
||||
HCI_KEYBOARD_ONLY_IO_CAPABILITY,
|
||||
)
|
||||
from .smp import (
|
||||
SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY,
|
||||
SMP_KEYBOARD_ONLY_IO_CAPABILITY,
|
||||
SMP_DISPLAY_ONLY_IO_CAPABILITY,
|
||||
SMP_DISPLAY_YES_NO_IO_CAPABILITY,
|
||||
SMP_KEYBOARD_DISPLAY_IO_CAPABILITY,
|
||||
SMP_ENC_KEY_DISTRIBUTION_FLAG,
|
||||
SMP_ID_KEY_DISTRIBUTION_FLAG,
|
||||
SMP_SIGN_KEY_DISTRIBUTION_FLAG,
|
||||
SMP_LINK_KEY_DISTRIBUTION_FLAG,
|
||||
)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
class PairingDelegate:
|
||||
"""Abstract base class for Pairing Delegates."""
|
||||
|
||||
# I/O Capabilities.
|
||||
# These are defined abstractly, and can be mapped to specific Classic pairing
|
||||
# and/or SMP constants.
|
||||
class IoCapability(enum.IntEnum):
|
||||
NO_OUTPUT_NO_INPUT = SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY
|
||||
KEYBOARD_INPUT_ONLY = SMP_KEYBOARD_ONLY_IO_CAPABILITY
|
||||
DISPLAY_OUTPUT_ONLY = SMP_DISPLAY_ONLY_IO_CAPABILITY
|
||||
DISPLAY_OUTPUT_AND_YES_NO_INPUT = SMP_DISPLAY_YES_NO_IO_CAPABILITY
|
||||
DISPLAY_OUTPUT_AND_KEYBOARD_INPUT = SMP_KEYBOARD_DISPLAY_IO_CAPABILITY
|
||||
|
||||
# Direct names for backward compatibility.
|
||||
NO_OUTPUT_NO_INPUT = IoCapability.NO_OUTPUT_NO_INPUT
|
||||
KEYBOARD_INPUT_ONLY = IoCapability.KEYBOARD_INPUT_ONLY
|
||||
DISPLAY_OUTPUT_ONLY = IoCapability.DISPLAY_OUTPUT_ONLY
|
||||
DISPLAY_OUTPUT_AND_YES_NO_INPUT = IoCapability.DISPLAY_OUTPUT_AND_YES_NO_INPUT
|
||||
DISPLAY_OUTPUT_AND_KEYBOARD_INPUT = IoCapability.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT
|
||||
|
||||
# Key Distribution [LE only]
|
||||
class KeyDistribution(enum.IntFlag):
|
||||
DISTRIBUTE_ENCRYPTION_KEY = SMP_ENC_KEY_DISTRIBUTION_FLAG
|
||||
DISTRIBUTE_IDENTITY_KEY = SMP_ID_KEY_DISTRIBUTION_FLAG
|
||||
DISTRIBUTE_SIGNING_KEY = SMP_SIGN_KEY_DISTRIBUTION_FLAG
|
||||
DISTRIBUTE_LINK_KEY = SMP_LINK_KEY_DISTRIBUTION_FLAG
|
||||
|
||||
DEFAULT_KEY_DISTRIBUTION: int = (
|
||||
SMP_ENC_KEY_DISTRIBUTION_FLAG | SMP_ID_KEY_DISTRIBUTION_FLAG
|
||||
)
|
||||
|
||||
# Default mapping from abstract to Classic I/O capabilities.
|
||||
# Subclasses may override this if they prefer a different mapping.
|
||||
CLASSIC_IO_CAPABILITIES_MAP = {
|
||||
NO_OUTPUT_NO_INPUT: HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY,
|
||||
KEYBOARD_INPUT_ONLY: HCI_KEYBOARD_ONLY_IO_CAPABILITY,
|
||||
DISPLAY_OUTPUT_ONLY: HCI_DISPLAY_ONLY_IO_CAPABILITY,
|
||||
DISPLAY_OUTPUT_AND_YES_NO_INPUT: HCI_DISPLAY_YES_NO_IO_CAPABILITY,
|
||||
DISPLAY_OUTPUT_AND_KEYBOARD_INPUT: HCI_DISPLAY_YES_NO_IO_CAPABILITY,
|
||||
}
|
||||
|
||||
io_capability: IoCapability
|
||||
local_initiator_key_distribution: KeyDistribution
|
||||
local_responder_key_distribution: KeyDistribution
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
io_capability=NO_OUTPUT_NO_INPUT,
|
||||
local_initiator_key_distribution=DEFAULT_KEY_DISTRIBUTION,
|
||||
local_responder_key_distribution=DEFAULT_KEY_DISTRIBUTION,
|
||||
) -> None:
|
||||
self.io_capability = io_capability
|
||||
self.local_initiator_key_distribution = local_initiator_key_distribution
|
||||
self.local_responder_key_distribution = local_responder_key_distribution
|
||||
|
||||
@property
|
||||
def classic_io_capability(self) -> int:
|
||||
"""Map the abstract I/O capability to a Classic constant."""
|
||||
|
||||
# pylint: disable=line-too-long
|
||||
return self.CLASSIC_IO_CAPABILITIES_MAP.get(
|
||||
self.io_capability, HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY
|
||||
)
|
||||
|
||||
@property
|
||||
def smp_io_capability(self) -> int:
|
||||
"""Map the abstract I/O capability to an SMP constant."""
|
||||
|
||||
# This is just a 1-1 direct mapping
|
||||
return self.io_capability
|
||||
|
||||
async def accept(self) -> bool:
|
||||
"""Accept or reject a Pairing request."""
|
||||
return True
|
||||
|
||||
async def confirm(self) -> bool:
|
||||
"""Respond yes or no to a Pairing confirmation question."""
|
||||
return True
|
||||
|
||||
# pylint: disable-next=unused-argument
|
||||
async def compare_numbers(self, number: int, digits: int) -> bool:
|
||||
"""Compare two numbers."""
|
||||
return True
|
||||
|
||||
async def get_number(self) -> Optional[int]:
|
||||
"""
|
||||
Return an optional number as an answer to a passkey request.
|
||||
Returning `None` will result in a negative reply.
|
||||
"""
|
||||
return 0
|
||||
|
||||
async def get_string(self, max_length) -> Optional[str]:
|
||||
"""
|
||||
Return a string whose utf-8 encoding is up to max_length bytes.
|
||||
"""
|
||||
return None
|
||||
|
||||
# pylint: disable-next=unused-argument
|
||||
async def display_number(self, number: int, digits: int) -> None:
|
||||
"""Display a number."""
|
||||
|
||||
# [LE only]
|
||||
async def key_distribution_response(
|
||||
self, peer_initiator_key_distribution: int, peer_responder_key_distribution: int
|
||||
) -> Tuple[int, int]:
|
||||
"""
|
||||
Return the key distribution response in an SMP protocol context.
|
||||
|
||||
NOTE: since it is only used by the SMP protocol, this method's input and output
|
||||
are directly as integers, using the SMP constants, rather than the abstract
|
||||
KeyDistribution enums.
|
||||
"""
|
||||
return (
|
||||
int(
|
||||
peer_initiator_key_distribution & self.local_initiator_key_distribution
|
||||
),
|
||||
int(
|
||||
peer_responder_key_distribution & self.local_responder_key_distribution
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
class PairingConfig:
|
||||
"""Configuration for the Pairing protocol."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
sc: bool = True,
|
||||
mitm: bool = True,
|
||||
bonding: bool = True,
|
||||
delegate: Optional[PairingDelegate] = None,
|
||||
) -> None:
|
||||
self.sc = sc
|
||||
self.mitm = mitm
|
||||
self.bonding = bonding
|
||||
self.delegate = delegate or PairingDelegate()
|
||||
|
||||
def __str__(self) -> str:
|
||||
return (
|
||||
f'PairingConfig(sc={self.sc}, '
|
||||
f'mitm={self.mitm}, bonding={self.bonding}, '
|
||||
f'delegate[{self.delegate.io_capability}])'
|
||||
)
|
||||
109
bumble/smp.py
109
bumble/smp.py
@@ -31,7 +31,16 @@ from typing import Dict, Optional, Type
|
||||
from pyee import EventEmitter
|
||||
|
||||
from .colors import color
|
||||
from .hci import Address, HCI_LE_Enable_Encryption_Command, HCI_Object, key_with_value
|
||||
from .hci import (
|
||||
HCI_DISPLAY_ONLY_IO_CAPABILITY,
|
||||
HCI_DISPLAY_YES_NO_IO_CAPABILITY,
|
||||
HCI_KEYBOARD_ONLY_IO_CAPABILITY,
|
||||
HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY,
|
||||
Address,
|
||||
HCI_LE_Enable_Encryption_Command,
|
||||
HCI_Object,
|
||||
key_with_value,
|
||||
)
|
||||
from .core import (
|
||||
BT_BR_EDR_TRANSPORT,
|
||||
BT_CENTRAL_ROLE,
|
||||
@@ -476,7 +485,7 @@ class AddressResolver:
|
||||
address_bytes = bytes(address)
|
||||
hash_part = address_bytes[0:3]
|
||||
prand = address_bytes[3:6]
|
||||
for (irk, resolved_address) in self.resolving_keys:
|
||||
for irk, resolved_address in self.resolving_keys:
|
||||
local_hash = crypto.ah(irk, prand)
|
||||
if local_hash == hash_part:
|
||||
# Match!
|
||||
@@ -491,86 +500,6 @@ class AddressResolver:
|
||||
return None
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
class PairingDelegate:
|
||||
NO_OUTPUT_NO_INPUT = SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY
|
||||
KEYBOARD_INPUT_ONLY = SMP_KEYBOARD_ONLY_IO_CAPABILITY
|
||||
DISPLAY_OUTPUT_ONLY = SMP_DISPLAY_ONLY_IO_CAPABILITY
|
||||
DISPLAY_OUTPUT_AND_YES_NO_INPUT = SMP_DISPLAY_YES_NO_IO_CAPABILITY
|
||||
DISPLAY_OUTPUT_AND_KEYBOARD_INPUT = SMP_KEYBOARD_DISPLAY_IO_CAPABILITY
|
||||
DEFAULT_KEY_DISTRIBUTION: int = (
|
||||
SMP_ENC_KEY_DISTRIBUTION_FLAG | SMP_ID_KEY_DISTRIBUTION_FLAG
|
||||
)
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
io_capability: int = NO_OUTPUT_NO_INPUT,
|
||||
local_initiator_key_distribution: int = DEFAULT_KEY_DISTRIBUTION,
|
||||
local_responder_key_distribution: int = DEFAULT_KEY_DISTRIBUTION,
|
||||
) -> None:
|
||||
self.io_capability = io_capability
|
||||
self.local_initiator_key_distribution = local_initiator_key_distribution
|
||||
self.local_responder_key_distribution = local_responder_key_distribution
|
||||
|
||||
async def accept(self) -> bool:
|
||||
return True
|
||||
|
||||
async def confirm(self) -> bool:
|
||||
return True
|
||||
|
||||
# pylint: disable-next=unused-argument
|
||||
async def compare_numbers(self, number: int, digits: int) -> bool:
|
||||
return True
|
||||
|
||||
async def get_number(self) -> Optional[int]:
|
||||
'''
|
||||
Returns an optional number as an answer to a passkey request.
|
||||
Returning `None` will result in a negative reply.
|
||||
'''
|
||||
return 0
|
||||
|
||||
async def get_string(self, max_length) -> Optional[str]:
|
||||
'''
|
||||
Returns a string whose utf-8 encoding is up to max_length bytes.
|
||||
'''
|
||||
return None
|
||||
|
||||
# pylint: disable-next=unused-argument
|
||||
async def display_number(self, number: int, digits: int) -> None:
|
||||
pass
|
||||
|
||||
async def key_distribution_response(
|
||||
self, peer_initiator_key_distribution, peer_responder_key_distribution
|
||||
):
|
||||
return (
|
||||
(peer_initiator_key_distribution & self.local_initiator_key_distribution),
|
||||
(peer_responder_key_distribution & self.local_responder_key_distribution),
|
||||
)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
class PairingConfig:
|
||||
def __init__(
|
||||
self,
|
||||
sc: bool = True,
|
||||
mitm: bool = False,
|
||||
bonding: bool = True,
|
||||
delegate: Optional[PairingDelegate] = None,
|
||||
) -> None:
|
||||
self.sc = sc
|
||||
self.mitm = mitm
|
||||
self.bonding = bonding
|
||||
self.delegate = delegate or PairingDelegate()
|
||||
|
||||
def __str__(self):
|
||||
io_capability_str = SMP_Command.io_capability_name(self.delegate.io_capability)
|
||||
return (
|
||||
f'PairingConfig(sc={self.sc}, '
|
||||
f'mitm={self.mitm}, bonding={self.bonding}, '
|
||||
f'delegate[{io_capability_str}])'
|
||||
)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
class Session:
|
||||
# Pairing methods
|
||||
@@ -645,7 +574,7 @@ class Session:
|
||||
},
|
||||
}
|
||||
|
||||
def __init__(self, manager, connection, pairing_config):
|
||||
def __init__(self, manager, connection, pairing_config, is_initiator):
|
||||
self.manager = manager
|
||||
self.connection = connection
|
||||
self.preq = None
|
||||
@@ -684,7 +613,7 @@ class Session:
|
||||
self.ctkd_task = None
|
||||
|
||||
# Decide if we're the initiator or the responder
|
||||
self.is_initiator = connection.role == BT_CENTRAL_ROLE
|
||||
self.is_initiator = is_initiator
|
||||
self.is_responder = not self.is_initiator
|
||||
|
||||
# Listen for connection events
|
||||
@@ -1662,12 +1591,12 @@ class Manager(EventEmitter):
|
||||
Implements the Initiator and Responder roles of the Security Manager Protocol
|
||||
'''
|
||||
|
||||
def __init__(self, device):
|
||||
def __init__(self, device, pairing_config_factory):
|
||||
super().__init__()
|
||||
self.device = device
|
||||
self.sessions = {}
|
||||
self._ecc_key = None
|
||||
self.pairing_config_factory = lambda connection: PairingConfig()
|
||||
self.pairing_config_factory = pairing_config_factory
|
||||
|
||||
def send_command(self, connection, command):
|
||||
logger.debug(
|
||||
@@ -1680,6 +1609,8 @@ class Manager(EventEmitter):
|
||||
def on_smp_pdu(self, connection, pdu):
|
||||
# Look for a session with this connection, and create one if none exists
|
||||
if not (session := self.sessions.get(connection.handle)):
|
||||
if connection.role == BT_CENTRAL_ROLE:
|
||||
logger.warning('Remote starts pairing as Peripheral!')
|
||||
pairing_config = self.pairing_config_factory(connection)
|
||||
if pairing_config is None:
|
||||
# Pairing disabled
|
||||
@@ -1688,7 +1619,7 @@ class Manager(EventEmitter):
|
||||
SMP_Pairing_Failed_Command(reason=SMP_PAIRING_NOT_SUPPORTED_ERROR),
|
||||
)
|
||||
return
|
||||
session = Session(self, connection, pairing_config)
|
||||
session = Session(self, connection, pairing_config, is_initiator=False)
|
||||
self.sessions[connection.handle] = session
|
||||
|
||||
# Parse the L2CAP payload into an SMP Command object
|
||||
@@ -1709,10 +1640,12 @@ class Manager(EventEmitter):
|
||||
|
||||
async def pair(self, connection):
|
||||
# TODO: check if there's already a session for this connection
|
||||
if connection.role != BT_CENTRAL_ROLE:
|
||||
logger.warning('Start pairing as Peripheral!')
|
||||
pairing_config = self.pairing_config_factory(connection)
|
||||
if pairing_config is None:
|
||||
raise ValueError('pairing config must not be None when initiating')
|
||||
session = Session(self, connection, pairing_config)
|
||||
session = Session(self, connection, pairing_config, is_initiator=True)
|
||||
self.sessions[connection.handle] = session
|
||||
return await session.pair()
|
||||
|
||||
|
||||
@@ -44,6 +44,7 @@ install_requires =
|
||||
pyusb >= 1.2; platform_system!='Emscripten'
|
||||
websockets >= 8.1; platform_system!='Emscripten'
|
||||
prettytable >= 3.6.0
|
||||
humanize >= 4.6.0
|
||||
|
||||
[options.entry_points]
|
||||
console_scripts =
|
||||
|
||||
@@ -28,9 +28,8 @@ from bumble.device import Device, Peer
|
||||
from bumble.host import Host
|
||||
from bumble.gatt import Service, Characteristic
|
||||
from bumble.transport import AsyncPipeSink
|
||||
from bumble.pairing import PairingConfig, PairingDelegate
|
||||
from bumble.smp import (
|
||||
PairingConfig,
|
||||
PairingDelegate,
|
||||
SMP_PAIRING_NOT_SUPPORTED_ERROR,
|
||||
SMP_CONFIRM_VALUE_FAILED_ERROR,
|
||||
)
|
||||
@@ -262,7 +261,7 @@ async def test_self_gatt_long_read():
|
||||
found_service = result[0]
|
||||
found_characteristics = await found_service.discover_characteristics()
|
||||
assert len(found_characteristics) == 513
|
||||
for (i, characteristic) in enumerate(found_characteristics):
|
||||
for i, characteristic in enumerate(found_characteristics):
|
||||
value = await characteristic.read_value()
|
||||
assert value == characteristics[i].value
|
||||
|
||||
@@ -317,11 +316,11 @@ async def _test_self_smp_with_configs(pairing_config1, pairing_config2):
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
IO_CAP = [
|
||||
PairingDelegate.NO_OUTPUT_NO_INPUT,
|
||||
PairingDelegate.KEYBOARD_INPUT_ONLY,
|
||||
PairingDelegate.DISPLAY_OUTPUT_ONLY,
|
||||
PairingDelegate.DISPLAY_OUTPUT_AND_YES_NO_INPUT,
|
||||
PairingDelegate.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT,
|
||||
PairingDelegate.IoCapability.NO_OUTPUT_NO_INPUT,
|
||||
PairingDelegate.IoCapability.KEYBOARD_INPUT_ONLY,
|
||||
PairingDelegate.IoCapability.DISPLAY_OUTPUT_ONLY,
|
||||
PairingDelegate.IoCapability.DISPLAY_OUTPUT_AND_YES_NO_INPUT,
|
||||
PairingDelegate.IoCapability.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT,
|
||||
]
|
||||
SC = [False, True]
|
||||
MITM = [False, True]
|
||||
@@ -335,7 +334,10 @@ KEY_DIST = range(16)
|
||||
itertools.chain(
|
||||
itertools.product([IO_CAP], SC, MITM, [15]),
|
||||
itertools.product(
|
||||
[[PairingDelegate.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT]], SC, MITM, KEY_DIST
|
||||
[[PairingDelegate.IoCapability.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT]],
|
||||
SC,
|
||||
MITM,
|
||||
KEY_DIST,
|
||||
),
|
||||
),
|
||||
)
|
||||
@@ -378,7 +380,7 @@ async def test_self_smp(io_caps, sc, mitm, key_dist):
|
||||
else:
|
||||
if (
|
||||
self.peer_delegate.io_capability
|
||||
== PairingDelegate.KEYBOARD_INPUT_ONLY
|
||||
== PairingDelegate.IoCapability.KEYBOARD_INPUT_ONLY
|
||||
):
|
||||
peer_number = 6789
|
||||
else:
|
||||
@@ -421,7 +423,7 @@ async def test_self_smp(io_caps, sc, mitm, key_dist):
|
||||
async def test_self_smp_reject():
|
||||
class RejectingDelegate(PairingDelegate):
|
||||
def __init__(self):
|
||||
super().__init__(PairingDelegate.NO_OUTPUT_NO_INPUT)
|
||||
super().__init__(PairingDelegate.IoCapability.NO_OUTPUT_NO_INPUT)
|
||||
|
||||
async def accept(self):
|
||||
return False
|
||||
@@ -442,7 +444,9 @@ async def test_self_smp_reject():
|
||||
async def test_self_smp_wrong_pin():
|
||||
class WrongPinDelegate(PairingDelegate):
|
||||
def __init__(self):
|
||||
super().__init__(PairingDelegate.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT)
|
||||
super().__init__(
|
||||
PairingDelegate.IoCapability.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT
|
||||
)
|
||||
|
||||
async def compare_numbers(self, number, digits):
|
||||
return False
|
||||
|
||||
Reference in New Issue
Block a user