Compare commits

...

1 Commits

Author SHA1 Message Date
Gilles Boccon-Gibod
a9e726545e wip 2022-09-20 14:58:20 -07:00
4 changed files with 128 additions and 37 deletions

View File

@@ -18,7 +18,8 @@
import json import json
import asyncio import asyncio
import logging import logging
from contextlib import asynccontextmanager, AsyncExitStack import secrets
from contextlib import asynccontextmanager, AsyncExitStack
from .hci import * from .hci import *
from .host import Host from .host import Host
@@ -32,6 +33,8 @@ from . import smp
from . import sdp from . import sdp
from . import l2cap from . import l2cap
from . import keys from . import keys
from . import crypto
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# Logging # Logging
@@ -51,6 +54,7 @@ DEVICE_DEFAULT_SCAN_RESPONSE_DATA = b''
DEVICE_DEFAULT_DATA_LENGTH = (27, 328, 27, 328) DEVICE_DEFAULT_DATA_LENGTH = (27, 328, 27, 328)
DEVICE_DEFAULT_SCAN_INTERVAL = 60 # ms DEVICE_DEFAULT_SCAN_INTERVAL = 60 # ms
DEVICE_DEFAULT_SCAN_WINDOW = 60 # ms DEVICE_DEFAULT_SCAN_WINDOW = 60 # ms
DEVICE_DEFAULT_LE_RPA_TIMEOUT = 15 * 60 # 15 minutes (in seconds)
DEVICE_MIN_SCAN_INTERVAL = 25 DEVICE_MIN_SCAN_INTERVAL = 25
DEVICE_MAX_SCAN_INTERVAL = 10240 DEVICE_MAX_SCAN_INTERVAL = 10240
DEVICE_MIN_SCAN_WINDOW = 25 DEVICE_MIN_SCAN_WINDOW = 25
@@ -169,7 +173,6 @@ class Peer:
async def __aexit__(self, exc_type, exc_value, traceback): async def __aexit__(self, exc_type, exc_value, traceback):
pass pass
def __str__(self): def __str__(self):
return f'{self.connection.peer_address} as {self.connection.role_name}' return f'{self.connection.peer_address} as {self.connection.role_name}'
@@ -202,11 +205,22 @@ class Connection(CompositeEventEmitter):
def on_connection_encryption_key_refresh(self): def on_connection_encryption_key_refresh(self):
pass pass
def __init__(self, device, handle, transport, peer_address, peer_resolvable_address, role, parameters): def __init__(
self,
device,
handle,
transport,
local_address,
peer_address,
peer_resolvable_address,
role,
parameters
):
super().__init__() super().__init__()
self.device = device self.device = device
self.handle = handle self.handle = handle
self.transport = transport self.transport = transport
self.local_address = local_address
self.peer_address = peer_address self.peer_address = peer_address
self.peer_resolvable_address = peer_resolvable_address self.peer_resolvable_address = peer_resolvable_address
self.peer_name = None # Classic only self.peer_name = None # Classic only
@@ -297,7 +311,12 @@ class Connection(CompositeEventEmitter):
raise raise
def __str__(self): def __str__(self):
return f'Connection(handle=0x{self.handle:04X}, role={self.role_name}, address={self.peer_address})' return (
f'Connection(handle=0x{self.handle:04X}, '
f'role={self.role_name}, '
f'local_address={self.local_address}, '
f'peer_address={self.peer_address})'
)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -311,8 +330,10 @@ class DeviceConfiguration:
self.advertising_interval_min = DEVICE_DEFAULT_ADVERTISING_INTERVAL self.advertising_interval_min = DEVICE_DEFAULT_ADVERTISING_INTERVAL
self.advertising_interval_max = DEVICE_DEFAULT_ADVERTISING_INTERVAL self.advertising_interval_max = DEVICE_DEFAULT_ADVERTISING_INTERVAL
self.le_enabled = True self.le_enabled = True
# LE host enable 2nd parameter
self.le_simultaneous_enabled = True self.le_simultaneous_enabled = True
self.le_privacy_enabled = False
self.le_rpa_timeout = DEVICE_DEFAULT_LE_RPA_TIMEOUT
self.classic_enabled = False
self.classic_sc_enabled = True self.classic_sc_enabled = True
self.classic_ssp_enabled = True self.classic_ssp_enabled = True
self.connectable = True self.connectable = True
@@ -320,19 +341,22 @@ class DeviceConfiguration:
self.advertising_data = bytes( self.advertising_data = bytes(
AdvertisingData([(AdvertisingData.COMPLETE_LOCAL_NAME, bytes(self.name, 'utf-8'))]) AdvertisingData([(AdvertisingData.COMPLETE_LOCAL_NAME, bytes(self.name, 'utf-8'))])
) )
self.irk = bytes(16) # This really must be changed for any level of security self.irk = bytes([0xFF] * 16) # This really must be changed for any level of security
self.keystore = None self.keystore = None
def load_from_dict(self, config): def load_from_dict(self, config):
# Load simple properties # Load simple properties
self.name = config.get('name', self.name) self.name = config.get('name', self.name)
self.address = Address(config.get('address', self.address)) self.address = Address(config.get('address', self.address))
self.class_of_device = config.get('class_of_device', self.class_of_device) self.class_of_device = config.get('class_of_device', self.class_of_device)
self.advertising_interval_min = config.get('advertising_interval', self.advertising_interval_min) self.advertising_interval_min = config.get('advertising_interval', self.advertising_interval_min)
self.advertising_interval_max = self.advertising_interval_min self.advertising_interval_max = self.advertising_interval_min
self.keystore = config.get('keystore') self.keystore = config.get('keystore')
self.le_enabled = config.get('le_enabled', self.le_enabled) self.le_enabled = config.get('le_enabled', self.le_enabled)
self.le_simultaneous_enabled = config.get('le_simultaneous_enabled', self.le_simultaneous_enabled) self.le_simultaneous_enabled = config.get('le_simultaneous_enabled', self.le_simultaneous_enabled)
self.le_privacy_enabled = config.get('le_privacy_enabled', self.le_privacy_enabled)
self.le_rpa_timeout = config.get('le_rpa_timeout', self.le_rpa_timeout)
self.classic_enabled = config.get('classic_enabled', self.classic_enabled)
self.classic_sc_enabled = config.get('classic_sc_enabled', self.classic_sc_enabled) self.classic_sc_enabled = config.get('classic_sc_enabled', self.classic_sc_enabled)
self.classic_ssp_enabled = config.get('classic_ssp_enabled', self.classic_ssp_enabled) self.classic_ssp_enabled = config.get('classic_ssp_enabled', self.classic_ssp_enabled)
self.connectable = config.get('connectable', self.connectable) self.connectable = config.get('connectable', self.connectable)
@@ -352,6 +376,10 @@ class DeviceConfiguration:
advertising_data = config.get('advertising_data') advertising_data = config.get('advertising_data')
if advertising_data: if advertising_data:
self.advertising_data = bytes.fromhex(advertising_data) self.advertising_data = bytes.fromhex(advertising_data)
else:
self.advertising_data = bytes(
AdvertisingData([(AdvertisingData.COMPLETE_LOCAL_NAME, bytes(self.name, 'utf-8'))])
)
def load_from_file(self, filename): def load_from_file(self, filename):
with open(filename, 'r') as file: with open(filename, 'r') as file:
@@ -458,9 +486,9 @@ class Device(CompositeEventEmitter):
self.connecting = False self.connecting = False
self.disconnecting = False self.disconnecting = False
self.connections = {} # Connections, by connection handle self.connections = {} # Connections, by connection handle
self.classic_enabled = False
self.inquiry_response = None self.inquiry_response = None
self.address_resolver = None self.address_resolver = None
self.le_rpa_task = None
# Use the initial config or a default # Use the initial config or a default
self.public_address = Address('00:00:00:00:00:00') self.public_address = Address('00:00:00:00:00:00')
@@ -468,6 +496,7 @@ class Device(CompositeEventEmitter):
config = DeviceConfiguration() config = DeviceConfiguration()
self.name = config.name self.name = config.name
self.random_address = config.address self.random_address = config.address
self.identity_address = config.address
self.class_of_device = config.class_of_device self.class_of_device = config.class_of_device
self.scan_response_data = config.scan_response_data self.scan_response_data = config.scan_response_data
self.advertising_data = config.advertising_data self.advertising_data = config.advertising_data
@@ -477,6 +506,9 @@ class Device(CompositeEventEmitter):
self.irk = config.irk self.irk = config.irk
self.le_enabled = config.le_enabled self.le_enabled = config.le_enabled
self.le_simultaneous_enabled = config.le_simultaneous_enabled self.le_simultaneous_enabled = config.le_simultaneous_enabled
self.le_privacy_enabled = config.le_privacy_enabled
self.le_rpa_timeout = config.le_rpa_timeout
self.classic_enabled = config.classic_enabled
self.classic_ssp_enabled = config.classic_ssp_enabled self.classic_ssp_enabled = config.classic_ssp_enabled
self.classic_sc_enabled = config.classic_sc_enabled self.classic_sc_enabled = config.classic_sc_enabled
self.discoverable = config.discoverable self.discoverable = config.discoverable
@@ -490,11 +522,12 @@ class Device(CompositeEventEmitter):
if address: if address:
if type(address) is str: if type(address) is str:
address = Address(address) address = Address(address)
self.random_address = address self.random_address = address
self.identity_address = address
# Setup SMP # Setup SMP
# TODO: allow using a public address # TODO: allow using a public address
self.smp_manager = smp.Manager(self, self.random_address) self.smp_manager = smp.Manager(self, self.random_address, self.identity_address)
self.l2cap_channel_manager.register_fixed_channel( self.l2cap_channel_manager.register_fixed_channel(
smp.SMP_CID, self.on_smp_pdu) smp.SMP_CID, self.on_smp_pdu)
self.l2cap_channel_manager.register_fixed_channel( self.l2cap_channel_manager.register_fixed_channel(
@@ -591,6 +624,14 @@ class Device(CompositeEventEmitter):
)) ))
if self.le_enabled: if self.le_enabled:
# If LE Privacy is enabled, generate an RPA
if self.le_privacy_enabled:
self.random_address = self.generate_le_rpa()
logger.info(f'Initial RPA: {self.random_address}')
if self.le_rpa_timeout > 0:
# Start a task to periodically generate a new RPA
self.le_rpa_task = asyncio.create_task(self.run_le_rpa_generation())
# Set the controller address # Set the controller address
await self.send_command(HCI_LE_Set_Random_Address_Command( await self.send_command(HCI_LE_Set_Random_Address_Command(
random_address = self.random_address random_address = self.random_address
@@ -637,13 +678,48 @@ class Device(CompositeEventEmitter):
await self.set_connectable(self.connectable) await self.set_connectable(self.connectable)
await self.set_discoverable(self.discoverable) await self.set_discoverable(self.discoverable)
# Let the SMP manager know about the address
# TODO: allow using a public address
self.smp_manager.address = self.random_address
# Done # Done
self.powered_on = True self.powered_on = True
async def run_le_rpa_generation(self):
while self.le_rpa_timeout != 0:
await asyncio.sleep(self.le_rpa_timeout)
# Check if this is a good time to rotate the address
if self.advertising or self.scanning or self.connecting:
logger.debug('skipping RPA rotation')
continue
random_address = self.generate_le_rpa()
response = await self.send_command(HCI_LE_Set_Random_Address_Command(
random_address = self.random_address
))
if response.return_parameters == HCI_SUCCESS:
logger.info(f'New RPA: {random_address}')
self.random_address = random_address
else:
logger.warning(f'failed to set RPA: {response.return_parameters}')
def generate_le_rpa(self):
# See 1.3.2.2 Private device address generation
# Generate `prand`
while True:
# Generate a 22-bit random number for the random part of `prand`
prand_random = secrets.randbelow(0x400000)
# As least on bit shall be 0 and one bit shall be 1
if prand_random != 0 and prand_random != 0x3FFFFF:
break
prand = prand_random | 0x400000 # The two MSBs are |1|0|
# Generate `hash`
hash = crypto.ah(self.irk, struct.pack('<I', prand)[:3])
# Generate the address from `prand` and `hash`
return Address(hash + struct.pack('<I', prand)[:3], Address.RANDOM_IDENTITY_ADDRESS)
async def start_advertising(self, auto_restart=False): async def start_advertising(self, auto_restart=False):
self.auto_restart_advertising = auto_restart self.auto_restart_advertising = auto_restart
@@ -675,18 +751,24 @@ class Device(CompositeEventEmitter):
)) ))
# Enable advertising # Enable advertising
await self.send_command(HCI_LE_Set_Advertising_Enable_Command( response = await self.send_command(HCI_LE_Set_Advertising_Enable_Command(
advertising_enable = 1 advertising_enable = 1
)) ))
if response.return_parameters != HCI_SUCCESS:
logger.warning(f'HCI_LE_Set_Advertising_Enable_Command failed ({response.return_parameters})')
raise HCI_Error(response.return_parameters)
self.advertising = True self.advertising = True
async def stop_advertising(self): async def stop_advertising(self):
# Disable advertising # Disable advertising
if self.advertising: if self.advertising:
await self.send_command(HCI_LE_Set_Advertising_Enable_Command( response = await self.send_command(HCI_LE_Set_Advertising_Enable_Command(
advertising_enable = 0 advertising_enable = 0
)) ))
if response.return_parameters != HCI_SUCCESS:
logger.warning(f'HCI_LE_Set_Advertising_Enable_Command failed ({response.return_parameters})')
raise HCI_Error(response.return_parameters)
self.advertising = False self.advertising = False
@@ -721,17 +803,23 @@ class Device(CompositeEventEmitter):
)) ))
# Enable scanning # Enable scanning
await self.send_command(HCI_LE_Set_Scan_Enable_Command( response = await self.send_command(HCI_LE_Set_Scan_Enable_Command(
le_scan_enable = 1, le_scan_enable = 1,
filter_duplicates = 1 if filter_duplicates else 0 filter_duplicates = 1 if filter_duplicates else 0
)) ))
if response.return_parameters != HCI_SUCCESS:
raise HCI_Error(response.return_parameters)
self.scanning = True self.scanning = True
async def stop_scanning(self): async def stop_scanning(self):
await self.send_command(HCI_LE_Set_Scan_Enable_Command( response = await self.send_command(HCI_LE_Set_Scan_Enable_Command(
le_scan_enable = 0, le_scan_enable = 0,
filter_duplicates = 0 filter_duplicates = 0
)) ))
if response.return_parameters != HCI_SUCCESS:
raise HCI_Error(response.return_parameters)
self.scanning = False self.scanning = False
@property @property
@@ -1242,6 +1330,7 @@ class Device(CompositeEventEmitter):
self, self,
connection_handle, connection_handle,
transport, transport,
self.public_address if transport == BT_BR_EDR_TRANSPORT else self.random_address,
peer_address, peer_address,
peer_resolvable_address, peer_resolvable_address,
role, role,

View File

@@ -1375,9 +1375,11 @@ class HCI_Error(ProtocolError):
class HCI_StatusError(ProtocolError): class HCI_StatusError(ProtocolError):
def __init__(self, response): def __init__(self, response):
super().__init__(response.status, super().__init__(
error_namespace=HCI_Command.command_name(response.command_opcode), response.status,
error_name=HCI_Constant.status_name(response.status)) error_namespace=HCI_Command.command_name(response.command_opcode),
error_name=HCI_Constant.status_name(response.status)
)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------

View File

@@ -638,13 +638,13 @@ class Session:
# Set up addresses # Set up addresses
peer_address = connection.peer_resolvable_address or connection.peer_address peer_address = connection.peer_resolvable_address or connection.peer_address
if self.is_initiator: if self.is_initiator:
self.ia = bytes(manager.address) self.ia = bytes(connection.local_address)
self.iat = 1 if manager.address.is_random else 0 self.iat = 1 if connection.local_address.is_random else 0
self.ra = bytes(peer_address) self.ra = bytes(peer_address)
self.rat = 1 if peer_address.is_random else 0 self.rat = 1 if peer_address.is_random else 0
else: else:
self.ra = bytes(manager.address) self.ra = bytes(connection.local_address)
self.rat = 1 if manager.address.is_random else 0 self.rat = 1 if connection.local_address.is_random else 0
self.ia = bytes(peer_address) self.ia = bytes(peer_address)
self.iat = 1 if peer_address.is_random else 0 self.iat = 1 if peer_address.is_random else 0
@@ -906,8 +906,8 @@ class Session:
SMP_Identity_Information_Command(identity_resolving_key=self.manager.device.irk) SMP_Identity_Information_Command(identity_resolving_key=self.manager.device.irk)
) )
self.send_command(SMP_Identity_Address_Information_Command( self.send_command(SMP_Identity_Address_Information_Command(
addr_type = self.manager.address.address_type, addr_type = self.manager.identity_address.address_type,
bd_addr = self.manager.address bd_addr = self.manager.identity_address
)) ))
# Distribute CSRK # Distribute CSRK
@@ -938,8 +938,8 @@ class Session:
SMP_Identity_Information_Command(identity_resolving_key=self.manager.device.irk) SMP_Identity_Information_Command(identity_resolving_key=self.manager.device.irk)
) )
self.send_command(SMP_Identity_Address_Information_Command( self.send_command(SMP_Identity_Address_Information_Command(
addr_type = self.manager.address.address_type, addr_type = self.manager.identity_address.address_type,
bd_addr = self.manager.address bd_addr = self.manager.identity_address
)) ))
# Distribute CSRK # Distribute CSRK
@@ -1479,10 +1479,10 @@ class Manager(EventEmitter):
Implements the Initiator and Responder roles of the Security Manager Protocol Implements the Initiator and Responder roles of the Security Manager Protocol
''' '''
def __init__(self, device, address): def __init__(self, device, address, identity_address):
super().__init__() super().__init__()
self.device = device self.device = device
self.address = address self.identity_address = identity_address
self.sessions = {} self.sessions = {}
self._ecc_key = None self._ecc_key = None
self.pairing_config_factory = lambda connection: PairingConfig() self.pairing_config_factory = lambda connection: PairingConfig()

View File

@@ -20,7 +20,7 @@ import sys
import os import os
import logging import logging
from colors import color from colors import color
from bumble.device import Device, Peer from bumble.device import Device
from bumble.transport import open_transport from bumble.transport import open_transport
from bumble.profiles.battery_service import BatteryServiceProxy from bumble.profiles.battery_service import BatteryServiceProxy