This commit is contained in:
Gilles Boccon-Gibod
2022-08-04 14:08:24 -07:00
parent 4d96b821bc
commit 7c4b042026
3 changed files with 141 additions and 21 deletions

View File

@@ -25,15 +25,21 @@ from bumble.company_ids import COMPANY_IDENTIFIERS
from bumble.core import name_or_number from bumble.core import name_or_number
from bumble.hci import ( from bumble.hci import (
map_null_terminated_utf8_string, map_null_terminated_utf8_string,
HCI_LE_SUPPORTED_FEATURES_NAMES,
HCI_SUCCESS, HCI_SUCCESS,
HCI_LE_SUPPORTED_FEATURES_NAMES,
HCI_VERSION_NAMES, HCI_VERSION_NAMES,
LMP_VERSION_NAMES, LMP_VERSION_NAMES,
HCI_Command, HCI_Command,
HCI_Read_BD_ADDR_Command,
HCI_READ_BD_ADDR_COMMAND, HCI_READ_BD_ADDR_COMMAND,
HCI_Read_BD_ADDR_Command,
HCI_READ_LOCAL_NAME_COMMAND,
HCI_Read_Local_Name_Command, HCI_Read_Local_Name_Command,
HCI_READ_LOCAL_NAME_COMMAND HCI_LE_READ_MAXIMUM_DATA_LENGTH_COMMAND,
HCI_LE_Read_Maximum_Data_Length_Command,
HCI_LE_READ_NUMBER_OF_SUPPORTED_ADVERTISING_SETS_COMMAND,
HCI_LE_Read_Number_Of_Supported_Advertising_Sets_Command,
HCI_LE_READ_MAXIMUM_ADVERTISING_DATA_LENGTH_COMMAND,
HCI_LE_Read_Maximum_Advertising_Data_Length_Command
) )
from bumble.host import Host from bumble.host import Host
from bumble.transport import open_transport_or_link from bumble.transport import open_transport_or_link
@@ -57,6 +63,39 @@ async def get_classic_info(host):
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
async def get_le_info(host): async def get_le_info(host):
print() print()
if host.supports_command(HCI_LE_READ_NUMBER_OF_SUPPORTED_ADVERTISING_SETS_COMMAND):
response = await host.send_command(HCI_LE_Read_Number_Of_Supported_Advertising_Sets_Command())
if response.return_parameters.status == HCI_SUCCESS:
print(
color('LE Number Of Supported Advertising Sets:', 'yellow'),
response.return_parameters.num_supported_advertising_sets,
'\n'
)
if host.supports_command(HCI_LE_READ_MAXIMUM_ADVERTISING_DATA_LENGTH_COMMAND):
response = await host.send_command(HCI_LE_Read_Maximum_Advertising_Data_Length_Command())
if response.return_parameters.status == HCI_SUCCESS:
print(
color('LE Maximum Advertising Data Length:', 'yellow'),
response.return_parameters.max_advertising_data_length,
'\n'
)
if host.supports_command(HCI_LE_READ_MAXIMUM_DATA_LENGTH_COMMAND):
response = await host.send_command(HCI_LE_Read_Maximum_Data_Length_Command())
if response.return_parameters.status == HCI_SUCCESS:
print(
color('Maximum Data Length:', 'yellow'),
(
f'tx:{response.return_parameters.supported_max_tx_octets}/'
f'{response.return_parameters.supported_max_tx_time}, '
f'rx:{response.return_parameters.supported_max_rx_octets}/'
f'{response.return_parameters.supported_max_rx_time}'
),
'\n'
)
print(color('LE Features:', 'yellow')) print(color('LE Features:', 'yellow'))
for feature in host.supported_le_features: for feature in host.supported_le_features:
print(' ', name_or_number(HCI_LE_SUPPORTED_FEATURES_NAMES, feature)) print(' ', name_or_number(HCI_LE_SUPPORTED_FEATURES_NAMES, feature))

View File

@@ -644,6 +644,9 @@ class Device(CompositeEventEmitter):
# Done # Done
self.powered_on = True self.powered_on = True
def supports_le_feature(self, feature):
return self.host.supports_le_feature(feature)
async def start_advertising(self, auto_restart=False): async def start_advertising(self, auto_restart=False):
self.auto_restart_advertising = auto_restart self.auto_restart_advertising = auto_restart
@@ -710,6 +713,34 @@ class Device(CompositeEventEmitter):
if scan_window < DEVICE_MIN_SCAN_WINDOW or scan_window > DEVICE_MAX_SCAN_WINDOW: if scan_window < DEVICE_MIN_SCAN_WINDOW or scan_window > DEVICE_MAX_SCAN_WINDOW:
raise ValueError('scan_interval out of range') raise ValueError('scan_interval out of range')
if self.supports_le_feature(HCI_LE_EXTENDED_ADVERTISING_LE_SUPPORTED_FEATURE):
# Set the scanning parameters
scan_type = HCI_LE_Set_Extended_Scan_Parameters_Command.ACTIVE_SCANNING if active else HCI_LE_Set_Extended_Scan_Parameters_Command.PASSIVE_SCANNING
scanning_filter_policy = HCI_LE_Set_Extended_Scan_Parameters_Command.BASIC_UNFILTERED_POLICY # TODO: support other types
scanning_phys = 1 << HCI_LE_Set_Extended_Scan_Parameters_Command.LE_1M_PHY
scanning_phy_count = 1
# if self.supports_le_feature(HCI_LE_CODED_PHY_LE_SUPPORTED_FEATURE):
# scanning_phys |= 1 << HCI_LE_Set_Extended_Scan_Parameters_Command.LE_CODED_PHY
# scanning_phy_count += 1
await self.send_command(HCI_LE_Set_Extended_Scan_Parameters_Command(
own_address_type = own_address_type,
scanning_filter_policy = scanning_filter_policy,
scanning_phys = scanning_phys,
scan_types = [scan_type] * scanning_phy_count,
scan_intervals = [int(scan_window / 0.625)] * scanning_phy_count,
scan_windows = [int(scan_window / 0.625)] * scanning_phy_count
))
# Enable scanning
await self.send_command(HCI_LE_Set_Extended_Scan_Enable_Command(
enable = 1,
filter_duplicates = 1 if filter_duplicates else 0,
duration = 0, # TODO allow other values
period = 0 # TODO allow other values
))
else:
# Set the scanning parameters # Set the scanning parameters
scan_type = HCI_LE_Set_Scan_Parameters_Command.ACTIVE_SCANNING if active else HCI_LE_Set_Scan_Parameters_Command.PASSIVE_SCANNING scan_type = HCI_LE_Set_Scan_Parameters_Command.ACTIVE_SCANNING if active else HCI_LE_Set_Scan_Parameters_Command.PASSIVE_SCANNING
await self.send_command(HCI_LE_Set_Scan_Parameters_Command( await self.send_command(HCI_LE_Set_Scan_Parameters_Command(
@@ -725,6 +756,7 @@ class Device(CompositeEventEmitter):
le_scan_enable = 1, le_scan_enable = 1,
filter_duplicates = 1 if filter_duplicates else 0 filter_duplicates = 1 if filter_duplicates else 0
)) ))
self.scanning = True self.scanning = True
async def stop_scanning(self): async def stop_scanning(self):

View File

@@ -3088,6 +3088,42 @@ class HCI_LE_Set_Resolvable_Private_Address_Timeout_Command(HCI_Command):
''' '''
# -----------------------------------------------------------------------------
@HCI_Command.command(return_parameters_fields=[
('status', STATUS_SPEC),
('supported_max_tx_octets', 2),
('supported_max_tx_time', 2),
('supported_max_rx_octets', 2),
('supported_max_rx_time', 2)
])
class HCI_LE_Read_Maximum_Data_Length_Command(HCI_Command):
'''
See Bluetooth spec @ 7.8.46 LE Read Maximum Data Length Command
'''
# -----------------------------------------------------------------------------
@HCI_Command.command(return_parameters_fields=[
('status', STATUS_SPEC),
('max_advertising_data_length', 2)
])
class HCI_LE_Read_Maximum_Advertising_Data_Length_Command(HCI_Command):
'''
See Bluetooth spec @ 7.8.57 LE Read Maximum Advertising Data Length Command
'''
# -----------------------------------------------------------------------------
@HCI_Command.command(return_parameters_fields=[
('status', STATUS_SPEC),
('num_supported_advertising_sets', 1)
])
class HCI_LE_Read_Number_Of_Supported_Advertising_Sets_Command(HCI_Command):
'''
See Bluetooth spec @ 7.8.58 LE Read Number of Supported Advertising Sets Command
'''
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@HCI_Command.command(fields=None) @HCI_Command.command(fields=None)
class HCI_LE_Set_Extended_Scan_Parameters_Command(HCI_Command): class HCI_LE_Set_Extended_Scan_Parameters_Command(HCI_Command):
@@ -3116,9 +3152,9 @@ class HCI_LE_Set_Extended_Scan_Parameters_Command(HCI_Command):
scan_intervals = [] scan_intervals = []
scan_windows = [] scan_windows = []
for i in range(phy_bits_set): for i in range(phy_bits_set):
scan_types.append(parameters[3 + (3 * i)]) scan_types.append(parameters[3 + (5 * i)])
scan_intervals.append(parameters[3 + (3 * i) + 1]) scan_intervals.append(struct.unpack_from('<H', parameters, 3 + (5 * i) + 1)[0])
scan_windows.append(parameters[3 + (3 * i) + 2]) scan_windows.append(struct.unpack_from('<H', parameters, 3 + (5 * i) + 3)[0])
return HCI_LE_Set_Extended_Scan_Parameters_Command( return HCI_LE_Set_Extended_Scan_Parameters_Command(
own_address_type = own_address_type, own_address_type = own_address_type,
@@ -3149,7 +3185,7 @@ class HCI_LE_Set_Extended_Scan_Parameters_Command(HCI_Command):
self.parameters = bytes([own_address_type, scanning_filter_policy, scanning_phys]) self.parameters = bytes([own_address_type, scanning_filter_policy, scanning_phys])
phy_bits_set = bin(scanning_phys).count('1') phy_bits_set = bin(scanning_phys).count('1')
for i in range(phy_bits_set): for i in range(phy_bits_set):
self.parameters += bytes([scan_types[i], scan_intervals[i], scan_windows[i]]) self.parameters += struct.pack('<BHH', scan_types[i], scan_intervals[i], scan_windows[i])
def __str__(self): def __str__(self):
scanning_phys_strs = [] scanning_phys_strs = []
@@ -3178,6 +3214,19 @@ class HCI_LE_Set_Extended_Scan_Parameters_Command(HCI_Command):
) )
# -----------------------------------------------------------------------------
@HCI_Command.command([
('enable', 1),
('filter_duplicates', 1),
('duration', 2),
('period', 2)
])
class HCI_LE_Set_Extended_Scan_Enable_Command(HCI_Command):
'''
See Bluetooth spec @ 7.8.65 LE Set Extended Scan Enable Command
'''
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# HCI Events # HCI Events
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------