From a9e726545e934c5dfcc4454c70a3aec58e351183 Mon Sep 17 00:00:00 2001 From: Gilles Boccon-Gibod Date: Tue, 20 Sep 2022 14:58:20 -0700 Subject: [PATCH] wip --- bumble/device.py | 129 +++++++++++++++++++++++++++++++------ bumble/hci.py | 8 ++- bumble/smp.py | 26 ++++---- examples/battery_client.py | 2 +- 4 files changed, 128 insertions(+), 37 deletions(-) diff --git a/bumble/device.py b/bumble/device.py index 230a1cf..028c53f 100644 --- a/bumble/device.py +++ b/bumble/device.py @@ -18,7 +18,8 @@ import json import asyncio import logging -from contextlib import asynccontextmanager, AsyncExitStack +import secrets +from contextlib import asynccontextmanager, AsyncExitStack from .hci import * from .host import Host @@ -32,6 +33,8 @@ from . import smp from . import sdp from . import l2cap from . import keys +from . import crypto + # ----------------------------------------------------------------------------- # Logging @@ -51,6 +54,7 @@ DEVICE_DEFAULT_SCAN_RESPONSE_DATA = b'' DEVICE_DEFAULT_DATA_LENGTH = (27, 328, 27, 328) DEVICE_DEFAULT_SCAN_INTERVAL = 60 # ms DEVICE_DEFAULT_SCAN_WINDOW = 60 # ms +DEVICE_DEFAULT_LE_RPA_TIMEOUT = 15 * 60 # 15 minutes (in seconds) DEVICE_MIN_SCAN_INTERVAL = 25 DEVICE_MAX_SCAN_INTERVAL = 10240 DEVICE_MIN_SCAN_WINDOW = 25 @@ -169,7 +173,6 @@ class Peer: async def __aexit__(self, exc_type, exc_value, traceback): pass - def __str__(self): return f'{self.connection.peer_address} as {self.connection.role_name}' @@ -202,11 +205,22 @@ class Connection(CompositeEventEmitter): def on_connection_encryption_key_refresh(self): pass - def __init__(self, device, handle, transport, peer_address, peer_resolvable_address, role, parameters): + def __init__( + self, + device, + handle, + transport, + local_address, + peer_address, + peer_resolvable_address, + role, + parameters + ): super().__init__() self.device = device self.handle = handle self.transport = transport + self.local_address = local_address self.peer_address = peer_address self.peer_resolvable_address = peer_resolvable_address self.peer_name = None # Classic only @@ -297,7 +311,12 @@ class Connection(CompositeEventEmitter): raise def __str__(self): - return f'Connection(handle=0x{self.handle:04X}, role={self.role_name}, address={self.peer_address})' + return ( + f'Connection(handle=0x{self.handle:04X}, ' + f'role={self.role_name}, ' + f'local_address={self.local_address}, ' + f'peer_address={self.peer_address})' + ) # ----------------------------------------------------------------------------- @@ -311,8 +330,10 @@ class DeviceConfiguration: self.advertising_interval_min = DEVICE_DEFAULT_ADVERTISING_INTERVAL self.advertising_interval_max = DEVICE_DEFAULT_ADVERTISING_INTERVAL self.le_enabled = True - # LE host enable 2nd parameter self.le_simultaneous_enabled = True + self.le_privacy_enabled = False + self.le_rpa_timeout = DEVICE_DEFAULT_LE_RPA_TIMEOUT + self.classic_enabled = False self.classic_sc_enabled = True self.classic_ssp_enabled = True self.connectable = True @@ -320,19 +341,22 @@ class DeviceConfiguration: self.advertising_data = bytes( AdvertisingData([(AdvertisingData.COMPLETE_LOCAL_NAME, bytes(self.name, 'utf-8'))]) ) - self.irk = bytes(16) # This really must be changed for any level of security + self.irk = bytes([0xFF] * 16) # This really must be changed for any level of security self.keystore = None def load_from_dict(self, config): # Load simple properties - self.name = config.get('name', self.name) - self.address = Address(config.get('address', self.address)) - self.class_of_device = config.get('class_of_device', self.class_of_device) + self.name = config.get('name', self.name) + self.address = Address(config.get('address', self.address)) + self.class_of_device = config.get('class_of_device', self.class_of_device) self.advertising_interval_min = config.get('advertising_interval', self.advertising_interval_min) self.advertising_interval_max = self.advertising_interval_min self.keystore = config.get('keystore') self.le_enabled = config.get('le_enabled', self.le_enabled) self.le_simultaneous_enabled = config.get('le_simultaneous_enabled', self.le_simultaneous_enabled) + self.le_privacy_enabled = config.get('le_privacy_enabled', self.le_privacy_enabled) + self.le_rpa_timeout = config.get('le_rpa_timeout', self.le_rpa_timeout) + self.classic_enabled = config.get('classic_enabled', self.classic_enabled) self.classic_sc_enabled = config.get('classic_sc_enabled', self.classic_sc_enabled) self.classic_ssp_enabled = config.get('classic_ssp_enabled', self.classic_ssp_enabled) self.connectable = config.get('connectable', self.connectable) @@ -352,6 +376,10 @@ class DeviceConfiguration: advertising_data = config.get('advertising_data') if advertising_data: self.advertising_data = bytes.fromhex(advertising_data) + else: + self.advertising_data = bytes( + AdvertisingData([(AdvertisingData.COMPLETE_LOCAL_NAME, bytes(self.name, 'utf-8'))]) + ) def load_from_file(self, filename): with open(filename, 'r') as file: @@ -458,9 +486,9 @@ class Device(CompositeEventEmitter): self.connecting = False self.disconnecting = False self.connections = {} # Connections, by connection handle - self.classic_enabled = False self.inquiry_response = None self.address_resolver = None + self.le_rpa_task = None # Use the initial config or a default self.public_address = Address('00:00:00:00:00:00') @@ -468,6 +496,7 @@ class Device(CompositeEventEmitter): config = DeviceConfiguration() self.name = config.name self.random_address = config.address + self.identity_address = config.address self.class_of_device = config.class_of_device self.scan_response_data = config.scan_response_data self.advertising_data = config.advertising_data @@ -477,6 +506,9 @@ class Device(CompositeEventEmitter): self.irk = config.irk self.le_enabled = config.le_enabled self.le_simultaneous_enabled = config.le_simultaneous_enabled + self.le_privacy_enabled = config.le_privacy_enabled + self.le_rpa_timeout = config.le_rpa_timeout + self.classic_enabled = config.classic_enabled self.classic_ssp_enabled = config.classic_ssp_enabled self.classic_sc_enabled = config.classic_sc_enabled self.discoverable = config.discoverable @@ -490,11 +522,12 @@ class Device(CompositeEventEmitter): if address: if type(address) is str: address = Address(address) - self.random_address = address + self.random_address = address + self.identity_address = address # Setup SMP # TODO: allow using a public address - self.smp_manager = smp.Manager(self, self.random_address) + self.smp_manager = smp.Manager(self, self.random_address, self.identity_address) self.l2cap_channel_manager.register_fixed_channel( smp.SMP_CID, self.on_smp_pdu) self.l2cap_channel_manager.register_fixed_channel( @@ -591,6 +624,14 @@ class Device(CompositeEventEmitter): )) if self.le_enabled: + # If LE Privacy is enabled, generate an RPA + if self.le_privacy_enabled: + self.random_address = self.generate_le_rpa() + logger.info(f'Initial RPA: {self.random_address}') + if self.le_rpa_timeout > 0: + # Start a task to periodically generate a new RPA + self.le_rpa_task = asyncio.create_task(self.run_le_rpa_generation()) + # Set the controller address await self.send_command(HCI_LE_Set_Random_Address_Command( random_address = self.random_address @@ -637,13 +678,48 @@ class Device(CompositeEventEmitter): await self.set_connectable(self.connectable) await self.set_discoverable(self.discoverable) - # Let the SMP manager know about the address - # TODO: allow using a public address - self.smp_manager.address = self.random_address - # Done self.powered_on = True + async def run_le_rpa_generation(self): + while self.le_rpa_timeout != 0: + await asyncio.sleep(self.le_rpa_timeout) + + # Check if this is a good time to rotate the address + if self.advertising or self.scanning or self.connecting: + logger.debug('skipping RPA rotation') + continue + + random_address = self.generate_le_rpa() + response = await self.send_command(HCI_LE_Set_Random_Address_Command( + random_address = self.random_address + )) + if response.return_parameters == HCI_SUCCESS: + logger.info(f'New RPA: {random_address}') + self.random_address = random_address + else: + logger.warning(f'failed to set RPA: {response.return_parameters}') + + def generate_le_rpa(self): + # See 1.3.2.2 Private device address generation + + # Generate `prand` + while True: + # Generate a 22-bit random number for the random part of `prand` + prand_random = secrets.randbelow(0x400000) + + # As least on bit shall be 0 and one bit shall be 1 + if prand_random != 0 and prand_random != 0x3FFFFF: + break + + prand = prand_random | 0x400000 # The two MSBs are |1|0| + + # Generate `hash` + hash = crypto.ah(self.irk, struct.pack('