add oob data on command line

This commit is contained in:
Gilles Boccon-Gibod
2023-11-07 20:38:35 -08:00
parent 4ae612090b
commit 5a307c19b8
2 changed files with 58 additions and 15 deletions

View File

@@ -24,11 +24,16 @@ from prompt_toolkit.shortcuts import PromptSession
from bumble.colors import color from bumble.colors import color
from bumble.device import Device, Peer from bumble.device import Device, Peer
from bumble.transport import open_transport_or_link from bumble.transport import open_transport_or_link
from bumble.pairing import PairingDelegate, PairingConfig from bumble.pairing import OobData, PairingDelegate, PairingConfig
from bumble.smp import OobContext from bumble.smp import OobContext, OobLegacyContext
from bumble.smp import error_name as smp_error_name from bumble.smp import error_name as smp_error_name
from bumble.keys import JsonKeyStore from bumble.keys import JsonKeyStore
from bumble.core import ProtocolError from bumble.core import (
AdvertisingData,
ProtocolError,
BT_LE_TRANSPORT,
BT_BR_EDR_TRANSPORT,
)
from bumble.gatt import ( from bumble.gatt import (
GATT_DEVICE_NAME_CHARACTERISTIC, GATT_DEVICE_NAME_CHARACTERISTIC,
GATT_GENERIC_ACCESS_SERVICE, GATT_GENERIC_ACCESS_SERVICE,
@@ -348,11 +353,27 @@ async def pair(
# Create an OOB context if needed # Create an OOB context if needed
if oob: if oob:
our_oob_context = OobContext() our_oob_context = OobContext()
peer_oob_context = None # TODO: parse from command line param shared_data = (
oob_contexts = PairingConfig.OobContexts(our_oob_context, peer_oob_context) None
if oob == '-'
else OobData.from_ad(AdvertisingData.from_bytes(bytes.fromhex(oob)))
)
legacy_context = OobLegacyContext()
oob_contexts = PairingConfig.OobConfig(
our_context=our_oob_context,
peer_data=shared_data,
legacy_context=legacy_context,
)
oob_data = OobData(
address=device.random_address,
shared_data=shared_data,
legacy_context=legacy_context,
)
print(color('@@@-----------------------------------', 'yellow')) print(color('@@@-----------------------------------', 'yellow'))
print(color('@@@ OOB Data:', 'yellow')) print(color('@@@ OOB Data:', 'yellow'))
print(color(f'@@@ {our_oob_context.share()}', 'yellow')) print(color(f'@@@ {our_oob_context.share()}', 'yellow'))
print(color(f'@@@ TK={legacy_context.tk.hex()}', 'yellow'))
print(color(f'@@@ HEX: ({bytes(oob_data.to_ad()).hex()})', 'yellow'))
print(color('@@@-----------------------------------', 'yellow')) print(color('@@@-----------------------------------', 'yellow'))
else: else:
oob_contexts = None oob_contexts = None
@@ -370,7 +391,10 @@ async def pair(
device.on('connection', lambda connection: on_connection(connection, request)) device.on('connection', lambda connection: on_connection(connection, request))
if address_or_name is not None: if address_or_name is not None:
print(color(f'=== Connecting to {address_or_name}...', 'green')) print(color(f'=== Connecting to {address_or_name}...', 'green'))
connection = await device.connect(address_or_name) connection = await device.connect(
address_or_name,
transport=BT_LE_TRANSPORT if mode == 'le' else BT_BR_EDR_TRANSPORT,
)
if not request: if not request:
try: try:
@@ -439,7 +463,11 @@ class LogHandler(logging.Handler):
default='display+keyboard', default='display+keyboard',
show_default=True, show_default=True,
) )
@click.option('--oob', help='Use OOB pairing with this data from the peer') @click.option(
'--oob',
metavar='<oob-data-hex>',
help='Use OOB pairing with this data from the peer',
)
@click.option('--prompt', is_flag=True, help='Prompt to accept/reject pairing request') @click.option('--prompt', is_flag=True, help='Prompt to accept/reject pairing request')
@click.option( @click.option(
'--request', is_flag=True, help='Request that the connecting peer initiate pairing' '--request', is_flag=True, help='Request that the connecting peer initiate pairing'

View File

@@ -48,25 +48,33 @@ from .core import AdvertisingData, LeRole
@dataclass @dataclass
class OobData: class OobData:
"""OOB data that can be sent from one device to another.""" """OOB data that can be sent from one device to another."""
address: Optional[Address] = None address: Optional[Address] = None
role: Optional[LeRole] = None role: Optional[LeRole] = None
shared_data: Optional[OobSharedData] = None shared_data: Optional[OobSharedData] = None
legacy_context: Optional[OobLegacyContext] = None
@classmethod @classmethod
def from_ad(cls, ad: AdvertisingData) -> OobData: def from_ad(cls, ad: AdvertisingData) -> OobData:
instance = cls() instance = cls()
shared_data_c = None shared_data_c: Optional[bytes] = None
shared_data_r = None shared_data_r: Optional[bytes] = None
for (ad_type, ad_data) in ad.ad_structures: for ad_type, ad_data in ad.ad_structures:
if ad_type == AdvertisingData.LE_BLUETOOTH_DEVICE_ADDRESS: if ad_type == AdvertisingData.LE_BLUETOOTH_DEVICE_ADDRESS:
instance.address = Address(ad_data) instance.address = Address(ad_data)
elif ad_type == AdvertisingData.LE_ROLE: elif ad_type == AdvertisingData.LE_ROLE:
instance.role = LeRole(ad_data[0]) instance.role = LeRole(ad_data[0])
elif ad_type == AdvertisingData.LE_SECURE_CONNECTIONS_CONFIRMATION_VALUE: elif ad_type == AdvertisingData.LE_SECURE_CONNECTIONS_CONFIRMATION_VALUE:
shared_data_c: bytes = AdvertisingData.ad_data_to_object(ad_type, ad_data) shared_data_c: Optional[bytes] = AdvertisingData.ad_data_to_object(
ad_type, ad_data
)
elif ad_type == AdvertisingData.LE_SECURE_CONNECTIONS_RANDOM_VALUE: elif ad_type == AdvertisingData.LE_SECURE_CONNECTIONS_RANDOM_VALUE:
shared_data_r: bytes = AdvertisingData.ad_data_to_object(ad_type, ad_data) shared_data_r: bytes = AdvertisingData.ad_data_to_object(
if shared_data_c or shared_data_r: ad_type, ad_data
)
elif ad_type == AdvertisingData.SECURITY_MANAGER_TK_VALUE:
instance.legacy_context = OobLegacyContext(tk=ad_data)
if shared_data_c and shared_data_r:
instance.shared_data = OobSharedData(c=shared_data_c, r=shared_data_r) instance.shared_data = OobSharedData(c=shared_data_c, r=shared_data_r)
return instance return instance
@@ -74,11 +82,17 @@ class OobData:
def to_ad(self): def to_ad(self):
ad_structures = [] ad_structures = []
if self.address is not None: if self.address is not None:
ad_structures.append((AdvertisingData.LE_BLUETOOTH_DEVICE_ADDRESS, bytes(self.address))) ad_structures.append(
(AdvertisingData.LE_BLUETOOTH_DEVICE_ADDRESS, bytes(self.address))
)
if self.role is not None: if self.role is not None:
ad_structures.append((AdvertisingData.LE_ROLE, bytes([self.role]))) ad_structures.append((AdvertisingData.LE_ROLE, bytes([self.role])))
if self.shared_data is not None: if self.shared_data is not None:
ad_structures.extend(self.shared_data.to_ad().ad_structures) ad_structures.extend(self.shared_data.to_ad().ad_structures)
if self.legacy_context is not None:
ad_structures.append(
(AdvertisingData.SECURITY_MANAGER_TK_VALUE, self.legacy_context.tk)
)
return AdvertisingData(ad_structures) return AdvertisingData(ad_structures)
@@ -221,6 +235,7 @@ class PairingConfig:
@dataclass @dataclass
class OobConfig: class OobConfig:
"""Config for OOB pairing.""" """Config for OOB pairing."""
our_context: Optional[OobContext] our_context: Optional[OobContext]
peer_data: Optional[OobSharedData] peer_data: Optional[OobSharedData]
legacy_context: Optional[OobLegacyContext] legacy_context: Optional[OobLegacyContext]