forked from auracaster/bumble_mirror
Compare commits
9 Commits
gbg/connec
...
v0.0.149
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
51d3a869a4 | ||
|
|
dd930e3bde | ||
|
|
9af426db45 | ||
|
|
4286b2ab59 | ||
|
|
3442358dea | ||
|
|
bf3e05ef91 | ||
|
|
5351ab8a42 | ||
|
|
2c2f512180 | ||
|
|
859aea5a99 |
22
apps/pair.py
22
apps/pair.py
@@ -24,7 +24,7 @@ from prompt_toolkit.shortcuts import PromptSession
|
||||
from bumble.colors import color
|
||||
from bumble.device import Device, Peer
|
||||
from bumble.transport import open_transport_or_link
|
||||
from bumble.smp import PairingDelegate, PairingConfig
|
||||
from bumble.pairing import PairingDelegate, PairingConfig
|
||||
from bumble.smp import error_name as smp_error_name
|
||||
from bumble.keys import JsonKeyStore
|
||||
from bumble.core import ProtocolError
|
||||
@@ -264,6 +264,7 @@ async def pair(
|
||||
sc,
|
||||
mitm,
|
||||
bond,
|
||||
ctkd,
|
||||
io,
|
||||
prompt,
|
||||
request,
|
||||
@@ -317,6 +318,7 @@ async def pair(
|
||||
if mode == 'classic':
|
||||
device.classic_enabled = True
|
||||
device.le_enabled = False
|
||||
device.classic_smp_enabled = ctkd
|
||||
|
||||
# Get things going
|
||||
await device.power_on()
|
||||
@@ -343,8 +345,13 @@ async def pair(
|
||||
print(color(f'Pairing failed: {error}', 'red'))
|
||||
return
|
||||
else:
|
||||
# Advertise so that peers can find us and connect
|
||||
await device.start_advertising(auto_restart=True)
|
||||
if mode == 'le':
|
||||
# Advertise so that peers can find us and connect
|
||||
await device.start_advertising(auto_restart=True)
|
||||
else:
|
||||
# Become discoverable and connectable
|
||||
await device.set_discoverable(True)
|
||||
await device.set_connectable(True)
|
||||
|
||||
# Run until the user asks to exit
|
||||
await Waiter.instance.wait_until_terminated()
|
||||
@@ -379,6 +386,13 @@ class LogHandler(logging.Handler):
|
||||
@click.option(
|
||||
'--bond', type=bool, default=True, help='Enable bonding', show_default=True
|
||||
)
|
||||
@click.option(
|
||||
'--ctkd',
|
||||
type=bool,
|
||||
default=True,
|
||||
help='Enable CTKD',
|
||||
show_default=True,
|
||||
)
|
||||
@click.option(
|
||||
'--io',
|
||||
type=click.Choice(
|
||||
@@ -405,6 +419,7 @@ def main(
|
||||
sc,
|
||||
mitm,
|
||||
bond,
|
||||
ctkd,
|
||||
io,
|
||||
prompt,
|
||||
request,
|
||||
@@ -427,6 +442,7 @@ def main(
|
||||
sc,
|
||||
mitm,
|
||||
bond,
|
||||
ctkd,
|
||||
io,
|
||||
prompt,
|
||||
request,
|
||||
|
||||
201
bumble/device.py
201
bumble/device.py
@@ -29,11 +29,13 @@ from .colors import color
|
||||
from .att import ATT_CID, ATT_DEFAULT_MTU, ATT_PDU
|
||||
from .gatt import Characteristic, Descriptor, Service
|
||||
from .hci import (
|
||||
HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_192_TYPE,
|
||||
HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE,
|
||||
HCI_CENTRAL_ROLE,
|
||||
HCI_COMMAND_STATUS_PENDING,
|
||||
HCI_CONNECTION_REJECTED_DUE_TO_LIMITED_RESOURCES_ERROR,
|
||||
HCI_DISPLAY_ONLY_IO_CAPABILITY,
|
||||
HCI_DISPLAY_YES_NO_IO_CAPABILITY,
|
||||
HCI_DISPLAY_ONLY_IO_CAPABILITY,
|
||||
HCI_EXTENDED_INQUIRY_MODE,
|
||||
HCI_GENERAL_INQUIRY_LAP,
|
||||
HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR,
|
||||
@@ -141,6 +143,7 @@ from .keys import (
|
||||
KeyStore,
|
||||
PairingKeys,
|
||||
)
|
||||
from .pairing import PairingConfig
|
||||
from . import gatt_client
|
||||
from . import gatt_server
|
||||
from . import smp
|
||||
@@ -198,6 +201,7 @@ DEVICE_DEFAULT_L2CAP_COC_MAX_CREDITS = l2cap.L2CAP_LE_CREDIT_BASED_CONN
|
||||
# Classes
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
class Advertisement:
|
||||
address: Address
|
||||
@@ -530,6 +534,8 @@ class Connection(CompositeEventEmitter):
|
||||
sc: bool
|
||||
link_key_type: int
|
||||
gatt_client: gatt_client.Client
|
||||
pairing_peer_io_capability: Optional[int]
|
||||
pairing_peer_authentication_requirements: Optional[int]
|
||||
|
||||
@composite_listener
|
||||
class Listener:
|
||||
@@ -593,10 +599,12 @@ class Connection(CompositeEventEmitter):
|
||||
self.gatt_server = (
|
||||
device.gatt_server
|
||||
) # By default, use the device's shared server
|
||||
self.pairing_peer_io_capability = None
|
||||
self.pairing_peer_authentication_requirements = None
|
||||
|
||||
# [Classic only]
|
||||
@classmethod
|
||||
def incomplete(cls, device, peer_address):
|
||||
def incomplete(cls, device, peer_address, role):
|
||||
"""
|
||||
Instantiate an incomplete connection (ie. one waiting for a HCI Connection
|
||||
Complete event).
|
||||
@@ -609,28 +617,30 @@ class Connection(CompositeEventEmitter):
|
||||
device.public_address,
|
||||
peer_address,
|
||||
None,
|
||||
None,
|
||||
role,
|
||||
None,
|
||||
None,
|
||||
)
|
||||
|
||||
# [Classic only]
|
||||
def complete(self, handle, peer_resolvable_address, role, parameters):
|
||||
def complete(self, handle, parameters):
|
||||
"""
|
||||
Finish an incomplete connection upon completion.
|
||||
"""
|
||||
assert self.handle is None
|
||||
assert self.transport == BT_BR_EDR_TRANSPORT
|
||||
self.handle = handle
|
||||
self.peer_resolvable_address = peer_resolvable_address
|
||||
# Quirk: role might be known before complete
|
||||
if self.role is None:
|
||||
self.role = role
|
||||
self.parameters = parameters
|
||||
|
||||
@property
|
||||
def role_name(self):
|
||||
return 'CENTRAL' if self.role == BT_CENTRAL_ROLE else 'PERIPHERAL'
|
||||
if self.role is None:
|
||||
return 'NOT-SET'
|
||||
if self.role == BT_CENTRAL_ROLE:
|
||||
return 'CENTRAL'
|
||||
if self.role == BT_PERIPHERAL_ROLE:
|
||||
return 'PERIPHERAL'
|
||||
return f'UNKNOWN[{self.role}]'
|
||||
|
||||
@property
|
||||
def is_encrypted(self):
|
||||
@@ -638,7 +648,7 @@ class Connection(CompositeEventEmitter):
|
||||
|
||||
@property
|
||||
def is_incomplete(self) -> bool:
|
||||
return self.handle == None
|
||||
return self.handle is None
|
||||
|
||||
def send_l2cap_pdu(self, cid, pdu):
|
||||
self.device.send_l2cap_pdu(self.handle, cid, pdu)
|
||||
@@ -751,10 +761,11 @@ class DeviceConfiguration:
|
||||
self.advertising_interval_max = DEVICE_DEFAULT_ADVERTISING_INTERVAL
|
||||
self.le_enabled = True
|
||||
# LE host enable 2nd parameter
|
||||
self.le_simultaneous_enabled = True
|
||||
self.le_simultaneous_enabled = False
|
||||
self.classic_enabled = False
|
||||
self.classic_sc_enabled = True
|
||||
self.classic_ssp_enabled = True
|
||||
self.classic_smp_enabled = True
|
||||
self.classic_accept_any = True
|
||||
self.connectable = True
|
||||
self.discoverable = True
|
||||
@@ -789,6 +800,9 @@ class DeviceConfiguration:
|
||||
self.classic_ssp_enabled = config.get(
|
||||
'classic_ssp_enabled', self.classic_ssp_enabled
|
||||
)
|
||||
self.classic_smp_enabled = config.get(
|
||||
'classic_smp_enabled', self.classic_smp_enabled
|
||||
)
|
||||
self.classic_accept_any = config.get(
|
||||
'classic_accept_any', self.classic_accept_any
|
||||
)
|
||||
@@ -998,8 +1012,9 @@ class Device(CompositeEventEmitter):
|
||||
self.le_enabled = config.le_enabled
|
||||
self.classic_enabled = config.classic_enabled
|
||||
self.le_simultaneous_enabled = config.le_simultaneous_enabled
|
||||
self.classic_ssp_enabled = config.classic_ssp_enabled
|
||||
self.classic_sc_enabled = config.classic_sc_enabled
|
||||
self.classic_ssp_enabled = config.classic_ssp_enabled
|
||||
self.classic_smp_enabled = config.classic_smp_enabled
|
||||
self.discoverable = config.discoverable
|
||||
self.connectable = config.connectable
|
||||
self.classic_accept_any = config.classic_accept_any
|
||||
@@ -1042,12 +1057,12 @@ class Device(CompositeEventEmitter):
|
||||
self.random_address = address
|
||||
|
||||
# Setup SMP
|
||||
self.smp_manager = smp.Manager(self)
|
||||
self.l2cap_channel_manager.register_fixed_channel(smp.SMP_CID, self.on_smp_pdu)
|
||||
self.l2cap_channel_manager.register_fixed_channel(
|
||||
smp.SMP_BR_CID, self.on_smp_pdu
|
||||
self.smp_manager = smp.Manager(
|
||||
self, pairing_config_factory=lambda connection: PairingConfig()
|
||||
)
|
||||
|
||||
self.l2cap_channel_manager.register_fixed_channel(smp.SMP_CID, self.on_smp_pdu)
|
||||
|
||||
# Register the SDP server with the L2CAP Channel Manager
|
||||
self.sdp_server.register(self.l2cap_channel_manager)
|
||||
|
||||
@@ -1183,6 +1198,12 @@ class Device(CompositeEventEmitter):
|
||||
if self.keystore is None:
|
||||
self.keystore = KeyStore.create_for_device(self)
|
||||
|
||||
# Finish setting up SMP based on post-init configurable options
|
||||
if self.classic_smp_enabled:
|
||||
self.l2cap_channel_manager.register_fixed_channel(
|
||||
smp.SMP_BR_CID, self.on_smp_pdu
|
||||
)
|
||||
|
||||
if self.host.supports_command(HCI_WRITE_LE_HOST_SUPPORT_COMMAND):
|
||||
await self.send_command(
|
||||
HCI_Write_LE_Host_Support_Command(
|
||||
@@ -1230,7 +1251,7 @@ class Device(CompositeEventEmitter):
|
||||
await self.send_command(HCI_LE_Clear_Resolving_List_Command()) # type: ignore[call-arg]
|
||||
|
||||
resolving_keys = await self.keystore.get_resolving_keys()
|
||||
for (irk, address) in resolving_keys:
|
||||
for irk, address in resolving_keys:
|
||||
await self.send_command(
|
||||
HCI_LE_Add_Device_To_Resolving_List_Command(
|
||||
peer_identity_address_type=address.address_type,
|
||||
@@ -1802,7 +1823,7 @@ class Device(CompositeEventEmitter):
|
||||
else:
|
||||
# Save pending connection
|
||||
self.pending_connections[peer_address] = Connection.incomplete(
|
||||
self, peer_address
|
||||
self, peer_address, BT_CENTRAL_ROLE
|
||||
)
|
||||
|
||||
# TODO: allow passing other settings
|
||||
@@ -1939,9 +1960,12 @@ class Device(CompositeEventEmitter):
|
||||
self.on('connection', on_connection)
|
||||
self.on('connection_failure', on_connection_failure)
|
||||
|
||||
# Save pending connection
|
||||
# Save pending connection, with the Peripheral role.
|
||||
# Even if we requested a role switch in the HCI_Accept_Connection_Request
|
||||
# command, this connection is still considered Peripheral until an eventual
|
||||
# role change event.
|
||||
self.pending_connections[peer_address] = Connection.incomplete(
|
||||
self, peer_address
|
||||
self, peer_address, BT_PERIPHERAL_ROLE
|
||||
)
|
||||
|
||||
try:
|
||||
@@ -2214,6 +2238,10 @@ class Device(CompositeEventEmitter):
|
||||
keys = await self.keystore.get(str(address))
|
||||
if keys is not None:
|
||||
logger.debug('found keys in the key store')
|
||||
if keys.link_key is None:
|
||||
logger.warning('no link key')
|
||||
return None
|
||||
|
||||
return keys.link_key.value
|
||||
|
||||
# [Classic only]
|
||||
@@ -2418,8 +2446,14 @@ class Device(CompositeEventEmitter):
|
||||
def on_link_key(self, bd_addr, link_key, key_type):
|
||||
# Store the keys in the key store
|
||||
if self.keystore:
|
||||
authenticated = key_type in (
|
||||
HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_192_TYPE,
|
||||
HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE,
|
||||
)
|
||||
pairing_keys = PairingKeys()
|
||||
pairing_keys.link_key = PairingKeys.Key(value=link_key)
|
||||
pairing_keys.link_key = PairingKeys.Key(
|
||||
value=link_key, authenticated=authenticated
|
||||
)
|
||||
|
||||
async def store_keys():
|
||||
try:
|
||||
@@ -2463,25 +2497,24 @@ class Device(CompositeEventEmitter):
|
||||
connection_handle,
|
||||
transport,
|
||||
peer_address,
|
||||
peer_resolvable_address,
|
||||
role,
|
||||
connection_parameters,
|
||||
):
|
||||
logger.debug(
|
||||
f'*** Connection: [0x{connection_handle:04X}] '
|
||||
f'{peer_address} as {HCI_Constant.role_name(role)}'
|
||||
f'{peer_address} {"" if role is None else HCI_Constant.role_name(role)}'
|
||||
)
|
||||
if connection_handle in self.connections:
|
||||
logger.warning(
|
||||
'new connection reuses the same handle as a previous connection'
|
||||
)
|
||||
|
||||
peer_resolvable_address = None
|
||||
|
||||
if transport == BT_BR_EDR_TRANSPORT:
|
||||
# Create a new connection
|
||||
connection = self.pending_connections.pop(peer_address)
|
||||
connection.complete(
|
||||
connection_handle, peer_resolvable_address, role, connection_parameters
|
||||
)
|
||||
connection.complete(connection_handle, connection_parameters)
|
||||
self.connections[connection_handle] = connection
|
||||
|
||||
# Emit an event to notify listeners of the new connection
|
||||
@@ -2593,7 +2626,9 @@ class Device(CompositeEventEmitter):
|
||||
# device configuration is set to accept any incoming connection
|
||||
elif self.classic_accept_any:
|
||||
# Save pending connection
|
||||
self.pending_connections[bd_addr] = Connection.incomplete(self, bd_addr)
|
||||
self.pending_connections[bd_addr] = Connection.incomplete(
|
||||
self, bd_addr, BT_PERIPHERAL_ROLE
|
||||
)
|
||||
|
||||
self.host.send_command_sync(
|
||||
HCI_Accept_Connection_Request_Command(
|
||||
@@ -2684,7 +2719,7 @@ class Device(CompositeEventEmitter):
|
||||
# On Secure Simple Pairing complete, in case:
|
||||
# - Connection isn't already authenticated
|
||||
# - AND we are not the initiator of the authentication
|
||||
# We must trigger authentication to known if we are truly authenticated
|
||||
# We must trigger authentication to know if we are truly authenticated
|
||||
if not connection.authenticating and not connection.authenticated:
|
||||
logger.debug(
|
||||
f'*** Trigger Connection Authentication: [0x{connection.handle:04X}] '
|
||||
@@ -2699,22 +2734,6 @@ class Device(CompositeEventEmitter):
|
||||
# Ask what the pairing config should be for this connection
|
||||
pairing_config = self.pairing_config_factory(connection)
|
||||
|
||||
# Map the SMP IO capability to a Classic IO capability
|
||||
# pylint: disable=line-too-long
|
||||
io_capability = {
|
||||
smp.SMP_DISPLAY_ONLY_IO_CAPABILITY: HCI_DISPLAY_ONLY_IO_CAPABILITY,
|
||||
smp.SMP_DISPLAY_YES_NO_IO_CAPABILITY: HCI_DISPLAY_YES_NO_IO_CAPABILITY,
|
||||
smp.SMP_KEYBOARD_ONLY_IO_CAPABILITY: HCI_KEYBOARD_ONLY_IO_CAPABILITY,
|
||||
smp.SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY,
|
||||
smp.SMP_KEYBOARD_DISPLAY_IO_CAPABILITY: HCI_DISPLAY_YES_NO_IO_CAPABILITY,
|
||||
}.get(pairing_config.delegate.io_capability)
|
||||
|
||||
if io_capability is None:
|
||||
logger.warning(
|
||||
f'cannot map IO capability ({pairing_config.delegate.io_capability}'
|
||||
)
|
||||
io_capability = HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY
|
||||
|
||||
# Compute the authentication requirements
|
||||
authentication_requirements = (
|
||||
# No Bonding
|
||||
@@ -2733,53 +2752,50 @@ class Device(CompositeEventEmitter):
|
||||
self.host.send_command_sync(
|
||||
HCI_IO_Capability_Request_Reply_Command(
|
||||
bd_addr=connection.peer_address,
|
||||
io_capability=io_capability,
|
||||
io_capability=pairing_config.delegate.classic_io_capability,
|
||||
oob_data_present=0x00, # Not present
|
||||
authentication_requirements=authentication_requirements,
|
||||
)
|
||||
)
|
||||
|
||||
# [Classic only]
|
||||
@host_event_handler
|
||||
@with_connection_from_address
|
||||
def on_authentication_io_capability_response(
|
||||
self, connection, io_capability, authentication_requirements
|
||||
):
|
||||
connection.peer_pairing_io_capability = io_capability
|
||||
connection.peer_pairing_authentication_requirements = (
|
||||
authentication_requirements
|
||||
)
|
||||
|
||||
# [Classic only]
|
||||
@host_event_handler
|
||||
@with_connection_from_address
|
||||
def on_authentication_user_confirmation_request(self, connection, code):
|
||||
# Ask what the pairing config should be for this connection
|
||||
pairing_config = self.pairing_config_factory(connection)
|
||||
|
||||
can_compare = pairing_config.delegate.io_capability not in (
|
||||
smp.SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY,
|
||||
smp.SMP_DISPLAY_ONLY_IO_CAPABILITY,
|
||||
)
|
||||
io_capability = pairing_config.delegate.classic_io_capability
|
||||
|
||||
# Respond
|
||||
if can_compare:
|
||||
|
||||
async def compare_numbers():
|
||||
numbers_match = await connection.abort_on(
|
||||
'disconnection',
|
||||
pairing_config.delegate.compare_numbers(code, digits=6),
|
||||
)
|
||||
if numbers_match:
|
||||
await self.host.send_command(
|
||||
HCI_User_Confirmation_Request_Reply_Command(
|
||||
bd_addr=connection.peer_address
|
||||
)
|
||||
)
|
||||
else:
|
||||
await self.host.send_command(
|
||||
HCI_User_Confirmation_Request_Negative_Reply_Command(
|
||||
bd_addr=connection.peer_address
|
||||
)
|
||||
if io_capability == HCI_DISPLAY_YES_NO_IO_CAPABILITY:
|
||||
if connection.peer_pairing_io_capability in (
|
||||
HCI_DISPLAY_YES_NO_IO_CAPABILITY,
|
||||
HCI_DISPLAY_ONLY_IO_CAPABILITY,
|
||||
):
|
||||
# Display the code and ask the user to compare
|
||||
async def prompt():
|
||||
return (
|
||||
await pairing_config.delegate.compare_numbers(code, digits=6),
|
||||
)
|
||||
|
||||
asyncio.create_task(compare_numbers())
|
||||
else:
|
||||
else:
|
||||
# Ask the user to confirm the pairing, without showing a code
|
||||
async def prompt():
|
||||
return await pairing_config.delegate.confirm()
|
||||
|
||||
async def confirm():
|
||||
confirm = await connection.abort_on(
|
||||
'disconnection', pairing_config.delegate.confirm()
|
||||
)
|
||||
if confirm:
|
||||
if await prompt():
|
||||
await self.host.send_command(
|
||||
HCI_User_Confirmation_Request_Reply_Command(
|
||||
bd_addr=connection.peer_address
|
||||
@@ -2792,7 +2808,17 @@ class Device(CompositeEventEmitter):
|
||||
)
|
||||
)
|
||||
|
||||
asyncio.create_task(confirm())
|
||||
AsyncRunner.spawn(connection.abort_on('disconnection', confirm()))
|
||||
return
|
||||
|
||||
if io_capability == HCI_DISPLAY_ONLY_IO_CAPABILITY:
|
||||
# Display the code to the user
|
||||
AsyncRunner.spawn(pairing_config.delegate.display_number(code, 6))
|
||||
|
||||
# Automatic confirmation
|
||||
self.host.send_command_sync(
|
||||
HCI_User_Confirmation_Request_Reply_Command(bd_addr=connection.peer_address)
|
||||
)
|
||||
|
||||
# [Classic only]
|
||||
@host_event_handler
|
||||
@@ -2800,15 +2826,11 @@ class Device(CompositeEventEmitter):
|
||||
def on_authentication_user_passkey_request(self, connection):
|
||||
# Ask what the pairing config should be for this connection
|
||||
pairing_config = self.pairing_config_factory(connection)
|
||||
|
||||
can_input = pairing_config.delegate.io_capability in (
|
||||
smp.SMP_KEYBOARD_ONLY_IO_CAPABILITY,
|
||||
smp.SMP_KEYBOARD_DISPLAY_IO_CAPABILITY,
|
||||
)
|
||||
io_capability = pairing_config.delegate.classic_io_capability
|
||||
|
||||
# Respond
|
||||
if can_input:
|
||||
|
||||
if io_capability == HCI_KEYBOARD_ONLY_IO_CAPABILITY:
|
||||
# Ask the user to input a number
|
||||
async def get_number():
|
||||
number = await connection.abort_on(
|
||||
'disconnection', pairing_config.delegate.get_number()
|
||||
@@ -2838,18 +2860,14 @@ class Device(CompositeEventEmitter):
|
||||
@host_event_handler
|
||||
@with_connection_from_address
|
||||
def on_pin_code_request(self, connection):
|
||||
# classic legacy pairing
|
||||
# Classic legacy pairing
|
||||
# Ask what the pairing config should be for this connection
|
||||
pairing_config = self.pairing_config_factory(connection)
|
||||
io_capability = pairing_config.delegate.classic_io_capability
|
||||
|
||||
can_input = pairing_config.delegate.io_capability in (
|
||||
smp.SMP_KEYBOARD_ONLY_IO_CAPABILITY,
|
||||
smp.SMP_KEYBOARD_DISPLAY_IO_CAPABILITY,
|
||||
)
|
||||
|
||||
# respond the pin code
|
||||
if can_input:
|
||||
|
||||
# Respond
|
||||
if io_capability == HCI_KEYBOARD_ONLY_IO_CAPABILITY:
|
||||
# Ask the user to enter a string
|
||||
async def get_pin_code():
|
||||
pin_code = await connection.abort_on(
|
||||
'disconnection', pairing_config.delegate.get_string(16)
|
||||
@@ -2889,6 +2907,7 @@ class Device(CompositeEventEmitter):
|
||||
# Ask what the pairing config should be for this connection
|
||||
pairing_config = self.pairing_config_factory(connection)
|
||||
|
||||
# Show the passkey to the user
|
||||
connection.abort_on(
|
||||
'disconnection', pairing_config.delegate.display_number(passkey)
|
||||
)
|
||||
|
||||
@@ -94,10 +94,9 @@ HOST_HC_TOTAL_NUM_ACL_DATA_PACKETS = 1
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
class Connection:
|
||||
def __init__(self, host, handle, role, peer_address, transport):
|
||||
def __init__(self, host, handle, peer_address, transport):
|
||||
self.host = host
|
||||
self.handle = handle
|
||||
self.role = role
|
||||
self.peer_address = peer_address
|
||||
self.assembler = HCI_AclDataPacketAssembler(self.on_acl_pdu)
|
||||
self.transport = transport
|
||||
@@ -396,8 +395,8 @@ class Host(AbortableEventEmitter):
|
||||
|
||||
def supports_command(self, command):
|
||||
# Find the support flag position for this command
|
||||
for (octet, flags) in enumerate(HCI_SUPPORTED_COMMANDS_FLAGS):
|
||||
for (flag_position, value) in enumerate(flags):
|
||||
for octet, flags in enumerate(HCI_SUPPORTED_COMMANDS_FLAGS):
|
||||
for flag_position, value in enumerate(flags):
|
||||
if value == command:
|
||||
# Check if the flag is set
|
||||
if octet < len(self.local_supported_commands) and flag_position < 8:
|
||||
@@ -410,7 +409,7 @@ class Host(AbortableEventEmitter):
|
||||
@property
|
||||
def supported_commands(self):
|
||||
commands = []
|
||||
for (octet, flags) in enumerate(self.local_supported_commands):
|
||||
for octet, flags in enumerate(self.local_supported_commands):
|
||||
if octet < len(HCI_SUPPORTED_COMMANDS_FLAGS):
|
||||
for flag in range(8):
|
||||
if flags & (1 << flag) != 0:
|
||||
@@ -534,7 +533,7 @@ class Host(AbortableEventEmitter):
|
||||
if event.status == HCI_SUCCESS:
|
||||
# Create/update the connection
|
||||
logger.debug(
|
||||
f'### CONNECTION: [0x{event.connection_handle:04X}] '
|
||||
f'### LE CONNECTION: [0x{event.connection_handle:04X}] '
|
||||
f'{event.peer_address} as {HCI_Constant.role_name(event.role)}'
|
||||
)
|
||||
|
||||
@@ -543,7 +542,6 @@ class Host(AbortableEventEmitter):
|
||||
connection = Connection(
|
||||
self,
|
||||
event.connection_handle,
|
||||
event.role,
|
||||
event.peer_address,
|
||||
BT_LE_TRANSPORT,
|
||||
)
|
||||
@@ -560,7 +558,6 @@ class Host(AbortableEventEmitter):
|
||||
event.connection_handle,
|
||||
BT_LE_TRANSPORT,
|
||||
event.peer_address,
|
||||
None,
|
||||
event.role,
|
||||
connection_parameters,
|
||||
)
|
||||
@@ -589,7 +586,6 @@ class Host(AbortableEventEmitter):
|
||||
connection = Connection(
|
||||
self,
|
||||
event.connection_handle,
|
||||
BT_CENTRAL_ROLE,
|
||||
event.bd_addr,
|
||||
BT_BR_EDR_TRANSPORT,
|
||||
)
|
||||
@@ -602,7 +598,6 @@ class Host(AbortableEventEmitter):
|
||||
BT_BR_EDR_TRANSPORT,
|
||||
event.bd_addr,
|
||||
None,
|
||||
BT_CENTRAL_ROLE,
|
||||
None,
|
||||
)
|
||||
else:
|
||||
@@ -622,8 +617,7 @@ class Host(AbortableEventEmitter):
|
||||
if event.status == HCI_SUCCESS:
|
||||
logger.debug(
|
||||
f'### DISCONNECTION: [0x{event.connection_handle:04X}] '
|
||||
f'{connection.peer_address} as '
|
||||
f'{HCI_Constant.role_name(connection.role)}, '
|
||||
f'{connection.peer_address} '
|
||||
f'reason={event.reason}'
|
||||
)
|
||||
del self.connections[event.connection_handle]
|
||||
@@ -739,10 +733,6 @@ class Host(AbortableEventEmitter):
|
||||
f'role change for {event.bd_addr}: '
|
||||
f'{HCI_Constant.role_name(event.new_role)}'
|
||||
)
|
||||
if connection := self.find_connection_by_bd_addr(
|
||||
event.bd_addr, BT_BR_EDR_TRANSPORT
|
||||
):
|
||||
connection.role = event.new_role
|
||||
self.emit('role_change', event.bd_addr, event.new_role)
|
||||
else:
|
||||
logger.debug(
|
||||
@@ -849,7 +839,12 @@ class Host(AbortableEventEmitter):
|
||||
self.emit('authentication_io_capability_request', event.bd_addr)
|
||||
|
||||
def on_hci_io_capability_response_event(self, event):
|
||||
pass
|
||||
self.emit(
|
||||
'authentication_io_capability_response',
|
||||
event.bd_addr,
|
||||
event.io_capability,
|
||||
event.authentication_requirements,
|
||||
)
|
||||
|
||||
def on_hci_user_confirmation_request_event(self, event):
|
||||
self.emit(
|
||||
|
||||
@@ -257,7 +257,7 @@ class JsonKeyStore(KeyStore):
|
||||
json.dump(db, output, sort_keys=True, indent=4)
|
||||
|
||||
# Atomically replace the previous file
|
||||
os.rename(temp_filename, self.filename)
|
||||
os.replace(temp_filename, self.filename)
|
||||
|
||||
async def delete(self, name: str) -> None:
|
||||
db = await self.load()
|
||||
@@ -273,7 +273,7 @@ class JsonKeyStore(KeyStore):
|
||||
db = await self.load()
|
||||
|
||||
namespace = db.setdefault(self.namespace, {})
|
||||
namespace[name] = keys.to_dict()
|
||||
namespace.setdefault(name, {}).update(keys.to_dict())
|
||||
|
||||
await self.save(db)
|
||||
|
||||
|
||||
184
bumble/pairing.py
Normal file
184
bumble/pairing.py
Normal file
@@ -0,0 +1,184 @@
|
||||
# Copyright 2021-2023 Google LLC
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# https://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Imports
|
||||
# -----------------------------------------------------------------------------
|
||||
import enum
|
||||
from typing import Optional, Tuple
|
||||
|
||||
from .hci import (
|
||||
HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY,
|
||||
HCI_DISPLAY_ONLY_IO_CAPABILITY,
|
||||
HCI_DISPLAY_YES_NO_IO_CAPABILITY,
|
||||
HCI_KEYBOARD_ONLY_IO_CAPABILITY,
|
||||
)
|
||||
from .smp import (
|
||||
SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY,
|
||||
SMP_KEYBOARD_ONLY_IO_CAPABILITY,
|
||||
SMP_DISPLAY_ONLY_IO_CAPABILITY,
|
||||
SMP_DISPLAY_YES_NO_IO_CAPABILITY,
|
||||
SMP_KEYBOARD_DISPLAY_IO_CAPABILITY,
|
||||
SMP_ENC_KEY_DISTRIBUTION_FLAG,
|
||||
SMP_ID_KEY_DISTRIBUTION_FLAG,
|
||||
SMP_SIGN_KEY_DISTRIBUTION_FLAG,
|
||||
SMP_LINK_KEY_DISTRIBUTION_FLAG,
|
||||
)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
class PairingDelegate:
|
||||
"""Abstract base class for Pairing Delegates."""
|
||||
|
||||
# I/O Capabilities.
|
||||
# These are defined abstractly, and can be mapped to specific Classic pairing
|
||||
# and/or SMP constants.
|
||||
class IoCapability(enum.IntEnum):
|
||||
NO_OUTPUT_NO_INPUT = SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY
|
||||
KEYBOARD_INPUT_ONLY = SMP_KEYBOARD_ONLY_IO_CAPABILITY
|
||||
DISPLAY_OUTPUT_ONLY = SMP_DISPLAY_ONLY_IO_CAPABILITY
|
||||
DISPLAY_OUTPUT_AND_YES_NO_INPUT = SMP_DISPLAY_YES_NO_IO_CAPABILITY
|
||||
DISPLAY_OUTPUT_AND_KEYBOARD_INPUT = SMP_KEYBOARD_DISPLAY_IO_CAPABILITY
|
||||
|
||||
# Direct names for backward compatibility.
|
||||
NO_OUTPUT_NO_INPUT = IoCapability.NO_OUTPUT_NO_INPUT
|
||||
KEYBOARD_INPUT_ONLY = IoCapability.KEYBOARD_INPUT_ONLY
|
||||
DISPLAY_OUTPUT_ONLY = IoCapability.DISPLAY_OUTPUT_ONLY
|
||||
DISPLAY_OUTPUT_AND_YES_NO_INPUT = IoCapability.DISPLAY_OUTPUT_AND_YES_NO_INPUT
|
||||
DISPLAY_OUTPUT_AND_KEYBOARD_INPUT = IoCapability.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT
|
||||
|
||||
# Key Distribution [LE only]
|
||||
class KeyDistribution(enum.IntFlag):
|
||||
DISTRIBUTE_ENCRYPTION_KEY = SMP_ENC_KEY_DISTRIBUTION_FLAG
|
||||
DISTRIBUTE_IDENTITY_KEY = SMP_ID_KEY_DISTRIBUTION_FLAG
|
||||
DISTRIBUTE_SIGNING_KEY = SMP_SIGN_KEY_DISTRIBUTION_FLAG
|
||||
DISTRIBUTE_LINK_KEY = SMP_LINK_KEY_DISTRIBUTION_FLAG
|
||||
|
||||
DEFAULT_KEY_DISTRIBUTION: int = (
|
||||
SMP_ENC_KEY_DISTRIBUTION_FLAG | SMP_ID_KEY_DISTRIBUTION_FLAG
|
||||
)
|
||||
|
||||
# Default mapping from abstract to Classic I/O capabilities.
|
||||
# Subclasses may override this if they prefer a different mapping.
|
||||
CLASSIC_IO_CAPABILITIES_MAP = {
|
||||
NO_OUTPUT_NO_INPUT: HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY,
|
||||
KEYBOARD_INPUT_ONLY: HCI_KEYBOARD_ONLY_IO_CAPABILITY,
|
||||
DISPLAY_OUTPUT_ONLY: HCI_DISPLAY_ONLY_IO_CAPABILITY,
|
||||
DISPLAY_OUTPUT_AND_YES_NO_INPUT: HCI_DISPLAY_YES_NO_IO_CAPABILITY,
|
||||
DISPLAY_OUTPUT_AND_KEYBOARD_INPUT: HCI_DISPLAY_YES_NO_IO_CAPABILITY,
|
||||
}
|
||||
|
||||
io_capability: IoCapability
|
||||
local_initiator_key_distribution: KeyDistribution
|
||||
local_responder_key_distribution: KeyDistribution
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
io_capability=NO_OUTPUT_NO_INPUT,
|
||||
local_initiator_key_distribution=DEFAULT_KEY_DISTRIBUTION,
|
||||
local_responder_key_distribution=DEFAULT_KEY_DISTRIBUTION,
|
||||
) -> None:
|
||||
self.io_capability = io_capability
|
||||
self.local_initiator_key_distribution = local_initiator_key_distribution
|
||||
self.local_responder_key_distribution = local_responder_key_distribution
|
||||
|
||||
@property
|
||||
def classic_io_capability(self) -> int:
|
||||
"""Map the abstract I/O capability to a Classic constant."""
|
||||
|
||||
# pylint: disable=line-too-long
|
||||
return self.CLASSIC_IO_CAPABILITIES_MAP.get(
|
||||
self.io_capability, HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY
|
||||
)
|
||||
|
||||
@property
|
||||
def smp_io_capability(self) -> int:
|
||||
"""Map the abstract I/O capability to an SMP constant."""
|
||||
|
||||
# This is just a 1-1 direct mapping
|
||||
return self.io_capability
|
||||
|
||||
async def accept(self) -> bool:
|
||||
"""Accept or reject a Pairing request."""
|
||||
return True
|
||||
|
||||
async def confirm(self) -> bool:
|
||||
"""Respond yes or no to a Pairing confirmation question."""
|
||||
return True
|
||||
|
||||
# pylint: disable-next=unused-argument
|
||||
async def compare_numbers(self, number: int, digits: int) -> bool:
|
||||
"""Compare two numbers."""
|
||||
return True
|
||||
|
||||
async def get_number(self) -> Optional[int]:
|
||||
"""
|
||||
Return an optional number as an answer to a passkey request.
|
||||
Returning `None` will result in a negative reply.
|
||||
"""
|
||||
return 0
|
||||
|
||||
async def get_string(self, max_length) -> Optional[str]:
|
||||
"""
|
||||
Return a string whose utf-8 encoding is up to max_length bytes.
|
||||
"""
|
||||
return None
|
||||
|
||||
# pylint: disable-next=unused-argument
|
||||
async def display_number(self, number: int, digits: int) -> None:
|
||||
"""Display a number."""
|
||||
|
||||
# [LE only]
|
||||
async def key_distribution_response(
|
||||
self, peer_initiator_key_distribution: int, peer_responder_key_distribution: int
|
||||
) -> Tuple[int, int]:
|
||||
"""
|
||||
Return the key distribution response in an SMP protocol context.
|
||||
|
||||
NOTE: since it is only used by the SMP protocol, this method's input and output
|
||||
are directly as integers, using the SMP constants, rather than the abstract
|
||||
KeyDistribution enums.
|
||||
"""
|
||||
return (
|
||||
int(
|
||||
peer_initiator_key_distribution & self.local_initiator_key_distribution
|
||||
),
|
||||
int(
|
||||
peer_responder_key_distribution & self.local_responder_key_distribution
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
class PairingConfig:
|
||||
"""Configuration for the Pairing protocol."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
sc: bool = True,
|
||||
mitm: bool = True,
|
||||
bonding: bool = True,
|
||||
delegate: Optional[PairingDelegate] = None,
|
||||
) -> None:
|
||||
self.sc = sc
|
||||
self.mitm = mitm
|
||||
self.bonding = bonding
|
||||
self.delegate = delegate or PairingDelegate()
|
||||
|
||||
def __str__(self) -> str:
|
||||
return (
|
||||
f'PairingConfig(sc={self.sc}, '
|
||||
f'mitm={self.mitm}, bonding={self.bonding}, '
|
||||
f'delegate[{self.delegate.io_capability}])'
|
||||
)
|
||||
@@ -439,7 +439,7 @@ class DLC(EventEmitter):
|
||||
|
||||
logger.debug(
|
||||
f'<<< Credits [{self.dlci}]: '
|
||||
f'received {credits}, total={self.tx_credits}'
|
||||
f'received {received_credits}, total={self.tx_credits}'
|
||||
)
|
||||
data = data[1:]
|
||||
|
||||
|
||||
@@ -31,7 +31,16 @@ from typing import Dict, Optional, Type
|
||||
from pyee import EventEmitter
|
||||
|
||||
from .colors import color
|
||||
from .hci import Address, HCI_LE_Enable_Encryption_Command, HCI_Object, key_with_value
|
||||
from .hci import (
|
||||
HCI_DISPLAY_ONLY_IO_CAPABILITY,
|
||||
HCI_DISPLAY_YES_NO_IO_CAPABILITY,
|
||||
HCI_KEYBOARD_ONLY_IO_CAPABILITY,
|
||||
HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY,
|
||||
Address,
|
||||
HCI_LE_Enable_Encryption_Command,
|
||||
HCI_Object,
|
||||
key_with_value,
|
||||
)
|
||||
from .core import (
|
||||
BT_BR_EDR_TRANSPORT,
|
||||
BT_CENTRAL_ROLE,
|
||||
@@ -476,7 +485,7 @@ class AddressResolver:
|
||||
address_bytes = bytes(address)
|
||||
hash_part = address_bytes[0:3]
|
||||
prand = address_bytes[3:6]
|
||||
for (irk, resolved_address) in self.resolving_keys:
|
||||
for irk, resolved_address in self.resolving_keys:
|
||||
local_hash = crypto.ah(irk, prand)
|
||||
if local_hash == hash_part:
|
||||
# Match!
|
||||
@@ -491,86 +500,6 @@ class AddressResolver:
|
||||
return None
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
class PairingDelegate:
|
||||
NO_OUTPUT_NO_INPUT = SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY
|
||||
KEYBOARD_INPUT_ONLY = SMP_KEYBOARD_ONLY_IO_CAPABILITY
|
||||
DISPLAY_OUTPUT_ONLY = SMP_DISPLAY_ONLY_IO_CAPABILITY
|
||||
DISPLAY_OUTPUT_AND_YES_NO_INPUT = SMP_DISPLAY_YES_NO_IO_CAPABILITY
|
||||
DISPLAY_OUTPUT_AND_KEYBOARD_INPUT = SMP_KEYBOARD_DISPLAY_IO_CAPABILITY
|
||||
DEFAULT_KEY_DISTRIBUTION: int = (
|
||||
SMP_ENC_KEY_DISTRIBUTION_FLAG | SMP_ID_KEY_DISTRIBUTION_FLAG
|
||||
)
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
io_capability: int = NO_OUTPUT_NO_INPUT,
|
||||
local_initiator_key_distribution: int = DEFAULT_KEY_DISTRIBUTION,
|
||||
local_responder_key_distribution: int = DEFAULT_KEY_DISTRIBUTION,
|
||||
) -> None:
|
||||
self.io_capability = io_capability
|
||||
self.local_initiator_key_distribution = local_initiator_key_distribution
|
||||
self.local_responder_key_distribution = local_responder_key_distribution
|
||||
|
||||
async def accept(self) -> bool:
|
||||
return True
|
||||
|
||||
async def confirm(self) -> bool:
|
||||
return True
|
||||
|
||||
# pylint: disable-next=unused-argument
|
||||
async def compare_numbers(self, number: int, digits: int) -> bool:
|
||||
return True
|
||||
|
||||
async def get_number(self) -> Optional[int]:
|
||||
'''
|
||||
Returns an optional number as an answer to a passkey request.
|
||||
Returning `None` will result in a negative reply.
|
||||
'''
|
||||
return 0
|
||||
|
||||
async def get_string(self, max_length) -> Optional[str]:
|
||||
'''
|
||||
Returns a string whose utf-8 encoding is up to max_length bytes.
|
||||
'''
|
||||
return None
|
||||
|
||||
# pylint: disable-next=unused-argument
|
||||
async def display_number(self, number: int, digits: int) -> None:
|
||||
pass
|
||||
|
||||
async def key_distribution_response(
|
||||
self, peer_initiator_key_distribution, peer_responder_key_distribution
|
||||
):
|
||||
return (
|
||||
(peer_initiator_key_distribution & self.local_initiator_key_distribution),
|
||||
(peer_responder_key_distribution & self.local_responder_key_distribution),
|
||||
)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
class PairingConfig:
|
||||
def __init__(
|
||||
self,
|
||||
sc: bool = True,
|
||||
mitm: bool = True,
|
||||
bonding: bool = True,
|
||||
delegate: Optional[PairingDelegate] = None,
|
||||
) -> None:
|
||||
self.sc = sc
|
||||
self.mitm = mitm
|
||||
self.bonding = bonding
|
||||
self.delegate = delegate or PairingDelegate()
|
||||
|
||||
def __str__(self):
|
||||
io_capability_str = SMP_Command.io_capability_name(self.delegate.io_capability)
|
||||
return (
|
||||
f'PairingConfig(sc={self.sc}, '
|
||||
f'mitm={self.mitm}, bonding={self.bonding}, '
|
||||
f'delegate[{io_capability_str}])'
|
||||
)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
class Session:
|
||||
# Pairing methods
|
||||
@@ -1662,12 +1591,12 @@ class Manager(EventEmitter):
|
||||
Implements the Initiator and Responder roles of the Security Manager Protocol
|
||||
'''
|
||||
|
||||
def __init__(self, device):
|
||||
def __init__(self, device, pairing_config_factory):
|
||||
super().__init__()
|
||||
self.device = device
|
||||
self.sessions = {}
|
||||
self._ecc_key = None
|
||||
self.pairing_config_factory = lambda connection: PairingConfig()
|
||||
self.pairing_config_factory = pairing_config_factory
|
||||
|
||||
def send_command(self, connection, command):
|
||||
logger.debug(
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
{
|
||||
"name": "Bumble Hands-Free",
|
||||
"class_of_device": 2360324
|
||||
"class_of_device": 2360324,
|
||||
"keystore": "JsonKeyStore",
|
||||
"le_enabled": false
|
||||
}
|
||||
|
||||
@@ -28,9 +28,8 @@ from bumble.device import Device, Peer
|
||||
from bumble.host import Host
|
||||
from bumble.gatt import Service, Characteristic
|
||||
from bumble.transport import AsyncPipeSink
|
||||
from bumble.pairing import PairingConfig, PairingDelegate
|
||||
from bumble.smp import (
|
||||
PairingConfig,
|
||||
PairingDelegate,
|
||||
SMP_PAIRING_NOT_SUPPORTED_ERROR,
|
||||
SMP_CONFIRM_VALUE_FAILED_ERROR,
|
||||
)
|
||||
@@ -262,7 +261,7 @@ async def test_self_gatt_long_read():
|
||||
found_service = result[0]
|
||||
found_characteristics = await found_service.discover_characteristics()
|
||||
assert len(found_characteristics) == 513
|
||||
for (i, characteristic) in enumerate(found_characteristics):
|
||||
for i, characteristic in enumerate(found_characteristics):
|
||||
value = await characteristic.read_value()
|
||||
assert value == characteristics[i].value
|
||||
|
||||
@@ -317,11 +316,11 @@ async def _test_self_smp_with_configs(pairing_config1, pairing_config2):
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
IO_CAP = [
|
||||
PairingDelegate.NO_OUTPUT_NO_INPUT,
|
||||
PairingDelegate.KEYBOARD_INPUT_ONLY,
|
||||
PairingDelegate.DISPLAY_OUTPUT_ONLY,
|
||||
PairingDelegate.DISPLAY_OUTPUT_AND_YES_NO_INPUT,
|
||||
PairingDelegate.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT,
|
||||
PairingDelegate.IoCapability.NO_OUTPUT_NO_INPUT,
|
||||
PairingDelegate.IoCapability.KEYBOARD_INPUT_ONLY,
|
||||
PairingDelegate.IoCapability.DISPLAY_OUTPUT_ONLY,
|
||||
PairingDelegate.IoCapability.DISPLAY_OUTPUT_AND_YES_NO_INPUT,
|
||||
PairingDelegate.IoCapability.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT,
|
||||
]
|
||||
SC = [False, True]
|
||||
MITM = [False, True]
|
||||
@@ -335,7 +334,10 @@ KEY_DIST = range(16)
|
||||
itertools.chain(
|
||||
itertools.product([IO_CAP], SC, MITM, [15]),
|
||||
itertools.product(
|
||||
[[PairingDelegate.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT]], SC, MITM, KEY_DIST
|
||||
[[PairingDelegate.IoCapability.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT]],
|
||||
SC,
|
||||
MITM,
|
||||
KEY_DIST,
|
||||
),
|
||||
),
|
||||
)
|
||||
@@ -378,7 +380,7 @@ async def test_self_smp(io_caps, sc, mitm, key_dist):
|
||||
else:
|
||||
if (
|
||||
self.peer_delegate.io_capability
|
||||
== PairingDelegate.KEYBOARD_INPUT_ONLY
|
||||
== PairingDelegate.IoCapability.KEYBOARD_INPUT_ONLY
|
||||
):
|
||||
peer_number = 6789
|
||||
else:
|
||||
@@ -421,7 +423,7 @@ async def test_self_smp(io_caps, sc, mitm, key_dist):
|
||||
async def test_self_smp_reject():
|
||||
class RejectingDelegate(PairingDelegate):
|
||||
def __init__(self):
|
||||
super().__init__(PairingDelegate.NO_OUTPUT_NO_INPUT)
|
||||
super().__init__(PairingDelegate.IoCapability.NO_OUTPUT_NO_INPUT)
|
||||
|
||||
async def accept(self):
|
||||
return False
|
||||
@@ -442,12 +444,14 @@ async def test_self_smp_reject():
|
||||
async def test_self_smp_wrong_pin():
|
||||
class WrongPinDelegate(PairingDelegate):
|
||||
def __init__(self):
|
||||
super().__init__(PairingDelegate.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT)
|
||||
super().__init__(
|
||||
PairingDelegate.IoCapability.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT
|
||||
)
|
||||
|
||||
async def compare_numbers(self, number, digits):
|
||||
return False
|
||||
|
||||
wrong_pin_pairing_config = PairingConfig(delegate=WrongPinDelegate())
|
||||
wrong_pin_pairing_config = PairingConfig(mitm=True, delegate=WrongPinDelegate())
|
||||
paired = False
|
||||
try:
|
||||
await _test_self_smp_with_configs(
|
||||
|
||||
Reference in New Issue
Block a user