forked from auracaster/bumble_mirror
add extended report class
This commit is contained in:
119
bumble/hci.py
119
bumble/hci.py
@@ -3141,8 +3141,8 @@ class HCI_LE_Set_Extended_Scan_Parameters_Command(HCI_Command):
|
|||||||
LE_1M_PHY = 0x00
|
LE_1M_PHY = 0x00
|
||||||
LE_CODED_PHY = 0x02
|
LE_CODED_PHY = 0x02
|
||||||
|
|
||||||
@staticmethod
|
@classmethod
|
||||||
def from_parameters(parameters):
|
def from_parameters(cls, parameters):
|
||||||
own_address_type = parameters[0]
|
own_address_type = parameters[0]
|
||||||
scanning_filter_policy = parameters[1]
|
scanning_filter_policy = parameters[1]
|
||||||
scanning_phys = parameters[2]
|
scanning_phys = parameters[2]
|
||||||
@@ -3156,7 +3156,7 @@ class HCI_LE_Set_Extended_Scan_Parameters_Command(HCI_Command):
|
|||||||
scan_intervals.append(struct.unpack_from('<H', parameters, 3 + (5 * i) + 1)[0])
|
scan_intervals.append(struct.unpack_from('<H', parameters, 3 + (5 * i) + 1)[0])
|
||||||
scan_windows.append(struct.unpack_from('<H', parameters, 3 + (5 * i) + 3)[0])
|
scan_windows.append(struct.unpack_from('<H', parameters, 3 + (5 * i) + 3)[0])
|
||||||
|
|
||||||
return HCI_LE_Set_Extended_Scan_Parameters_Command(
|
return cls(
|
||||||
own_address_type = own_address_type,
|
own_address_type = own_address_type,
|
||||||
scanning_filter_policy = scanning_filter_policy,
|
scanning_filter_policy = scanning_filter_policy,
|
||||||
scanning_phys = scanning_phys,
|
scanning_phys = scanning_phys,
|
||||||
@@ -3452,17 +3452,17 @@ class HCI_LE_Advertising_Report_Event(HCI_LE_Meta_Event):
|
|||||||
def event_type_name(cls, event_type):
|
def event_type_name(cls, event_type):
|
||||||
return name_or_number(cls.EVENT_TYPE_NAMES, event_type)
|
return name_or_number(cls.EVENT_TYPE_NAMES, event_type)
|
||||||
|
|
||||||
@staticmethod
|
@classmethod
|
||||||
def from_parameters(parameters):
|
def from_parameters(cls, parameters):
|
||||||
num_reports = parameters[1]
|
num_reports = parameters[1]
|
||||||
reports = []
|
reports = []
|
||||||
offset = 2
|
offset = 2
|
||||||
for _ in range(num_reports):
|
for _ in range(num_reports):
|
||||||
report = HCI_Object.from_bytes(parameters, offset, HCI_LE_Advertising_Report_Event.REPORT_FIELDS)
|
report = HCI_Object.from_bytes(parameters, offset, cls.REPORT_FIELDS)
|
||||||
offset += 10 + len(report.data)
|
offset += 10 + len(report.data)
|
||||||
reports.append(report)
|
reports.append(report)
|
||||||
|
|
||||||
return HCI_LE_Advertising_Report_Event(reports)
|
return cls(reports)
|
||||||
|
|
||||||
def __init__(self, reports):
|
def __init__(self, reports):
|
||||||
self.reports = reports[:]
|
self.reports = reports[:]
|
||||||
@@ -3583,6 +3583,111 @@ class HCI_LE_PHY_Update_Complete_Event(HCI_LE_Meta_Event):
|
|||||||
'''
|
'''
|
||||||
|
|
||||||
|
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
class HCI_LE_Extended_Advertising_Report_Event(HCI_LE_Meta_Event):
|
||||||
|
'''
|
||||||
|
See Bluetooth spec @ 7.7.65.13 LE Extended Advertising Report Event
|
||||||
|
'''
|
||||||
|
subevent_code = HCI_LE_EXTENDED_ADVERTISING_REPORT_EVENT
|
||||||
|
|
||||||
|
# Event Types
|
||||||
|
CONNECTABLE_ADVERTISING = 0
|
||||||
|
SCANNABLE_ADVERTISING = 1
|
||||||
|
DIRECTED_ADVERTISING = 2
|
||||||
|
SCAN_RESPONSE = 3
|
||||||
|
LEGACY_ADVERTISING_PDU_USED = 4
|
||||||
|
|
||||||
|
DATA_COMPLETE = 0x00
|
||||||
|
DATA_INCOMPLETE_MORE_TO_COME = 0x01
|
||||||
|
DATA_INCOMPLETE_TRUNCATED_NO_MORE_TO_COME = 0x02
|
||||||
|
|
||||||
|
EVENT_TYPE_NAMES = {
|
||||||
|
CONNECTABLE_ADVERTISING: 'CONNECTABLE_ADVERTISING',
|
||||||
|
SCANNABLE_ADVERTISING: 'SCANNABLE_ADVERTISING',
|
||||||
|
DIRECTED_ADVERTISING: 'DIRECTED_ADVERTISING',
|
||||||
|
SCAN_RESPONSE: 'SCAN_RESPONSE',
|
||||||
|
LEGACY_ADVERTISING_PDU_USED: 'LEGACY_ADVERTISING_PDU_USED'
|
||||||
|
}
|
||||||
|
|
||||||
|
LEGACY_PDU_TYPE_MAP = {
|
||||||
|
0b0011: HCI_LE_Advertising_Report_Event.ADV_IND,
|
||||||
|
0b0101: HCI_LE_Advertising_Report_Event.ADV_DIRECT_IND,
|
||||||
|
0b0010: HCI_LE_Advertising_Report_Event.ADV_SCAN_IND,
|
||||||
|
0b0000: HCI_LE_Advertising_Report_Event.ADV_NONCONN_IND,
|
||||||
|
0b1011: HCI_LE_Advertising_Report_Event.SCAN_RSP,
|
||||||
|
0b1010: HCI_LE_Advertising_Report_Event.SCAN_RSP
|
||||||
|
}
|
||||||
|
|
||||||
|
REPORT_FIELDS = [
|
||||||
|
('event_type', 2),
|
||||||
|
('address_type', Address.ADDRESS_TYPE_SPEC),
|
||||||
|
('address', Address.parse_address_preceded_by_type),
|
||||||
|
('primary_phy', {'size': 1, 'mapper': HCI_Constant.le_phy_name}),
|
||||||
|
('secondary_phy', {'size': 1, 'mapper': HCI_Constant.le_phy_name}),
|
||||||
|
('advertising_sid', 1),
|
||||||
|
('tx_power', 1),
|
||||||
|
('rssi', -1),
|
||||||
|
('periodic_advertising_interval', 2),
|
||||||
|
('direct_address_type', Address.ADDRESS_TYPE_SPEC),
|
||||||
|
('direct_address', Address.parse_address_preceded_by_type),
|
||||||
|
('data', {'parser': HCI_Object.parse_length_prefixed_bytes, 'serializer': HCI_Object.serialize_length_prefixed_bytes}),
|
||||||
|
]
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def event_type_name(cls, event_type):
|
||||||
|
return name_or_number(cls.EVENT_TYPE_NAMES, event_type)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_parameters(cls, parameters):
|
||||||
|
num_reports = parameters[1]
|
||||||
|
reports = []
|
||||||
|
offset = 2
|
||||||
|
for _ in range(num_reports):
|
||||||
|
report = HCI_Object.from_bytes(parameters, offset, cls.REPORT_FIELDS)
|
||||||
|
offset += 24 + len(report.data)
|
||||||
|
reports.append(report)
|
||||||
|
|
||||||
|
return cls(reports)
|
||||||
|
|
||||||
|
def __init__(self, reports):
|
||||||
|
self.reports = reports[:]
|
||||||
|
|
||||||
|
# Serialize the fields
|
||||||
|
parameters = bytes([HCI_LE_EXTENDED_ADVERTISING_REPORT_EVENT, len(reports)]) + b''.join([bytes(report) for report in reports])
|
||||||
|
|
||||||
|
super().__init__(self.subevent_code, parameters)
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
def event_type_string(event_type):
|
||||||
|
event_type_flags = []
|
||||||
|
for bit in range(0, 5):
|
||||||
|
if event_type & (1 << bit):
|
||||||
|
event_type_flags.append(self.EVENT_TYPE_NAMES[bit])
|
||||||
|
event_type_flags.append(('COMPLETE', 'INCOMPLETE+', 'INCOMPLETE#', '?')[(event_type >> 5) & 3])
|
||||||
|
|
||||||
|
if event_type & (1 << self.LEGACY_ADVERTISING_PDU_USED):
|
||||||
|
legacy_pdu_type = self.LEGACY_PDU_TYPE_MAP.get(event_type & 0x0F)
|
||||||
|
if legacy_pdu_type is not None:
|
||||||
|
legacy_info_string = f'({HCI_LE_Advertising_Report_Event.event_type_name(legacy_pdu_type)})'
|
||||||
|
else:
|
||||||
|
legacy_info_string = ''
|
||||||
|
else:
|
||||||
|
legacy_info_string = ''
|
||||||
|
|
||||||
|
return f'0x{event_type:04X} [{",".join(event_type_flags)}]{legacy_info_string}'
|
||||||
|
|
||||||
|
reports = '\n'.join([report.to_string(' ', {
|
||||||
|
'event_type': event_type_string,
|
||||||
|
'address_type': Address.address_type_name,
|
||||||
|
'data': lambda x: str(AdvertisingData.from_bytes(x))
|
||||||
|
}) for report in self.reports])
|
||||||
|
|
||||||
|
return f'{color(self.subevent_name(self.subevent_code), "magenta")}:\n{reports}'
|
||||||
|
|
||||||
|
|
||||||
|
HCI_Event.meta_event_classes[HCI_LE_EXTENDED_ADVERTISING_REPORT_EVENT] = HCI_LE_Extended_Advertising_Report_Event
|
||||||
|
|
||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
@HCI_LE_Meta_Event.event([
|
@HCI_LE_Meta_Event.event([
|
||||||
('connection_handle', 2),
|
('connection_handle', 2),
|
||||||
|
|||||||
Reference in New Issue
Block a user