mirror of
https://github.com/google/bumble.git
synced 2026-06-01 07:37:02 +00:00
+3
-3
@@ -821,8 +821,8 @@ class AdvertisingData:
|
||||
ad_structures = []
|
||||
self.ad_structures = ad_structures[:]
|
||||
|
||||
@staticmethod
|
||||
def from_bytes(data):
|
||||
@classmethod
|
||||
def from_bytes(cls, data: bytes) -> AdvertisingData:
|
||||
instance = AdvertisingData()
|
||||
instance.append(data)
|
||||
return instance
|
||||
@@ -978,7 +978,7 @@ class AdvertisingData:
|
||||
|
||||
return ad_data
|
||||
|
||||
def append(self, data):
|
||||
def append(self, data: bytes) -> None:
|
||||
offset = 0
|
||||
while offset + 1 < len(data):
|
||||
length = data[offset]
|
||||
|
||||
+34
-45
@@ -243,16 +243,40 @@ DEVICE_DEFAULT_L2CAP_COC_MAX_CREDITS = l2cap.L2CAP_LE_CREDIT_BASED_CONN
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@dataclass
|
||||
class Advertisement:
|
||||
# Attributes
|
||||
address: Address
|
||||
|
||||
TX_POWER_NOT_AVAILABLE = (
|
||||
rssi: int = HCI_LE_Extended_Advertising_Report_Event.RSSI_NOT_AVAILABLE
|
||||
is_legacy: bool = False
|
||||
is_anonymous: bool = False
|
||||
is_connectable: bool = False
|
||||
is_directed: bool = False
|
||||
is_scannable: bool = False
|
||||
is_scan_response: bool = False
|
||||
is_complete: bool = True
|
||||
is_truncated: bool = False
|
||||
primary_phy: int = 0
|
||||
secondary_phy: int = 0
|
||||
tx_power: int = (
|
||||
HCI_LE_Extended_Advertising_Report_Event.TX_POWER_INFORMATION_NOT_AVAILABLE
|
||||
)
|
||||
RSSI_NOT_AVAILABLE = HCI_LE_Extended_Advertising_Report_Event.RSSI_NOT_AVAILABLE
|
||||
sid: int = 0
|
||||
data_bytes: bytes = b''
|
||||
|
||||
# Constants
|
||||
TX_POWER_NOT_AVAILABLE: ClassVar[
|
||||
int
|
||||
] = HCI_LE_Extended_Advertising_Report_Event.TX_POWER_INFORMATION_NOT_AVAILABLE
|
||||
RSSI_NOT_AVAILABLE: ClassVar[
|
||||
int
|
||||
] = HCI_LE_Extended_Advertising_Report_Event.RSSI_NOT_AVAILABLE
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
self.data = AdvertisingData.from_bytes(self.data_bytes)
|
||||
|
||||
@classmethod
|
||||
def from_advertising_report(cls, report):
|
||||
def from_advertising_report(cls, report) -> Optional[Advertisement]:
|
||||
if isinstance(report, HCI_LE_Advertising_Report_Event.Report):
|
||||
return LegacyAdvertisement.from_advertising_report(report)
|
||||
|
||||
@@ -261,41 +285,6 @@ class Advertisement:
|
||||
|
||||
return None
|
||||
|
||||
# pylint: disable=line-too-long
|
||||
def __init__(
|
||||
self,
|
||||
address,
|
||||
rssi=HCI_LE_Extended_Advertising_Report_Event.RSSI_NOT_AVAILABLE,
|
||||
is_legacy=False,
|
||||
is_anonymous=False,
|
||||
is_connectable=False,
|
||||
is_directed=False,
|
||||
is_scannable=False,
|
||||
is_scan_response=False,
|
||||
is_complete=True,
|
||||
is_truncated=False,
|
||||
primary_phy=0,
|
||||
secondary_phy=0,
|
||||
tx_power=HCI_LE_Extended_Advertising_Report_Event.TX_POWER_INFORMATION_NOT_AVAILABLE,
|
||||
sid=0,
|
||||
data=b'',
|
||||
):
|
||||
self.address = address
|
||||
self.rssi = rssi
|
||||
self.is_legacy = is_legacy
|
||||
self.is_anonymous = is_anonymous
|
||||
self.is_connectable = is_connectable
|
||||
self.is_directed = is_directed
|
||||
self.is_scannable = is_scannable
|
||||
self.is_scan_response = is_scan_response
|
||||
self.is_complete = is_complete
|
||||
self.is_truncated = is_truncated
|
||||
self.primary_phy = primary_phy
|
||||
self.secondary_phy = secondary_phy
|
||||
self.tx_power = tx_power
|
||||
self.sid = sid
|
||||
self.data = AdvertisingData.from_bytes(data)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
class LegacyAdvertisement(Advertisement):
|
||||
@@ -319,7 +308,7 @@ class LegacyAdvertisement(Advertisement):
|
||||
),
|
||||
is_scan_response=report.event_type
|
||||
== HCI_LE_Advertising_Report_Event.SCAN_RSP,
|
||||
data=report.data,
|
||||
data_bytes=report.data,
|
||||
)
|
||||
|
||||
|
||||
@@ -344,7 +333,7 @@ class ExtendedAdvertisement(Advertisement):
|
||||
secondary_phy = report.secondary_phy,
|
||||
tx_power = report.tx_power,
|
||||
sid = report.advertising_sid,
|
||||
data = report.data
|
||||
data_bytes = report.data
|
||||
)
|
||||
# fmt: on
|
||||
|
||||
@@ -405,7 +394,7 @@ class AdvertisingType(IntEnum):
|
||||
# fmt: on
|
||||
|
||||
@property
|
||||
def has_data(self):
|
||||
def has_data(self) -> bool:
|
||||
return self in (
|
||||
AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE,
|
||||
AdvertisingType.UNDIRECTED_SCANNABLE,
|
||||
@@ -413,7 +402,7 @@ class AdvertisingType(IntEnum):
|
||||
)
|
||||
|
||||
@property
|
||||
def is_connectable(self):
|
||||
def is_connectable(self) -> bool:
|
||||
return self in (
|
||||
AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE,
|
||||
AdvertisingType.DIRECTED_CONNECTABLE_HIGH_DUTY,
|
||||
@@ -421,14 +410,14 @@ class AdvertisingType(IntEnum):
|
||||
)
|
||||
|
||||
@property
|
||||
def is_scannable(self):
|
||||
def is_scannable(self) -> bool:
|
||||
return self in (
|
||||
AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE,
|
||||
AdvertisingType.UNDIRECTED_SCANNABLE,
|
||||
)
|
||||
|
||||
@property
|
||||
def is_directed(self):
|
||||
def is_directed(self) -> bool:
|
||||
return self in (
|
||||
AdvertisingType.DIRECTED_CONNECTABLE_HIGH_DUTY,
|
||||
AdvertisingType.DIRECTED_CONNECTABLE_LOW_DUTY,
|
||||
|
||||
Reference in New Issue
Block a user