From 7c4b042026371b4aa481a8a25517468b19088707 Mon Sep 17 00:00:00 2001 From: Gilles Boccon-Gibod Date: Thu, 4 Aug 2022 14:08:24 -0700 Subject: [PATCH] wip --- apps/controller_info.py | 45 ++++++++++++++++++++++++++++--- bumble/device.py | 60 +++++++++++++++++++++++++++++++---------- bumble/hci.py | 57 ++++++++++++++++++++++++++++++++++++--- 3 files changed, 141 insertions(+), 21 deletions(-) diff --git a/apps/controller_info.py b/apps/controller_info.py index 74d45504..b65caab0 100644 --- a/apps/controller_info.py +++ b/apps/controller_info.py @@ -25,15 +25,21 @@ from bumble.company_ids import COMPANY_IDENTIFIERS from bumble.core import name_or_number from bumble.hci import ( map_null_terminated_utf8_string, - HCI_LE_SUPPORTED_FEATURES_NAMES, HCI_SUCCESS, + HCI_LE_SUPPORTED_FEATURES_NAMES, HCI_VERSION_NAMES, LMP_VERSION_NAMES, HCI_Command, - HCI_Read_BD_ADDR_Command, HCI_READ_BD_ADDR_COMMAND, + HCI_Read_BD_ADDR_Command, + HCI_READ_LOCAL_NAME_COMMAND, HCI_Read_Local_Name_Command, - HCI_READ_LOCAL_NAME_COMMAND + HCI_LE_READ_MAXIMUM_DATA_LENGTH_COMMAND, + HCI_LE_Read_Maximum_Data_Length_Command, + HCI_LE_READ_NUMBER_OF_SUPPORTED_ADVERTISING_SETS_COMMAND, + HCI_LE_Read_Number_Of_Supported_Advertising_Sets_Command, + HCI_LE_READ_MAXIMUM_ADVERTISING_DATA_LENGTH_COMMAND, + HCI_LE_Read_Maximum_Advertising_Data_Length_Command ) from bumble.host import Host from bumble.transport import open_transport_or_link @@ -57,6 +63,39 @@ async def get_classic_info(host): # ----------------------------------------------------------------------------- async def get_le_info(host): print() + + if host.supports_command(HCI_LE_READ_NUMBER_OF_SUPPORTED_ADVERTISING_SETS_COMMAND): + response = await host.send_command(HCI_LE_Read_Number_Of_Supported_Advertising_Sets_Command()) + if response.return_parameters.status == HCI_SUCCESS: + print( + color('LE Number Of Supported Advertising Sets:', 'yellow'), + response.return_parameters.num_supported_advertising_sets, + '\n' + ) + + if host.supports_command(HCI_LE_READ_MAXIMUM_ADVERTISING_DATA_LENGTH_COMMAND): + response = await host.send_command(HCI_LE_Read_Maximum_Advertising_Data_Length_Command()) + if response.return_parameters.status == HCI_SUCCESS: + print( + color('LE Maximum Advertising Data Length:', 'yellow'), + response.return_parameters.max_advertising_data_length, + '\n' + ) + + if host.supports_command(HCI_LE_READ_MAXIMUM_DATA_LENGTH_COMMAND): + response = await host.send_command(HCI_LE_Read_Maximum_Data_Length_Command()) + if response.return_parameters.status == HCI_SUCCESS: + print( + color('Maximum Data Length:', 'yellow'), + ( + f'tx:{response.return_parameters.supported_max_tx_octets}/' + f'{response.return_parameters.supported_max_tx_time}, ' + f'rx:{response.return_parameters.supported_max_rx_octets}/' + f'{response.return_parameters.supported_max_rx_time}' + ), + '\n' + ) + print(color('LE Features:', 'yellow')) for feature in host.supported_le_features: print(' ', name_or_number(HCI_LE_SUPPORTED_FEATURES_NAMES, feature)) diff --git a/bumble/device.py b/bumble/device.py index f5bd0618..dbc75fee 100644 --- a/bumble/device.py +++ b/bumble/device.py @@ -644,6 +644,9 @@ class Device(CompositeEventEmitter): # Done self.powered_on = True + def supports_le_feature(self, feature): + return self.host.supports_le_feature(feature) + async def start_advertising(self, auto_restart=False): self.auto_restart_advertising = auto_restart @@ -710,21 +713,50 @@ class Device(CompositeEventEmitter): if scan_window < DEVICE_MIN_SCAN_WINDOW or scan_window > DEVICE_MAX_SCAN_WINDOW: raise ValueError('scan_interval out of range') - # Set the scanning parameters - scan_type = HCI_LE_Set_Scan_Parameters_Command.ACTIVE_SCANNING if active else HCI_LE_Set_Scan_Parameters_Command.PASSIVE_SCANNING - await self.send_command(HCI_LE_Set_Scan_Parameters_Command( - le_scan_type = scan_type, - le_scan_interval = int(scan_window / 0.625), - le_scan_window = int(scan_window / 0.625), - own_address_type = own_address_type, - scanning_filter_policy = HCI_LE_Set_Scan_Parameters_Command.BASIC_UNFILTERED_POLICY - )) + if self.supports_le_feature(HCI_LE_EXTENDED_ADVERTISING_LE_SUPPORTED_FEATURE): + # Set the scanning parameters + scan_type = HCI_LE_Set_Extended_Scan_Parameters_Command.ACTIVE_SCANNING if active else HCI_LE_Set_Extended_Scan_Parameters_Command.PASSIVE_SCANNING + scanning_filter_policy = HCI_LE_Set_Extended_Scan_Parameters_Command.BASIC_UNFILTERED_POLICY # TODO: support other types + + scanning_phys = 1 << HCI_LE_Set_Extended_Scan_Parameters_Command.LE_1M_PHY + scanning_phy_count = 1 + # if self.supports_le_feature(HCI_LE_CODED_PHY_LE_SUPPORTED_FEATURE): + # scanning_phys |= 1 << HCI_LE_Set_Extended_Scan_Parameters_Command.LE_CODED_PHY + # scanning_phy_count += 1 + + await self.send_command(HCI_LE_Set_Extended_Scan_Parameters_Command( + own_address_type = own_address_type, + scanning_filter_policy = scanning_filter_policy, + scanning_phys = scanning_phys, + scan_types = [scan_type] * scanning_phy_count, + scan_intervals = [int(scan_window / 0.625)] * scanning_phy_count, + scan_windows = [int(scan_window / 0.625)] * scanning_phy_count + )) + + # Enable scanning + await self.send_command(HCI_LE_Set_Extended_Scan_Enable_Command( + enable = 1, + filter_duplicates = 1 if filter_duplicates else 0, + duration = 0, # TODO allow other values + period = 0 # TODO allow other values + )) + else: + # Set the scanning parameters + scan_type = HCI_LE_Set_Scan_Parameters_Command.ACTIVE_SCANNING if active else HCI_LE_Set_Scan_Parameters_Command.PASSIVE_SCANNING + await self.send_command(HCI_LE_Set_Scan_Parameters_Command( + le_scan_type = scan_type, + le_scan_interval = int(scan_window / 0.625), + le_scan_window = int(scan_window / 0.625), + own_address_type = own_address_type, + scanning_filter_policy = HCI_LE_Set_Scan_Parameters_Command.BASIC_UNFILTERED_POLICY + )) + + # Enable scanning + await self.send_command(HCI_LE_Set_Scan_Enable_Command( + le_scan_enable = 1, + filter_duplicates = 1 if filter_duplicates else 0 + )) - # Enable scanning - await self.send_command(HCI_LE_Set_Scan_Enable_Command( - le_scan_enable = 1, - filter_duplicates = 1 if filter_duplicates else 0 - )) self.scanning = True async def stop_scanning(self): diff --git a/bumble/hci.py b/bumble/hci.py index 9858c04d..c8aed197 100644 --- a/bumble/hci.py +++ b/bumble/hci.py @@ -3088,6 +3088,42 @@ class HCI_LE_Set_Resolvable_Private_Address_Timeout_Command(HCI_Command): ''' +# ----------------------------------------------------------------------------- +@HCI_Command.command(return_parameters_fields=[ + ('status', STATUS_SPEC), + ('supported_max_tx_octets', 2), + ('supported_max_tx_time', 2), + ('supported_max_rx_octets', 2), + ('supported_max_rx_time', 2) +]) +class HCI_LE_Read_Maximum_Data_Length_Command(HCI_Command): + ''' + See Bluetooth spec @ 7.8.46 LE Read Maximum Data Length Command + ''' + + +# ----------------------------------------------------------------------------- +@HCI_Command.command(return_parameters_fields=[ + ('status', STATUS_SPEC), + ('max_advertising_data_length', 2) +]) +class HCI_LE_Read_Maximum_Advertising_Data_Length_Command(HCI_Command): + ''' + See Bluetooth spec @ 7.8.57 LE Read Maximum Advertising Data Length Command + ''' + + +# ----------------------------------------------------------------------------- +@HCI_Command.command(return_parameters_fields=[ + ('status', STATUS_SPEC), + ('num_supported_advertising_sets', 1) +]) +class HCI_LE_Read_Number_Of_Supported_Advertising_Sets_Command(HCI_Command): + ''' + See Bluetooth spec @ 7.8.58 LE Read Number of Supported Advertising Sets Command + ''' + + # ----------------------------------------------------------------------------- @HCI_Command.command(fields=None) class HCI_LE_Set_Extended_Scan_Parameters_Command(HCI_Command): @@ -3116,9 +3152,9 @@ class HCI_LE_Set_Extended_Scan_Parameters_Command(HCI_Command): scan_intervals = [] scan_windows = [] for i in range(phy_bits_set): - scan_types.append(parameters[3 + (3 * i)]) - scan_intervals.append(parameters[3 + (3 * i) + 1]) - scan_windows.append(parameters[3 + (3 * i) + 2]) + scan_types.append(parameters[3 + (5 * i)]) + scan_intervals.append(struct.unpack_from('