From 615691ec81ae05af7b588b6dd7dcd3f5dd5f296c Mon Sep 17 00:00:00 2001 From: Gilles Boccon-Gibod Date: Thu, 1 Aug 2024 15:37:11 -0700 Subject: [PATCH] add basic RPA support --- bumble/device.py | 69 +++++++++++++++++++++++++++++++---- bumble/smp.py | 4 +- examples/device_with_rpa.json | 7 ++++ tests/device_test.py | 28 -------------- 4 files changed, 71 insertions(+), 37 deletions(-) create mode 100644 examples/device_with_rpa.json diff --git a/bumble/device.py b/bumble/device.py index e163a46..66178fb 100644 --- a/bumble/device.py +++ b/bumble/device.py @@ -259,8 +259,9 @@ DEVICE_DEFAULT_L2CAP_COC_MAX_CREDITS = l2cap.L2CAP_LE_CREDIT_BASED_CONN DEVICE_DEFAULT_ADVERTISING_TX_POWER = ( HCI_LE_Set_Extended_Advertising_Parameters_Command.TX_POWER_NO_PREFERENCE ) -DEVICE_DEFAULT_PERIODIC_ADVERTISING_SYNC_SKIP = 0 +DEVICE_DEFAULT_PERIODIC_ADVERTISING_SYNC_SKIP = 0 DEVICE_DEFAULT_PERIODIC_ADVERTISING_SYNC_TIMEOUT = 5.0 +DEVICE_DEFAULT_LE_RPA_TIMEOUT = 15 * 60 # 15 minutes (in seconds) # fmt: on # pylint: enable=line-too-long @@ -1567,8 +1568,9 @@ class DeviceConfiguration: advertising_interval_min: int = DEVICE_DEFAULT_ADVERTISING_INTERVAL advertising_interval_max: int = DEVICE_DEFAULT_ADVERTISING_INTERVAL le_enabled: bool = True - # LE host enable 2nd parameter le_simultaneous_enabled: bool = False + le_privacy_enabled: bool = False + le_rpa_timeout: int = DEVICE_DEFAULT_LE_RPA_TIMEOUT classic_enabled: bool = False classic_sc_enabled: bool = True classic_ssp_enabled: bool = True @@ -1736,8 +1738,9 @@ device_host_event_handlers: List[str] = [] # ----------------------------------------------------------------------------- class Device(CompositeEventEmitter): # Incomplete list of fields. - random_address: Address - public_address: Address + random_address: Address # Random address that may change with RPA + public_address: Address # Public address (obtained from the controller) + static_address: Address # Random address that can be set but does not change classic_enabled: bool name: str class_of_device: int @@ -1867,15 +1870,19 @@ class Device(CompositeEventEmitter): config = config or DeviceConfiguration() self.config = config - self.public_address = Address('00:00:00:00:00:00') self.name = config.name + self.public_address = Address.ANY self.random_address = config.address + self.static_address = config.address self.class_of_device = config.class_of_device self.keystore = None self.irk = config.irk self.le_enabled = config.le_enabled - self.classic_enabled = config.classic_enabled self.le_simultaneous_enabled = config.le_simultaneous_enabled + self.le_privacy_enabled = config.le_privacy_enabled + self.le_rpa_timeout = config.le_rpa_timeout + self.le_rpa_periodic_update_task: Optional[asyncio.Task] = None + self.classic_enabled = config.classic_enabled self.cis_enabled = config.cis_enabled self.classic_sc_enabled = config.classic_sc_enabled self.classic_ssp_enabled = config.classic_ssp_enabled @@ -1939,6 +1946,7 @@ class Device(CompositeEventEmitter): if isinstance(address, str): address = Address(address) self.random_address = address + self.static_address = address # Setup SMP self.smp_manager = smp.Manager( @@ -2170,6 +2178,16 @@ class Device(CompositeEventEmitter): ) if self.le_enabled: + # If LE Privacy is enabled, generate an RPA + if self.le_privacy_enabled: + self.random_address = Address.generate_private_address(self.irk) + logger.info(f'Initial RPA: {self.random_address}') + if self.le_rpa_timeout > 0: + # Start a task to periodically generate a new RPA + self.le_rpa_periodic_update_task = asyncio.create_task( + self._run_rpa_periodic_update() + ) + # Set the controller address if self.random_address == Address.ANY_RANDOM: # Try to use an address generated at random by the controller @@ -2249,9 +2267,45 @@ class Device(CompositeEventEmitter): async def power_off(self) -> None: if self.powered_on: + if self.le_rpa_periodic_update_task: + self.le_rpa_periodic_update_task.cancel() + await self.host.flush() + self.powered_on = False + async def update_rpa(self) -> bool: + """ + Try to update the RPA. + + Returns: + True if the RPA was updated, False if it could not be updated. + """ + + # Check if this is a good time to rotate the address + if self.is_advertising or self.is_scanning or self.is_le_connecting: + logger.debug('skipping RPA update') + return False + + random_address = Address.generate_private_address(self.irk) + response = await self.send_command( + HCI_LE_Set_Random_Address_Command(random_address=self.random_address) + ) + if response.return_parameters == HCI_SUCCESS: + logger.info(f'new RPA: {random_address}') + self.random_address = random_address + return True + else: + logger.warning(f'failed to set RPA: {response.return_parameters}') + return False + + async def _run_rpa_periodic_update(self) -> None: + """Update the RPA periodically""" + while self.le_rpa_timeout != 0: + await asyncio.sleep(self.le_rpa_timeout) + if not self.update_rpa(): + logger.debug("periodic RPA update failed") + async def refresh_resolving_list(self) -> None: assert self.keystore is not None @@ -4871,5 +4925,6 @@ class Device(CompositeEventEmitter): return ( f'Device(name="{self.name}", ' f'random_address="{self.random_address}", ' - f'public_address="{self.public_address}")' + f'public_address="{self.public_address}", ' + f'static_address="{self.static_address}")' ) diff --git a/bumble/smp.py b/bumble/smp.py index cf523e7..f95ff1c 100644 --- a/bumble/smp.py +++ b/bumble/smp.py @@ -1076,9 +1076,9 @@ class Session: def send_identity_address_command(self) -> None: identity_address = { - None: self.connection.self_address, + None: self.manager.device.static_address, Address.PUBLIC_DEVICE_ADDRESS: self.manager.device.public_address, - Address.RANDOM_DEVICE_ADDRESS: self.manager.device.random_address, + Address.RANDOM_DEVICE_ADDRESS: self.manager.device.static_address, }[self.pairing_config.identity_address_type] self.send_command( SMP_Identity_Address_Information_Command( diff --git a/examples/device_with_rpa.json b/examples/device_with_rpa.json new file mode 100644 index 0000000..56f1ec2 --- /dev/null +++ b/examples/device_with_rpa.json @@ -0,0 +1,7 @@ +{ + "name": "Bumble", + "address": "F0:F1:F2:F3:F4:F5", + "keystore": "JsonKeyStore", + "irk": "865F81FF5A8B486EAAE29A27AD9F77DC", + "le_privacy_enabled": true +} diff --git a/tests/device_test.py b/tests/device_test.py index b5df89a..a15353e 100644 --- a/tests/device_test.py +++ b/tests/device_test.py @@ -276,34 +276,6 @@ async def test_legacy_advertising(): assert not device.is_advertising -# ----------------------------------------------------------------------------- -@pytest.mark.parametrize( - 'own_address_type,', - (OwnAddressType.PUBLIC, OwnAddressType.RANDOM), -) -@pytest.mark.asyncio -async def test_legacy_advertising_connection(own_address_type): - device = Device(host=mock.AsyncMock(Host)) - peer_address = Address('F0:F1:F2:F3:F4:F5') - - # Start advertising - await device.start_advertising() - device.on_connection( - 0x0001, - BT_LE_TRANSPORT, - peer_address, - BT_PERIPHERAL_ROLE, - ConnectionParameters(0, 0, 0), - ) - - if own_address_type == OwnAddressType.PUBLIC: - assert device.lookup_connection(0x0001).self_address == device.public_address - else: - assert device.lookup_connection(0x0001).self_address == device.random_address - - await async_barrier() - - # ----------------------------------------------------------------------------- @pytest.mark.parametrize( 'auto_restart,',