From d5eebc2101f6d3600af542e4ce783b4f34a8c782 Mon Sep 17 00:00:00 2001 From: Gilles Boccon-Gibod Date: Sat, 15 Oct 2022 21:43:08 -0700 Subject: [PATCH] add AdvertisingType --- bumble/controller.py | 4 +- bumble/device.py | 118 ++++++++++++++++++++++++++++++------- bumble/hci.py | 28 ++++----- examples/run_advertiser.py | 21 +++++-- tests/hci_test.py | 24 ++++---- tests/import_test.py | 51 ++++++++++++++++ 6 files changed, 192 insertions(+), 54 deletions(-) diff --git a/bumble/controller.py b/bumble/controller.py index dc21f881..1d238ed7 100644 --- a/bumble/controller.py +++ b/bumble/controller.py @@ -76,7 +76,7 @@ class Controller: self.supported_commands = bytes.fromhex('2000800000c000000000e40000002822000000000000040000f7ffff7f00000030f0f9ff01008004000000000000000000000000000000000000000000000000') self.le_features = bytes.fromhex('ff49010000000000') self.le_states = bytes.fromhex('ffff3fffff030000') - self.avertising_channel_tx_power = 0 + self.advertising_channel_tx_power = 0 self.filter_accept_list_size = 8 self.resolving_list_size = 8 self.supported_max_tx_octets = 27 @@ -650,7 +650,7 @@ class Controller: ''' See Bluetooth spec Vol 2, Part E - 7.8.6 LE Read Advertising Channel Tx Power Command ''' - return bytes([HCI_SUCCESS, self.avertising_channel_tx_power]) + return bytes([HCI_SUCCESS, self.advertising_channel_tx_power]) def on_hci_le_set_advertising_data_command(self, command): ''' diff --git a/bumble/device.py b/bumble/device.py index 5facc26f..b91bf2b9 100644 --- a/bumble/device.py +++ b/bumble/device.py @@ -15,6 +15,7 @@ # ----------------------------------------------------------------------------- # Imports # ----------------------------------------------------------------------------- +from enum import IntEnum import json import asyncio import logging @@ -66,6 +67,9 @@ DEVICE_MIN_SCAN_INTERVAL = 25 DEVICE_MAX_SCAN_INTERVAL = 10240 DEVICE_MIN_SCAN_WINDOW = 25 DEVICE_MAX_SCAN_WINDOW = 10240 +DEVICE_MIN_LE_RSSI = -127 +DEVICE_MAX_LE_RSSI = 20 + # ----------------------------------------------------------------------------- # Classes @@ -199,6 +203,45 @@ class AdvertisementDataAccumulator: return result +# ----------------------------------------------------------------------------- +class AdvertisingType(IntEnum): + UNDIRECTED_CONNECTABLE_SCANNABLE = 0x00 # Undirected, connectable, scannable + DIRECTED_CONNECTABLE_HIGH_DUTY = 0x01 # Directed, connectable, non-scannable + UNDIRECTED_SCANNABLE = 0x02 # Undirected, non-connectable, scannable + UNDIRECTED = 0x03 # Undirected, non-connectable, non-scannable + DIRECTED_CONNECTABLE_LOW_DUTY = 0x04 # Directed, connectable, non-scannable + + @property + def has_data(self): + return self in { + AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE, + AdvertisingType.UNDIRECTED_SCANNABLE, + AdvertisingType.UNDIRECTED + } + + @property + def is_connectable(self): + return self in { + AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE, + AdvertisingType.DIRECTED_CONNECTABLE_HIGH_DUTY, + AdvertisingType.DIRECTED_CONNECTABLE_LOW_DUTY + } + + @property + def is_scannable(self): + return self in { + AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE, + AdvertisingType.UNDIRECTED_SCANNABLE + } + + @property + def is_directed(self): + return self in { + AdvertisingType.DIRECTED_CONNECTABLE_HIGH_DUTY, + AdvertisingType.DIRECTED_CONNECTABLE_LOW_DUTY + } + + # ----------------------------------------------------------------------------- class Peer: def __init__(self, connection): @@ -298,7 +341,8 @@ class ConnectionParametersPreferences: min_ce_length: int = DEVICE_DEFAULT_CONNECTION_MIN_CE_LENGTH max_ce_length: int = DEVICE_DEFAULT_CONNECTION_MAX_CE_LENGTH -DEVICE_DEFAULT_CONNECTION_PARAMETER_PREFERENCES = ConnectionParametersPreferences() + +ConnectionParametersPreferences.default = ConnectionParametersPreferences() # ----------------------------------------------------------------------------- @@ -592,6 +636,7 @@ class Device(CompositeEventEmitter): self._host = None self.powered_on = False self.advertising = False + self.advertising_type = None self.auto_restart_advertising = False self.command_timeout = 10 # seconds self.gatt_server = gatt_server.Server(self) @@ -811,32 +856,48 @@ class Device(CompositeEventEmitter): return self.host.supports_le_feature(feature_map[phy]) - async def start_advertising(self, auto_restart=False): - self.auto_restart_advertising = auto_restart - + async def start_advertising( + self, + advertising_type=AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE, + target=None, + auto_restart=False + ): # If we're advertising, stop first if self.advertising: await self.stop_advertising() - # Set/update the advertising data - await self.send_command(HCI_LE_Set_Advertising_Data_Command( - advertising_data = self.advertising_data - ), check_result=True) + # Set/update the advertising data if the advertising type allows it + if advertising_type.has_data: + await self.send_command(HCI_LE_Set_Advertising_Data_Command( + advertising_data = self.advertising_data + ), check_result=True) - # Set/update the scan response data - await self.send_command(HCI_LE_Set_Scan_Response_Data_Command( - scan_response_data = self.scan_response_data - ), check_result=True) + # Set/update the scan response data if the advertising is scannable + if advertising_type.is_scannable: + await self.send_command(HCI_LE_Set_Scan_Response_Data_Command( + scan_response_data = self.scan_response_data + ), check_result=True) + + # Decide what peer address to use + if advertising_type.is_directed: + if target is None: + raise ValueError('directed advertising requires a target address') + + peer_address = target + peer_address_type = target.address_type + else: + peer_address = Address('00:00:00:00:00:00') + peer_address_type = Address.PUBLIC_DEVICE_ADDRESS # Set the advertising parameters await self.send_command(HCI_LE_Set_Advertising_Parameters_Command( # TODO: use real values, not fixed ones advertising_interval_min = self.advertising_interval_min, advertising_interval_max = self.advertising_interval_max, - advertising_type = HCI_LE_Set_Advertising_Parameters_Command.ADV_IND, + advertising_type = int(advertising_type), own_address_type = Address.RANDOM_DEVICE_ADDRESS, # TODO: allow using the public address - peer_address_type = Address.PUBLIC_DEVICE_ADDRESS, - peer_address = Address('00:00:00:00:00:00'), + peer_address_type = peer_address_type, + peer_address = peer_address, advertising_channel_map = 7, advertising_filter_policy = 0 ), check_result=True) @@ -846,7 +907,9 @@ class Device(CompositeEventEmitter): advertising_enable = 1 ), check_result=True) - self.advertising = True + self.auto_restart_advertising = auto_restart + self.advertising_type = advertising_type + self.advertising = True async def stop_advertising(self): # Disable advertising @@ -855,7 +918,9 @@ class Device(CompositeEventEmitter): advertising_enable = 0 ), check_result=True) - self.advertising = False + self.advertising = False + self.advertising_type = None + self.auto_restart_advertising = False @property def is_advertising(self): @@ -1088,9 +1153,9 @@ class Device(CompositeEventEmitter): if connection_parameters_preferences is None: if connection_parameters_preferences is None: connection_parameters_preferences = { - HCI_LE_1M_PHY: DEVICE_DEFAULT_CONNECTION_PARAMETER_PREFERENCES, - HCI_LE_2M_PHY: DEVICE_DEFAULT_CONNECTION_PARAMETER_PREFERENCES, - HCI_LE_CODED_PHY: DEVICE_DEFAULT_CONNECTION_PARAMETER_PREFERENCES + HCI_LE_1M_PHY: ConnectionParametersPreferences.default, + HCI_LE_2M_PHY: ConnectionParametersPreferences.default, + HCI_LE_CODED_PHY: ConnectionParametersPreferences.default } if self.host.supports_command(HCI_LE_EXTENDED_CREATE_CONNECTION_COMMAND): @@ -1610,7 +1675,13 @@ class Device(CompositeEventEmitter): @host_event_handler def on_connection_failure(self, connection_handle, error_code): - logger.debug(f'*** Connection failed: {error_code}') + logger.debug(f'*** Connection failed: {HCI_Constant.error_name(error_code)}') + + # For directed advertising, this means a timeout + if self.advertising and self.advertising_type.is_directed: + self.advertising = False + + # Notify listeners error = ConnectionError( error_code, 'hci', @@ -1633,7 +1704,10 @@ class Device(CompositeEventEmitter): # Restart advertising if auto-restart is enabled if self.auto_restart_advertising: logger.debug('restarting advertising') - asyncio.create_task(self.start_advertising(auto_restart=self.auto_restart_advertising)) + asyncio.create_task(self.start_advertising( + advertising_type = self.advertising_type, + auto_restart = True + )) @host_event_handler @with_connection_from_handle diff --git a/bumble/hci.py b/bumble/hci.py index 5f6af92c..065901bb 100644 --- a/bumble/hci.py +++ b/bumble/hci.py @@ -1491,7 +1491,7 @@ class HCI_Object: def dict_to_bytes(object, fields): result = bytearray() for (field_name, field_type) in fields: - # The field_type may be a dictionnary with a mapper, parser, serializer, and/or size + # The field_type may be a dictionary with a mapper, parser, serializer, and/or size serializer = None if type(field_type) is dict: if 'serializer' in field_type: @@ -2826,18 +2826,18 @@ class HCI_LE_Set_Advertising_Parameters_Command(HCI_Command): See Bluetooth spec @ 7.8.5 LE Set Advertising Parameters Command ''' - ADV_IND = 0x00 - ADV_DIRECT_IND = 0x01 - ADV_SCAN_IND = 0x02 - ADV_NONCONN_IND = 0x03 - ADV_DIRECT_IND = 0x04 + ADV_IND = 0x00 + ADV_DIRECT_IND = 0x01 + ADV_SCAN_IND = 0x02 + ADV_NONCONN_IND = 0x03 + ADV_DIRECT_IND_LOW_DUTY = 0x04 ADVERTISING_TYPE_NAMES = { - ADV_IND: 'ADV_IND', - ADV_DIRECT_IND: 'ADV_DIRECT_IND', - ADV_SCAN_IND: 'ADV_SCAN_IND', - ADV_NONCONN_IND: 'ADV_NONCONN_IND', - ADV_DIRECT_IND: 'ADV_DIRECT_IND' + ADV_IND: 'ADV_IND', + ADV_DIRECT_IND: 'ADV_DIRECT_IND', + ADV_SCAN_IND: 'ADV_SCAN_IND', + ADV_NONCONN_IND: 'ADV_NONCONN_IND', + ADV_DIRECT_IND_LOW_DUTY: 'ADV_DIRECT_IND_LOW_DUTY' } @classmethod @@ -4346,7 +4346,7 @@ class HCI_Encryption_Change_Event(HCI_Event): E0_OR_AES_CCM = 0x01 AES_CCM = 0x02 - ENCYRPTION_ENABLED_NAMES = { + ENCRYPTION_ENABLED_NAMES = { OFF: 'OFF', E0_OR_AES_CCM: 'E0_OR_AES_CCM', AES_CCM: 'AES_CCM' @@ -4354,7 +4354,7 @@ class HCI_Encryption_Change_Event(HCI_Event): @staticmethod def encryption_enabled_name(encryption_enabled): - return name_or_number(HCI_Encryption_Change_Event.ENCYRPTION_ENABLED_NAMES, encryption_enabled) + return name_or_number(HCI_Encryption_Change_Event.ENCRYPTION_ENABLED_NAMES, encryption_enabled) # ----------------------------------------------------------------------------- @@ -4701,7 +4701,7 @@ class HCI_Synchronous_Connection_Complete_Event(HCI_Event): U_LAW_LOG_AIR_MODE: 'u-law log', A_LAW_LOG_AIR_MORE: 'A-law log', CVSD_AIR_MODE: 'CVSD', - TRANSPARENT_DATA_AIR_MODE: 'Transparend Data' + TRANSPARENT_DATA_AIR_MODE: 'Transparent Data' } @staticmethod diff --git a/examples/run_advertiser.py b/examples/run_advertiser.py index 52013562..e54bc37e 100644 --- a/examples/run_advertiser.py +++ b/examples/run_advertiser.py @@ -29,18 +29,31 @@ from bumble.transport import open_transport_or_link # ----------------------------------------------------------------------------- async def main(): - if len(sys.argv) != 3: - print('Usage: run_advertiser.py ') - print('example: run_advertiser.py device1.json link-relay:ws://localhost:8888/test') + if len(sys.argv) < 3: + print('Usage: run_advertiser.py [type] [address]') + print('example: run_advertiser.py device1.json usb:0') return + if len(sys.argv) >= 4: + advertising_type = AdvertisingType(int(sys.argv[3])) + else: + advertising_type = AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE + + if advertising_type.is_directed: + if len(sys.argv) < 5: + print('
required for directed advertising') + return + target = Address(sys.argv[4]) + else: + target = None + print('<<< connecting to HCI...') async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): print('<<< connected') device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) await device.power_on() - await device.start_advertising() + await device.start_advertising(advertising_type=advertising_type, target=target) await hci_source.wait_for_termination() # ----------------------------------------------------------------------------- diff --git a/tests/hci_test.py b/tests/hci_test.py index 805f1584..370b4889 100644 --- a/tests/hci_test.py +++ b/tests/hci_test.py @@ -27,8 +27,8 @@ def basic_check(x): parsed_str = str(parsed) print(x_str) parsed_bytes = parsed.to_bytes() - assert(x_str == parsed_str) - assert(packet == parsed_bytes) + assert x_str == parsed_str + assert packet == parsed_bytes # ----------------------------------------------------------------------------- @@ -133,7 +133,7 @@ def test_HCI_Command_Complete_Event(): ) basic_check(event) event = HCI_Packet.from_bytes(event.to_bytes()) - assert(event.return_parameters == 7) + assert event.return_parameters == 7 # With a simple status as an integer status event = HCI_Command_Complete_Event( @@ -142,7 +142,7 @@ def test_HCI_Command_Complete_Event(): return_parameters = 9 ) basic_check(event) - assert(event.return_parameters == 9) + assert event.return_parameters == 9 # ----------------------------------------------------------------------------- @@ -383,20 +383,20 @@ def test_HCI_LE_Set_Extended_Scan_Parameters_Command(): # ----------------------------------------------------------------------------- def test_address(): a = Address('C4:F2:17:1A:1D:BB') - assert(not a.is_public) - assert(a.is_random) - assert(a.address_type == Address.RANDOM_DEVICE_ADDRESS) - assert(not a.is_resolvable) - assert(not a.is_resolved) - assert(a.is_static) + assert not a.is_public + assert a.is_random + assert a.address_type == Address.RANDOM_DEVICE_ADDRESS + assert not a.is_resolvable + assert not a.is_resolved + assert a.is_static # ----------------------------------------------------------------------------- def test_custom(): data = bytes([0x77, 0x02, 0x01, 0x03]) packet = HCI_CustomPacket(data) - assert(packet.hci_packet_type == 0x77) - assert(packet.payload == data) + assert packet.hci_packet_type == 0x77 + assert packet.payload == data # ----------------------------------------------------------------------------- diff --git a/tests/import_test.py b/tests/import_test.py index d6eafbda..c016c3e4 100644 --- a/tests/import_test.py +++ b/tests/import_test.py @@ -62,6 +62,57 @@ def test_import(): assert utils +# ----------------------------------------------------------------------------- +def test_app_imports(): + from bumble.apps.console import main + assert main + + from bumble.apps.controller_info import main + assert main + + from bumble.apps.controllers import main + assert main + + from bumble.apps.gatt_dump import main + assert main + + from bumble.apps.gg_bridge import main + assert main + + from bumble.apps.hci_bridge import main + assert main + + from bumble.apps.pair import main + assert main + + from bumble.apps.scan import main + assert main + + from bumble.apps.show import main + assert main + + from bumble.apps.unbond import main + assert main + + from bumble.apps.usb_probe import main + assert main + + +# ----------------------------------------------------------------------------- +def test_profiles_imports(): + from bumble.profiles import ( + battery_service, + device_information_service, + heart_rate_service + ) + + assert battery_service + assert device_information_service + assert heart_rate_service + + # ----------------------------------------------------------------------------- if __name__ == '__main__': test_import() + test_app_imports() + test_profiles_imports()