From a9628f73e3a04506cac5255000bcce12aaa3eea1 Mon Sep 17 00:00:00 2001 From: Josh Wu Date: Fri, 17 Nov 2023 00:29:26 +0800 Subject: [PATCH 1/3] Add support for Extended Advertising --- bumble/device.py | 155 ++++++++++++++++++++++++++++ bumble/hci.py | 98 +++++------------- bumble/utils.py | 18 +++- examples/run_extended_advertiser.py | 69 +++++++++++++ 4 files changed, 267 insertions(+), 73 deletions(-) create mode 100644 examples/run_extended_advertiser.py diff --git a/bumble/device.py b/bumble/device.py index 216da09f..1d40a357 100644 --- a/bumble/device.py +++ b/bumble/device.py @@ -32,6 +32,7 @@ from typing import ( Optional, Tuple, Type, + Set, Union, cast, overload, @@ -99,14 +100,20 @@ from .hci import ( HCI_LE_Extended_Create_Connection_Command, HCI_LE_Rand_Command, HCI_LE_Read_PHY_Command, + HCI_LE_Remove_Advertising_Set_Command, HCI_LE_Set_Address_Resolution_Enable_Command, HCI_LE_Set_Advertising_Data_Command, HCI_LE_Set_Advertising_Enable_Command, HCI_LE_Set_Advertising_Parameters_Command, + HCI_LE_Set_Advertising_Set_Random_Address_Command, HCI_LE_Set_Data_Length_Command, HCI_LE_Set_Default_PHY_Command, HCI_LE_Set_Extended_Scan_Enable_Command, HCI_LE_Set_Extended_Scan_Parameters_Command, + HCI_LE_Set_Extended_Scan_Response_Data_Command, + HCI_LE_Set_Extended_Advertising_Data_Command, + HCI_LE_Set_Extended_Advertising_Enable_Command, + HCI_LE_Set_Extended_Advertising_Parameters_Command, HCI_LE_Set_PHY_Command, HCI_LE_Set_Random_Address_Command, HCI_LE_Set_Scan_Enable_Command, @@ -155,6 +162,7 @@ from .utils import ( setup_event_forwarding, composite_listener, deprecated, + experimental, ) from .keys import ( KeyStore, @@ -189,6 +197,8 @@ DEVICE_MIN_SCAN_WINDOW = 25 DEVICE_MAX_SCAN_WINDOW = 10240 DEVICE_MIN_LE_RSSI = -127 DEVICE_MAX_LE_RSSI = 20 +DEVICE_MIN_EXTENDED_ADVERTISING_SET_HANDLE = 0x00 +DEVICE_MAX_EXTENDED_ADVERTISING_SET_HANDLE = 0xEF DEVICE_DEFAULT_ADDRESS = '00:00:00:00:00:00' DEVICE_DEFAULT_ADVERTISING_INTERVAL = 1000 # ms @@ -960,6 +970,7 @@ class Device(CompositeEventEmitter): ] advertisement_accumulators: Dict[Address, AdvertisementDataAccumulator] config: DeviceConfiguration + extended_advertising_handles: Set[int] @composite_listener class Listener: @@ -1058,6 +1069,7 @@ class Device(CompositeEventEmitter): self.classic_pending_accepts = { Address.ANY: [] } # Futures, by BD address OR [Futures] for Address.ANY + self.extended_advertising_handles = set() # Own address type cache self.advertising_own_address_type = None @@ -1536,6 +1548,149 @@ class Device(CompositeEventEmitter): self.advertising = False self.auto_restart_advertising = False + @experimental('Extended Advertising is still experimental - Might be changed soon.') + async def start_extended_advertising( + self, + advertising_properties: HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties = HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties.CONNECTABLE_ADVERTISING, + target: Address = Address.ANY, + own_address_type: int = OwnAddressType.RANDOM, + scan_response: Optional[bytes] = None, + advertising_data: Optional[bytes] = None, + ) -> int: + """Starts an extended advertising set. + + Args: + advertising_properties: Properties to pass in HCI_LE_Set_Extended_Advertising_Parameters_Command + target: Directed advertising target. Directed property should be set in advertising_properties arg. + own_address_type: own address type to use in the advertising. + scan_response: raw scan response. When a non-none value is set, HCI_LE_Set_Extended_Scan_Response_Data_Command will be sent. + advertising_data: raw advertising data. When a non-none value is set, HCI_LE_Set_Advertising_Set_Random_Address_Command will be sent. + + Returns: + Handle of the new advertising set. + """ + + adv_handle = -1 + # Find a free handle + for i in range( + DEVICE_MIN_EXTENDED_ADVERTISING_SET_HANDLE, + DEVICE_MAX_EXTENDED_ADVERTISING_SET_HANDLE + 1, + ): + if i not in self.extended_advertising_handles: + adv_handle = i + break + + if adv_handle == -1: + raise InvalidStateError('No available advertising set.') + + try: + # Set the advertising parameters + await self.send_command( + HCI_LE_Set_Extended_Advertising_Parameters_Command( + advertising_handle=adv_handle, + advertising_event_properties=advertising_properties, + primary_advertising_interval_min=self.advertising_interval_min, + primary_advertising_interval_max=self.advertising_interval_max, + primary_advertising_channel_map=( + HCI_LE_Set_Extended_Advertising_Parameters_Command.ChannelMap.CHANNEL_37 + | HCI_LE_Set_Extended_Advertising_Parameters_Command.ChannelMap.CHANNEL_38 + | HCI_LE_Set_Extended_Advertising_Parameters_Command.ChannelMap.CHANNEL_39 + ), + own_address_type=own_address_type, + peer_address_type=target.address_type, + peer_address=target, + advertising_tx_power=7, + advertising_filter_policy=0, + primary_advertising_phy=1, # LE 1M + secondary_advertising_max_skip=0, + secondary_advertising_phy=1, # LE 1M + advertising_sid=0, + scan_request_notification_enable=0, + ), # type: ignore[call-arg] + check_result=True, + ) + + # Set the advertising data if present + if advertising_data is not None: + await self.send_command( + HCI_LE_Set_Extended_Advertising_Data_Command( + advertising_handle=adv_handle, + operation=HCI_LE_Set_Extended_Advertising_Data_Command.Operation.COMPLETE_DATA, + fragment_preference=0x01, # Should not fragment + advertising_data=advertising_data, + ), # type: ignore[call-arg] + check_result=True, + ) + + # Set the scan response if present + if scan_response is not None: + await self.send_command( + HCI_LE_Set_Extended_Scan_Response_Data_Command( + advertising_handle=adv_handle, + operation=HCI_LE_Set_Extended_Advertising_Data_Command.Operation.COMPLETE_DATA, + fragment_preference=0x01, # Should not fragment + scan_response_data=scan_response, + ), # type: ignore[call-arg] + check_result=True, + ) + + if own_address_type in ( + OwnAddressType.RANDOM, + OwnAddressType.RESOLVABLE_OR_RANDOM, + ): + await self.send_command( + HCI_LE_Set_Advertising_Set_Random_Address_Command( + advertising_handle=adv_handle, + random_address=self.random_address, + ), # type: ignore[call-arg] + check_result=True, + ) + + # Enable advertising + await self.send_command( + HCI_LE_Set_Extended_Advertising_Enable_Command( + enable=1, + advertising_handles=[adv_handle], + durations=[0], # Forever + max_extended_advertising_events=[0], # Infinite + ), # type: ignore[call-arg] + check_result=True, + ) + except HCI_Error as error: + # When any step fails, cleanup the advertising handle. + await self.send_command( + HCI_LE_Remove_Advertising_Set_Command(advertising_handle=adv_handle), # type: ignore[call-arg] + check_result=False, + ) + raise error + + self.extended_advertising_handles.add(adv_handle) + return adv_handle + + @experimental('Extended Advertising is still experimental - Might be changed soon.') + async def stop_extended_advertising(self, adv_handle: int) -> None: + """Stops an extended advertising set. + + Args: + adv_handle: Handle of the advertising set to stop. + """ + # Disable advertising + await self.send_command( + HCI_LE_Set_Extended_Advertising_Enable_Command( + enable=0, + advertising_handles=[adv_handle], + durations=[0], + max_extended_advertising_events=[0], + ), # type: ignore[call-arg] + check_result=True, + ) + # Remove advertising set + await self.send_command( + HCI_LE_Remove_Advertising_Set_Command(advertising_handle=adv_handle), # type: ignore[call-arg] + check_result=True, + ) + self.extended_advertising_handles.remove(adv_handle) + @property def is_advertising(self): return self.advertising diff --git a/bumble/hci.py b/bumble/hci.py index bf58ee04..7a7132ae 100644 --- a/bumble/hci.py +++ b/bumble/hci.py @@ -3829,9 +3829,9 @@ class HCI_LE_Set_Advertising_Set_Random_Address_Command(HCI_Command): 'advertising_event_properties', { 'size': 2, - 'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Parameters_Command.advertising_properties_string( + 'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties( x - ), + ).name, }, ), ('primary_advertising_interval_min', 3), @@ -3840,9 +3840,9 @@ class HCI_LE_Set_Advertising_Set_Random_Address_Command(HCI_Command): 'primary_advertising_channel_map', { 'size': 1, - 'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Parameters_Command.channel_map_string( + 'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Parameters_Command.ChannelMap( x - ), + ).name, }, ), ('own_address_type', OwnAddressType.TYPE_SPEC), @@ -3863,38 +3863,19 @@ class HCI_LE_Set_Extended_Advertising_Parameters_Command(HCI_Command): See Bluetooth spec @ 7.8.53 LE Set Extended Advertising Parameters Command ''' - CONNECTABLE_ADVERTISING = 0 - SCANNABLE_ADVERTISING = 1 - DIRECTED_ADVERTISING = 2 - HIGH_DUTY_CYCLE_DIRECTED_CONNECTABLE_ADVERTISING = 3 - USE_LEGACY_ADVERTISING_PDUS = 4 - ANONYMOUS_ADVERTISING = 5 - INCLUDE_TX_POWER = 6 + class AdvertisingProperties(enum.IntFlag): + CONNECTABLE_ADVERTISING = 1 << 0 + SCANNABLE_ADVERTISING = 1 << 1 + DIRECTED_ADVERTISING = 1 << 2 + HIGH_DUTY_CYCLE_DIRECTED_CONNECTABLE_ADVERTISING = 1 << 3 + USE_LEGACY_ADVERTISING_PDUS = 1 << 4 + ANONYMOUS_ADVERTISING = 1 << 5 + INCLUDE_TX_POWER = 1 << 6 - ADVERTISING_PROPERTIES_NAMES = ( - 'CONNECTABLE_ADVERTISING', - 'SCANNABLE_ADVERTISING', - 'DIRECTED_ADVERTISING', - 'HIGH_DUTY_CYCLE_DIRECTED_CONNECTABLE_ADVERTISING', - 'USE_LEGACY_ADVERTISING_PDUS', - 'ANONYMOUS_ADVERTISING', - 'INCLUDE_TX_POWER', - ) - - CHANNEL_37 = 0 - CHANNEL_38 = 1 - CHANNEL_39 = 2 - - CHANNEL_NAMES = ('37', '38', '39') - - @classmethod - def advertising_properties_string(cls, properties): - # pylint: disable=line-too-long - return f'[{",".join(bit_flags_to_strings(properties, cls.ADVERTISING_PROPERTIES_NAMES))}]' - - @classmethod - def channel_map_string(cls, channel_map): - return f'[{",".join(bit_flags_to_strings(channel_map, cls.CHANNEL_NAMES))}]' + class ChannelMap(enum.IntFlag): + CHANNEL_37 = 1 << 0 + CHANNEL_38 = 1 << 1 + CHANNEL_39 = 1 << 2 # ----------------------------------------------------------------------------- @@ -3906,9 +3887,9 @@ class HCI_LE_Set_Extended_Advertising_Parameters_Command(HCI_Command): 'operation', { 'size': 1, - 'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Data_Command.operation_name( + 'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Data_Command.Operation( x - ), + ).name, }, ), ('fragment_preference', 1), @@ -3926,23 +3907,12 @@ class HCI_LE_Set_Extended_Advertising_Data_Command(HCI_Command): See Bluetooth spec @ 7.8.54 LE Set Extended Advertising Data Command ''' - INTERMEDIATE_FRAGMENT = 0x00 - FIRST_FRAGMENT = 0x01 - LAST_FRAGMENT = 0x02 - COMPLETE_DATA = 0x03 - UNCHANGED_DATA = 0x04 - - OPERATION_NAMES = { - INTERMEDIATE_FRAGMENT: 'INTERMEDIATE_FRAGMENT', - FIRST_FRAGMENT: 'FIRST_FRAGMENT', - LAST_FRAGMENT: 'LAST_FRAGMENT', - COMPLETE_DATA: 'COMPLETE_DATA', - UNCHANGED_DATA: 'UNCHANGED_DATA', - } - - @classmethod - def operation_name(cls, operation): - return name_or_number(cls.OPERATION_NAMES, operation) + class Operation(enum.IntEnum): + INTERMEDIATE_FRAGMENT = 0x00 + FIRST_FRAGMENT = 0x01 + LAST_FRAGMENT = 0x02 + COMPLETE_DATA = 0x03 + UNCHANGED_DATA = 0x04 # ----------------------------------------------------------------------------- @@ -3954,9 +3924,9 @@ class HCI_LE_Set_Extended_Advertising_Data_Command(HCI_Command): 'operation', { 'size': 1, - 'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Data_Command.operation_name( + 'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Data_Command.Operation( x - ), + ).name, }, ), ('fragment_preference', 1), @@ -3974,22 +3944,6 @@ class HCI_LE_Set_Extended_Scan_Response_Data_Command(HCI_Command): See Bluetooth spec @ 7.8.55 LE Set Extended Scan Response Data Command ''' - INTERMEDIATE_FRAGMENT = 0x00 - FIRST_FRAGMENT = 0x01 - LAST_FRAGMENT = 0x02 - COMPLETE_DATA = 0x03 - - OPERATION_NAMES = { - INTERMEDIATE_FRAGMENT: 'INTERMEDIATE_FRAGMENT', - FIRST_FRAGMENT: 'FIRST_FRAGMENT', - LAST_FRAGMENT: 'LAST_FRAGMENT', - COMPLETE_DATA: 'COMPLETE_DATA', - } - - @classmethod - def operation_name(cls, operation): - return name_or_number(cls.OPERATION_NAMES, operation) - # ----------------------------------------------------------------------------- @HCI_Command.command( diff --git a/bumble/utils.py b/bumble/utils.py index a562618f..81e150cc 100644 --- a/bumble/utils.py +++ b/bumble/utils.py @@ -432,7 +432,7 @@ def wrap_async(function): def deprecated(msg: str): """ - Throw deprecation warning before execution + Throw deprecation warning before execution. """ def wrapper(function): @@ -444,3 +444,19 @@ def deprecated(msg: str): return inner return wrapper + + +def experimental(msg: str): + """ + Throws a future warning before execution. + """ + + def wrapper(function): + @wraps(function) + def inner(*args, **kwargs): + warnings.warn(msg, FutureWarning) + return function(*args, **kwargs) + + return inner + + return wrapper diff --git a/examples/run_extended_advertiser.py b/examples/run_extended_advertiser.py new file mode 100644 index 00000000..20b0b341 --- /dev/null +++ b/examples/run_extended_advertiser.py @@ -0,0 +1,69 @@ +# Copyright 2021-2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# ----------------------------------------------------------------------------- +# Imports +# ----------------------------------------------------------------------------- +import asyncio +import logging +import sys +import os +from bumble.device import AdvertisingType, Device +from bumble.hci import Address, HCI_LE_Set_Extended_Advertising_Parameters_Command + +from bumble.transport import open_transport_or_link + + +# ----------------------------------------------------------------------------- +async def main() -> None: + if len(sys.argv) < 3: + print( + 'Usage: run_extended_advertiser.py [type] [address]' + ) + print('example: run_extended_advertiser.py device1.json usb:0') + return + + if len(sys.argv) >= 4: + advertising_properties = ( + HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties( + int(sys.argv[3]) + ) + ) + else: + advertising_properties = ( + HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties.CONNECTABLE_ADVERTISING + ) + + if len(sys.argv) >= 5: + target = Address(sys.argv[4]) + else: + target = Address.ANY + + print('<<< connecting to HCI...') + async with await open_transport_or_link(sys.argv[2]) as hci_transport: + print('<<< connected') + + device = Device.from_config_file_with_hci( + sys.argv[1], hci_transport.source, hci_transport.sink + ) + await device.power_on() + await device.start_extended_advertising( + advertising_properties=advertising_properties, target=target + ) + await hci_transport.source.terminated + + +# ----------------------------------------------------------------------------- +logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper()) +asyncio.run(main()) From 80d34a226d76581eda7b7acb2411a3cc4f8208d6 Mon Sep 17 00:00:00 2001 From: Josh Wu Date: Thu, 23 Nov 2023 03:46:19 +0800 Subject: [PATCH 2/3] Slightly refactor and fix CTKD It seems sample input data provided in the spec is big-endian (just like other AES-CMAC-based functions), but all keys are in little-endian( HCI standard), so they need to be reverse before and after applying AES-CMAC. --- .vscode/settings.json | 1 + bumble/crypto.py | 148 +++++++++++++++++++++++------------------- bumble/smp.py | 109 +++++++++++++++++++------------ tests/self_test.py | 15 +---- tests/smp_test.py | 140 +++++++++++++++++++-------------------- 5 files changed, 220 insertions(+), 193 deletions(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index 04a7f409..466158f5 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -29,6 +29,7 @@ "deregistration", "dhkey", "diversifier", + "endianness", "Fitbit", "GATTLINK", "HANDSFREE", diff --git a/bumble/crypto.py b/bumble/crypto.py index 852c675a..d267c3b7 100644 --- a/bumble/crypto.py +++ b/bumble/crypto.py @@ -21,6 +21,8 @@ # ----------------------------------------------------------------------------- # Imports # ----------------------------------------------------------------------------- +from __future__ import annotations + import logging import operator @@ -29,11 +31,13 @@ from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.primitives.asymmetric.ec import ( generate_private_key, ECDH, + EllipticCurvePrivateKey, EllipticCurvePublicNumbers, EllipticCurvePrivateNumbers, SECP256R1, ) from cryptography.hazmat.primitives import cmac +from typing import Tuple # ----------------------------------------------------------------------------- @@ -46,16 +50,18 @@ logger = logging.getLogger(__name__) # Classes # ----------------------------------------------------------------------------- class EccKey: - def __init__(self, private_key): + def __init__(self, private_key: EllipticCurvePrivateKey) -> None: self.private_key = private_key @classmethod - def generate(cls): + def generate(cls) -> EccKey: private_key = generate_private_key(SECP256R1()) return cls(private_key) @classmethod - def from_private_key_bytes(cls, d_bytes, x_bytes, y_bytes): + def from_private_key_bytes( + cls, d_bytes: bytes, x_bytes: bytes, y_bytes: bytes + ) -> EccKey: d = int.from_bytes(d_bytes, byteorder='big', signed=False) x = int.from_bytes(x_bytes, byteorder='big', signed=False) y = int.from_bytes(y_bytes, byteorder='big', signed=False) @@ -65,7 +71,7 @@ class EccKey: return cls(private_key) @property - def x(self): + def x(self) -> bytes: return ( self.private_key.public_key() .public_numbers() @@ -73,14 +79,14 @@ class EccKey: ) @property - def y(self): + def y(self) -> bytes: return ( self.private_key.public_key() .public_numbers() .y.to_bytes(32, byteorder='big') ) - def dh(self, public_key_x, public_key_y): + def dh(self, public_key_x: bytes, public_key_y: bytes) -> bytes: x = int.from_bytes(public_key_x, byteorder='big', signed=False) y = int.from_bytes(public_key_y, byteorder='big', signed=False) public_key = EllipticCurvePublicNumbers(x, y, SECP256R1()).public_key() @@ -93,14 +99,23 @@ class EccKey: # Functions # ----------------------------------------------------------------------------- + # ----------------------------------------------------------------------------- -def xor(x, y): +def xor(x: bytes, y: bytes) -> bytes: assert len(x) == len(y) return bytes(map(operator.xor, x, y)) # ----------------------------------------------------------------------------- -def r(): +def reverse(input: bytes) -> bytes: + ''' + Returns bytes of input in reversed endianness. + ''' + return input[::-1] + + +# ----------------------------------------------------------------------------- +def r() -> bytes: ''' Generate 16 bytes of random data ''' @@ -108,20 +123,20 @@ def r(): # ----------------------------------------------------------------------------- -def e(key, data): +def e(key: bytes, data: bytes) -> bytes: ''' AES-128 ECB, expecting byte-swapped inputs and producing a byte-swapped output. See Bluetooth spec Vol 3, Part H - 2.2.1 Security function e ''' - cipher = Cipher(algorithms.AES(bytes(reversed(key))), modes.ECB()) + cipher = Cipher(algorithms.AES(reverse(key)), modes.ECB()) encryptor = cipher.encryptor() - return bytes(reversed(encryptor.update(bytes(reversed(data))))) + return reverse(encryptor.update(reverse(data))) # ----------------------------------------------------------------------------- -def ah(k, r): # pylint: disable=redefined-outer-name +def ah(k: bytes, r: bytes) -> bytes: # pylint: disable=redefined-outer-name ''' See Bluetooth spec Vol 3, Part H - 2.2.2 Random Address Hash function ah ''' @@ -132,7 +147,16 @@ def ah(k, r): # pylint: disable=redefined-outer-name # ----------------------------------------------------------------------------- -def c1(k, r, preq, pres, iat, rat, ia, ra): # pylint: disable=redefined-outer-name +def c1( + k: bytes, + r: bytes, + preq: bytes, + pres: bytes, + iat: int, + rat: int, + ia: bytes, + ra: bytes, +) -> bytes: # pylint: disable=redefined-outer-name ''' See Bluetooth spec, Vol 3, Part H - 2.2.3 Confirm value generation function c1 for LE Legacy Pairing @@ -144,7 +168,7 @@ def c1(k, r, preq, pres, iat, rat, ia, ra): # pylint: disable=redefined-outer-n # ----------------------------------------------------------------------------- -def s1(k, r1, r2): +def s1(k: bytes, r1: bytes, r2: bytes) -> bytes: ''' See Bluetooth spec, Vol 3, Part H - 2.2.4 Key generation function s1 for LE Legacy Pairing @@ -154,7 +178,7 @@ def s1(k, r1, r2): # ----------------------------------------------------------------------------- -def aes_cmac(m, k): +def aes_cmac(m: bytes, k: bytes) -> bytes: ''' See Bluetooth spec, Vol 3, Part H - 2.2.5 FunctionAES-CMAC @@ -166,20 +190,16 @@ def aes_cmac(m, k): # ----------------------------------------------------------------------------- -def f4(u, v, x, z): +def f4(u: bytes, v: bytes, x: bytes, z: bytes) -> bytes: ''' See Bluetooth spec, Vol 3, Part H - 2.2.6 LE Secure Connections Confirm Value Generation Function f4 ''' - return bytes( - reversed( - aes_cmac(bytes(reversed(u)) + bytes(reversed(v)) + z, bytes(reversed(x))) - ) - ) + return reverse(aes_cmac(reverse(u) + reverse(v) + z, reverse(x))) # ----------------------------------------------------------------------------- -def f5(w, n1, n2, a1, a2): +def f5(w: bytes, n1: bytes, n2: bytes, a1: bytes, a2: bytes) -> Tuple[bytes, bytes]: ''' See Bluetooth spec, Vol 3, Part H - 2.2.7 LE Secure Connections Key Generation Function f5 @@ -187,87 +207,83 @@ def f5(w, n1, n2, a1, a2): NOTE: this returns a tuple: (MacKey, LTK) in little-endian byte order ''' salt = bytes.fromhex('6C888391AAF5A53860370BDB5A6083BE') - t = aes_cmac(bytes(reversed(w)), salt) + t = aes_cmac(reverse(w), salt) key_id = bytes([0x62, 0x74, 0x6C, 0x65]) return ( - bytes( - reversed( - aes_cmac( - bytes([0]) - + key_id - + bytes(reversed(n1)) - + bytes(reversed(n2)) - + bytes(reversed(a1)) - + bytes(reversed(a2)) - + bytes([1, 0]), - t, - ) + reverse( + aes_cmac( + bytes([0]) + + key_id + + reverse(n1) + + reverse(n2) + + reverse(a1) + + reverse(a2) + + bytes([1, 0]), + t, ) ), - bytes( - reversed( - aes_cmac( - bytes([1]) - + key_id - + bytes(reversed(n1)) - + bytes(reversed(n2)) - + bytes(reversed(a1)) - + bytes(reversed(a2)) - + bytes([1, 0]), - t, - ) + reverse( + aes_cmac( + bytes([1]) + + key_id + + reverse(n1) + + reverse(n2) + + reverse(a1) + + reverse(a2) + + bytes([1, 0]), + t, ) ), ) # ----------------------------------------------------------------------------- -def f6(w, n1, n2, r, io_cap, a1, a2): # pylint: disable=redefined-outer-name +def f6( + w: bytes, n1: bytes, n2: bytes, r: bytes, io_cap: bytes, a1: bytes, a2: bytes +) -> bytes: # pylint: disable=redefined-outer-name ''' See Bluetooth spec, Vol 3, Part H - 2.2.8 LE Secure Connections Check Value Generation Function f6 ''' - return bytes( - reversed( - aes_cmac( - bytes(reversed(n1)) - + bytes(reversed(n2)) - + bytes(reversed(r)) - + bytes(reversed(io_cap)) - + bytes(reversed(a1)) - + bytes(reversed(a2)), - bytes(reversed(w)), - ) + return reverse( + aes_cmac( + reverse(n1) + + reverse(n2) + + reverse(r) + + reverse(io_cap) + + reverse(a1) + + reverse(a2), + reverse(w), ) ) # ----------------------------------------------------------------------------- -def g2(u, v, x, y): +def g2(u: bytes, v: bytes, x: bytes, y: bytes) -> int: ''' See Bluetooth spec, Vol 3, Part H - 2.2.9 LE Secure Connections Numeric Comparison Value Generation Function g2 ''' return int.from_bytes( aes_cmac( - bytes(reversed(u)) + bytes(reversed(v)) + bytes(reversed(y)), - bytes(reversed(x)), + reverse(u) + reverse(v) + reverse(y), + reverse(x), )[-4:], byteorder='big', ) # ----------------------------------------------------------------------------- -def h6(w, key_id): +def h6(w: bytes, key_id: bytes) -> bytes: ''' See Bluetooth spec, Vol 3, Part H - 2.2.10 Link key conversion function h6 ''' - return aes_cmac(key_id, w) + return reverse(aes_cmac(key_id, reverse(w))) # ----------------------------------------------------------------------------- -def h7(salt, w): +def h7(salt: bytes, w: bytes) -> bytes: ''' See Bluetooth spec, Vol 3, Part H - 2.2.11 Link key conversion function h7 ''' - return aes_cmac(w, salt) + return reverse(aes_cmac(reverse(w), salt)) diff --git a/bumble/smp.py b/bumble/smp.py index 1461969c..25dd46b5 100644 --- a/bumble/smp.py +++ b/bumble/smp.py @@ -187,8 +187,8 @@ SMP_KEYPRESS_AUTHREQ = 0b00010000 SMP_CT2_AUTHREQ = 0b00100000 # Crypto salt -SMP_CTKD_H7_LEBR_SALT = bytes.fromhex('00000000000000000000000000000000746D7031') -SMP_CTKD_H7_BRLE_SALT = bytes.fromhex('00000000000000000000000000000000746D7032') +SMP_CTKD_H7_LEBR_SALT = bytes.fromhex('000000000000000000000000746D7031') +SMP_CTKD_H7_BRLE_SALT = bytes.fromhex('000000000000000000000000746D7032') # fmt: on # pylint: enable=line-too-long @@ -579,7 +579,7 @@ class OobContext: self.r = crypto.r() if r is None else r def share(self) -> OobSharedData: - pkx = bytes(reversed(self.ecc_key.x)) + pkx = self.ecc_key.x[::-1] return OobSharedData(c=crypto.f4(pkx, pkx, self.r, bytes(1)), r=self.r) @@ -677,6 +677,13 @@ class Session: }, } + ea: bytes + eb: bytes + ltk: bytes + preq: bytes + pres: bytes + tk: bytes + def __init__( self, manager: Manager, @@ -686,15 +693,10 @@ class Session: ) -> None: self.manager = manager self.connection = connection - self.preq: Optional[bytes] = None - self.pres: Optional[bytes] = None - self.ea = None - self.eb = None self.stk = None - self.ltk = None self.ltk_ediv = 0 self.ltk_rand = bytes(8) - self.link_key = None + self.link_key: Optional[bytes] = None self.initiator_key_distribution: int = 0 self.responder_key_distribution: int = 0 self.peer_random_value: Optional[bytes] = None @@ -787,9 +789,7 @@ class Session: ) self.r = pairing_config.oob.our_context.r self.ecc_key = pairing_config.oob.our_context.ecc_key - if pairing_config.oob.legacy_context is None: - self.tk = None - else: + if pairing_config.oob.legacy_context is not None: self.tk = pairing_config.oob.legacy_context.tk else: if pairing_config.oob.legacy_context is None: @@ -807,7 +807,7 @@ class Session: @property def pkx(self) -> Tuple[bytes, bytes]: - return (bytes(reversed(self.ecc_key.x)), self.peer_public_key_x) + return (self.ecc_key.x[::-1], self.peer_public_key_x) @property def pka(self) -> bytes: @@ -1061,8 +1061,8 @@ class Session: def send_public_key_command(self) -> None: self.send_command( SMP_Pairing_Public_Key_Command( - public_key_x=bytes(reversed(self.ecc_key.x)), - public_key_y=bytes(reversed(self.ecc_key.y)), + public_key_x=self.ecc_key.x[::-1], + public_key_y=self.ecc_key.y[::-1], ) ) @@ -1098,15 +1098,52 @@ class Session: ) ) - async def derive_ltk(self) -> None: - link_key = await self.manager.device.get_link_key(self.connection.peer_address) - assert link_key is not None + @classmethod + def derive_ltk(cls, link_key: bytes, ct2: bool) -> bytes: + '''Derives Long Term Key from Link Key. + + Args: + link_key: BR/EDR Link Key bytes in little-endian. + ct2: whether ct2 is supported on both devices. + Returns: + LE Long Tern Key bytes in little-endian. + ''' ilk = ( crypto.h7(salt=SMP_CTKD_H7_BRLE_SALT, w=link_key) - if self.ct2 + if ct2 else crypto.h6(link_key, b'tmp2') ) - self.ltk = crypto.h6(ilk, b'brle') + return crypto.h6(ilk, b'brle') + + @classmethod + def derive_link_key(cls, ltk: bytes, ct2: bool) -> bytes: + '''Derives Link Key from Long Term Key. + + Args: + ltk: LE Long Term Key bytes in little-endian. + ct2: whether ct2 is supported on both devices. + Returns: + BR/EDR Link Key bytes in little-endian. + ''' + ilk = ( + crypto.h7(salt=SMP_CTKD_H7_LEBR_SALT, w=ltk) + if ct2 + else crypto.h6(ltk, b'tmp1') + ) + return crypto.h6(ilk, b'lebr') + + async def get_link_key_and_derive_ltk(self) -> None: + '''Retrieves BR/EDR Link Key from storage and derive it to LE LTK.''' + link_key = await self.manager.device.get_link_key(self.connection.peer_address) + if link_key is None: + logging.warning( + 'Try to derive LTK but host does not have the LK. Send a SMP_PAIRING_FAILED but the procedure will not be paused!' + ) + self.send_pairing_failed( + SMP_CROSS_TRANSPORT_KEY_DERIVATION_NOT_ALLOWED_ERROR + ) + else: + self.ltk = self.derive_ltk(link_key, self.ct2) def distribute_keys(self) -> None: # Distribute the keys as required @@ -1117,7 +1154,7 @@ class Session: and self.initiator_key_distribution & SMP_ENC_KEY_DISTRIBUTION_FLAG ): self.ctkd_task = self.connection.abort_on( - 'disconnection', self.derive_ltk() + 'disconnection', self.get_link_key_and_derive_ltk() ) elif not self.sc: # Distribute the LTK, EDIV and RAND @@ -1147,12 +1184,7 @@ class Session: # CTKD, calculate BR/EDR link key if self.initiator_key_distribution & SMP_LINK_KEY_DISTRIBUTION_FLAG: - ilk = ( - crypto.h7(salt=SMP_CTKD_H7_LEBR_SALT, w=self.ltk) - if self.ct2 - else crypto.h6(self.ltk, b'tmp1') - ) - self.link_key = crypto.h6(ilk, b'lebr') + self.link_key = self.derive_link_key(self.ltk, self.ct2) else: # CTKD: Derive LTK from LinkKey @@ -1161,7 +1193,7 @@ class Session: and self.responder_key_distribution & SMP_ENC_KEY_DISTRIBUTION_FLAG ): self.ctkd_task = self.connection.abort_on( - 'disconnection', self.derive_ltk() + 'disconnection', self.get_link_key_and_derive_ltk() ) # Distribute the LTK, EDIV and RAND elif not self.sc: @@ -1191,12 +1223,7 @@ class Session: # CTKD, calculate BR/EDR link key if self.responder_key_distribution & SMP_LINK_KEY_DISTRIBUTION_FLAG: - ilk = ( - crypto.h7(salt=SMP_CTKD_H7_LEBR_SALT, w=self.ltk) - if self.ct2 - else crypto.h6(self.ltk, b'tmp1') - ) - self.link_key = crypto.h6(ilk, b'lebr') + self.link_key = self.derive_link_key(self.ltk, self.ct2) def compute_peer_expected_distributions(self, key_distribution_flags: int) -> None: # Set our expectations for what to wait for in the key distribution phase @@ -1754,14 +1781,10 @@ class Session: self.peer_public_key_y = command.public_key_y # Compute the DH key - self.dh_key = bytes( - reversed( - self.ecc_key.dh( - bytes(reversed(command.public_key_x)), - bytes(reversed(command.public_key_y)), - ) - ) - ) + self.dh_key = self.ecc_key.dh( + command.public_key_x[::-1], + command.public_key_y[::-1], + )[::-1] logger.debug(f'DH key: {self.dh_key.hex()}') if self.pairing_method == PairingMethod.OOB: @@ -1824,7 +1847,6 @@ class Session: else: self.send_pairing_dhkey_check_command() else: - assert self.ltk self.start_encryption(self.ltk) def on_smp_pairing_failed_command( @@ -1874,6 +1896,7 @@ class Manager(EventEmitter): sessions: Dict[int, Session] pairing_config_factory: Callable[[Connection], PairingConfig] session_proxy: Type[Session] + _ecc_key: Optional[crypto.EccKey] def __init__( self, diff --git a/tests/self_test.py b/tests/self_test.py index 728fbc7e..7a144870 100644 --- a/tests/self_test.py +++ b/tests/self_test.py @@ -21,7 +21,7 @@ import logging import os import pytest -from unittest.mock import MagicMock, patch +from unittest.mock import AsyncMock, MagicMock, patch from bumble.controller import Controller from bumble.core import BT_BR_EDR_TRANSPORT, BT_PERIPHERAL_ROLE, BT_CENTRAL_ROLE @@ -38,7 +38,6 @@ from bumble.smp import ( OobLegacyContext, ) from bumble.core import ProtocolError -from bumble.hci import HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE from bumble.keys import PairingKeys @@ -519,16 +518,8 @@ async def test_self_smp_over_classic(): # Mock connection # TODO: Implement Classic SSP and encryption in link relayer LINK_KEY = bytes.fromhex('287ad379dca402530a39f1f43047b835') - two_devices.devices[0].on_link_key( - two_devices.devices[1].public_address, - LINK_KEY, - HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE, - ) - two_devices.devices[1].on_link_key( - two_devices.devices[0].public_address, - LINK_KEY, - HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE, - ) + two_devices.devices[0].get_link_key = AsyncMock(return_value=LINK_KEY) + two_devices.devices[1].get_link_key = AsyncMock(return_value=LINK_KEY) two_devices.connections[0].encryption = 1 two_devices.connections[1].encryption = 1 diff --git a/tests/smp_test.py b/tests/smp_test.py index 4bd75228..7a32b23c 100644 --- a/tests/smp_test.py +++ b/tests/smp_test.py @@ -16,6 +16,9 @@ # Imports # ----------------------------------------------------------------------------- +import pytest + +from bumble import smp from bumble.crypto import EccKey, aes_cmac, ah, c1, f4, f5, f6, g2, h6, h7, s1 from bumble.pairing import OobData, OobSharedData, LeRole from bumble.hci import Address @@ -28,8 +31,8 @@ from bumble.core import AdvertisingData # ----------------------------------------------------------------------------- -def reversed_hex(hex_str): - return bytes(reversed(bytes.fromhex(hex_str))) +def reversed_hex(hex_str: str) -> bytes: + return bytes.fromhex(hex_str)[::-1] # ----------------------------------------------------------------------------- @@ -129,112 +132,79 @@ def test_aes_cmac(): # ----------------------------------------------------------------------------- def test_f4(): - u = bytes( - reversed( - bytes.fromhex( - '20b003d2 f297be2c 5e2c83a7 e9f9a5b9' - + 'eff49111 acf4fddb cc030148 0e359de6' - ) - ) + u = reversed_hex( + '20b003d2 f297be2c 5e2c83a7 e9f9a5b9 eff49111 acf4fddb cc030148 0e359de6' ) - v = bytes( - reversed( - bytes.fromhex( - '55188b3d 32f6bb9a 900afcfb eed4e72a' - + '59cb9ac2 f19d7cfb 6b4fdd49 f47fc5fd' - ) - ) + v = reversed_hex( + '55188b3d 32f6bb9a 900afcfb eed4e72a 59cb9ac2 f19d7cfb 6b4fdd49 f47fc5fd' ) - x = bytes(reversed(bytes.fromhex('d5cb8454 d177733e ffffb2ec 712baeab'))) - z = bytes([0]) + x = reversed_hex('d5cb8454 d177733e ffffb2ec 712baeab') + z = b'\0' value = f4(u, v, x, z) - assert bytes(reversed(value)) == bytes.fromhex( - 'f2c916f1 07a9bd1c f1eda1be a974872d' - ) + assert value == reversed_hex('f2c916f1 07a9bd1c f1eda1be a974872d') # ----------------------------------------------------------------------------- def test_f5(): - w = bytes( - reversed( - bytes.fromhex( - 'ec0234a3 57c8ad05 341010a6 0a397d9b' - + '99796b13 b4f866f1 868d34f3 73bfa698' - ) - ) + w = reversed_hex( + 'ec0234a3 57c8ad05 341010a6 0a397d9b 99796b13 b4f866f1 868d34f3 73bfa698' ) - n1 = bytes(reversed(bytes.fromhex('d5cb8454 d177733e ffffb2ec 712baeab'))) - n2 = bytes(reversed(bytes.fromhex('a6e8e7cc 25a75f6e 216583f7 ff3dc4cf'))) - a1 = bytes(reversed(bytes.fromhex('00561237 37bfce'))) - a2 = bytes(reversed(bytes.fromhex('00a71370 2dcfc1'))) + n1 = reversed_hex('d5cb8454 d177733e ffffb2ec 712baeab') + n2 = reversed_hex('a6e8e7cc 25a75f6e 216583f7 ff3dc4cf') + a1 = reversed_hex('00561237 37bfce') + a2 = reversed_hex('00a71370 2dcfc1') value = f5(w, n1, n2, a1, a2) - assert bytes(reversed(value[0])) == bytes.fromhex( - '2965f176 a1084a02 fd3f6a20 ce636e20' - ) - assert bytes(reversed(value[1])) == bytes.fromhex( - '69867911 69d7cd23 980522b5 94750a38' - ) + assert value[0] == reversed_hex('2965f176 a1084a02 fd3f6a20 ce636e20') + assert value[1] == reversed_hex('69867911 69d7cd23 980522b5 94750a38') # ----------------------------------------------------------------------------- def test_f6(): - n1 = bytes(reversed(bytes.fromhex('d5cb8454 d177733e ffffb2ec 712baeab'))) - n2 = bytes(reversed(bytes.fromhex('a6e8e7cc 25a75f6e 216583f7 ff3dc4cf'))) - mac_key = bytes(reversed(bytes.fromhex('2965f176 a1084a02 fd3f6a20 ce636e20'))) - r = bytes(reversed(bytes.fromhex('12a3343b b453bb54 08da42d2 0c2d0fc8'))) - io_cap = bytes(reversed(bytes.fromhex('010102'))) - a1 = bytes(reversed(bytes.fromhex('00561237 37bfce'))) - a2 = bytes(reversed(bytes.fromhex('00a71370 2dcfc1'))) + n1 = reversed_hex('d5cb8454 d177733e ffffb2ec 712baeab') + n2 = reversed_hex('a6e8e7cc 25a75f6e 216583f7 ff3dc4cf') + mac_key = reversed_hex('2965f176 a1084a02 fd3f6a20 ce636e20') + r = reversed_hex('12a3343b b453bb54 08da42d2 0c2d0fc8') + io_cap = reversed_hex('010102') + a1 = reversed_hex('00561237 37bfce') + a2 = reversed_hex('00a71370 2dcfc1') value = f6(mac_key, n1, n2, r, io_cap, a1, a2) - assert bytes(reversed(value)) == bytes.fromhex( - 'e3c47398 9cd0e8c5 d26c0b09 da958f61' - ) + assert value == reversed_hex('e3c47398 9cd0e8c5 d26c0b09 da958f61') # ----------------------------------------------------------------------------- def test_g2(): - u = bytes( - reversed( - bytes.fromhex( - '20b003d2 f297be2c 5e2c83a7 e9f9a5b9' - + 'eff49111 acf4fddb cc030148 0e359de6' - ) - ) + u = reversed_hex( + '20b003d2 f297be2c 5e2c83a7 e9f9a5b9 eff49111 acf4fddb cc030148 0e359de6' ) - v = bytes( - reversed( - bytes.fromhex( - '55188b3d 32f6bb9a 900afcfb eed4e72a' - + '59cb9ac2 f19d7cfb 6b4fdd49 f47fc5fd' - ) - ) + v = reversed_hex( + '55188b3d 32f6bb9a 900afcfb eed4e72a 59cb9ac2 f19d7cfb 6b4fdd49 f47fc5fd' ) - x = bytes(reversed(bytes.fromhex('d5cb8454 d177733e ffffb2ec 712baeab'))) - y = bytes(reversed(bytes.fromhex('a6e8e7cc 25a75f6e 216583f7 ff3dc4cf'))) + x = reversed_hex('d5cb8454 d177733e ffffb2ec 712baeab') + y = reversed_hex('a6e8e7cc 25a75f6e 216583f7 ff3dc4cf') value = g2(u, v, x, y) assert value == 0x2F9ED5BA # ----------------------------------------------------------------------------- def test_h6(): - KEY = bytes.fromhex('ec0234a3 57c8ad05 341010a6 0a397d9b') + KEY = reversed_hex('ec0234a3 57c8ad05 341010a6 0a397d9b') KEY_ID = bytes.fromhex('6c656272') - assert h6(KEY, KEY_ID) == bytes.fromhex('2d9ae102 e76dc91c e8d3a9e2 80b16399') + assert h6(KEY, KEY_ID) == reversed_hex('2d9ae102 e76dc91c e8d3a9e2 80b16399') # ----------------------------------------------------------------------------- def test_h7(): - KEY = bytes.fromhex('ec0234a3 57c8ad05 341010a6 0a397d9b') + KEY = reversed_hex('ec0234a3 57c8ad05 341010a6 0a397d9b') SALT = bytes.fromhex('00000000 00000000 00000000 746D7031') - assert h7(SALT, KEY) == bytes.fromhex('fb173597 c6a3c0ec d2998c2a 75a57011') + assert h7(SALT, KEY) == reversed_hex('fb173597 c6a3c0ec d2998c2a 75a57011') # ----------------------------------------------------------------------------- def test_ah(): - irk = bytes(reversed(bytes.fromhex('ec0234a3 57c8ad05 341010a6 0a397d9b'))) - prand = bytes(reversed(bytes.fromhex('708194'))) + irk = reversed_hex('ec0234a3 57c8ad05 341010a6 0a397d9b') + prand = reversed_hex('708194') value = ah(irk, prand) - expected = bytes(reversed(bytes.fromhex('0dfbaa'))) + expected = reversed_hex('0dfbaa') assert value == expected @@ -243,7 +213,7 @@ def test_oob_data(): oob_data = OobData( address=Address("F0:F1:F2:F3:F4:F5"), role=LeRole.BOTH_PERIPHERAL_PREFERRED, - shared_data=OobSharedData(c=bytes([1, 2]), r=bytes([3, 4])), + shared_data=OobSharedData(c=b'12', r=b'34'), ) oob_data_ad = oob_data.to_ad() oob_data_bytes = bytes(oob_data_ad) @@ -255,6 +225,32 @@ def test_oob_data(): assert oob_data_parsed.shared_data.r == oob_data.shared_data.r +# ----------------------------------------------------------------------------- +@pytest.mark.parametrize( + 'ct2, expected', + [ + (False, 'bc1ca4ef 633fc1bd 0d8230af ee388fb0'), + (True, '287ad379 dca40253 0a39f1f4 3047b835'), + ], +) +def test_ltk_to_link_key(ct2: bool, expected: str): + LTK = reversed_hex('368df9bc e3264b58 bd066c33 334fbf64') + assert smp.Session.derive_link_key(LTK, ct2) == reversed_hex(expected) + + +# ----------------------------------------------------------------------------- +@pytest.mark.parametrize( + 'ct2, expected', + [ + (False, 'a813fb72 f1a3dfa1 8a2c9a43 f10d0a30'), + (True, 'e85e09eb 5eccb3e2 69418a13 3211bc79'), + ], +) +def test_link_key_to_ltk(ct2: bool, expected: str): + LINK_KEY = reversed_hex('05040302 01000908 07060504 03020100') + assert smp.Session.derive_ltk(LINK_KEY, ct2) == reversed_hex(expected) + + # ----------------------------------------------------------------------------- if __name__ == '__main__': test_ecc() From a65a215fd799cc657eab60e11ad54a3a86c5c2ca Mon Sep 17 00:00:00 2001 From: Josh Wu Date: Sun, 26 Nov 2023 19:26:42 +0800 Subject: [PATCH 3/3] Provide IntFlag.name property fallback --- bumble/hci.py | 28 ++++++++++++++++++++++------ 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/bumble/hci.py b/bumble/hci.py index 7a7132ae..8897624e 100644 --- a/bumble/hci.py +++ b/bumble/hci.py @@ -3829,9 +3829,11 @@ class HCI_LE_Set_Advertising_Set_Random_Address_Command(HCI_Command): 'advertising_event_properties', { 'size': 2, - 'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties( - x - ).name, + 'mapper': lambda x: str( + HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties( + x + ) + ), }, ), ('primary_advertising_interval_min', 3), @@ -3840,9 +3842,9 @@ class HCI_LE_Set_Advertising_Set_Random_Address_Command(HCI_Command): 'primary_advertising_channel_map', { 'size': 1, - 'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Parameters_Command.ChannelMap( - x - ).name, + 'mapper': lambda x: str( + HCI_LE_Set_Extended_Advertising_Parameters_Command.ChannelMap(x) + ), }, ), ('own_address_type', OwnAddressType.TYPE_SPEC), @@ -3872,11 +3874,25 @@ class HCI_LE_Set_Extended_Advertising_Parameters_Command(HCI_Command): ANONYMOUS_ADVERTISING = 1 << 5 INCLUDE_TX_POWER = 1 << 6 + def __str__(self) -> str: + return '|'.join( + flag.name + for flag in HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties + if self.value & flag.value and flag.name is not None + ) + class ChannelMap(enum.IntFlag): CHANNEL_37 = 1 << 0 CHANNEL_38 = 1 << 1 CHANNEL_39 = 1 << 2 + def __str__(self) -> str: + return '|'.join( + flag.name + for flag in HCI_LE_Set_Extended_Advertising_Parameters_Command.ChannelMap + if self.value & flag.value and flag.name is not None + ) + # ----------------------------------------------------------------------------- @HCI_Command.command(