Add a flag to enable LE address resolution

This commit is contained in:
Josh Wu
2023-08-02 16:56:20 +08:00
parent 6ea669531a
commit 784cf4f26a
+32 -24
View File
@@ -86,6 +86,7 @@ from .hci import (
HCI_LE_Extended_Create_Connection_Command,
HCI_LE_Rand_Command,
HCI_LE_Read_PHY_Command,
HCI_LE_Set_Address_Resolution_Enable_Command,
HCI_LE_Set_Advertising_Data_Command,
HCI_LE_Set_Advertising_Enable_Command,
HCI_LE_Set_Advertising_Parameters_Command,
@@ -778,6 +779,7 @@ class DeviceConfiguration:
self.irk = bytes(16) # This really must be changed for any level of security
self.keystore = None
self.gatt_services: List[Dict[str, Any]] = []
self.address_resolution_offload = False
def load_from_dict(self, config: Dict[str, Any]) -> None:
# Load simple properties
@@ -1029,6 +1031,7 @@ class Device(CompositeEventEmitter):
self.discoverable = config.discoverable
self.connectable = config.connectable
self.classic_accept_any = config.classic_accept_any
self.address_resolution_offload = config.address_resolution_offload
for service in config.gatt_services:
characteristics = []
@@ -1256,31 +1259,16 @@ class Device(CompositeEventEmitter):
)
# Load the address resolving list
if self.keystore and self.host.supports_command(
HCI_LE_CLEAR_RESOLVING_LIST_COMMAND
):
await self.send_command(HCI_LE_Clear_Resolving_List_Command()) # type: ignore[call-arg]
if self.keystore:
await self.refresh_resolving_list()
resolving_keys = await self.keystore.get_resolving_keys()
for irk, address in resolving_keys:
await self.send_command(
HCI_LE_Add_Device_To_Resolving_List_Command(
peer_identity_address_type=address.address_type,
peer_identity_address=address,
peer_irk=irk,
local_irk=self.irk,
) # type: ignore[call-arg]
)
# Enable address resolution
# await self.send_command(
# HCI_LE_Set_Address_Resolution_Enable_Command(
# address_resolution_enable=1)
# )
# )
# Create a host-side address resolver
self.address_resolver = smp.AddressResolver(resolving_keys)
# Enable address resolution
if self.address_resolution_offload:
await self.send_command(
HCI_LE_Set_Address_Resolution_Enable_Command(
address_resolution_enable=1
) # type: ignore[call-arg]
)
if self.classic_enabled:
await self.send_command(
@@ -1310,6 +1298,26 @@ class Device(CompositeEventEmitter):
await self.host.flush()
self.powered_on = False
async def refresh_resolving_list(self) -> None:
assert self.keystore is not None
resolving_keys = await self.keystore.get_resolving_keys()
# Create a host-side address resolver
self.address_resolver = smp.AddressResolver(resolving_keys)
if self.address_resolution_offload:
await self.send_command(HCI_LE_Clear_Resolving_List_Command()) # type: ignore[call-arg]
for irk, address in resolving_keys:
await self.send_command(
HCI_LE_Add_Device_To_Resolving_List_Command(
peer_identity_address_type=address.address_type,
peer_identity_address=address,
peer_irk=irk,
local_irk=self.irk,
) # type: ignore[call-arg]
)
def supports_le_feature(self, feature):
return self.host.supports_le_feature(feature)