add basic RPA support

This commit is contained in:
Gilles Boccon-Gibod
2024-08-01 15:37:11 -07:00
parent ae8b83f294
commit 615691ec81
4 changed files with 71 additions and 37 deletions

View File

@@ -261,6 +261,7 @@ DEVICE_DEFAULT_ADVERTISING_TX_POWER = (
)
DEVICE_DEFAULT_PERIODIC_ADVERTISING_SYNC_SKIP = 0
DEVICE_DEFAULT_PERIODIC_ADVERTISING_SYNC_TIMEOUT = 5.0
DEVICE_DEFAULT_LE_RPA_TIMEOUT = 15 * 60 # 15 minutes (in seconds)
# fmt: on
# pylint: enable=line-too-long
@@ -1567,8 +1568,9 @@ class DeviceConfiguration:
advertising_interval_min: int = DEVICE_DEFAULT_ADVERTISING_INTERVAL
advertising_interval_max: int = DEVICE_DEFAULT_ADVERTISING_INTERVAL
le_enabled: bool = True
# LE host enable 2nd parameter
le_simultaneous_enabled: bool = False
le_privacy_enabled: bool = False
le_rpa_timeout: int = DEVICE_DEFAULT_LE_RPA_TIMEOUT
classic_enabled: bool = False
classic_sc_enabled: bool = True
classic_ssp_enabled: bool = True
@@ -1736,8 +1738,9 @@ device_host_event_handlers: List[str] = []
# -----------------------------------------------------------------------------
class Device(CompositeEventEmitter):
# Incomplete list of fields.
random_address: Address
public_address: Address
random_address: Address # Random address that may change with RPA
public_address: Address # Public address (obtained from the controller)
static_address: Address # Random address that can be set but does not change
classic_enabled: bool
name: str
class_of_device: int
@@ -1867,15 +1870,19 @@ class Device(CompositeEventEmitter):
config = config or DeviceConfiguration()
self.config = config
self.public_address = Address('00:00:00:00:00:00')
self.name = config.name
self.public_address = Address.ANY
self.random_address = config.address
self.static_address = config.address
self.class_of_device = config.class_of_device
self.keystore = None
self.irk = config.irk
self.le_enabled = config.le_enabled
self.classic_enabled = config.classic_enabled
self.le_simultaneous_enabled = config.le_simultaneous_enabled
self.le_privacy_enabled = config.le_privacy_enabled
self.le_rpa_timeout = config.le_rpa_timeout
self.le_rpa_periodic_update_task: Optional[asyncio.Task] = None
self.classic_enabled = config.classic_enabled
self.cis_enabled = config.cis_enabled
self.classic_sc_enabled = config.classic_sc_enabled
self.classic_ssp_enabled = config.classic_ssp_enabled
@@ -1939,6 +1946,7 @@ class Device(CompositeEventEmitter):
if isinstance(address, str):
address = Address(address)
self.random_address = address
self.static_address = address
# Setup SMP
self.smp_manager = smp.Manager(
@@ -2170,6 +2178,16 @@ class Device(CompositeEventEmitter):
)
if self.le_enabled:
# If LE Privacy is enabled, generate an RPA
if self.le_privacy_enabled:
self.random_address = Address.generate_private_address(self.irk)
logger.info(f'Initial RPA: {self.random_address}')
if self.le_rpa_timeout > 0:
# Start a task to periodically generate a new RPA
self.le_rpa_periodic_update_task = asyncio.create_task(
self._run_rpa_periodic_update()
)
# Set the controller address
if self.random_address == Address.ANY_RANDOM:
# Try to use an address generated at random by the controller
@@ -2249,9 +2267,45 @@ class Device(CompositeEventEmitter):
async def power_off(self) -> None:
if self.powered_on:
if self.le_rpa_periodic_update_task:
self.le_rpa_periodic_update_task.cancel()
await self.host.flush()
self.powered_on = False
async def update_rpa(self) -> bool:
"""
Try to update the RPA.
Returns:
True if the RPA was updated, False if it could not be updated.
"""
# Check if this is a good time to rotate the address
if self.is_advertising or self.is_scanning or self.is_le_connecting:
logger.debug('skipping RPA update')
return False
random_address = Address.generate_private_address(self.irk)
response = await self.send_command(
HCI_LE_Set_Random_Address_Command(random_address=self.random_address)
)
if response.return_parameters == HCI_SUCCESS:
logger.info(f'new RPA: {random_address}')
self.random_address = random_address
return True
else:
logger.warning(f'failed to set RPA: {response.return_parameters}')
return False
async def _run_rpa_periodic_update(self) -> None:
"""Update the RPA periodically"""
while self.le_rpa_timeout != 0:
await asyncio.sleep(self.le_rpa_timeout)
if not self.update_rpa():
logger.debug("periodic RPA update failed")
async def refresh_resolving_list(self) -> None:
assert self.keystore is not None
@@ -4871,5 +4925,6 @@ class Device(CompositeEventEmitter):
return (
f'Device(name="{self.name}", '
f'random_address="{self.random_address}", '
f'public_address="{self.public_address}")'
f'public_address="{self.public_address}", '
f'static_address="{self.static_address}")'
)

View File

@@ -1076,9 +1076,9 @@ class Session:
def send_identity_address_command(self) -> None:
identity_address = {
None: self.connection.self_address,
None: self.manager.device.static_address,
Address.PUBLIC_DEVICE_ADDRESS: self.manager.device.public_address,
Address.RANDOM_DEVICE_ADDRESS: self.manager.device.random_address,
Address.RANDOM_DEVICE_ADDRESS: self.manager.device.static_address,
}[self.pairing_config.identity_address_type]
self.send_command(
SMP_Identity_Address_Information_Command(

View File

@@ -0,0 +1,7 @@
{
"name": "Bumble",
"address": "F0:F1:F2:F3:F4:F5",
"keystore": "JsonKeyStore",
"irk": "865F81FF5A8B486EAAE29A27AD9F77DC",
"le_privacy_enabled": true
}

View File

@@ -276,34 +276,6 @@ async def test_legacy_advertising():
assert not device.is_advertising
# -----------------------------------------------------------------------------
@pytest.mark.parametrize(
'own_address_type,',
(OwnAddressType.PUBLIC, OwnAddressType.RANDOM),
)
@pytest.mark.asyncio
async def test_legacy_advertising_connection(own_address_type):
device = Device(host=mock.AsyncMock(Host))
peer_address = Address('F0:F1:F2:F3:F4:F5')
# Start advertising
await device.start_advertising()
device.on_connection(
0x0001,
BT_LE_TRANSPORT,
peer_address,
BT_PERIPHERAL_ROLE,
ConnectionParameters(0, 0, 0),
)
if own_address_type == OwnAddressType.PUBLIC:
assert device.lookup_connection(0x0001).self_address == device.public_address
else:
assert device.lookup_connection(0x0001).self_address == device.random_address
await async_barrier()
# -----------------------------------------------------------------------------
@pytest.mark.parametrize(
'auto_restart,',