more HCI commands

This commit is contained in:
Gilles Boccon-Gibod
2022-08-06 14:08:24 -07:00
parent 31edd58b3d
commit de7e74652d
8 changed files with 540 additions and 149 deletions

View File

@@ -18,7 +18,7 @@
import json
import asyncio
import logging
from contextlib import asynccontextmanager, AsyncExitStack
from contextlib import asynccontextmanager, AsyncExitStack
from .hci import *
from .host import Host
@@ -61,29 +61,134 @@ DEVICE_MAX_SCAN_WINDOW = 10240
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
class Advertisement:
TX_POWER_NOT_AVAILABLE = HCI_LE_Extended_Advertising_Report_Event.TX_POWER_INFORMATION_NOT_AVAILABLE
RSSI_NOT_AVAILABLE = HCI_LE_Extended_Advertising_Report_Event.RSSI_NOT_AVAILABLE
@classmethod
def from_advertising_report(cls, report):
if isinstance(report, HCI_LE_Advertising_Report_Event.Report):
return LegacyAdvertisement.from_advertising_report(report)
elif isinstance(report, HCI_LE_Extended_Advertising_Report_Event.Report):
return ExtendedAdvertisement.from_advertising_report(report)
def __init__(
self,
address,
rssi = HCI_LE_Extended_Advertising_Report_Event.RSSI_NOT_AVAILABLE,
is_legacy = False,
is_anonymous = False,
is_connectable = False,
is_directed = False,
is_scannable = False,
is_scan_response = False,
primary_phy = 0,
secondary_phy = 0,
tx_power = HCI_LE_Extended_Advertising_Report_Event.TX_POWER_INFORMATION_NOT_AVAILABLE,
sid = 0,
data = b''
):
self.address = address
self.rssi = rssi
self.is_legacy = is_legacy
self.is_anonymous = is_anonymous
self.is_connectable = is_connectable
self.is_directed = is_directed
self.is_scannable = is_scannable
self.is_scan_response = is_scan_response
self.primary_phy = primary_phy
self.secondary_phy = secondary_phy
self.tx_power = tx_power
self.sid = sid
self.data = AdvertisingData.from_bytes(data)
# -----------------------------------------------------------------------------
class LegacyAdvertisement(Advertisement):
@classmethod
def from_advertising_report(cls, report):
return cls(
address = report.address,
rssi = report.rssi,
is_legacy = True,
is_connectable = report.event_type in {
HCI_LE_Advertising_Report_Event.ADV_IND,
HCI_LE_Advertising_Report_Event.ADV_DIRECT_IND
},
is_directed = report.event_type == HCI_LE_Advertising_Report_Event.ADV_DIRECT_IND,
is_scannable = report.event_type in {
HCI_LE_Advertising_Report_Event.ADV_IND,
HCI_LE_Advertising_Report_Event.ADV_SCAN_IND
},
is_scan_response = report.event_type == HCI_LE_Advertising_Report_Event.SCAN_RSP,
data = report.data
)
# -----------------------------------------------------------------------------
class ExtendedAdvertisement(Advertisement):
@classmethod
def from_advertising_report(cls, report):
return cls(
address = report.address,
rssi = report.rssi,
is_legacy = report.event_type & (1 << HCI_LE_Extended_Advertising_Report_Event.LEGACY_ADVERTISING_PDU_USED) != 0,
is_anonymous = report.address.address_type == HCI_LE_Extended_Advertising_Report_Event.ANONYMOUS_ADDRESS_TYPE,
is_connectable = report.event_type & (1 << HCI_LE_Extended_Advertising_Report_Event.CONNECTABLE_ADVERTISING) != 0,
is_directed = report.event_type & (1 << HCI_LE_Extended_Advertising_Report_Event.DIRECTED_ADVERTISING) != 0,
is_scannable = report.event_type & (1 << HCI_LE_Extended_Advertising_Report_Event.SCANNABLE_ADVERTISING) != 0,
is_scan_response = report.event_type & (1 << HCI_LE_Extended_Advertising_Report_Event.SCAN_RESPONSE) != 0,
primary_phy = report.primary_phy,
secondary_phy = report.secondary_phy,
tx_power = report.tx_power,
sid = report.advertising_sid,
data = report.data
)
# -----------------------------------------------------------------------------
class AdvertisementDataAccumulator:
def __init__(self):
self.advertising_data = AdvertisingData()
self.last_advertisement_type = None
self.connectable = False
self.flushable = False
def __init__(self, passive=False):
self.passive = passive
self.last_event_type = None
self.advertisement = None
self.data = b''
def update(self, data, advertisement_type):
if advertisement_type == HCI_LE_Advertising_Report_Event.SCAN_RSP:
if self.last_advertisement_type != HCI_LE_Advertising_Report_Event.SCAN_RSP:
self.advertising_data.append(data)
self.flushable = True
else:
self.advertising_data = AdvertisingData.from_bytes(data)
self.flushable = self.last_advertisement_type != HCI_LE_Advertising_Report_Event.SCAN_RSP
def update(self, report):
if isinstance(report, HCI_LE_Advertising_Report_Event.Report):
if report.event_type == HCI_LE_Advertising_Report_Event.SCAN_RSP:
if self.last_event_type in {
HCI_LE_Advertising_Report_Event.ADV_IND,
HCI_LE_Advertising_Report_Event.ADV_SCAN_IND
}:
# This is the response to a scannable advertisement
self.advertisement = Advertisement.from_advertising_report(report)
self.advertisement.data = AdvertisingData.from_bytes(self.data + report.data)
self.advertisement.is_connectable = (self.last_event_type == HCI_LE_Advertising_Report_Event.ADV_IND)
else:
# Unexpected scan response
self.advertisement = None
if advertisement_type == HCI_LE_Advertising_Report_Event.ADV_IND or advertisement_type == HCI_LE_Advertising_Report_Event.ADV_DIRECT_IND:
self.connectable = True
elif advertisement_type == HCI_LE_Advertising_Report_Event.ADV_SCAN_IND or advertisement_type == HCI_LE_Advertising_Report_Event.ADV_NONCONN_IND:
self.connectable = False
# Reset the data
self.data = b''
else:
self.data = report.data
self.last_advertisement_type = advertisement_type
if self.passive or report.event_type in {
HCI_LE_Advertising_Report_Event.ADV_DIRECT_IND,
HCI_LE_Advertising_Report_Event.ADV_NONCONN_IND
} or self.last_event_type not in {
None,
HCI_LE_Advertising_Report_Event.SCAN_RSP
}:
# Don't wait for a scan response
self.advertisement = Advertisement.from_advertising_report(report)
else:
# Wait for a scan response
self.advertisement = None
self.last_event_type = report.event_type
# -----------------------------------------------------------------------------
@@ -403,7 +508,7 @@ class Device(CompositeEventEmitter):
@composite_listener
class Listener:
def on_advertisement(self, address, data, rssi, advertisement_type):
def on_advertisement(self, advertisement):
pass
def on_inquiry_result(self, address, class_of_device, data, rssi):
@@ -454,6 +559,7 @@ class Device(CompositeEventEmitter):
[l2cap.L2CAP_Information_Request.EXTENDED_FEATURE_FIXED_CHANNELS])
self.advertisement_data = {}
self.scanning = False
self.scanning_is_passive = False
self.discovering = False
self.connecting = False
self.disconnecting = False
@@ -757,7 +863,8 @@ class Device(CompositeEventEmitter):
filter_duplicates = 1 if filter_duplicates else 0
))
self.scanning = True
self.scanning = True
self.scanning_is_passive = not active
async def stop_scanning(self):
await self.send_command(HCI_LE_Set_Scan_Enable_Command(
@@ -771,19 +878,13 @@ class Device(CompositeEventEmitter):
return self.scanning
@host_event_handler
def on_advertising_report(self, address, data, rssi, advertisement_type):
if not (accumulator := self.advertisement_data.get(address)):
accumulator = AdvertisementDataAccumulator()
self.advertisement_data[address] = accumulator
accumulator.update(data, advertisement_type)
if accumulator.flushable:
self.emit(
'advertisement',
address,
accumulator.advertising_data,
rssi,
accumulator.connectable
)
def on_advertising_report(self, report):
if not (accumulator := self.advertisement_data.get(report.address)):
accumulator = AdvertisementDataAccumulator(passive=self.scanning_is_passive)
self.advertisement_data[report.address] = accumulator
accumulator.update(report)
if accumulator.advertisement is not None:
self.emit('advertisement', accumulator.advertisement)
async def start_discovery(self):
await self.host.send_command(HCI_Write_Inquiry_Mode_Command(inquiry_mode=HCI_EXTENDED_INQUIRY_MODE))

View File

@@ -1373,11 +1373,14 @@ class HCI_Error(ProtocolError):
super().__init__(error_code, 'hci', HCI_Constant.error_name(error_code))
# -----------------------------------------------------------------------------
class HCI_StatusError(ProtocolError):
def __init__(self, response):
super().__init__(response.status,
error_namespace=HCI_Command.command_name(response.command_opcode),
error_name=HCI_Constant.status_name(response.status))
super().__init__(
response.status,
error_namespace=HCI_Command.command_name(response.command_opcode),
error_name=HCI_Constant.status_name(response.status)
)
# -----------------------------------------------------------------------------
@@ -1402,7 +1405,7 @@ class HCI_Object:
def dict_from_bytes(data, offset, fields):
result = collections.OrderedDict()
for (field_name, field_type) in fields:
# The field_type may be a dictionnary with a mapper, parser, and/or size
# The field_type may be a dictionary with a mapper, parser, and/or size
if type(field_type) is dict:
if 'size' in field_type:
field_type = field_type['size']
@@ -1523,9 +1526,9 @@ class HCI_Object:
return bytes(result)
@staticmethod
def from_bytes(data, offset, fields):
return HCI_Object(fields, **HCI_Object.dict_from_bytes(data, offset, fields))
@classmethod
def from_bytes(cls, data, offset, fields):
return cls(fields, **cls.dict_from_bytes(data, offset, fields))
def to_bytes(self):
return HCI_Object.dict_to_bytes(self.__dict__, self.fields)
@@ -3024,6 +3027,24 @@ class HCI_LE_Remote_Connection_Parameter_Request_Negative_Reply_Command(HCI_Comm
'''
# -----------------------------------------------------------------------------
@HCI_Command.command(
fields=[
('connection_handle', 2),
('tx_octets', 2),
('tx_time', 2),
],
return_parameters_fields=[
('status', STATUS_SPEC),
('connection_handle', 2)
]
)
class HCI_LE_Set_Data_Length_Command(HCI_Command):
'''
See Bluetooth spec @ 7.8.33 LE Set Data Length Command
'''
# -----------------------------------------------------------------------------
@HCI_Command.command([
('suggested_max_tx_octets', 2),
@@ -3102,6 +3123,228 @@ class HCI_LE_Read_Maximum_Data_Length_Command(HCI_Command):
'''
# -----------------------------------------------------------------------------
@HCI_Command.command(
fields=[
('connection_handle', 2)
],
return_parameters_fields=[
('status', STATUS_SPEC),
('connection_handle', 2),
('tx_phy', {'size': 1, 'mapper': HCI_Constant.le_phy_name}),
('rx_phy', {'size': 1, 'mapper': HCI_Constant.le_phy_name})
])
class HCI_LE_Read_PHY_Command(HCI_Command):
'''
See Bluetooth spec @ 7.8.47 LE Read PHY Command
'''
# -----------------------------------------------------------------------------
@HCI_Command.command([
('connection_handle', 2),
('all_phys', 1),
('tx_phys', 1),
('rx_phys', 1),
('phy_options', 2)
])
class HCI_LE_Set_PHY_Command(HCI_Command):
'''
See Bluetooth spec @ 7.8.49 LE Set PHY Command
'''
# -----------------------------------------------------------------------------
@HCI_Command.command([
('advertising_handle', 1),
('random_address', lambda data, offset: Address.parse_address_with_type(data, offset, Address.RANDOM_DEVICE_ADDRESS))
])
class HCI_LE_Set_Advertising_Set_Random_Address_Command(HCI_Command):
'''
See Bluetooth spec @ 7.8.52 LE Set Advertising Set Random Address Command
'''
# -----------------------------------------------------------------------------
@HCI_Command.command(
fields=[
('advertising_handle', 1),
('advertising_event_properties', {'size': 2, 'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Parameters_Command.advertising_properties_string(x)}),
('primary_advertising_interval_min', 3),
('primary_advertising_interval_max', 3),
('primary_advertising_channel_map', {'size': 1, 'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Parameters_Command.channel_map_string(x)}),
('own_address_type', Address.ADDRESS_TYPE_SPEC),
('peer_address_type', Address.ADDRESS_TYPE_SPEC),
('peer_address', Address.parse_address_preceded_by_type),
('advertising_filter_policy', 1),
('advertising_tx_power', 1),
('primary_advertising_phy', {'size': 1, 'mapper': HCI_Constant.le_phy_name}),
('secondary_advertising_max_skip', 1),
('secondary_advertising_phy', {'size': 1, 'mapper': HCI_Constant.le_phy_name}),
('advertising_sid', 1),
('scan_request_notification_enable', 1)
],
return_parameters_fields=[
('status', STATUS_SPEC),
('selected_tx__power', 1)
]
)
class HCI_LE_Set_Extended_Advertising_Parameters_Command(HCI_Command):
'''
See Bluetooth spec @ 7.8.53 LE Set Extended Advertising Parameters Command
'''
CONNECTABLE_ADVERTISING = 0
SCANNABLE_ADVERTISING = 1
DIRECTED_ADVERTISING = 2
HIGH_DUTY_CYCLE_DIRECTED_CONNECTABLE_ADVERTISING = 3
USE_LEGACY_ADVERTISING_PDUS = 4
ANONYMOUS_ADVERTISING = 5
INCLUDE_TX_POWER = 6
ADVERTISING_PROPERTIES_NAMES = (
'CONNECTABLE_ADVERTISING',
'SCANNABLE_ADVERTISING',
'DIRECTED_ADVERTISING',
'HIGH_DUTY_CYCLE_DIRECTED_CONNECTABLE_ADVERTISING',
'USE_LEGACY_ADVERTISING_PDUS',
'ANONYMOUS_ADVERTISING',
'INCLUDE_TX_POWER'
)
CHANNEL_37 = 0
CHANNEL_38 = 1
CHANNEL_39 = 2
CHANNEL_NAMES = ('37', '38', '39')
@classmethod
def advertising_properties_string(cls, properties):
return f'[{",".join(bit_flags_to_strings(properties, cls.ADVERTISING_PROPERTIES_NAMES))}]'
@classmethod
def channel_map_string(cls, channel_map):
return f'[{",".join(bit_flags_to_strings(channel_map, cls.CHANNEL_NAMES))}]'
# -----------------------------------------------------------------------------
@HCI_Command.command([
('advertising_handle', 1),
('operation', {'size': 1, 'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Data_Command.operation_name(x)}),
('fragment_preference', 1),
('advertising_data', {
'parser': HCI_Object.parse_length_prefixed_bytes,
'serializer': functools.partial(HCI_Object.serialize_length_prefixed_bytes)
})
])
class HCI_LE_Set_Extended_Advertising_Data_Command(HCI_Command):
'''
See Bluetooth spec @ 7.8.54 LE Set Extended Advertising Data Command
'''
INTERMEDIATE_FRAGMENT = 0x00
FIRST_FRAGMENT = 0x01
LAST_FRAGMENT = 0x02
COMPLETE_DATA = 0x03
UNCHANGED_DATA = 0x04
OPERATION_NAMES = {
INTERMEDIATE_FRAGMENT: 'INTERMEDIATE_FRAGMENT',
FIRST_FRAGMENT: 'FIRST_FRAGMENT',
LAST_FRAGMENT: 'LAST_FRAGMENT',
COMPLETE_DATA: 'COMPLETE_DATA',
UNCHANGED_DATA: 'UNCHANGED_DATA'
}
@classmethod
def operation_name(cls, operation):
return name_or_number(cls.OPERATION_NAMES, operation)
# -----------------------------------------------------------------------------
@HCI_Command.command([
('advertising_handle', 1),
('operation', {'size': 1, 'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Data_Command.operation_name(x)}),
('fragment_preference', 1),
('scan_response_data', {
'parser': HCI_Object.parse_length_prefixed_bytes,
'serializer': functools.partial(HCI_Object.serialize_length_prefixed_bytes)
})
])
class HCI_LE_Set_Extended_Scan_Response_Data_Command(HCI_Command):
'''
See Bluetooth spec @ 7.8.55 LE Set Extended Scan Response Data Command
'''
INTERMEDIATE_FRAGMENT = 0x00
FIRST_FRAGMENT = 0x01
LAST_FRAGMENT = 0x02
COMPLETE_DATA = 0x03
OPERATION_NAMES = {
INTERMEDIATE_FRAGMENT: 'INTERMEDIATE_FRAGMENT',
FIRST_FRAGMENT: 'FIRST_FRAGMENT',
LAST_FRAGMENT: 'LAST_FRAGMENT',
COMPLETE_DATA: 'COMPLETE_DATA'
}
@classmethod
def operation_name(cls, operation):
return name_or_number(cls.OPERATION_NAMES, operation)
# -----------------------------------------------------------------------------
@HCI_Command.command(fields=None)
class HCI_LE_Set_Extended_Advertising_Enable_Command(HCI_Command):
'''
See Bluetooth spec @ 7.8.56 LE Set Extended Advertising Enable Command
'''
@classmethod
def from_parameters(cls, parameters):
enable = parameters[0]
num_sets = parameters[1]
advertising_handles = []
durations = []
max_extended_advertising_events = []
offset = 2
for _ in range(num_sets):
advertising_handles.append(parameters[offset])
durations.append(struct.unpack_from('<H', parameters, offset + 1)[0])
max_extended_advertising_events.append(parameters[offset + 3])
offset += 4
return cls(enable, advertising_handles, durations, max_extended_advertising_events)
def __init__(self, enable, advertising_handles, durations, max_extended_advertising_events):
super().__init__(HCI_LE_SET_EXTENDED_ADVERTISING_ENABLE_COMMAND)
self.enable = enable
self.advertising_handles = advertising_handles
self.durations = durations
self.max_extended_advertising_events = max_extended_advertising_events
self.parameters = bytes([enable, len(advertising_handles)]) + b''.join([
struct.pack(
'<BHB',
advertising_handles[i],
durations[i],
max_extended_advertising_events[i]
)
for i in range(len(advertising_handles))
])
def __str__(self):
fields = [('enable:', self.enable)]
for i in range(len(self.advertising_handles)):
fields.append((f'advertising_handle[{i}]: ', self.advertising_handles[i]))
fields.append((f'duration[{i}]: ', self.durations[i]))
fields.append((f'max_extended_advertising_events[{i}]:', self.max_extended_advertising_events[i]))
return color(self.name, 'green') + ':\n' + '\n'.join(
[color(field[0], 'cyan') + ' ' + str(field[1]) for field in fields]
)
# -----------------------------------------------------------------------------
@HCI_Command.command(return_parameters_fields=[
('status', STATUS_SPEC),
@@ -3124,6 +3367,35 @@ class HCI_LE_Read_Number_Of_Supported_Advertising_Sets_Command(HCI_Command):
'''
# -----------------------------------------------------------------------------
@HCI_Command.command([
('advertising_handle', 1)
])
class HCI_LE_Remove_Advertising_Set_Command(HCI_Command):
'''
See Bluetooth spec @ 7.8.59 LE Remove Advertising Set Command
'''
# -----------------------------------------------------------------------------
@HCI_Command.command()
class HCI_LE_Clear_Advertising_Sets_Command(HCI_Command):
'''
See Bluetooth spec @ 7.8.60 LE Clear Advertising Sets Command
'''
# -----------------------------------------------------------------------------
@HCI_Command.command([
('enable', 1),
('advertising_handle', 1)
])
class HCI_LE_Set_Periodic_Advertising_Enable_Command(HCI_Command):
'''
See Bluetooth spec @ 7.8.63 LE Set Periodic Advertising Enable Command
'''
# -----------------------------------------------------------------------------
@HCI_Command.command(fields=None)
class HCI_LE_Set_Extended_Scan_Parameters_Command(HCI_Command):
@@ -3141,6 +3413,8 @@ class HCI_LE_Set_Extended_Scan_Parameters_Command(HCI_Command):
LE_1M_PHY = 0x00
LE_CODED_PHY = 0x02
SCANNING_PHY_NAMES = ['LE_1M_PHY', '', 'LE_CODED_PHY']
@classmethod
def from_parameters(cls, parameters):
own_address_type = parameters[0]
@@ -3188,17 +3462,7 @@ class HCI_LE_Set_Extended_Scan_Parameters_Command(HCI_Command):
self.parameters += struct.pack('<BHH', scan_types[i], scan_intervals[i], scan_windows[i])
def __str__(self):
scanning_phys_strs = []
for bit in range(8):
if self.scanning_phys & (1 << bit) != 0:
if bit == 0:
scanning_phys_strs.append('LE_1M_PHY')
elif bit == 2:
scanning_phys_strs.append('LE_CODED_PHY')
else:
scanning_phys_strs.append(f'0x{(1 << bit):02X}')
scanning_phys_strs = bit_flags_to_strings(self.scanning_phys, self.SCANNING_PHY_NAMES)
fields = [
('own_address_type: ', Address.address_type_name(self.own_address_type)),
('scanning_filter_policy:', self.scanning_filter_policy),
@@ -3227,6 +3491,17 @@ class HCI_LE_Set_Extended_Scan_Enable_Command(HCI_Command):
'''
# -----------------------------------------------------------------------------
@HCI_Command.command([
('bit_number', 1),
('bit_value', 1)
])
class HCI_LE_Set_Host_Feature_Command(HCI_Command):
'''
See Bluetooth spec @ 7.8.115 LE Set Host Feature Command
'''
# -----------------------------------------------------------------------------
# HCI Events
# -----------------------------------------------------------------------------
@@ -3440,13 +3715,25 @@ class HCI_LE_Advertising_Report_Event(HCI_LE_Meta_Event):
SCAN_RSP: 'SCAN_RSP' # Scan Response
}
REPORT_FIELDS = [
('event_type', 1),
('address_type', Address.ADDRESS_TYPE_SPEC),
('address', Address.parse_address_preceded_by_type),
('data', {'parser': HCI_Object.parse_length_prefixed_bytes, 'serializer': HCI_Object.serialize_length_prefixed_bytes}),
('rssi', -1)
]
class Report(HCI_Object):
FIELDS = [
('event_type', 1),
('address_type', Address.ADDRESS_TYPE_SPEC),
('address', Address.parse_address_preceded_by_type),
('data', {'parser': HCI_Object.parse_length_prefixed_bytes, 'serializer': HCI_Object.serialize_length_prefixed_bytes}),
('rssi', -1)
]
@classmethod
def from_parameters(cls, parameters, offset):
return cls.from_bytes(parameters, offset, cls.FIELDS)
def to_string(self, prefix):
return super().to_string(prefix, {
'event_type': HCI_LE_Advertising_Report_Event.event_type_name,
'address_type': Address.address_type_name,
'data': lambda x: str(AdvertisingData.from_bytes(x))
})
@classmethod
def event_type_name(cls, event_type):
@@ -3458,7 +3745,7 @@ class HCI_LE_Advertising_Report_Event(HCI_LE_Meta_Event):
reports = []
offset = 2
for _ in range(num_reports):
report = HCI_Object.from_bytes(parameters, offset, cls.REPORT_FIELDS)
report = cls.Report.from_parameters(parameters, offset)
offset += 10 + len(report.data)
reports.append(report)
@@ -3473,11 +3760,7 @@ class HCI_LE_Advertising_Report_Event(HCI_LE_Meta_Event):
super().__init__(self.subevent_code, parameters)
def __str__(self):
reports = '\n'.join([report.to_string(' ', {
'event_type': self.event_type_name,
'address_type': Address.address_type_name,
'data': lambda x: str(AdvertisingData.from_bytes(x))
}) for report in self.reports])
reports = '\n'.join([report.to_string(' ') for report in self.reports])
return f'{color(self.subevent_name(self.subevent_code), "magenta")}:\n{reports}'
@@ -3590,7 +3873,7 @@ class HCI_LE_Extended_Advertising_Report_Event(HCI_LE_Meta_Event):
'''
subevent_code = HCI_LE_EXTENDED_ADVERTISING_REPORT_EVENT
# Event Types
# Event types flags
CONNECTABLE_ADVERTISING = 0
SCANNABLE_ADVERTISING = 1
DIRECTED_ADVERTISING = 2
@@ -3601,13 +3884,13 @@ class HCI_LE_Extended_Advertising_Report_Event(HCI_LE_Meta_Event):
DATA_INCOMPLETE_MORE_TO_COME = 0x01
DATA_INCOMPLETE_TRUNCATED_NO_MORE_TO_COME = 0x02
EVENT_TYPE_NAMES = {
CONNECTABLE_ADVERTISING: 'CONNECTABLE_ADVERTISING',
SCANNABLE_ADVERTISING: 'SCANNABLE_ADVERTISING',
DIRECTED_ADVERTISING: 'DIRECTED_ADVERTISING',
SCAN_RESPONSE: 'SCAN_RESPONSE',
LEGACY_ADVERTISING_PDU_USED: 'LEGACY_ADVERTISING_PDU_USED'
}
EVENT_TYPE_FLAG_NAMES = (
'CONNECTABLE_ADVERTISING',
'SCANNABLE_ADVERTISING',
'DIRECTED_ADVERTISING',
'SCAN_RESPONSE',
'LEGACY_ADVERTISING_PDU_USED'
)
LEGACY_PDU_TYPE_MAP = {
0b0011: HCI_LE_Advertising_Report_Event.ADV_IND,
@@ -3618,24 +3901,56 @@ class HCI_LE_Extended_Advertising_Report_Event(HCI_LE_Meta_Event):
0b1010: HCI_LE_Advertising_Report_Event.SCAN_RSP
}
REPORT_FIELDS = [
('event_type', 2),
('address_type', Address.ADDRESS_TYPE_SPEC),
('address', Address.parse_address_preceded_by_type),
('primary_phy', {'size': 1, 'mapper': HCI_Constant.le_phy_name}),
('secondary_phy', {'size': 1, 'mapper': HCI_Constant.le_phy_name}),
('advertising_sid', 1),
('tx_power', 1),
('rssi', -1),
('periodic_advertising_interval', 2),
('direct_address_type', Address.ADDRESS_TYPE_SPEC),
('direct_address', Address.parse_address_preceded_by_type),
('data', {'parser': HCI_Object.parse_length_prefixed_bytes, 'serializer': HCI_Object.serialize_length_prefixed_bytes}),
]
NO_ADI_FIELD_PROVIDED = 0xFF
TX_POWER_INFORMATION_NOT_AVAILABLE = 0x7F
RSSI_NOT_AVAILABLE = 0x7F
ANONYMOUS_ADDRESS_TYPE = 0xFF
UNRESOLVED_RESOLVABLE_ADDRESS_TYPE = 0xFE
@classmethod
def event_type_name(cls, event_type):
return name_or_number(cls.EVENT_TYPE_NAMES, event_type)
class Report(HCI_Object):
FIELDS = [
('event_type', 2),
('address_type', Address.ADDRESS_TYPE_SPEC),
('address', Address.parse_address_preceded_by_type),
('primary_phy', {'size': 1, 'mapper': HCI_Constant.le_phy_name}),
('secondary_phy', {'size': 1, 'mapper': HCI_Constant.le_phy_name}),
('advertising_sid', 1),
('tx_power', 1),
('rssi', -1),
('periodic_advertising_interval', 2),
('direct_address_type', Address.ADDRESS_TYPE_SPEC),
('direct_address', Address.parse_address_preceded_by_type),
('data', {'parser': HCI_Object.parse_length_prefixed_bytes, 'serializer': HCI_Object.serialize_length_prefixed_bytes}),
]
@classmethod
def from_parameters(cls, parameters, offset):
return cls.from_bytes(parameters, offset, cls.FIELDS)
def to_string(self, prefix):
def event_type_string(event_type):
event_type_flags = bit_flags_to_strings(
event_type & 0x1F,
HCI_LE_Extended_Advertising_Report_Event.EVENT_TYPE_FLAG_NAMES,
)
event_type_flags.append(('COMPLETE', 'INCOMPLETE+', 'INCOMPLETE#', '?')[(event_type >> 5) & 3])
if event_type & (1 << HCI_LE_Extended_Advertising_Report_Event.LEGACY_ADVERTISING_PDU_USED):
legacy_pdu_type = HCI_LE_Extended_Advertising_Report_Event.LEGACY_PDU_TYPE_MAP.get(event_type & 0x0F)
if legacy_pdu_type is not None:
legacy_info_string = f'({HCI_LE_Advertising_Report_Event.event_type_name(legacy_pdu_type)})'
else:
legacy_info_string = ''
else:
legacy_info_string = ''
return f'0x{event_type:04X} [{",".join(event_type_flags)}]{legacy_info_string}'
return super().to_string(prefix, {
'event_type': event_type_string,
'address_type': Address.address_type_name,
'data': lambda x: str(AdvertisingData.from_bytes(x))
})
@classmethod
def from_parameters(cls, parameters):
@@ -3643,7 +3958,7 @@ class HCI_LE_Extended_Advertising_Report_Event(HCI_LE_Meta_Event):
reports = []
offset = 2
for _ in range(num_reports):
report = HCI_Object.from_bytes(parameters, offset, cls.REPORT_FIELDS)
report = cls.Report.from_parameters(parameters, offset)
offset += 24 + len(report.data)
reports.append(report)
@@ -3658,30 +3973,7 @@ class HCI_LE_Extended_Advertising_Report_Event(HCI_LE_Meta_Event):
super().__init__(self.subevent_code, parameters)
def __str__(self):
def event_type_string(event_type):
event_type_flags = []
for bit in range(0, 5):
if event_type & (1 << bit):
event_type_flags.append(self.EVENT_TYPE_NAMES[bit])
event_type_flags.append(('COMPLETE', 'INCOMPLETE+', 'INCOMPLETE#', '?')[(event_type >> 5) & 3])
if event_type & (1 << self.LEGACY_ADVERTISING_PDU_USED):
legacy_pdu_type = self.LEGACY_PDU_TYPE_MAP.get(event_type & 0x0F)
if legacy_pdu_type is not None:
legacy_info_string = f'({HCI_LE_Advertising_Report_Event.event_type_name(legacy_pdu_type)})'
else:
legacy_info_string = ''
else:
legacy_info_string = ''
return f'0x{event_type:04X} [{",".join(event_type_flags)}]{legacy_info_string}'
reports = '\n'.join([report.to_string(' ', {
'event_type': event_type_string,
'address_type': Address.address_type_name,
'data': lambda x: str(AdvertisingData.from_bytes(x))
}) for report in self.reports])
reports = '\n'.join([report.to_string(' ') for report in self.reports])
return f'{color(self.subevent_name(self.subevent_code), "magenta")}:\n{reports}'

View File

@@ -467,13 +467,11 @@ class Host(EventEmitter):
def on_hci_le_advertising_report_event(self, event):
for report in event.reports:
self.emit(
'advertising_report',
report.address,
report.data,
report.rssi,
report.event_type
)
self.emit('advertising_report', report)
def on_hci_le_extended_advertising_report_event(self, event):
# TODO
pass
def on_hci_le_remote_connection_parameter_request_event(self, event):
if event.connection_handle not in self.connections: