forked from auracaster/bumble_mirror
Refactor LE emulation with LL and Air Interface
This commit is contained in:
@@ -265,12 +265,22 @@ class ExtendedAdvertisement(Advertisement):
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
class AdvertisementDataAccumulator:
|
||||
last_advertisement: Advertisement | None
|
||||
last_data: bytes
|
||||
passive: bool
|
||||
|
||||
def __init__(self, passive: bool = False):
|
||||
self.passive = passive
|
||||
self.last_advertisement = None
|
||||
self.last_data = b''
|
||||
|
||||
def update(self, report):
|
||||
def update(
|
||||
self,
|
||||
report: (
|
||||
hci.HCI_LE_Advertising_Report_Event.Report
|
||||
| hci.HCI_LE_Extended_Advertising_Report_Event.Report
|
||||
),
|
||||
) -> Advertisement | None:
|
||||
advertisement = Advertisement.from_advertising_report(report)
|
||||
if advertisement is None:
|
||||
return None
|
||||
@@ -283,10 +293,12 @@ class AdvertisementDataAccumulator:
|
||||
and not self.last_advertisement.is_scan_response
|
||||
):
|
||||
# This is the response to a scannable advertisement
|
||||
result = Advertisement.from_advertising_report(report)
|
||||
result.is_connectable = self.last_advertisement.is_connectable
|
||||
result.is_scannable = True
|
||||
result.data = AdvertisingData.from_bytes(self.last_data + report.data)
|
||||
if result := Advertisement.from_advertising_report(report):
|
||||
result.is_connectable = self.last_advertisement.is_connectable
|
||||
result.is_scannable = True
|
||||
result.data = AdvertisingData.from_bytes(
|
||||
self.last_data + report.data
|
||||
)
|
||||
self.last_data = b''
|
||||
else:
|
||||
if (
|
||||
@@ -3333,7 +3345,13 @@ class Device(utils.CompositeEventEmitter):
|
||||
return self.scanning
|
||||
|
||||
@host_event_handler
|
||||
def on_advertising_report(self, report):
|
||||
def on_advertising_report(
|
||||
self,
|
||||
report: (
|
||||
hci.HCI_LE_Advertising_Report_Event.Report
|
||||
| hci.HCI_LE_Extended_Advertising_Report_Event.Report
|
||||
),
|
||||
) -> None:
|
||||
if not (accumulator := self.advertisement_accumulators.get(report.address)):
|
||||
accumulator = AdvertisementDataAccumulator(passive=self.scanning_is_passive)
|
||||
self.advertisement_accumulators[report.address] = accumulator
|
||||
|
||||
Reference in New Issue
Block a user