This commit is contained in:
Gilles Boccon-Gibod
2022-08-11 18:41:46 -07:00
parent de7e74652d
commit c316e8805f
2 changed files with 152 additions and 5 deletions

View File

@@ -166,6 +166,7 @@ class AdvertisementDataAccumulator:
self.advertisement = Advertisement.from_advertising_report(report)
self.advertisement.data = AdvertisingData.from_bytes(self.data + report.data)
self.advertisement.is_connectable = (self.last_event_type == HCI_LE_Advertising_Report_Event.ADV_IND)
self.advertisement.is_scannable = True
else:
# Unexpected scan response
self.advertisement = None
@@ -826,9 +827,9 @@ class Device(CompositeEventEmitter):
scanning_phys = 1 << HCI_LE_Set_Extended_Scan_Parameters_Command.LE_1M_PHY
scanning_phy_count = 1
# if self.supports_le_feature(HCI_LE_CODED_PHY_LE_SUPPORTED_FEATURE):
# scanning_phys |= 1 << HCI_LE_Set_Extended_Scan_Parameters_Command.LE_CODED_PHY
# scanning_phy_count += 1
if self.supports_le_feature(HCI_LE_CODED_PHY_LE_SUPPORTED_FEATURE):
scanning_phys |= 1 << HCI_LE_Set_Extended_Scan_Parameters_Command.LE_CODED_PHY
scanning_phy_count += 1
await self.send_command(HCI_LE_Set_Extended_Scan_Parameters_Command(
own_address_type = own_address_type,
@@ -863,8 +864,8 @@ class Device(CompositeEventEmitter):
filter_duplicates = 1 if filter_duplicates else 0
))
self.scanning = True
self.scanning_is_passive = not active
self.scanning = True
async def stop_scanning(self):
await self.send_command(HCI_LE_Set_Scan_Enable_Command(

View File

@@ -670,7 +670,7 @@ HCI_LE_CODED_PHY = 3
HCI_LE_PHY_NAMES = {
HCI_LE_1M_PHY: 'LE 1M',
HCI_LE_2M_PHY: 'L2 2M',
HCI_LE_2M_PHY: 'LE 2M',
HCI_LE_CODED_PHY: 'LE Coded'
}
@@ -3045,6 +3045,18 @@ class HCI_LE_Set_Data_Length_Command(HCI_Command):
'''
# -----------------------------------------------------------------------------
@HCI_Command.command(return_parameters_fields=[
('status', STATUS_SPEC),
('suggested_max_tx_octets', 2),
('suggested_max_tx_time', 2),
])
class HCI_LE_Read_Suggested_Default_Data_Length_Command(HCI_Command):
'''
See Bluetooth spec @ 7.8.34 LE Read Suggested Default Data Length Command
'''
# -----------------------------------------------------------------------------
@HCI_Command.command([
('suggested_max_tx_octets', 2),
@@ -3491,6 +3503,140 @@ class HCI_LE_Set_Extended_Scan_Enable_Command(HCI_Command):
'''
# -----------------------------------------------------------------------------
@HCI_Command.command(fields=None)
class HCI_LE_Extended_Create_Connection_Command(HCI_Command):
'''
See Bluetooth spec @ 7.8.66 LE Extended Create Connection Command
'''
LE_1M_PHY = 0x00
LE_2M_PHY = 0x01
LE_CODED_PHY = 0x02
INITIATING_PHY_NAMES = ['LE_1M_PHY', 'LE_2M_PHY', 'LE_CODED_PHY']
@classmethod
def from_parameters(cls, parameters):
initiator_filter_policy = parameters[0]
own_address_type = parameters[1]
peer_address = Address.parse_address_preceded_by_type(parameters, 2)[1]
initiating_phys = parameters[9]
phy_bits_set = bin(initiating_phys).count('1')
def read_parameter_list(offset):
return [struct.unpack_from('<H', parameters, offset + 16 * i)[0] for i in range(phy_bits_set)]
return cls(
initiator_filter_policy = initiator_filter_policy,
own_address_type = own_address_type,
peer_address = peer_address,
initiating_phys = initiating_phys,
scan_intervals = read_parameter_list(10),
scan_windows = read_parameter_list(12),
connection_interval_mins = read_parameter_list(14),
connection_interval_maxs = read_parameter_list(16),
max_latencies = read_parameter_list(18),
supervision_timeouts = read_parameter_list(20),
min_ce_lengths = read_parameter_list(22),
max_ce_lengths = read_parameter_list(24)
)
def __init__(
self,
initiator_filter_policy,
own_address_type,
peer_address,
initiating_phys,
scan_intervals,
scan_windows,
connection_interval_mins,
connection_interval_maxs,
max_latencies,
supervision_timeouts,
min_ce_lengths,
max_ce_lengths,
):
super().__init__(HCI_LE_EXTENDED_CREATE_CONNECTION_COMMAND)
self.initiator_filter_policy = initiator_filter_policy
self.own_address_type = own_address_type
self.peer_address = peer_address
self.initiating_phys = initiating_phys
self.scan_intervals = scan_intervals
self.scan_windows = scan_windows
self.connection_interval_mins = connection_interval_mins
self.connection_interval_maxs = connection_interval_maxs
self.max_latencies = max_latencies
self.supervision_timeouts = supervision_timeouts
self.min_ce_lengths = min_ce_lengths
self.max_ce_lengths = max_ce_lengths
self.parameters = bytes([
initiator_filter_policy,
own_address_type,
peer_address.address_type
]) + bytes(peer_address) + bytes([initiating_phys])
phy_bits_set = bin(initiating_phys).count('1')
for i in range(phy_bits_set):
self.parameters += struct.pack(
'<HHHHHHHH',
scan_intervals[i],
scan_windows[i],
connection_interval_mins[i],
connection_interval_maxs[i],
max_latencies[i],
supervision_timeouts[i],
min_ce_lengths[i],
max_ce_lengths[i]
)
def __str__(self):
initiating_phys_strs = bit_flags_to_strings(self.initiating_phys, self.INITIATING_PHY_NAMES)
fields = [
('own_address_type:', Address.address_type_name(self.own_address_type)),
('scanning_phys: ', ','.join(initiating_phys_strs)),
]
for (i, initiating_phys_str) in enumerate(initiating_phys_strs):
fields.append((f'{initiating_phys_str}.scan_interval: ', self.scan_intervals[i])),
fields.append((f'{initiating_phys_str}.scan_window: ', self.scan_windows[i])),
fields.append((f'{initiating_phys_str}.connection_interval_min:', self.connection_interval_mins[i])),
fields.append((f'{initiating_phys_str}.connection_interval_max:', self.connection_interval_maxs[i])),
fields.append((f'{initiating_phys_str}.max_latency: ', self.max_latencies[i])),
fields.append((f'{initiating_phys_str}.supervision_timeout: ', self.supervision_timeouts[i])),
fields.append((f'{initiating_phys_str}.min_ce_length: ', self.min_ce_lengths[i])),
fields.append((f'{initiating_phys_str}.max_ce_length: ', self.max_ce_lengths[i]))
return color(self.name, 'green') + ':\n' + '\n'.join(
[color(field[0], 'cyan') + ' ' + str(field[1]) for field in fields]
)
# -----------------------------------------------------------------------------
@HCI_Command.command([
('peer_identity_address_type', Address.ADDRESS_TYPE_SPEC),
('peer_identity_address', Address.parse_address_preceded_by_type),
('privacy_mode', {'size': 1, 'mapper': lambda x: HCI_LE_Set_Privacy_Mode_Command.privacy_mode_name(x)})
])
class HCI_LE_Set_Privacy_Mode_Command(HCI_Command):
'''
See Bluetooth spec @ 7.8.77 LE Set Privacy Mode Command
'''
NETWORK_PRIVACY_MODE = 0x00
DEVICE_PRIVACY_MODE = 0x01
PRIVACY_MODE_NAMES = {
NETWORK_PRIVACY_MODE: 'NETWORK_PRIVACY_MODE',
DEVICE_PRIVACY_MODE: 'DEVICE_PRIVACY_MODE'
}
@classmethod
def privacy_mode_name(cls, privacy_mode):
return name_or_number(cls.PRIVACY_MODE_NAMES, privacy_mode)
# -----------------------------------------------------------------------------
@HCI_Command.command([
('bit_number', 1),