forked from auracaster/bumble_mirror
Merge branch 'google:main' into bumble_hid_device
This commit is contained in:
1
.vscode/settings.json
vendored
1
.vscode/settings.json
vendored
@@ -29,6 +29,7 @@
|
|||||||
"deregistration",
|
"deregistration",
|
||||||
"dhkey",
|
"dhkey",
|
||||||
"diversifier",
|
"diversifier",
|
||||||
|
"endianness",
|
||||||
"Fitbit",
|
"Fitbit",
|
||||||
"GATTLINK",
|
"GATTLINK",
|
||||||
"HANDSFREE",
|
"HANDSFREE",
|
||||||
|
|||||||
148
bumble/crypto.py
148
bumble/crypto.py
@@ -21,6 +21,8 @@
|
|||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
# Imports
|
# Imports
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
import operator
|
import operator
|
||||||
|
|
||||||
@@ -29,11 +31,13 @@ from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
|
|||||||
from cryptography.hazmat.primitives.asymmetric.ec import (
|
from cryptography.hazmat.primitives.asymmetric.ec import (
|
||||||
generate_private_key,
|
generate_private_key,
|
||||||
ECDH,
|
ECDH,
|
||||||
|
EllipticCurvePrivateKey,
|
||||||
EllipticCurvePublicNumbers,
|
EllipticCurvePublicNumbers,
|
||||||
EllipticCurvePrivateNumbers,
|
EllipticCurvePrivateNumbers,
|
||||||
SECP256R1,
|
SECP256R1,
|
||||||
)
|
)
|
||||||
from cryptography.hazmat.primitives import cmac
|
from cryptography.hazmat.primitives import cmac
|
||||||
|
from typing import Tuple
|
||||||
|
|
||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
@@ -46,16 +50,18 @@ logger = logging.getLogger(__name__)
|
|||||||
# Classes
|
# Classes
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
class EccKey:
|
class EccKey:
|
||||||
def __init__(self, private_key):
|
def __init__(self, private_key: EllipticCurvePrivateKey) -> None:
|
||||||
self.private_key = private_key
|
self.private_key = private_key
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def generate(cls):
|
def generate(cls) -> EccKey:
|
||||||
private_key = generate_private_key(SECP256R1())
|
private_key = generate_private_key(SECP256R1())
|
||||||
return cls(private_key)
|
return cls(private_key)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def from_private_key_bytes(cls, d_bytes, x_bytes, y_bytes):
|
def from_private_key_bytes(
|
||||||
|
cls, d_bytes: bytes, x_bytes: bytes, y_bytes: bytes
|
||||||
|
) -> EccKey:
|
||||||
d = int.from_bytes(d_bytes, byteorder='big', signed=False)
|
d = int.from_bytes(d_bytes, byteorder='big', signed=False)
|
||||||
x = int.from_bytes(x_bytes, byteorder='big', signed=False)
|
x = int.from_bytes(x_bytes, byteorder='big', signed=False)
|
||||||
y = int.from_bytes(y_bytes, byteorder='big', signed=False)
|
y = int.from_bytes(y_bytes, byteorder='big', signed=False)
|
||||||
@@ -65,7 +71,7 @@ class EccKey:
|
|||||||
return cls(private_key)
|
return cls(private_key)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def x(self):
|
def x(self) -> bytes:
|
||||||
return (
|
return (
|
||||||
self.private_key.public_key()
|
self.private_key.public_key()
|
||||||
.public_numbers()
|
.public_numbers()
|
||||||
@@ -73,14 +79,14 @@ class EccKey:
|
|||||||
)
|
)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def y(self):
|
def y(self) -> bytes:
|
||||||
return (
|
return (
|
||||||
self.private_key.public_key()
|
self.private_key.public_key()
|
||||||
.public_numbers()
|
.public_numbers()
|
||||||
.y.to_bytes(32, byteorder='big')
|
.y.to_bytes(32, byteorder='big')
|
||||||
)
|
)
|
||||||
|
|
||||||
def dh(self, public_key_x, public_key_y):
|
def dh(self, public_key_x: bytes, public_key_y: bytes) -> bytes:
|
||||||
x = int.from_bytes(public_key_x, byteorder='big', signed=False)
|
x = int.from_bytes(public_key_x, byteorder='big', signed=False)
|
||||||
y = int.from_bytes(public_key_y, byteorder='big', signed=False)
|
y = int.from_bytes(public_key_y, byteorder='big', signed=False)
|
||||||
public_key = EllipticCurvePublicNumbers(x, y, SECP256R1()).public_key()
|
public_key = EllipticCurvePublicNumbers(x, y, SECP256R1()).public_key()
|
||||||
@@ -93,14 +99,23 @@ class EccKey:
|
|||||||
# Functions
|
# Functions
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
def xor(x, y):
|
def xor(x: bytes, y: bytes) -> bytes:
|
||||||
assert len(x) == len(y)
|
assert len(x) == len(y)
|
||||||
return bytes(map(operator.xor, x, y))
|
return bytes(map(operator.xor, x, y))
|
||||||
|
|
||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
def r():
|
def reverse(input: bytes) -> bytes:
|
||||||
|
'''
|
||||||
|
Returns bytes of input in reversed endianness.
|
||||||
|
'''
|
||||||
|
return input[::-1]
|
||||||
|
|
||||||
|
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
def r() -> bytes:
|
||||||
'''
|
'''
|
||||||
Generate 16 bytes of random data
|
Generate 16 bytes of random data
|
||||||
'''
|
'''
|
||||||
@@ -108,20 +123,20 @@ def r():
|
|||||||
|
|
||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
def e(key, data):
|
def e(key: bytes, data: bytes) -> bytes:
|
||||||
'''
|
'''
|
||||||
AES-128 ECB, expecting byte-swapped inputs and producing a byte-swapped output.
|
AES-128 ECB, expecting byte-swapped inputs and producing a byte-swapped output.
|
||||||
|
|
||||||
See Bluetooth spec Vol 3, Part H - 2.2.1 Security function e
|
See Bluetooth spec Vol 3, Part H - 2.2.1 Security function e
|
||||||
'''
|
'''
|
||||||
|
|
||||||
cipher = Cipher(algorithms.AES(bytes(reversed(key))), modes.ECB())
|
cipher = Cipher(algorithms.AES(reverse(key)), modes.ECB())
|
||||||
encryptor = cipher.encryptor()
|
encryptor = cipher.encryptor()
|
||||||
return bytes(reversed(encryptor.update(bytes(reversed(data)))))
|
return reverse(encryptor.update(reverse(data)))
|
||||||
|
|
||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
def ah(k, r): # pylint: disable=redefined-outer-name
|
def ah(k: bytes, r: bytes) -> bytes: # pylint: disable=redefined-outer-name
|
||||||
'''
|
'''
|
||||||
See Bluetooth spec Vol 3, Part H - 2.2.2 Random Address Hash function ah
|
See Bluetooth spec Vol 3, Part H - 2.2.2 Random Address Hash function ah
|
||||||
'''
|
'''
|
||||||
@@ -132,7 +147,16 @@ def ah(k, r): # pylint: disable=redefined-outer-name
|
|||||||
|
|
||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
def c1(k, r, preq, pres, iat, rat, ia, ra): # pylint: disable=redefined-outer-name
|
def c1(
|
||||||
|
k: bytes,
|
||||||
|
r: bytes,
|
||||||
|
preq: bytes,
|
||||||
|
pres: bytes,
|
||||||
|
iat: int,
|
||||||
|
rat: int,
|
||||||
|
ia: bytes,
|
||||||
|
ra: bytes,
|
||||||
|
) -> bytes: # pylint: disable=redefined-outer-name
|
||||||
'''
|
'''
|
||||||
See Bluetooth spec, Vol 3, Part H - 2.2.3 Confirm value generation function c1 for
|
See Bluetooth spec, Vol 3, Part H - 2.2.3 Confirm value generation function c1 for
|
||||||
LE Legacy Pairing
|
LE Legacy Pairing
|
||||||
@@ -144,7 +168,7 @@ def c1(k, r, preq, pres, iat, rat, ia, ra): # pylint: disable=redefined-outer-n
|
|||||||
|
|
||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
def s1(k, r1, r2):
|
def s1(k: bytes, r1: bytes, r2: bytes) -> bytes:
|
||||||
'''
|
'''
|
||||||
See Bluetooth spec, Vol 3, Part H - 2.2.4 Key generation function s1 for LE Legacy
|
See Bluetooth spec, Vol 3, Part H - 2.2.4 Key generation function s1 for LE Legacy
|
||||||
Pairing
|
Pairing
|
||||||
@@ -154,7 +178,7 @@ def s1(k, r1, r2):
|
|||||||
|
|
||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
def aes_cmac(m, k):
|
def aes_cmac(m: bytes, k: bytes) -> bytes:
|
||||||
'''
|
'''
|
||||||
See Bluetooth spec, Vol 3, Part H - 2.2.5 FunctionAES-CMAC
|
See Bluetooth spec, Vol 3, Part H - 2.2.5 FunctionAES-CMAC
|
||||||
|
|
||||||
@@ -166,20 +190,16 @@ def aes_cmac(m, k):
|
|||||||
|
|
||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
def f4(u, v, x, z):
|
def f4(u: bytes, v: bytes, x: bytes, z: bytes) -> bytes:
|
||||||
'''
|
'''
|
||||||
See Bluetooth spec, Vol 3, Part H - 2.2.6 LE Secure Connections Confirm Value
|
See Bluetooth spec, Vol 3, Part H - 2.2.6 LE Secure Connections Confirm Value
|
||||||
Generation Function f4
|
Generation Function f4
|
||||||
'''
|
'''
|
||||||
return bytes(
|
return reverse(aes_cmac(reverse(u) + reverse(v) + z, reverse(x)))
|
||||||
reversed(
|
|
||||||
aes_cmac(bytes(reversed(u)) + bytes(reversed(v)) + z, bytes(reversed(x)))
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
def f5(w, n1, n2, a1, a2):
|
def f5(w: bytes, n1: bytes, n2: bytes, a1: bytes, a2: bytes) -> Tuple[bytes, bytes]:
|
||||||
'''
|
'''
|
||||||
See Bluetooth spec, Vol 3, Part H - 2.2.7 LE Secure Connections Key Generation
|
See Bluetooth spec, Vol 3, Part H - 2.2.7 LE Secure Connections Key Generation
|
||||||
Function f5
|
Function f5
|
||||||
@@ -187,87 +207,83 @@ def f5(w, n1, n2, a1, a2):
|
|||||||
NOTE: this returns a tuple: (MacKey, LTK) in little-endian byte order
|
NOTE: this returns a tuple: (MacKey, LTK) in little-endian byte order
|
||||||
'''
|
'''
|
||||||
salt = bytes.fromhex('6C888391AAF5A53860370BDB5A6083BE')
|
salt = bytes.fromhex('6C888391AAF5A53860370BDB5A6083BE')
|
||||||
t = aes_cmac(bytes(reversed(w)), salt)
|
t = aes_cmac(reverse(w), salt)
|
||||||
key_id = bytes([0x62, 0x74, 0x6C, 0x65])
|
key_id = bytes([0x62, 0x74, 0x6C, 0x65])
|
||||||
return (
|
return (
|
||||||
bytes(
|
reverse(
|
||||||
reversed(
|
aes_cmac(
|
||||||
aes_cmac(
|
bytes([0])
|
||||||
bytes([0])
|
+ key_id
|
||||||
+ key_id
|
+ reverse(n1)
|
||||||
+ bytes(reversed(n1))
|
+ reverse(n2)
|
||||||
+ bytes(reversed(n2))
|
+ reverse(a1)
|
||||||
+ bytes(reversed(a1))
|
+ reverse(a2)
|
||||||
+ bytes(reversed(a2))
|
+ bytes([1, 0]),
|
||||||
+ bytes([1, 0]),
|
t,
|
||||||
t,
|
|
||||||
)
|
|
||||||
)
|
)
|
||||||
),
|
),
|
||||||
bytes(
|
reverse(
|
||||||
reversed(
|
aes_cmac(
|
||||||
aes_cmac(
|
bytes([1])
|
||||||
bytes([1])
|
+ key_id
|
||||||
+ key_id
|
+ reverse(n1)
|
||||||
+ bytes(reversed(n1))
|
+ reverse(n2)
|
||||||
+ bytes(reversed(n2))
|
+ reverse(a1)
|
||||||
+ bytes(reversed(a1))
|
+ reverse(a2)
|
||||||
+ bytes(reversed(a2))
|
+ bytes([1, 0]),
|
||||||
+ bytes([1, 0]),
|
t,
|
||||||
t,
|
|
||||||
)
|
|
||||||
)
|
)
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
def f6(w, n1, n2, r, io_cap, a1, a2): # pylint: disable=redefined-outer-name
|
def f6(
|
||||||
|
w: bytes, n1: bytes, n2: bytes, r: bytes, io_cap: bytes, a1: bytes, a2: bytes
|
||||||
|
) -> bytes: # pylint: disable=redefined-outer-name
|
||||||
'''
|
'''
|
||||||
See Bluetooth spec, Vol 3, Part H - 2.2.8 LE Secure Connections Check Value
|
See Bluetooth spec, Vol 3, Part H - 2.2.8 LE Secure Connections Check Value
|
||||||
Generation Function f6
|
Generation Function f6
|
||||||
'''
|
'''
|
||||||
return bytes(
|
return reverse(
|
||||||
reversed(
|
aes_cmac(
|
||||||
aes_cmac(
|
reverse(n1)
|
||||||
bytes(reversed(n1))
|
+ reverse(n2)
|
||||||
+ bytes(reversed(n2))
|
+ reverse(r)
|
||||||
+ bytes(reversed(r))
|
+ reverse(io_cap)
|
||||||
+ bytes(reversed(io_cap))
|
+ reverse(a1)
|
||||||
+ bytes(reversed(a1))
|
+ reverse(a2),
|
||||||
+ bytes(reversed(a2)),
|
reverse(w),
|
||||||
bytes(reversed(w)),
|
|
||||||
)
|
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
def g2(u, v, x, y):
|
def g2(u: bytes, v: bytes, x: bytes, y: bytes) -> int:
|
||||||
'''
|
'''
|
||||||
See Bluetooth spec, Vol 3, Part H - 2.2.9 LE Secure Connections Numeric Comparison
|
See Bluetooth spec, Vol 3, Part H - 2.2.9 LE Secure Connections Numeric Comparison
|
||||||
Value Generation Function g2
|
Value Generation Function g2
|
||||||
'''
|
'''
|
||||||
return int.from_bytes(
|
return int.from_bytes(
|
||||||
aes_cmac(
|
aes_cmac(
|
||||||
bytes(reversed(u)) + bytes(reversed(v)) + bytes(reversed(y)),
|
reverse(u) + reverse(v) + reverse(y),
|
||||||
bytes(reversed(x)),
|
reverse(x),
|
||||||
)[-4:],
|
)[-4:],
|
||||||
byteorder='big',
|
byteorder='big',
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
def h6(w, key_id):
|
def h6(w: bytes, key_id: bytes) -> bytes:
|
||||||
'''
|
'''
|
||||||
See Bluetooth spec, Vol 3, Part H - 2.2.10 Link key conversion function h6
|
See Bluetooth spec, Vol 3, Part H - 2.2.10 Link key conversion function h6
|
||||||
'''
|
'''
|
||||||
return aes_cmac(key_id, w)
|
return reverse(aes_cmac(key_id, reverse(w)))
|
||||||
|
|
||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
def h7(salt, w):
|
def h7(salt: bytes, w: bytes) -> bytes:
|
||||||
'''
|
'''
|
||||||
See Bluetooth spec, Vol 3, Part H - 2.2.11 Link key conversion function h7
|
See Bluetooth spec, Vol 3, Part H - 2.2.11 Link key conversion function h7
|
||||||
'''
|
'''
|
||||||
return aes_cmac(w, salt)
|
return reverse(aes_cmac(reverse(w), salt))
|
||||||
|
|||||||
155
bumble/device.py
155
bumble/device.py
@@ -32,6 +32,7 @@ from typing import (
|
|||||||
Optional,
|
Optional,
|
||||||
Tuple,
|
Tuple,
|
||||||
Type,
|
Type,
|
||||||
|
Set,
|
||||||
Union,
|
Union,
|
||||||
cast,
|
cast,
|
||||||
overload,
|
overload,
|
||||||
@@ -99,14 +100,20 @@ from .hci import (
|
|||||||
HCI_LE_Extended_Create_Connection_Command,
|
HCI_LE_Extended_Create_Connection_Command,
|
||||||
HCI_LE_Rand_Command,
|
HCI_LE_Rand_Command,
|
||||||
HCI_LE_Read_PHY_Command,
|
HCI_LE_Read_PHY_Command,
|
||||||
|
HCI_LE_Remove_Advertising_Set_Command,
|
||||||
HCI_LE_Set_Address_Resolution_Enable_Command,
|
HCI_LE_Set_Address_Resolution_Enable_Command,
|
||||||
HCI_LE_Set_Advertising_Data_Command,
|
HCI_LE_Set_Advertising_Data_Command,
|
||||||
HCI_LE_Set_Advertising_Enable_Command,
|
HCI_LE_Set_Advertising_Enable_Command,
|
||||||
HCI_LE_Set_Advertising_Parameters_Command,
|
HCI_LE_Set_Advertising_Parameters_Command,
|
||||||
|
HCI_LE_Set_Advertising_Set_Random_Address_Command,
|
||||||
HCI_LE_Set_Data_Length_Command,
|
HCI_LE_Set_Data_Length_Command,
|
||||||
HCI_LE_Set_Default_PHY_Command,
|
HCI_LE_Set_Default_PHY_Command,
|
||||||
HCI_LE_Set_Extended_Scan_Enable_Command,
|
HCI_LE_Set_Extended_Scan_Enable_Command,
|
||||||
HCI_LE_Set_Extended_Scan_Parameters_Command,
|
HCI_LE_Set_Extended_Scan_Parameters_Command,
|
||||||
|
HCI_LE_Set_Extended_Scan_Response_Data_Command,
|
||||||
|
HCI_LE_Set_Extended_Advertising_Data_Command,
|
||||||
|
HCI_LE_Set_Extended_Advertising_Enable_Command,
|
||||||
|
HCI_LE_Set_Extended_Advertising_Parameters_Command,
|
||||||
HCI_LE_Set_PHY_Command,
|
HCI_LE_Set_PHY_Command,
|
||||||
HCI_LE_Set_Random_Address_Command,
|
HCI_LE_Set_Random_Address_Command,
|
||||||
HCI_LE_Set_Scan_Enable_Command,
|
HCI_LE_Set_Scan_Enable_Command,
|
||||||
@@ -155,6 +162,7 @@ from .utils import (
|
|||||||
setup_event_forwarding,
|
setup_event_forwarding,
|
||||||
composite_listener,
|
composite_listener,
|
||||||
deprecated,
|
deprecated,
|
||||||
|
experimental,
|
||||||
)
|
)
|
||||||
from .keys import (
|
from .keys import (
|
||||||
KeyStore,
|
KeyStore,
|
||||||
@@ -189,6 +197,8 @@ DEVICE_MIN_SCAN_WINDOW = 25
|
|||||||
DEVICE_MAX_SCAN_WINDOW = 10240
|
DEVICE_MAX_SCAN_WINDOW = 10240
|
||||||
DEVICE_MIN_LE_RSSI = -127
|
DEVICE_MIN_LE_RSSI = -127
|
||||||
DEVICE_MAX_LE_RSSI = 20
|
DEVICE_MAX_LE_RSSI = 20
|
||||||
|
DEVICE_MIN_EXTENDED_ADVERTISING_SET_HANDLE = 0x00
|
||||||
|
DEVICE_MAX_EXTENDED_ADVERTISING_SET_HANDLE = 0xEF
|
||||||
|
|
||||||
DEVICE_DEFAULT_ADDRESS = '00:00:00:00:00:00'
|
DEVICE_DEFAULT_ADDRESS = '00:00:00:00:00:00'
|
||||||
DEVICE_DEFAULT_ADVERTISING_INTERVAL = 1000 # ms
|
DEVICE_DEFAULT_ADVERTISING_INTERVAL = 1000 # ms
|
||||||
@@ -960,6 +970,7 @@ class Device(CompositeEventEmitter):
|
|||||||
]
|
]
|
||||||
advertisement_accumulators: Dict[Address, AdvertisementDataAccumulator]
|
advertisement_accumulators: Dict[Address, AdvertisementDataAccumulator]
|
||||||
config: DeviceConfiguration
|
config: DeviceConfiguration
|
||||||
|
extended_advertising_handles: Set[int]
|
||||||
|
|
||||||
@composite_listener
|
@composite_listener
|
||||||
class Listener:
|
class Listener:
|
||||||
@@ -1058,6 +1069,7 @@ class Device(CompositeEventEmitter):
|
|||||||
self.classic_pending_accepts = {
|
self.classic_pending_accepts = {
|
||||||
Address.ANY: []
|
Address.ANY: []
|
||||||
} # Futures, by BD address OR [Futures] for Address.ANY
|
} # Futures, by BD address OR [Futures] for Address.ANY
|
||||||
|
self.extended_advertising_handles = set()
|
||||||
|
|
||||||
# Own address type cache
|
# Own address type cache
|
||||||
self.advertising_own_address_type = None
|
self.advertising_own_address_type = None
|
||||||
@@ -1536,6 +1548,149 @@ class Device(CompositeEventEmitter):
|
|||||||
self.advertising = False
|
self.advertising = False
|
||||||
self.auto_restart_advertising = False
|
self.auto_restart_advertising = False
|
||||||
|
|
||||||
|
@experimental('Extended Advertising is still experimental - Might be changed soon.')
|
||||||
|
async def start_extended_advertising(
|
||||||
|
self,
|
||||||
|
advertising_properties: HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties = HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties.CONNECTABLE_ADVERTISING,
|
||||||
|
target: Address = Address.ANY,
|
||||||
|
own_address_type: int = OwnAddressType.RANDOM,
|
||||||
|
scan_response: Optional[bytes] = None,
|
||||||
|
advertising_data: Optional[bytes] = None,
|
||||||
|
) -> int:
|
||||||
|
"""Starts an extended advertising set.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
advertising_properties: Properties to pass in HCI_LE_Set_Extended_Advertising_Parameters_Command
|
||||||
|
target: Directed advertising target. Directed property should be set in advertising_properties arg.
|
||||||
|
own_address_type: own address type to use in the advertising.
|
||||||
|
scan_response: raw scan response. When a non-none value is set, HCI_LE_Set_Extended_Scan_Response_Data_Command will be sent.
|
||||||
|
advertising_data: raw advertising data. When a non-none value is set, HCI_LE_Set_Advertising_Set_Random_Address_Command will be sent.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Handle of the new advertising set.
|
||||||
|
"""
|
||||||
|
|
||||||
|
adv_handle = -1
|
||||||
|
# Find a free handle
|
||||||
|
for i in range(
|
||||||
|
DEVICE_MIN_EXTENDED_ADVERTISING_SET_HANDLE,
|
||||||
|
DEVICE_MAX_EXTENDED_ADVERTISING_SET_HANDLE + 1,
|
||||||
|
):
|
||||||
|
if i not in self.extended_advertising_handles:
|
||||||
|
adv_handle = i
|
||||||
|
break
|
||||||
|
|
||||||
|
if adv_handle == -1:
|
||||||
|
raise InvalidStateError('No available advertising set.')
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Set the advertising parameters
|
||||||
|
await self.send_command(
|
||||||
|
HCI_LE_Set_Extended_Advertising_Parameters_Command(
|
||||||
|
advertising_handle=adv_handle,
|
||||||
|
advertising_event_properties=advertising_properties,
|
||||||
|
primary_advertising_interval_min=self.advertising_interval_min,
|
||||||
|
primary_advertising_interval_max=self.advertising_interval_max,
|
||||||
|
primary_advertising_channel_map=(
|
||||||
|
HCI_LE_Set_Extended_Advertising_Parameters_Command.ChannelMap.CHANNEL_37
|
||||||
|
| HCI_LE_Set_Extended_Advertising_Parameters_Command.ChannelMap.CHANNEL_38
|
||||||
|
| HCI_LE_Set_Extended_Advertising_Parameters_Command.ChannelMap.CHANNEL_39
|
||||||
|
),
|
||||||
|
own_address_type=own_address_type,
|
||||||
|
peer_address_type=target.address_type,
|
||||||
|
peer_address=target,
|
||||||
|
advertising_tx_power=7,
|
||||||
|
advertising_filter_policy=0,
|
||||||
|
primary_advertising_phy=1, # LE 1M
|
||||||
|
secondary_advertising_max_skip=0,
|
||||||
|
secondary_advertising_phy=1, # LE 1M
|
||||||
|
advertising_sid=0,
|
||||||
|
scan_request_notification_enable=0,
|
||||||
|
), # type: ignore[call-arg]
|
||||||
|
check_result=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Set the advertising data if present
|
||||||
|
if advertising_data is not None:
|
||||||
|
await self.send_command(
|
||||||
|
HCI_LE_Set_Extended_Advertising_Data_Command(
|
||||||
|
advertising_handle=adv_handle,
|
||||||
|
operation=HCI_LE_Set_Extended_Advertising_Data_Command.Operation.COMPLETE_DATA,
|
||||||
|
fragment_preference=0x01, # Should not fragment
|
||||||
|
advertising_data=advertising_data,
|
||||||
|
), # type: ignore[call-arg]
|
||||||
|
check_result=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Set the scan response if present
|
||||||
|
if scan_response is not None:
|
||||||
|
await self.send_command(
|
||||||
|
HCI_LE_Set_Extended_Scan_Response_Data_Command(
|
||||||
|
advertising_handle=adv_handle,
|
||||||
|
operation=HCI_LE_Set_Extended_Advertising_Data_Command.Operation.COMPLETE_DATA,
|
||||||
|
fragment_preference=0x01, # Should not fragment
|
||||||
|
scan_response_data=scan_response,
|
||||||
|
), # type: ignore[call-arg]
|
||||||
|
check_result=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
if own_address_type in (
|
||||||
|
OwnAddressType.RANDOM,
|
||||||
|
OwnAddressType.RESOLVABLE_OR_RANDOM,
|
||||||
|
):
|
||||||
|
await self.send_command(
|
||||||
|
HCI_LE_Set_Advertising_Set_Random_Address_Command(
|
||||||
|
advertising_handle=adv_handle,
|
||||||
|
random_address=self.random_address,
|
||||||
|
), # type: ignore[call-arg]
|
||||||
|
check_result=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Enable advertising
|
||||||
|
await self.send_command(
|
||||||
|
HCI_LE_Set_Extended_Advertising_Enable_Command(
|
||||||
|
enable=1,
|
||||||
|
advertising_handles=[adv_handle],
|
||||||
|
durations=[0], # Forever
|
||||||
|
max_extended_advertising_events=[0], # Infinite
|
||||||
|
), # type: ignore[call-arg]
|
||||||
|
check_result=True,
|
||||||
|
)
|
||||||
|
except HCI_Error as error:
|
||||||
|
# When any step fails, cleanup the advertising handle.
|
||||||
|
await self.send_command(
|
||||||
|
HCI_LE_Remove_Advertising_Set_Command(advertising_handle=adv_handle), # type: ignore[call-arg]
|
||||||
|
check_result=False,
|
||||||
|
)
|
||||||
|
raise error
|
||||||
|
|
||||||
|
self.extended_advertising_handles.add(adv_handle)
|
||||||
|
return adv_handle
|
||||||
|
|
||||||
|
@experimental('Extended Advertising is still experimental - Might be changed soon.')
|
||||||
|
async def stop_extended_advertising(self, adv_handle: int) -> None:
|
||||||
|
"""Stops an extended advertising set.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
adv_handle: Handle of the advertising set to stop.
|
||||||
|
"""
|
||||||
|
# Disable advertising
|
||||||
|
await self.send_command(
|
||||||
|
HCI_LE_Set_Extended_Advertising_Enable_Command(
|
||||||
|
enable=0,
|
||||||
|
advertising_handles=[adv_handle],
|
||||||
|
durations=[0],
|
||||||
|
max_extended_advertising_events=[0],
|
||||||
|
), # type: ignore[call-arg]
|
||||||
|
check_result=True,
|
||||||
|
)
|
||||||
|
# Remove advertising set
|
||||||
|
await self.send_command(
|
||||||
|
HCI_LE_Remove_Advertising_Set_Command(advertising_handle=adv_handle), # type: ignore[call-arg]
|
||||||
|
check_result=True,
|
||||||
|
)
|
||||||
|
self.extended_advertising_handles.remove(adv_handle)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def is_advertising(self):
|
def is_advertising(self):
|
||||||
return self.advertising
|
return self.advertising
|
||||||
|
|||||||
110
bumble/hci.py
110
bumble/hci.py
@@ -3829,8 +3829,10 @@ class HCI_LE_Set_Advertising_Set_Random_Address_Command(HCI_Command):
|
|||||||
'advertising_event_properties',
|
'advertising_event_properties',
|
||||||
{
|
{
|
||||||
'size': 2,
|
'size': 2,
|
||||||
'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Parameters_Command.advertising_properties_string(
|
'mapper': lambda x: str(
|
||||||
x
|
HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties(
|
||||||
|
x
|
||||||
|
)
|
||||||
),
|
),
|
||||||
},
|
},
|
||||||
),
|
),
|
||||||
@@ -3840,8 +3842,8 @@ class HCI_LE_Set_Advertising_Set_Random_Address_Command(HCI_Command):
|
|||||||
'primary_advertising_channel_map',
|
'primary_advertising_channel_map',
|
||||||
{
|
{
|
||||||
'size': 1,
|
'size': 1,
|
||||||
'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Parameters_Command.channel_map_string(
|
'mapper': lambda x: str(
|
||||||
x
|
HCI_LE_Set_Extended_Advertising_Parameters_Command.ChannelMap(x)
|
||||||
),
|
),
|
||||||
},
|
},
|
||||||
),
|
),
|
||||||
@@ -3863,38 +3865,33 @@ class HCI_LE_Set_Extended_Advertising_Parameters_Command(HCI_Command):
|
|||||||
See Bluetooth spec @ 7.8.53 LE Set Extended Advertising Parameters Command
|
See Bluetooth spec @ 7.8.53 LE Set Extended Advertising Parameters Command
|
||||||
'''
|
'''
|
||||||
|
|
||||||
CONNECTABLE_ADVERTISING = 0
|
class AdvertisingProperties(enum.IntFlag):
|
||||||
SCANNABLE_ADVERTISING = 1
|
CONNECTABLE_ADVERTISING = 1 << 0
|
||||||
DIRECTED_ADVERTISING = 2
|
SCANNABLE_ADVERTISING = 1 << 1
|
||||||
HIGH_DUTY_CYCLE_DIRECTED_CONNECTABLE_ADVERTISING = 3
|
DIRECTED_ADVERTISING = 1 << 2
|
||||||
USE_LEGACY_ADVERTISING_PDUS = 4
|
HIGH_DUTY_CYCLE_DIRECTED_CONNECTABLE_ADVERTISING = 1 << 3
|
||||||
ANONYMOUS_ADVERTISING = 5
|
USE_LEGACY_ADVERTISING_PDUS = 1 << 4
|
||||||
INCLUDE_TX_POWER = 6
|
ANONYMOUS_ADVERTISING = 1 << 5
|
||||||
|
INCLUDE_TX_POWER = 1 << 6
|
||||||
|
|
||||||
ADVERTISING_PROPERTIES_NAMES = (
|
def __str__(self) -> str:
|
||||||
'CONNECTABLE_ADVERTISING',
|
return '|'.join(
|
||||||
'SCANNABLE_ADVERTISING',
|
flag.name
|
||||||
'DIRECTED_ADVERTISING',
|
for flag in HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties
|
||||||
'HIGH_DUTY_CYCLE_DIRECTED_CONNECTABLE_ADVERTISING',
|
if self.value & flag.value and flag.name is not None
|
||||||
'USE_LEGACY_ADVERTISING_PDUS',
|
)
|
||||||
'ANONYMOUS_ADVERTISING',
|
|
||||||
'INCLUDE_TX_POWER',
|
|
||||||
)
|
|
||||||
|
|
||||||
CHANNEL_37 = 0
|
class ChannelMap(enum.IntFlag):
|
||||||
CHANNEL_38 = 1
|
CHANNEL_37 = 1 << 0
|
||||||
CHANNEL_39 = 2
|
CHANNEL_38 = 1 << 1
|
||||||
|
CHANNEL_39 = 1 << 2
|
||||||
|
|
||||||
CHANNEL_NAMES = ('37', '38', '39')
|
def __str__(self) -> str:
|
||||||
|
return '|'.join(
|
||||||
@classmethod
|
flag.name
|
||||||
def advertising_properties_string(cls, properties):
|
for flag in HCI_LE_Set_Extended_Advertising_Parameters_Command.ChannelMap
|
||||||
# pylint: disable=line-too-long
|
if self.value & flag.value and flag.name is not None
|
||||||
return f'[{",".join(bit_flags_to_strings(properties, cls.ADVERTISING_PROPERTIES_NAMES))}]'
|
)
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def channel_map_string(cls, channel_map):
|
|
||||||
return f'[{",".join(bit_flags_to_strings(channel_map, cls.CHANNEL_NAMES))}]'
|
|
||||||
|
|
||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
@@ -3906,9 +3903,9 @@ class HCI_LE_Set_Extended_Advertising_Parameters_Command(HCI_Command):
|
|||||||
'operation',
|
'operation',
|
||||||
{
|
{
|
||||||
'size': 1,
|
'size': 1,
|
||||||
'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Data_Command.operation_name(
|
'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Data_Command.Operation(
|
||||||
x
|
x
|
||||||
),
|
).name,
|
||||||
},
|
},
|
||||||
),
|
),
|
||||||
('fragment_preference', 1),
|
('fragment_preference', 1),
|
||||||
@@ -3926,23 +3923,12 @@ class HCI_LE_Set_Extended_Advertising_Data_Command(HCI_Command):
|
|||||||
See Bluetooth spec @ 7.8.54 LE Set Extended Advertising Data Command
|
See Bluetooth spec @ 7.8.54 LE Set Extended Advertising Data Command
|
||||||
'''
|
'''
|
||||||
|
|
||||||
INTERMEDIATE_FRAGMENT = 0x00
|
class Operation(enum.IntEnum):
|
||||||
FIRST_FRAGMENT = 0x01
|
INTERMEDIATE_FRAGMENT = 0x00
|
||||||
LAST_FRAGMENT = 0x02
|
FIRST_FRAGMENT = 0x01
|
||||||
COMPLETE_DATA = 0x03
|
LAST_FRAGMENT = 0x02
|
||||||
UNCHANGED_DATA = 0x04
|
COMPLETE_DATA = 0x03
|
||||||
|
UNCHANGED_DATA = 0x04
|
||||||
OPERATION_NAMES = {
|
|
||||||
INTERMEDIATE_FRAGMENT: 'INTERMEDIATE_FRAGMENT',
|
|
||||||
FIRST_FRAGMENT: 'FIRST_FRAGMENT',
|
|
||||||
LAST_FRAGMENT: 'LAST_FRAGMENT',
|
|
||||||
COMPLETE_DATA: 'COMPLETE_DATA',
|
|
||||||
UNCHANGED_DATA: 'UNCHANGED_DATA',
|
|
||||||
}
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def operation_name(cls, operation):
|
|
||||||
return name_or_number(cls.OPERATION_NAMES, operation)
|
|
||||||
|
|
||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
@@ -3954,9 +3940,9 @@ class HCI_LE_Set_Extended_Advertising_Data_Command(HCI_Command):
|
|||||||
'operation',
|
'operation',
|
||||||
{
|
{
|
||||||
'size': 1,
|
'size': 1,
|
||||||
'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Data_Command.operation_name(
|
'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Data_Command.Operation(
|
||||||
x
|
x
|
||||||
),
|
).name,
|
||||||
},
|
},
|
||||||
),
|
),
|
||||||
('fragment_preference', 1),
|
('fragment_preference', 1),
|
||||||
@@ -3974,22 +3960,6 @@ class HCI_LE_Set_Extended_Scan_Response_Data_Command(HCI_Command):
|
|||||||
See Bluetooth spec @ 7.8.55 LE Set Extended Scan Response Data Command
|
See Bluetooth spec @ 7.8.55 LE Set Extended Scan Response Data Command
|
||||||
'''
|
'''
|
||||||
|
|
||||||
INTERMEDIATE_FRAGMENT = 0x00
|
|
||||||
FIRST_FRAGMENT = 0x01
|
|
||||||
LAST_FRAGMENT = 0x02
|
|
||||||
COMPLETE_DATA = 0x03
|
|
||||||
|
|
||||||
OPERATION_NAMES = {
|
|
||||||
INTERMEDIATE_FRAGMENT: 'INTERMEDIATE_FRAGMENT',
|
|
||||||
FIRST_FRAGMENT: 'FIRST_FRAGMENT',
|
|
||||||
LAST_FRAGMENT: 'LAST_FRAGMENT',
|
|
||||||
COMPLETE_DATA: 'COMPLETE_DATA',
|
|
||||||
}
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def operation_name(cls, operation):
|
|
||||||
return name_or_number(cls.OPERATION_NAMES, operation)
|
|
||||||
|
|
||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
@HCI_Command.command(
|
@HCI_Command.command(
|
||||||
|
|||||||
109
bumble/smp.py
109
bumble/smp.py
@@ -187,8 +187,8 @@ SMP_KEYPRESS_AUTHREQ = 0b00010000
|
|||||||
SMP_CT2_AUTHREQ = 0b00100000
|
SMP_CT2_AUTHREQ = 0b00100000
|
||||||
|
|
||||||
# Crypto salt
|
# Crypto salt
|
||||||
SMP_CTKD_H7_LEBR_SALT = bytes.fromhex('00000000000000000000000000000000746D7031')
|
SMP_CTKD_H7_LEBR_SALT = bytes.fromhex('000000000000000000000000746D7031')
|
||||||
SMP_CTKD_H7_BRLE_SALT = bytes.fromhex('00000000000000000000000000000000746D7032')
|
SMP_CTKD_H7_BRLE_SALT = bytes.fromhex('000000000000000000000000746D7032')
|
||||||
|
|
||||||
# fmt: on
|
# fmt: on
|
||||||
# pylint: enable=line-too-long
|
# pylint: enable=line-too-long
|
||||||
@@ -579,7 +579,7 @@ class OobContext:
|
|||||||
self.r = crypto.r() if r is None else r
|
self.r = crypto.r() if r is None else r
|
||||||
|
|
||||||
def share(self) -> OobSharedData:
|
def share(self) -> OobSharedData:
|
||||||
pkx = bytes(reversed(self.ecc_key.x))
|
pkx = self.ecc_key.x[::-1]
|
||||||
return OobSharedData(c=crypto.f4(pkx, pkx, self.r, bytes(1)), r=self.r)
|
return OobSharedData(c=crypto.f4(pkx, pkx, self.r, bytes(1)), r=self.r)
|
||||||
|
|
||||||
|
|
||||||
@@ -677,6 +677,13 @@ class Session:
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ea: bytes
|
||||||
|
eb: bytes
|
||||||
|
ltk: bytes
|
||||||
|
preq: bytes
|
||||||
|
pres: bytes
|
||||||
|
tk: bytes
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
manager: Manager,
|
manager: Manager,
|
||||||
@@ -686,15 +693,10 @@ class Session:
|
|||||||
) -> None:
|
) -> None:
|
||||||
self.manager = manager
|
self.manager = manager
|
||||||
self.connection = connection
|
self.connection = connection
|
||||||
self.preq: Optional[bytes] = None
|
|
||||||
self.pres: Optional[bytes] = None
|
|
||||||
self.ea = None
|
|
||||||
self.eb = None
|
|
||||||
self.stk = None
|
self.stk = None
|
||||||
self.ltk = None
|
|
||||||
self.ltk_ediv = 0
|
self.ltk_ediv = 0
|
||||||
self.ltk_rand = bytes(8)
|
self.ltk_rand = bytes(8)
|
||||||
self.link_key = None
|
self.link_key: Optional[bytes] = None
|
||||||
self.initiator_key_distribution: int = 0
|
self.initiator_key_distribution: int = 0
|
||||||
self.responder_key_distribution: int = 0
|
self.responder_key_distribution: int = 0
|
||||||
self.peer_random_value: Optional[bytes] = None
|
self.peer_random_value: Optional[bytes] = None
|
||||||
@@ -787,9 +789,7 @@ class Session:
|
|||||||
)
|
)
|
||||||
self.r = pairing_config.oob.our_context.r
|
self.r = pairing_config.oob.our_context.r
|
||||||
self.ecc_key = pairing_config.oob.our_context.ecc_key
|
self.ecc_key = pairing_config.oob.our_context.ecc_key
|
||||||
if pairing_config.oob.legacy_context is None:
|
if pairing_config.oob.legacy_context is not None:
|
||||||
self.tk = None
|
|
||||||
else:
|
|
||||||
self.tk = pairing_config.oob.legacy_context.tk
|
self.tk = pairing_config.oob.legacy_context.tk
|
||||||
else:
|
else:
|
||||||
if pairing_config.oob.legacy_context is None:
|
if pairing_config.oob.legacy_context is None:
|
||||||
@@ -807,7 +807,7 @@ class Session:
|
|||||||
|
|
||||||
@property
|
@property
|
||||||
def pkx(self) -> Tuple[bytes, bytes]:
|
def pkx(self) -> Tuple[bytes, bytes]:
|
||||||
return (bytes(reversed(self.ecc_key.x)), self.peer_public_key_x)
|
return (self.ecc_key.x[::-1], self.peer_public_key_x)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def pka(self) -> bytes:
|
def pka(self) -> bytes:
|
||||||
@@ -1061,8 +1061,8 @@ class Session:
|
|||||||
def send_public_key_command(self) -> None:
|
def send_public_key_command(self) -> None:
|
||||||
self.send_command(
|
self.send_command(
|
||||||
SMP_Pairing_Public_Key_Command(
|
SMP_Pairing_Public_Key_Command(
|
||||||
public_key_x=bytes(reversed(self.ecc_key.x)),
|
public_key_x=self.ecc_key.x[::-1],
|
||||||
public_key_y=bytes(reversed(self.ecc_key.y)),
|
public_key_y=self.ecc_key.y[::-1],
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -1098,15 +1098,52 @@ class Session:
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
async def derive_ltk(self) -> None:
|
@classmethod
|
||||||
link_key = await self.manager.device.get_link_key(self.connection.peer_address)
|
def derive_ltk(cls, link_key: bytes, ct2: bool) -> bytes:
|
||||||
assert link_key is not None
|
'''Derives Long Term Key from Link Key.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
link_key: BR/EDR Link Key bytes in little-endian.
|
||||||
|
ct2: whether ct2 is supported on both devices.
|
||||||
|
Returns:
|
||||||
|
LE Long Tern Key bytes in little-endian.
|
||||||
|
'''
|
||||||
ilk = (
|
ilk = (
|
||||||
crypto.h7(salt=SMP_CTKD_H7_BRLE_SALT, w=link_key)
|
crypto.h7(salt=SMP_CTKD_H7_BRLE_SALT, w=link_key)
|
||||||
if self.ct2
|
if ct2
|
||||||
else crypto.h6(link_key, b'tmp2')
|
else crypto.h6(link_key, b'tmp2')
|
||||||
)
|
)
|
||||||
self.ltk = crypto.h6(ilk, b'brle')
|
return crypto.h6(ilk, b'brle')
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def derive_link_key(cls, ltk: bytes, ct2: bool) -> bytes:
|
||||||
|
'''Derives Link Key from Long Term Key.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
ltk: LE Long Term Key bytes in little-endian.
|
||||||
|
ct2: whether ct2 is supported on both devices.
|
||||||
|
Returns:
|
||||||
|
BR/EDR Link Key bytes in little-endian.
|
||||||
|
'''
|
||||||
|
ilk = (
|
||||||
|
crypto.h7(salt=SMP_CTKD_H7_LEBR_SALT, w=ltk)
|
||||||
|
if ct2
|
||||||
|
else crypto.h6(ltk, b'tmp1')
|
||||||
|
)
|
||||||
|
return crypto.h6(ilk, b'lebr')
|
||||||
|
|
||||||
|
async def get_link_key_and_derive_ltk(self) -> None:
|
||||||
|
'''Retrieves BR/EDR Link Key from storage and derive it to LE LTK.'''
|
||||||
|
link_key = await self.manager.device.get_link_key(self.connection.peer_address)
|
||||||
|
if link_key is None:
|
||||||
|
logging.warning(
|
||||||
|
'Try to derive LTK but host does not have the LK. Send a SMP_PAIRING_FAILED but the procedure will not be paused!'
|
||||||
|
)
|
||||||
|
self.send_pairing_failed(
|
||||||
|
SMP_CROSS_TRANSPORT_KEY_DERIVATION_NOT_ALLOWED_ERROR
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
self.ltk = self.derive_ltk(link_key, self.ct2)
|
||||||
|
|
||||||
def distribute_keys(self) -> None:
|
def distribute_keys(self) -> None:
|
||||||
# Distribute the keys as required
|
# Distribute the keys as required
|
||||||
@@ -1117,7 +1154,7 @@ class Session:
|
|||||||
and self.initiator_key_distribution & SMP_ENC_KEY_DISTRIBUTION_FLAG
|
and self.initiator_key_distribution & SMP_ENC_KEY_DISTRIBUTION_FLAG
|
||||||
):
|
):
|
||||||
self.ctkd_task = self.connection.abort_on(
|
self.ctkd_task = self.connection.abort_on(
|
||||||
'disconnection', self.derive_ltk()
|
'disconnection', self.get_link_key_and_derive_ltk()
|
||||||
)
|
)
|
||||||
elif not self.sc:
|
elif not self.sc:
|
||||||
# Distribute the LTK, EDIV and RAND
|
# Distribute the LTK, EDIV and RAND
|
||||||
@@ -1147,12 +1184,7 @@ class Session:
|
|||||||
|
|
||||||
# CTKD, calculate BR/EDR link key
|
# CTKD, calculate BR/EDR link key
|
||||||
if self.initiator_key_distribution & SMP_LINK_KEY_DISTRIBUTION_FLAG:
|
if self.initiator_key_distribution & SMP_LINK_KEY_DISTRIBUTION_FLAG:
|
||||||
ilk = (
|
self.link_key = self.derive_link_key(self.ltk, self.ct2)
|
||||||
crypto.h7(salt=SMP_CTKD_H7_LEBR_SALT, w=self.ltk)
|
|
||||||
if self.ct2
|
|
||||||
else crypto.h6(self.ltk, b'tmp1')
|
|
||||||
)
|
|
||||||
self.link_key = crypto.h6(ilk, b'lebr')
|
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# CTKD: Derive LTK from LinkKey
|
# CTKD: Derive LTK from LinkKey
|
||||||
@@ -1161,7 +1193,7 @@ class Session:
|
|||||||
and self.responder_key_distribution & SMP_ENC_KEY_DISTRIBUTION_FLAG
|
and self.responder_key_distribution & SMP_ENC_KEY_DISTRIBUTION_FLAG
|
||||||
):
|
):
|
||||||
self.ctkd_task = self.connection.abort_on(
|
self.ctkd_task = self.connection.abort_on(
|
||||||
'disconnection', self.derive_ltk()
|
'disconnection', self.get_link_key_and_derive_ltk()
|
||||||
)
|
)
|
||||||
# Distribute the LTK, EDIV and RAND
|
# Distribute the LTK, EDIV and RAND
|
||||||
elif not self.sc:
|
elif not self.sc:
|
||||||
@@ -1191,12 +1223,7 @@ class Session:
|
|||||||
|
|
||||||
# CTKD, calculate BR/EDR link key
|
# CTKD, calculate BR/EDR link key
|
||||||
if self.responder_key_distribution & SMP_LINK_KEY_DISTRIBUTION_FLAG:
|
if self.responder_key_distribution & SMP_LINK_KEY_DISTRIBUTION_FLAG:
|
||||||
ilk = (
|
self.link_key = self.derive_link_key(self.ltk, self.ct2)
|
||||||
crypto.h7(salt=SMP_CTKD_H7_LEBR_SALT, w=self.ltk)
|
|
||||||
if self.ct2
|
|
||||||
else crypto.h6(self.ltk, b'tmp1')
|
|
||||||
)
|
|
||||||
self.link_key = crypto.h6(ilk, b'lebr')
|
|
||||||
|
|
||||||
def compute_peer_expected_distributions(self, key_distribution_flags: int) -> None:
|
def compute_peer_expected_distributions(self, key_distribution_flags: int) -> None:
|
||||||
# Set our expectations for what to wait for in the key distribution phase
|
# Set our expectations for what to wait for in the key distribution phase
|
||||||
@@ -1754,14 +1781,10 @@ class Session:
|
|||||||
self.peer_public_key_y = command.public_key_y
|
self.peer_public_key_y = command.public_key_y
|
||||||
|
|
||||||
# Compute the DH key
|
# Compute the DH key
|
||||||
self.dh_key = bytes(
|
self.dh_key = self.ecc_key.dh(
|
||||||
reversed(
|
command.public_key_x[::-1],
|
||||||
self.ecc_key.dh(
|
command.public_key_y[::-1],
|
||||||
bytes(reversed(command.public_key_x)),
|
)[::-1]
|
||||||
bytes(reversed(command.public_key_y)),
|
|
||||||
)
|
|
||||||
)
|
|
||||||
)
|
|
||||||
logger.debug(f'DH key: {self.dh_key.hex()}')
|
logger.debug(f'DH key: {self.dh_key.hex()}')
|
||||||
|
|
||||||
if self.pairing_method == PairingMethod.OOB:
|
if self.pairing_method == PairingMethod.OOB:
|
||||||
@@ -1824,7 +1847,6 @@ class Session:
|
|||||||
else:
|
else:
|
||||||
self.send_pairing_dhkey_check_command()
|
self.send_pairing_dhkey_check_command()
|
||||||
else:
|
else:
|
||||||
assert self.ltk
|
|
||||||
self.start_encryption(self.ltk)
|
self.start_encryption(self.ltk)
|
||||||
|
|
||||||
def on_smp_pairing_failed_command(
|
def on_smp_pairing_failed_command(
|
||||||
@@ -1874,6 +1896,7 @@ class Manager(EventEmitter):
|
|||||||
sessions: Dict[int, Session]
|
sessions: Dict[int, Session]
|
||||||
pairing_config_factory: Callable[[Connection], PairingConfig]
|
pairing_config_factory: Callable[[Connection], PairingConfig]
|
||||||
session_proxy: Type[Session]
|
session_proxy: Type[Session]
|
||||||
|
_ecc_key: Optional[crypto.EccKey]
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
|
|||||||
@@ -432,7 +432,7 @@ def wrap_async(function):
|
|||||||
|
|
||||||
def deprecated(msg: str):
|
def deprecated(msg: str):
|
||||||
"""
|
"""
|
||||||
Throw deprecation warning before execution
|
Throw deprecation warning before execution.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def wrapper(function):
|
def wrapper(function):
|
||||||
@@ -444,3 +444,19 @@ def deprecated(msg: str):
|
|||||||
return inner
|
return inner
|
||||||
|
|
||||||
return wrapper
|
return wrapper
|
||||||
|
|
||||||
|
|
||||||
|
def experimental(msg: str):
|
||||||
|
"""
|
||||||
|
Throws a future warning before execution.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def wrapper(function):
|
||||||
|
@wraps(function)
|
||||||
|
def inner(*args, **kwargs):
|
||||||
|
warnings.warn(msg, FutureWarning)
|
||||||
|
return function(*args, **kwargs)
|
||||||
|
|
||||||
|
return inner
|
||||||
|
|
||||||
|
return wrapper
|
||||||
|
|||||||
69
examples/run_extended_advertiser.py
Normal file
69
examples/run_extended_advertiser.py
Normal file
@@ -0,0 +1,69 @@
|
|||||||
|
# Copyright 2021-2022 Google LLC
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# https://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
# Imports
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
|
import sys
|
||||||
|
import os
|
||||||
|
from bumble.device import AdvertisingType, Device
|
||||||
|
from bumble.hci import Address, HCI_LE_Set_Extended_Advertising_Parameters_Command
|
||||||
|
|
||||||
|
from bumble.transport import open_transport_or_link
|
||||||
|
|
||||||
|
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
async def main() -> None:
|
||||||
|
if len(sys.argv) < 3:
|
||||||
|
print(
|
||||||
|
'Usage: run_extended_advertiser.py <config-file> <transport-spec> [type] [address]'
|
||||||
|
)
|
||||||
|
print('example: run_extended_advertiser.py device1.json usb:0')
|
||||||
|
return
|
||||||
|
|
||||||
|
if len(sys.argv) >= 4:
|
||||||
|
advertising_properties = (
|
||||||
|
HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties(
|
||||||
|
int(sys.argv[3])
|
||||||
|
)
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
advertising_properties = (
|
||||||
|
HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties.CONNECTABLE_ADVERTISING
|
||||||
|
)
|
||||||
|
|
||||||
|
if len(sys.argv) >= 5:
|
||||||
|
target = Address(sys.argv[4])
|
||||||
|
else:
|
||||||
|
target = Address.ANY
|
||||||
|
|
||||||
|
print('<<< connecting to HCI...')
|
||||||
|
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
|
||||||
|
print('<<< connected')
|
||||||
|
|
||||||
|
device = Device.from_config_file_with_hci(
|
||||||
|
sys.argv[1], hci_transport.source, hci_transport.sink
|
||||||
|
)
|
||||||
|
await device.power_on()
|
||||||
|
await device.start_extended_advertising(
|
||||||
|
advertising_properties=advertising_properties, target=target
|
||||||
|
)
|
||||||
|
await hci_transport.source.terminated
|
||||||
|
|
||||||
|
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
|
||||||
|
asyncio.run(main())
|
||||||
@@ -21,7 +21,7 @@ import logging
|
|||||||
import os
|
import os
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from unittest.mock import MagicMock, patch
|
from unittest.mock import AsyncMock, MagicMock, patch
|
||||||
|
|
||||||
from bumble.controller import Controller
|
from bumble.controller import Controller
|
||||||
from bumble.core import BT_BR_EDR_TRANSPORT, BT_PERIPHERAL_ROLE, BT_CENTRAL_ROLE
|
from bumble.core import BT_BR_EDR_TRANSPORT, BT_PERIPHERAL_ROLE, BT_CENTRAL_ROLE
|
||||||
@@ -38,7 +38,6 @@ from bumble.smp import (
|
|||||||
OobLegacyContext,
|
OobLegacyContext,
|
||||||
)
|
)
|
||||||
from bumble.core import ProtocolError
|
from bumble.core import ProtocolError
|
||||||
from bumble.hci import HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE
|
|
||||||
from bumble.keys import PairingKeys
|
from bumble.keys import PairingKeys
|
||||||
|
|
||||||
|
|
||||||
@@ -519,16 +518,8 @@ async def test_self_smp_over_classic():
|
|||||||
# Mock connection
|
# Mock connection
|
||||||
# TODO: Implement Classic SSP and encryption in link relayer
|
# TODO: Implement Classic SSP and encryption in link relayer
|
||||||
LINK_KEY = bytes.fromhex('287ad379dca402530a39f1f43047b835')
|
LINK_KEY = bytes.fromhex('287ad379dca402530a39f1f43047b835')
|
||||||
two_devices.devices[0].on_link_key(
|
two_devices.devices[0].get_link_key = AsyncMock(return_value=LINK_KEY)
|
||||||
two_devices.devices[1].public_address,
|
two_devices.devices[1].get_link_key = AsyncMock(return_value=LINK_KEY)
|
||||||
LINK_KEY,
|
|
||||||
HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE,
|
|
||||||
)
|
|
||||||
two_devices.devices[1].on_link_key(
|
|
||||||
two_devices.devices[0].public_address,
|
|
||||||
LINK_KEY,
|
|
||||||
HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE,
|
|
||||||
)
|
|
||||||
two_devices.connections[0].encryption = 1
|
two_devices.connections[0].encryption = 1
|
||||||
two_devices.connections[1].encryption = 1
|
two_devices.connections[1].encryption = 1
|
||||||
|
|
||||||
|
|||||||
@@ -16,6 +16,9 @@
|
|||||||
# Imports
|
# Imports
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from bumble import smp
|
||||||
from bumble.crypto import EccKey, aes_cmac, ah, c1, f4, f5, f6, g2, h6, h7, s1
|
from bumble.crypto import EccKey, aes_cmac, ah, c1, f4, f5, f6, g2, h6, h7, s1
|
||||||
from bumble.pairing import OobData, OobSharedData, LeRole
|
from bumble.pairing import OobData, OobSharedData, LeRole
|
||||||
from bumble.hci import Address
|
from bumble.hci import Address
|
||||||
@@ -28,8 +31,8 @@ from bumble.core import AdvertisingData
|
|||||||
|
|
||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
def reversed_hex(hex_str):
|
def reversed_hex(hex_str: str) -> bytes:
|
||||||
return bytes(reversed(bytes.fromhex(hex_str)))
|
return bytes.fromhex(hex_str)[::-1]
|
||||||
|
|
||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
@@ -129,112 +132,79 @@ def test_aes_cmac():
|
|||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
def test_f4():
|
def test_f4():
|
||||||
u = bytes(
|
u = reversed_hex(
|
||||||
reversed(
|
'20b003d2 f297be2c 5e2c83a7 e9f9a5b9 eff49111 acf4fddb cc030148 0e359de6'
|
||||||
bytes.fromhex(
|
|
||||||
'20b003d2 f297be2c 5e2c83a7 e9f9a5b9'
|
|
||||||
+ 'eff49111 acf4fddb cc030148 0e359de6'
|
|
||||||
)
|
|
||||||
)
|
|
||||||
)
|
)
|
||||||
v = bytes(
|
v = reversed_hex(
|
||||||
reversed(
|
'55188b3d 32f6bb9a 900afcfb eed4e72a 59cb9ac2 f19d7cfb 6b4fdd49 f47fc5fd'
|
||||||
bytes.fromhex(
|
|
||||||
'55188b3d 32f6bb9a 900afcfb eed4e72a'
|
|
||||||
+ '59cb9ac2 f19d7cfb 6b4fdd49 f47fc5fd'
|
|
||||||
)
|
|
||||||
)
|
|
||||||
)
|
)
|
||||||
x = bytes(reversed(bytes.fromhex('d5cb8454 d177733e ffffb2ec 712baeab')))
|
x = reversed_hex('d5cb8454 d177733e ffffb2ec 712baeab')
|
||||||
z = bytes([0])
|
z = b'\0'
|
||||||
value = f4(u, v, x, z)
|
value = f4(u, v, x, z)
|
||||||
assert bytes(reversed(value)) == bytes.fromhex(
|
assert value == reversed_hex('f2c916f1 07a9bd1c f1eda1be a974872d')
|
||||||
'f2c916f1 07a9bd1c f1eda1be a974872d'
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
def test_f5():
|
def test_f5():
|
||||||
w = bytes(
|
w = reversed_hex(
|
||||||
reversed(
|
'ec0234a3 57c8ad05 341010a6 0a397d9b 99796b13 b4f866f1 868d34f3 73bfa698'
|
||||||
bytes.fromhex(
|
|
||||||
'ec0234a3 57c8ad05 341010a6 0a397d9b'
|
|
||||||
+ '99796b13 b4f866f1 868d34f3 73bfa698'
|
|
||||||
)
|
|
||||||
)
|
|
||||||
)
|
)
|
||||||
n1 = bytes(reversed(bytes.fromhex('d5cb8454 d177733e ffffb2ec 712baeab')))
|
n1 = reversed_hex('d5cb8454 d177733e ffffb2ec 712baeab')
|
||||||
n2 = bytes(reversed(bytes.fromhex('a6e8e7cc 25a75f6e 216583f7 ff3dc4cf')))
|
n2 = reversed_hex('a6e8e7cc 25a75f6e 216583f7 ff3dc4cf')
|
||||||
a1 = bytes(reversed(bytes.fromhex('00561237 37bfce')))
|
a1 = reversed_hex('00561237 37bfce')
|
||||||
a2 = bytes(reversed(bytes.fromhex('00a71370 2dcfc1')))
|
a2 = reversed_hex('00a71370 2dcfc1')
|
||||||
value = f5(w, n1, n2, a1, a2)
|
value = f5(w, n1, n2, a1, a2)
|
||||||
assert bytes(reversed(value[0])) == bytes.fromhex(
|
assert value[0] == reversed_hex('2965f176 a1084a02 fd3f6a20 ce636e20')
|
||||||
'2965f176 a1084a02 fd3f6a20 ce636e20'
|
assert value[1] == reversed_hex('69867911 69d7cd23 980522b5 94750a38')
|
||||||
)
|
|
||||||
assert bytes(reversed(value[1])) == bytes.fromhex(
|
|
||||||
'69867911 69d7cd23 980522b5 94750a38'
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
def test_f6():
|
def test_f6():
|
||||||
n1 = bytes(reversed(bytes.fromhex('d5cb8454 d177733e ffffb2ec 712baeab')))
|
n1 = reversed_hex('d5cb8454 d177733e ffffb2ec 712baeab')
|
||||||
n2 = bytes(reversed(bytes.fromhex('a6e8e7cc 25a75f6e 216583f7 ff3dc4cf')))
|
n2 = reversed_hex('a6e8e7cc 25a75f6e 216583f7 ff3dc4cf')
|
||||||
mac_key = bytes(reversed(bytes.fromhex('2965f176 a1084a02 fd3f6a20 ce636e20')))
|
mac_key = reversed_hex('2965f176 a1084a02 fd3f6a20 ce636e20')
|
||||||
r = bytes(reversed(bytes.fromhex('12a3343b b453bb54 08da42d2 0c2d0fc8')))
|
r = reversed_hex('12a3343b b453bb54 08da42d2 0c2d0fc8')
|
||||||
io_cap = bytes(reversed(bytes.fromhex('010102')))
|
io_cap = reversed_hex('010102')
|
||||||
a1 = bytes(reversed(bytes.fromhex('00561237 37bfce')))
|
a1 = reversed_hex('00561237 37bfce')
|
||||||
a2 = bytes(reversed(bytes.fromhex('00a71370 2dcfc1')))
|
a2 = reversed_hex('00a71370 2dcfc1')
|
||||||
value = f6(mac_key, n1, n2, r, io_cap, a1, a2)
|
value = f6(mac_key, n1, n2, r, io_cap, a1, a2)
|
||||||
assert bytes(reversed(value)) == bytes.fromhex(
|
assert value == reversed_hex('e3c47398 9cd0e8c5 d26c0b09 da958f61')
|
||||||
'e3c47398 9cd0e8c5 d26c0b09 da958f61'
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
def test_g2():
|
def test_g2():
|
||||||
u = bytes(
|
u = reversed_hex(
|
||||||
reversed(
|
'20b003d2 f297be2c 5e2c83a7 e9f9a5b9 eff49111 acf4fddb cc030148 0e359de6'
|
||||||
bytes.fromhex(
|
|
||||||
'20b003d2 f297be2c 5e2c83a7 e9f9a5b9'
|
|
||||||
+ 'eff49111 acf4fddb cc030148 0e359de6'
|
|
||||||
)
|
|
||||||
)
|
|
||||||
)
|
)
|
||||||
v = bytes(
|
v = reversed_hex(
|
||||||
reversed(
|
'55188b3d 32f6bb9a 900afcfb eed4e72a 59cb9ac2 f19d7cfb 6b4fdd49 f47fc5fd'
|
||||||
bytes.fromhex(
|
|
||||||
'55188b3d 32f6bb9a 900afcfb eed4e72a'
|
|
||||||
+ '59cb9ac2 f19d7cfb 6b4fdd49 f47fc5fd'
|
|
||||||
)
|
|
||||||
)
|
|
||||||
)
|
)
|
||||||
x = bytes(reversed(bytes.fromhex('d5cb8454 d177733e ffffb2ec 712baeab')))
|
x = reversed_hex('d5cb8454 d177733e ffffb2ec 712baeab')
|
||||||
y = bytes(reversed(bytes.fromhex('a6e8e7cc 25a75f6e 216583f7 ff3dc4cf')))
|
y = reversed_hex('a6e8e7cc 25a75f6e 216583f7 ff3dc4cf')
|
||||||
value = g2(u, v, x, y)
|
value = g2(u, v, x, y)
|
||||||
assert value == 0x2F9ED5BA
|
assert value == 0x2F9ED5BA
|
||||||
|
|
||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
def test_h6():
|
def test_h6():
|
||||||
KEY = bytes.fromhex('ec0234a3 57c8ad05 341010a6 0a397d9b')
|
KEY = reversed_hex('ec0234a3 57c8ad05 341010a6 0a397d9b')
|
||||||
KEY_ID = bytes.fromhex('6c656272')
|
KEY_ID = bytes.fromhex('6c656272')
|
||||||
assert h6(KEY, KEY_ID) == bytes.fromhex('2d9ae102 e76dc91c e8d3a9e2 80b16399')
|
assert h6(KEY, KEY_ID) == reversed_hex('2d9ae102 e76dc91c e8d3a9e2 80b16399')
|
||||||
|
|
||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
def test_h7():
|
def test_h7():
|
||||||
KEY = bytes.fromhex('ec0234a3 57c8ad05 341010a6 0a397d9b')
|
KEY = reversed_hex('ec0234a3 57c8ad05 341010a6 0a397d9b')
|
||||||
SALT = bytes.fromhex('00000000 00000000 00000000 746D7031')
|
SALT = bytes.fromhex('00000000 00000000 00000000 746D7031')
|
||||||
assert h7(SALT, KEY) == bytes.fromhex('fb173597 c6a3c0ec d2998c2a 75a57011')
|
assert h7(SALT, KEY) == reversed_hex('fb173597 c6a3c0ec d2998c2a 75a57011')
|
||||||
|
|
||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
def test_ah():
|
def test_ah():
|
||||||
irk = bytes(reversed(bytes.fromhex('ec0234a3 57c8ad05 341010a6 0a397d9b')))
|
irk = reversed_hex('ec0234a3 57c8ad05 341010a6 0a397d9b')
|
||||||
prand = bytes(reversed(bytes.fromhex('708194')))
|
prand = reversed_hex('708194')
|
||||||
value = ah(irk, prand)
|
value = ah(irk, prand)
|
||||||
expected = bytes(reversed(bytes.fromhex('0dfbaa')))
|
expected = reversed_hex('0dfbaa')
|
||||||
assert value == expected
|
assert value == expected
|
||||||
|
|
||||||
|
|
||||||
@@ -243,7 +213,7 @@ def test_oob_data():
|
|||||||
oob_data = OobData(
|
oob_data = OobData(
|
||||||
address=Address("F0:F1:F2:F3:F4:F5"),
|
address=Address("F0:F1:F2:F3:F4:F5"),
|
||||||
role=LeRole.BOTH_PERIPHERAL_PREFERRED,
|
role=LeRole.BOTH_PERIPHERAL_PREFERRED,
|
||||||
shared_data=OobSharedData(c=bytes([1, 2]), r=bytes([3, 4])),
|
shared_data=OobSharedData(c=b'12', r=b'34'),
|
||||||
)
|
)
|
||||||
oob_data_ad = oob_data.to_ad()
|
oob_data_ad = oob_data.to_ad()
|
||||||
oob_data_bytes = bytes(oob_data_ad)
|
oob_data_bytes = bytes(oob_data_ad)
|
||||||
@@ -255,6 +225,32 @@ def test_oob_data():
|
|||||||
assert oob_data_parsed.shared_data.r == oob_data.shared_data.r
|
assert oob_data_parsed.shared_data.r == oob_data.shared_data.r
|
||||||
|
|
||||||
|
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
'ct2, expected',
|
||||||
|
[
|
||||||
|
(False, 'bc1ca4ef 633fc1bd 0d8230af ee388fb0'),
|
||||||
|
(True, '287ad379 dca40253 0a39f1f4 3047b835'),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
def test_ltk_to_link_key(ct2: bool, expected: str):
|
||||||
|
LTK = reversed_hex('368df9bc e3264b58 bd066c33 334fbf64')
|
||||||
|
assert smp.Session.derive_link_key(LTK, ct2) == reversed_hex(expected)
|
||||||
|
|
||||||
|
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
'ct2, expected',
|
||||||
|
[
|
||||||
|
(False, 'a813fb72 f1a3dfa1 8a2c9a43 f10d0a30'),
|
||||||
|
(True, 'e85e09eb 5eccb3e2 69418a13 3211bc79'),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
def test_link_key_to_ltk(ct2: bool, expected: str):
|
||||||
|
LINK_KEY = reversed_hex('05040302 01000908 07060504 03020100')
|
||||||
|
assert smp.Session.derive_ltk(LINK_KEY, ct2) == reversed_hex(expected)
|
||||||
|
|
||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
test_ecc()
|
test_ecc()
|
||||||
|
|||||||
Reference in New Issue
Block a user