Compare commits

..

2 Commits

Author SHA1 Message Date
Gilles Boccon-Gibod
9c7089c8ff terminate when unplugged 2023-11-19 11:36:38 -08:00
Gilles Boccon-Gibod
a8ec1b0949 minor cleanup of the internals of the usb transport implementation 2023-11-15 17:26:21 -08:00
7 changed files with 97 additions and 450 deletions

View File

@@ -24,16 +24,10 @@ from prompt_toolkit.shortcuts import PromptSession
from bumble.colors import color
from bumble.device import Device, Peer
from bumble.transport import open_transport_or_link
from bumble.pairing import OobData, PairingDelegate, PairingConfig
from bumble.smp import OobContext, OobLegacyContext
from bumble.pairing import PairingDelegate, PairingConfig
from bumble.smp import error_name as smp_error_name
from bumble.keys import JsonKeyStore
from bumble.core import (
AdvertisingData,
ProtocolError,
BT_LE_TRANSPORT,
BT_BR_EDR_TRANSPORT,
)
from bumble.core import ProtocolError
from bumble.gatt import (
GATT_DEVICE_NAME_CHARACTERISTIC,
GATT_GENERIC_ACCESS_SERVICE,
@@ -66,7 +60,7 @@ class Waiter:
class Delegate(PairingDelegate):
def __init__(self, mode, connection, capability_string, do_prompt):
super().__init__(
io_capability={
{
'keyboard': PairingDelegate.KEYBOARD_INPUT_ONLY,
'display': PairingDelegate.DISPLAY_OUTPUT_ONLY,
'display+keyboard': PairingDelegate.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT,
@@ -292,7 +286,6 @@ async def pair(
bond,
ctkd,
io,
oob,
prompt,
request,
print_keys,
@@ -350,51 +343,16 @@ async def pair(
await device.keystore.print(prefix=color('@@@ ', 'blue'))
print(color('@@@-----------------------------------', 'blue'))
# Create an OOB context if needed
if oob:
our_oob_context = OobContext()
shared_data = (
None
if oob == '-'
else OobData.from_ad(AdvertisingData.from_bytes(bytes.fromhex(oob)))
)
legacy_context = OobLegacyContext()
oob_contexts = PairingConfig.OobConfig(
our_context=our_oob_context,
peer_data=shared_data,
legacy_context=legacy_context,
)
oob_data = OobData(
address=device.random_address,
shared_data=shared_data,
legacy_context=legacy_context,
)
print(color('@@@-----------------------------------', 'yellow'))
print(color('@@@ OOB Data:', 'yellow'))
print(color(f'@@@ {our_oob_context.share()}', 'yellow'))
print(color(f'@@@ TK={legacy_context.tk.hex()}', 'yellow'))
print(color(f'@@@ HEX: ({bytes(oob_data.to_ad()).hex()})', 'yellow'))
print(color('@@@-----------------------------------', 'yellow'))
else:
oob_contexts = None
# Set up a pairing config factory
device.pairing_config_factory = lambda connection: PairingConfig(
sc=sc,
mitm=mitm,
bonding=bond,
oob=oob_contexts,
delegate=Delegate(mode, connection, io, prompt),
sc, mitm, bond, Delegate(mode, connection, io, prompt)
)
# Connect to a peer or wait for a connection
device.on('connection', lambda connection: on_connection(connection, request))
if address_or_name is not None:
print(color(f'=== Connecting to {address_or_name}...', 'green'))
connection = await device.connect(
address_or_name,
transport=BT_LE_TRANSPORT if mode == 'le' else BT_BR_EDR_TRANSPORT,
)
connection = await device.connect(address_or_name)
if not request:
try:
@@ -463,14 +421,6 @@ class LogHandler(logging.Handler):
default='display+keyboard',
show_default=True,
)
@click.option(
'--oob',
metavar='<oob-data-hex>',
help=(
'Use OOB pairing with this data from the peer '
'(use "-" to enable OOB without peer data)'
),
)
@click.option('--prompt', is_flag=True, help='Prompt to accept/reject pairing request')
@click.option(
'--request', is_flag=True, help='Request that the connecting peer initiate pairing'
@@ -491,7 +441,6 @@ def main(
bond,
ctkd,
io,
oob,
prompt,
request,
print_keys,
@@ -515,7 +464,6 @@ def main(
bond,
ctkd,
io,
oob,
prompt,
request,
print_keys,

View File

@@ -16,7 +16,6 @@
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import enum
import struct
from typing import List, Optional, Tuple, Union, cast, Dict
@@ -1052,13 +1051,3 @@ class ConnectionPHY:
def __str__(self):
return f'ConnectionPHY(tx_phy={self.tx_phy}, rx_phy={self.rx_phy})'
# -----------------------------------------------------------------------------
# LE Role
# -----------------------------------------------------------------------------
class LeRole(enum.IntEnum):
PERIPHERAL_ONLY = 0x00
CENTRAL_ONLY = 0x01
BOTH_PERIPHERAL_PREFERRED = 0x02
BOTH_CENTRAL_PREFERRED = 0x03

View File

@@ -15,9 +15,7 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import enum
from dataclasses import dataclass
from typing import Optional, Tuple
from .hci import (
@@ -37,60 +35,7 @@ from .smp import (
SMP_ID_KEY_DISTRIBUTION_FLAG,
SMP_SIGN_KEY_DISTRIBUTION_FLAG,
SMP_LINK_KEY_DISTRIBUTION_FLAG,
OobContext,
OobLegacyContext,
OobSharedData,
)
from .core import AdvertisingData, LeRole
# -----------------------------------------------------------------------------
@dataclass
class OobData:
"""OOB data that can be sent from one device to another."""
address: Optional[Address] = None
role: Optional[LeRole] = None
shared_data: Optional[OobSharedData] = None
legacy_context: Optional[OobLegacyContext] = None
@classmethod
def from_ad(cls, ad: AdvertisingData) -> OobData:
instance = cls()
shared_data_c: Optional[bytes] = None
shared_data_r: Optional[bytes] = None
for ad_type, ad_data in ad.ad_structures:
if ad_type == AdvertisingData.LE_BLUETOOTH_DEVICE_ADDRESS:
instance.address = Address(ad_data)
elif ad_type == AdvertisingData.LE_ROLE:
instance.role = LeRole(ad_data[0])
elif ad_type == AdvertisingData.LE_SECURE_CONNECTIONS_CONFIRMATION_VALUE:
shared_data_c = ad_data
elif ad_type == AdvertisingData.LE_SECURE_CONNECTIONS_RANDOM_VALUE:
shared_data_r = ad_data
elif ad_type == AdvertisingData.SECURITY_MANAGER_TK_VALUE:
instance.legacy_context = OobLegacyContext(tk=ad_data)
if shared_data_c and shared_data_r:
instance.shared_data = OobSharedData(c=shared_data_c, r=shared_data_r)
return instance
def to_ad(self) -> AdvertisingData:
ad_structures = []
if self.address is not None:
ad_structures.append(
(AdvertisingData.LE_BLUETOOTH_DEVICE_ADDRESS, bytes(self.address))
)
if self.role is not None:
ad_structures.append((AdvertisingData.LE_ROLE, bytes([self.role])))
if self.shared_data is not None:
ad_structures.extend(self.shared_data.to_ad().ad_structures)
if self.legacy_context is not None:
ad_structures.append(
(AdvertisingData.SECURITY_MANAGER_TK_VALUE, self.legacy_context.tk)
)
return AdvertisingData(ad_structures)
# -----------------------------------------------------------------------------
@@ -228,14 +173,6 @@ class PairingConfig:
PUBLIC = Address.PUBLIC_DEVICE_ADDRESS
RANDOM = Address.RANDOM_DEVICE_ADDRESS
@dataclass
class OobConfig:
"""Config for OOB pairing."""
our_context: Optional[OobContext]
peer_data: Optional[OobSharedData]
legacy_context: Optional[OobLegacyContext]
def __init__(
self,
sc: bool = True,
@@ -243,20 +180,17 @@ class PairingConfig:
bonding: bool = True,
delegate: Optional[PairingDelegate] = None,
identity_address_type: Optional[AddressType] = None,
oob: Optional[OobConfig] = None,
) -> None:
self.sc = sc
self.mitm = mitm
self.bonding = bonding
self.delegate = delegate or PairingDelegate()
self.identity_address_type = identity_address_type
self.oob = oob
def __str__(self) -> str:
return (
f'PairingConfig(sc={self.sc}, '
f'mitm={self.mitm}, bonding={self.bonding}, '
f'identity_address_type={self.identity_address_type}, '
f'delegate[{self.delegate.io_capability}]), '
f'oob[{self.oob}])'
f'delegate[{self.delegate.io_capability}])'
)

View File

@@ -27,7 +27,6 @@ import logging
import asyncio
import enum
import secrets
from dataclasses import dataclass
from typing import (
TYPE_CHECKING,
Any,
@@ -54,7 +53,6 @@ from .core import (
BT_BR_EDR_TRANSPORT,
BT_CENTRAL_ROLE,
BT_LE_TRANSPORT,
AdvertisingData,
ProtocolError,
name_or_number,
)
@@ -565,54 +563,6 @@ class PairingMethod(enum.IntEnum):
CTKD_OVER_CLASSIC = 4
# -----------------------------------------------------------------------------
class OobContext:
"""Cryptographic context for LE SC OOB pairing."""
ecc_key: crypto.EccKey
r: bytes
def __init__(
self, ecc_key: Optional[crypto.EccKey] = None, r: Optional[bytes] = None
) -> None:
self.ecc_key = crypto.EccKey.generate() if ecc_key is None else ecc_key
self.r = crypto.r() if r is None else r
def share(self) -> OobSharedData:
pkx = bytes(reversed(self.ecc_key.x))
return OobSharedData(c=crypto.f4(pkx, pkx, self.r, bytes(1)), r=self.r)
# -----------------------------------------------------------------------------
class OobLegacyContext:
"""Cryptographic context for LE Legacy OOB pairing."""
tk: bytes
def __init__(self, tk: Optional[bytes] = None) -> None:
self.tk = crypto.r() if tk is None else tk
# -----------------------------------------------------------------------------
@dataclass
class OobSharedData:
"""Shareable data for LE SC OOB pairing."""
c: bytes
r: bytes
def to_ad(self) -> AdvertisingData:
return AdvertisingData(
[
(AdvertisingData.LE_SECURE_CONNECTIONS_CONFIRMATION_VALUE, self.c),
(AdvertisingData.LE_SECURE_CONNECTIONS_RANDOM_VALUE, self.r),
]
)
def __str__(self) -> str:
return f'OOB(C={self.c.hex()}, R={self.r.hex()})'
# -----------------------------------------------------------------------------
class Session:
# I/O Capability to pairing method decision matrix
@@ -690,6 +640,8 @@ class Session:
self.pres: Optional[bytes] = None
self.ea = None
self.eb = None
self.tk = bytes(16)
self.r = bytes(16)
self.stk = None
self.ltk = None
self.ltk_ediv = 0
@@ -707,7 +659,7 @@ class Session:
self.peer_bd_addr: Optional[Address] = None
self.peer_signature_key = None
self.peer_expected_distributions: List[Type[SMP_Command]] = []
self.dh_key = b''
self.dh_key = None
self.confirm_value = None
self.passkey: Optional[int] = None
self.passkey_ready = asyncio.Event()
@@ -760,8 +712,8 @@ class Session:
self.io_capability = pairing_config.delegate.io_capability
self.peer_io_capability = SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY
# OOB
self.oob_data_flag = 0 if pairing_config.oob is None else 1
# OOB (not supported yet)
self.oob = False
# Set up addresses
self_address = connection.self_address
@@ -777,37 +729,9 @@ class Session:
self.ia = bytes(peer_address)
self.iat = 1 if peer_address.is_random else 0
# Select the ECC key, TK and r initial value
if pairing_config.oob:
self.peer_oob_data = pairing_config.oob.peer_data
if pairing_config.sc:
if pairing_config.oob.our_context is None:
raise ValueError(
"oob pairing config requires a context when sc is True"
)
self.r = pairing_config.oob.our_context.r
self.ecc_key = pairing_config.oob.our_context.ecc_key
if pairing_config.oob.legacy_context is None:
self.tk = None
else:
self.tk = pairing_config.oob.legacy_context.tk
else:
if pairing_config.oob.legacy_context is None:
raise ValueError(
"oob pairing config requires a legacy context when sc is False"
)
self.r = bytes(16)
self.ecc_key = manager.ecc_key
self.tk = pairing_config.oob.legacy_context.tk
else:
self.peer_oob_data = None
self.r = bytes(16)
self.ecc_key = manager.ecc_key
self.tk = bytes(16)
@property
def pkx(self) -> Tuple[bytes, bytes]:
return (bytes(reversed(self.ecc_key.x)), self.peer_public_key_x)
return (bytes(reversed(self.manager.ecc_key.x)), self.peer_public_key_x)
@property
def pka(self) -> bytes:
@@ -844,10 +768,7 @@ class Session:
return None
def decide_pairing_method(
self,
auth_req: int,
initiator_io_capability: int,
responder_io_capability: int,
self, auth_req: int, initiator_io_capability: int, responder_io_capability: int
) -> None:
if self.connection.transport == BT_BR_EDR_TRANSPORT:
self.pairing_method = PairingMethod.CTKD_OVER_CLASSIC
@@ -988,7 +909,7 @@ class Session:
command = SMP_Pairing_Request_Command(
io_capability=self.io_capability,
oob_data_flag=self.oob_data_flag,
oob_data_flag=0,
auth_req=self.auth_req,
maximum_encryption_key_size=16,
initiator_key_distribution=self.initiator_key_distribution,
@@ -1000,7 +921,7 @@ class Session:
def send_pairing_response_command(self) -> None:
response = SMP_Pairing_Response_Command(
io_capability=self.io_capability,
oob_data_flag=self.oob_data_flag,
oob_data_flag=0,
auth_req=self.auth_req,
maximum_encryption_key_size=16,
initiator_key_distribution=self.initiator_key_distribution,
@@ -1061,8 +982,8 @@ class Session:
def send_public_key_command(self) -> None:
self.send_command(
SMP_Pairing_Public_Key_Command(
public_key_x=bytes(reversed(self.ecc_key.x)),
public_key_y=bytes(reversed(self.ecc_key.y)),
public_key_x=bytes(reversed(self.manager.ecc_key.x)),
public_key_y=bytes(reversed(self.manager.ecc_key.y)),
)
)
@@ -1109,6 +1030,7 @@ class Session:
self.ltk = crypto.h6(ilk, b'brle')
def distribute_keys(self) -> None:
# Distribute the keys as required
if self.is_initiator:
# CTKD: Derive LTK from LinkKey
@@ -1374,7 +1296,7 @@ class Session:
try:
handler(command)
except Exception as error:
logger.exception(f'{color("!!! Exception in handler:", "red")} {error}')
logger.warning(f'{color("!!! Exception in handler:", "red")} {error}')
response = SMP_Pairing_Failed_Command(
reason=SMP_UNSPECIFIED_REASON_ERROR
)
@@ -1411,28 +1333,15 @@ class Session:
self.sc = self.sc and (command.auth_req & SMP_SC_AUTHREQ != 0)
self.ct2 = self.ct2 and (command.auth_req & SMP_CT2_AUTHREQ != 0)
# Infer the pairing method
if (self.sc and (self.oob_data_flag != 0 or command.oob_data_flag != 0)) or (
not self.sc and (self.oob_data_flag != 0 and command.oob_data_flag != 0)
):
# Use OOB
self.pairing_method = PairingMethod.OOB
if not self.sc and self.tk is None:
# For legacy OOB, TK is required.
logger.warning("legacy OOB without TK")
self.send_pairing_failed(SMP_OOB_NOT_AVAILABLE_ERROR)
return
if command.oob_data_flag == 0:
# The peer doesn't have OOB data, use r=0
self.r = bytes(16)
else:
# Decide which pairing method to use from the IO capability
self.decide_pairing_method(
command.auth_req,
command.io_capability,
self.io_capability,
)
# Check for OOB
if command.oob_data_flag != 0:
self.send_pairing_failed(SMP_OOB_NOT_AVAILABLE_ERROR)
return
# Decide which pairing method to use
self.decide_pairing_method(
command.auth_req, command.io_capability, self.io_capability
)
logger.debug(f'pairing method: {self.pairing_method.name}')
# Key distribution
@@ -1481,26 +1390,15 @@ class Session:
self.bonding = self.bonding and (command.auth_req & SMP_BONDING_AUTHREQ != 0)
self.sc = self.sc and (command.auth_req & SMP_SC_AUTHREQ != 0)
# Infer the pairing method
if (self.sc and (self.oob_data_flag != 0 or command.oob_data_flag != 0)) or (
not self.sc and (self.oob_data_flag != 0 and command.oob_data_flag != 0)
):
# Use OOB
self.pairing_method = PairingMethod.OOB
if not self.sc and self.tk is None:
# For legacy OOB, TK is required.
logger.warning("legacy OOB without TK")
self.send_pairing_failed(SMP_OOB_NOT_AVAILABLE_ERROR)
return
if command.oob_data_flag == 0:
# The peer doesn't have OOB data, use r=0
self.r = bytes(16)
else:
# Decide which pairing method to use from the IO capability
self.decide_pairing_method(
command.auth_req, self.io_capability, command.io_capability
)
# Check for OOB
if self.sc and command.oob_data_flag:
self.send_pairing_failed(SMP_OOB_NOT_AVAILABLE_ERROR)
return
# Decide which pairing method to use
self.decide_pairing_method(
command.auth_req, self.io_capability, command.io_capability
)
logger.debug(f'pairing method: {self.pairing_method.name}')
# Key distribution
@@ -1651,13 +1549,12 @@ class Session:
if self.passkey_step < 20:
self.send_pairing_confirm_command()
return
elif self.pairing_method != PairingMethod.OOB:
else:
return
else:
if self.pairing_method in (
PairingMethod.JUST_WORKS,
PairingMethod.NUMERIC_COMPARISON,
PairingMethod.OOB,
):
self.send_pairing_random_command()
elif self.pairing_method == PairingMethod.PASSKEY:
@@ -1694,7 +1591,6 @@ class Session:
if self.pairing_method in (
PairingMethod.JUST_WORKS,
PairingMethod.NUMERIC_COMPARISON,
PairingMethod.OOB,
):
ra = bytes(16)
rb = ra
@@ -1703,6 +1599,7 @@ class Session:
ra = self.passkey.to_bytes(16, byteorder='little')
rb = ra
else:
# OOB not implemented yet
return
assert self.preq and self.pres
@@ -1756,7 +1653,7 @@ class Session:
# Compute the DH key
self.dh_key = bytes(
reversed(
self.ecc_key.dh(
self.manager.ecc_key.dh(
bytes(reversed(command.public_key_x)),
bytes(reversed(command.public_key_y)),
)
@@ -1764,27 +1661,8 @@ class Session:
)
logger.debug(f'DH key: {self.dh_key.hex()}')
if self.pairing_method == PairingMethod.OOB:
# Check against shared OOB data
if self.peer_oob_data:
confirm_verifier = crypto.f4(
self.peer_public_key_x,
self.peer_public_key_x,
self.peer_oob_data.r,
bytes(1),
)
if not self.check_expected_value(
self.peer_oob_data.c,
confirm_verifier,
SMP_CONFIRM_VALUE_FAILED_ERROR,
):
return
if self.is_initiator:
if self.pairing_method == PairingMethod.OOB:
self.send_pairing_random_command()
else:
self.send_pairing_confirm_command()
self.send_pairing_confirm_command()
else:
if self.pairing_method == PairingMethod.PASSKEY:
self.display_or_input_passkey()
@@ -1795,7 +1673,6 @@ class Session:
if self.pairing_method in (
PairingMethod.JUST_WORKS,
PairingMethod.NUMERIC_COMPARISON,
PairingMethod.OOB,
):
# We can now send the confirmation value
self.send_pairing_confirm_command()

View File

@@ -24,9 +24,10 @@ import platform
import usb1
from .common import Transport, ParserSource
from .. import hci
from ..colors import color
from bumble.transport.common import Transport, ParserSource
from bumble import hci
from bumble.colors import color
from bumble.utils import AsyncRunner
# -----------------------------------------------------------------------------
@@ -113,7 +114,7 @@ async def open_usb_transport(spec: str) -> Transport:
def __init__(self, device, acl_out):
self.device = device
self.acl_out = acl_out
self.transfer = device.getTransfer()
self.acl_out_transfer = device.getTransfer()
self.packets = collections.deque() # Queue of packets waiting to be sent
self.loop = asyncio.get_running_loop()
self.cancel_done = self.loop.create_future()
@@ -137,21 +138,20 @@ async def open_usb_transport(spec: str) -> Transport:
# The queue was previously empty, re-prime the pump
self.process_queue()
def on_packet_sent(self, transfer):
def transfer_callback(self, transfer):
status = transfer.getStatus()
# logger.debug(f'<<< USB out transfer callback: status={status}')
# pylint: disable=no-member
if status == usb1.TRANSFER_COMPLETED:
self.loop.call_soon_threadsafe(self.on_packet_sent_)
self.loop.call_soon_threadsafe(self.on_packet_sent)
elif status == usb1.TRANSFER_CANCELLED:
self.loop.call_soon_threadsafe(self.cancel_done.set_result, None)
else:
logger.warning(
color(f'!!! out transfer not completed: status={status}', 'red')
color(f'!!! OUT transfer not completed: status={status}', 'red')
)
def on_packet_sent_(self):
def on_packet_sent(self):
if self.packets:
self.packets.popleft()
self.process_queue()
@@ -163,22 +163,20 @@ async def open_usb_transport(spec: str) -> Transport:
packet = self.packets[0]
packet_type = packet[0]
if packet_type == hci.HCI_ACL_DATA_PACKET:
self.transfer.setBulk(
self.acl_out, packet[1:], callback=self.on_packet_sent
self.acl_out_transfer.setBulk(
self.acl_out, packet[1:], callback=self.transfer_callback
)
logger.debug('submit ACL')
self.transfer.submit()
self.acl_out_transfer.submit()
elif packet_type == hci.HCI_COMMAND_PACKET:
self.transfer.setControl(
self.acl_out_transfer.setControl(
USB_RECIPIENT_DEVICE | USB_REQUEST_TYPE_CLASS,
0,
0,
0,
packet[1:],
callback=self.on_packet_sent,
callback=self.transfer_callback,
)
logger.debug('submit COMMAND')
self.transfer.submit()
self.acl_out_transfer.submit()
else:
logger.warning(color(f'unsupported packet type {packet_type}', 'red'))
@@ -193,11 +191,11 @@ async def open_usb_transport(spec: str) -> Transport:
self.packets.clear()
# If we have a transfer in flight, cancel it
if self.transfer.isSubmitted():
if self.acl_out_transfer.isSubmitted():
# Try to cancel the transfer, but that may fail because it may have
# already completed
try:
self.transfer.cancel()
self.acl_out_transfer.cancel()
logger.debug('waiting for OUT transfer cancellation to be done...')
await self.cancel_done
@@ -206,27 +204,22 @@ async def open_usb_transport(spec: str) -> Transport:
logger.debug('OUT transfer likely already completed')
class UsbPacketSource(asyncio.Protocol, ParserSource):
def __init__(self, context, device, metadata, acl_in, events_in):
def __init__(self, device, metadata, acl_in, events_in):
super().__init__()
self.context = context
self.device = device
self.metadata = metadata
self.acl_in = acl_in
self.acl_in_transfer = None
self.events_in = events_in
self.events_in_transfer = None
self.loop = asyncio.get_running_loop()
self.queue = asyncio.Queue()
self.dequeue_task = None
self.closed = False
self.event_loop_done = self.loop.create_future()
self.cancel_done = {
hci.HCI_EVENT_PACKET: self.loop.create_future(),
hci.HCI_ACL_DATA_PACKET: self.loop.create_future(),
}
self.events_in_transfer = None
self.acl_in_transfer = None
# Create a thread to process events
self.event_thread = threading.Thread(target=self.run)
self.closed = False
def start(self):
# Set up transfer objects for input
@@ -234,7 +227,7 @@ async def open_usb_transport(spec: str) -> Transport:
self.events_in_transfer.setInterrupt(
self.events_in,
READ_SIZE,
callback=self.on_packet_received,
callback=self.transfer_callback,
user_data=hci.HCI_EVENT_PACKET,
)
self.events_in_transfer.submit()
@@ -243,22 +236,23 @@ async def open_usb_transport(spec: str) -> Transport:
self.acl_in_transfer.setBulk(
self.acl_in,
READ_SIZE,
callback=self.on_packet_received,
callback=self.transfer_callback,
user_data=hci.HCI_ACL_DATA_PACKET,
)
self.acl_in_transfer.submit()
self.dequeue_task = self.loop.create_task(self.dequeue())
self.event_thread.start()
def on_packet_received(self, transfer):
@property
def usb_transfer_submitted(self):
return (
self.events_in_transfer.isSubmitted()
or self.acl_in_transfer.isSubmitted()
)
def transfer_callback(self, transfer):
packet_type = transfer.getUserData()
status = transfer.getStatus()
# logger.debug(
# f'<<< USB IN transfer callback: status={status} '
# f'packet_type={packet_type} '
# f'length={transfer.getActualLength()}'
# )
# pylint: disable=no-member
if status == usb1.TRANSFER_COMPLETED:
@@ -267,18 +261,18 @@ async def open_usb_transport(spec: str) -> Transport:
+ transfer.getBuffer()[: transfer.getActualLength()]
)
self.loop.call_soon_threadsafe(self.queue.put_nowait, packet)
# Re-submit the transfer so we can receive more data
transfer.submit()
elif status == usb1.TRANSFER_CANCELLED:
self.loop.call_soon_threadsafe(
self.cancel_done[packet_type].set_result, None
)
return
else:
logger.warning(
color(f'!!! transfer not completed: status={status}', 'red')
color(f'!!! IN transfer not completed: status={status}', 'red')
)
# Re-submit the transfer so we can receive more data
transfer.submit()
self.loop.call_soon_threadsafe(self.on_transport_lost)
async def dequeue(self):
while not self.closed:
@@ -288,21 +282,6 @@ async def open_usb_transport(spec: str) -> Transport:
return
self.parser.feed_data(packet)
def run(self):
logger.debug('starting USB event loop')
while (
self.events_in_transfer.isSubmitted()
or self.acl_in_transfer.isSubmitted()
):
# pylint: disable=no-member
try:
self.context.handleEvents()
except usb1.USBErrorInterrupted:
pass
logger.debug('USB event loop done')
self.loop.call_soon_threadsafe(self.event_loop_done.set_result, None)
def close(self):
self.closed = True
@@ -331,15 +310,14 @@ async def open_usb_transport(spec: str) -> Transport:
f'IN[{packet_type}] transfer likely already completed'
)
# Wait for the thread to terminate
await self.event_loop_done
class UsbTransport(Transport):
def __init__(self, context, device, interface, setting, source, sink):
super().__init__(source, sink)
self.context = context
self.device = device
self.interface = interface
self.loop = asyncio.get_running_loop()
self.event_loop_done = self.loop.create_future()
# Get exclusive access
device.claimInterface(interface)
@@ -352,6 +330,22 @@ async def open_usb_transport(spec: str) -> Transport:
source.start()
sink.start()
# Create a thread to process events
self.event_thread = threading.Thread(target=self.run)
self.event_thread.start()
def run(self):
logger.debug('starting USB event loop')
while self.source.usb_transfer_submitted:
# pylint: disable=no-member
try:
self.context.handleEvents()
except usb1.USBErrorInterrupted:
pass
logger.debug('USB event loop done')
self.loop.call_soon_threadsafe(self.event_loop_done.set_result, None)
async def close(self):
self.source.close()
self.sink.close()
@@ -361,6 +355,9 @@ async def open_usb_transport(spec: str) -> Transport:
self.device.close()
self.context.close()
# Wait for the thread to terminate
await self.event_loop_done
# Find the device according to the spec moniker
load_libusb()
context = usb1.USBContext()
@@ -540,7 +537,7 @@ async def open_usb_transport(spec: str) -> Transport:
except usb1.USBError:
logger.warning('failed to set configuration')
source = UsbPacketSource(context, device, device_metadata, acl_in, events_in)
source = UsbPacketSource(device, device_metadata, acl_in, events_in)
sink = UsbPacketSink(device, acl_out)
return UsbTransport(context, device, interface, setting, source, sink)
except usb1.USBError as error:

View File

@@ -34,8 +34,6 @@ from bumble.pairing import PairingConfig, PairingDelegate
from bumble.smp import (
SMP_PAIRING_NOT_SUPPORTED_ERROR,
SMP_CONFIRM_VALUE_FAILED_ERROR,
OobContext,
OobLegacyContext,
)
from bumble.core import ProtocolError
from bumble.hci import HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE
@@ -577,77 +575,6 @@ async def test_self_smp_public_address():
await _test_self_smp_with_configs(pairing_config, pairing_config)
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_self_smp_oob_sc():
oob_context_1 = OobContext()
oob_context_2 = OobContext()
pairing_config_1 = PairingConfig(
mitm=True,
sc=True,
bonding=True,
oob=PairingConfig.OobConfig(oob_context_1, oob_context_2.share(), None),
)
pairing_config_2 = PairingConfig(
mitm=True,
sc=True,
bonding=True,
oob=PairingConfig.OobConfig(oob_context_2, oob_context_1.share(), None),
)
await _test_self_smp_with_configs(pairing_config_1, pairing_config_2)
pairing_config_3 = PairingConfig(
mitm=True,
sc=True,
bonding=True,
oob=PairingConfig.OobConfig(oob_context_2, None, None),
)
await _test_self_smp_with_configs(pairing_config_1, pairing_config_3)
await _test_self_smp_with_configs(pairing_config_3, pairing_config_1)
pairing_config_4 = PairingConfig(
mitm=True,
sc=True,
bonding=True,
oob=PairingConfig.OobConfig(oob_context_2, oob_context_2.share(), None),
)
with pytest.raises(ProtocolError) as error:
await _test_self_smp_with_configs(pairing_config_1, pairing_config_4)
assert error.value.error_code == SMP_CONFIRM_VALUE_FAILED_ERROR
with pytest.raises(ProtocolError):
await _test_self_smp_with_configs(pairing_config_4, pairing_config_1)
assert error.value.error_code == SMP_CONFIRM_VALUE_FAILED_ERROR
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_self_smp_oob_legacy():
legacy_context = OobLegacyContext()
pairing_config_1 = PairingConfig(
mitm=True,
sc=False,
bonding=True,
oob=PairingConfig.OobConfig(None, None, legacy_context),
)
pairing_config_2 = PairingConfig(
mitm=True,
sc=True,
bonding=True,
oob=PairingConfig.OobConfig(OobContext(), None, legacy_context),
)
await _test_self_smp_with_configs(pairing_config_1, pairing_config_2)
await _test_self_smp_with_configs(pairing_config_2, pairing_config_1)
# -----------------------------------------------------------------------------
async def run_test_self():
await test_self_connection()
@@ -658,8 +585,6 @@ async def run_test_self():
await test_self_smp_wrong_pin()
await test_self_smp_over_classic()
await test_self_smp_public_address()
await test_self_smp_oob_sc()
await test_self_smp_oob_legacy()
# -----------------------------------------------------------------------------

View File

@@ -17,16 +17,11 @@
# -----------------------------------------------------------------------------
from bumble.crypto import EccKey, aes_cmac, ah, c1, f4, f5, f6, g2, h6, h7, s1
from bumble.pairing import OobData, OobSharedData, LeRole
from bumble.hci import Address
from bumble.core import AdvertisingData
# -----------------------------------------------------------------------------
# pylint: disable=invalid-name
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
def reversed_hex(hex_str):
return bytes(reversed(bytes.fromhex(hex_str)))
@@ -238,23 +233,6 @@ def test_ah():
assert value == expected
# -----------------------------------------------------------------------------
def test_oob_data():
oob_data = OobData(
address=Address("F0:F1:F2:F3:F4:F5"),
role=LeRole.BOTH_PERIPHERAL_PREFERRED,
shared_data=OobSharedData(c=bytes([1, 2]), r=bytes([3, 4])),
)
oob_data_ad = oob_data.to_ad()
oob_data_bytes = bytes(oob_data_ad)
oob_data_ad_parsed = AdvertisingData.from_bytes(oob_data_bytes)
oob_data_parsed = OobData.from_ad(oob_data_ad_parsed)
assert oob_data_parsed.address == oob_data.address
assert oob_data_parsed.role == oob_data.role
assert oob_data_parsed.shared_data.c == oob_data.shared_data.c
assert oob_data_parsed.shared_data.r == oob_data.shared_data.r
# -----------------------------------------------------------------------------
if __name__ == '__main__':
test_ecc()
@@ -268,4 +246,3 @@ if __name__ == '__main__':
test_h6()
test_h7()
test_ah()
test_oob_data()