Dataclass-based HCI packets

This commit is contained in:
Josh Wu
2025-06-13 16:32:07 +08:00
parent bf027cf38f
commit a0498af626
5 changed files with 369 additions and 441 deletions

View File

@@ -544,15 +544,14 @@ class Controller:
acl_packet = HCI_AclDataPacket(connection.handle, 2, 0, len(data), data) acl_packet = HCI_AclDataPacket(connection.handle, 2, 0, len(data), data)
self.send_hci_packet(acl_packet) self.send_hci_packet(acl_packet)
def on_link_advertising_data(self, sender_address, data): def on_link_advertising_data(self, sender_address: Address, data: bytes):
# Ignore if we're not scanning # Ignore if we're not scanning
if self.le_scan_enable == 0: if self.le_scan_enable == 0:
return return
# Send a scan report # Send a scan report
report = HCI_LE_Advertising_Report_Event.Report( report = HCI_LE_Advertising_Report_Event.Report(
HCI_LE_Advertising_Report_Event.Report.FIELDS, event_type=HCI_LE_Advertising_Report_Event.EventType.ADV_IND,
event_type=HCI_LE_Advertising_Report_Event.ADV_IND,
address_type=sender_address.address_type, address_type=sender_address.address_type,
address=sender_address, address=sender_address,
data=data, data=data,
@@ -562,8 +561,7 @@ class Controller:
# Simulate a scan response # Simulate a scan response
report = HCI_LE_Advertising_Report_Event.Report( report = HCI_LE_Advertising_Report_Event.Report(
HCI_LE_Advertising_Report_Event.Report.FIELDS, event_type=HCI_LE_Advertising_Report_Event.EventType.SCAN_RSP,
event_type=HCI_LE_Advertising_Report_Event.SCAN_RSP,
address_type=sender_address.address_type, address_type=sender_address.address_type,
address=sender_address, address=sender_address,
data=data, data=data,

View File

@@ -201,25 +201,35 @@ class Advertisement:
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class LegacyAdvertisement(Advertisement): class LegacyAdvertisement(Advertisement):
@classmethod @classmethod
def from_advertising_report(cls, report): def from_advertising_report(
cls, report: hci.HCI_LE_Advertising_Report_Event.Report
) -> Self:
return cls( return cls(
address=report.address, address=report.address,
rssi=report.rssi, rssi=report.rssi,
is_legacy=True, is_legacy=True,
is_connectable=report.event_type is_connectable=(
report.event_type
in ( in (
hci.HCI_LE_Advertising_Report_Event.ADV_IND, hci.HCI_LE_Advertising_Report_Event.EventType.ADV_IND,
hci.HCI_LE_Advertising_Report_Event.ADV_DIRECT_IND, hci.HCI_LE_Advertising_Report_Event.EventType.ADV_DIRECT_IND,
)
), ),
is_directed=report.event_type is_directed=(
== hci.HCI_LE_Advertising_Report_Event.ADV_DIRECT_IND, report.event_type
is_scannable=report.event_type == hci.HCI_LE_Advertising_Report_Event.EventType.ADV_DIRECT_IND
),
is_scannable=(
report.event_type
in ( in (
hci.HCI_LE_Advertising_Report_Event.ADV_IND, hci.HCI_LE_Advertising_Report_Event.EventType.ADV_IND,
hci.HCI_LE_Advertising_Report_Event.ADV_SCAN_IND, hci.HCI_LE_Advertising_Report_Event.EventType.ADV_SCAN_IND,
)
),
is_scan_response=(
report.event_type
== hci.HCI_LE_Advertising_Report_Event.EventType.SCAN_RSP
), ),
is_scan_response=report.event_type
== hci.HCI_LE_Advertising_Report_Event.SCAN_RSP,
data_bytes=report.data, data_bytes=report.data,
) )
@@ -227,18 +237,20 @@ class LegacyAdvertisement(Advertisement):
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class ExtendedAdvertisement(Advertisement): class ExtendedAdvertisement(Advertisement):
@classmethod @classmethod
def from_advertising_report(cls, report): def from_advertising_report(
cls, report: hci.HCI_LE_Extended_Advertising_Report_Event.Report
) -> Self:
# fmt: off # fmt: off
# pylint: disable=line-too-long # pylint: disable=line-too-long
return cls( return cls(
address = report.address, address = report.address,
rssi = report.rssi, rssi = report.rssi,
is_legacy = report.event_type & (1 << hci.HCI_LE_Extended_Advertising_Report_Event.LEGACY_ADVERTISING_PDU_USED) != 0, is_legacy = (report.event_type & hci.HCI_LE_Extended_Advertising_Report_Event.EventType.LEGACY_ADVERTISING_PDU_USED) != 0,
is_anonymous = report.address.address_type == hci.HCI_LE_Extended_Advertising_Report_Event.ANONYMOUS_ADDRESS_TYPE, is_anonymous = report.address.address_type == hci.HCI_LE_Extended_Advertising_Report_Event.ANONYMOUS_ADDRESS_TYPE,
is_connectable = report.event_type & (1 << hci.HCI_LE_Extended_Advertising_Report_Event.CONNECTABLE_ADVERTISING) != 0, is_connectable = (report.event_type & hci.HCI_LE_Extended_Advertising_Report_Event.EventType.CONNECTABLE_ADVERTISING) != 0,
is_directed = report.event_type & (1 << hci.HCI_LE_Extended_Advertising_Report_Event.DIRECTED_ADVERTISING) != 0, is_directed = (report.event_type & hci.HCI_LE_Extended_Advertising_Report_Event.EventType.DIRECTED_ADVERTISING) != 0,
is_scannable = report.event_type & (1 << hci.HCI_LE_Extended_Advertising_Report_Event.SCANNABLE_ADVERTISING) != 0, is_scannable = (report.event_type & hci.HCI_LE_Extended_Advertising_Report_Event.EventType.SCANNABLE_ADVERTISING) != 0,
is_scan_response = report.event_type & (1 << hci.HCI_LE_Extended_Advertising_Report_Event.SCAN_RESPONSE) != 0, is_scan_response = (report.event_type & hci.HCI_LE_Extended_Advertising_Report_Event.EventType.SCAN_RESPONSE) != 0,
is_complete = (report.event_type >> 5 & 3) == hci.HCI_LE_Extended_Advertising_Report_Event.DATA_COMPLETE, is_complete = (report.event_type >> 5 & 3) == hci.HCI_LE_Extended_Advertising_Report_Event.DATA_COMPLETE,
is_truncated = (report.event_type >> 5 & 3) == hci.HCI_LE_Extended_Advertising_Report_Event.DATA_INCOMPLETE_TRUNCATED_NO_MORE_TO_COME, is_truncated = (report.event_type >> 5 & 3) == hci.HCI_LE_Extended_Advertising_Report_Event.DATA_INCOMPLETE_TRUNCATED_NO_MORE_TO_COME,
primary_phy = report.primary_phy, primary_phy = report.primary_phy,

View File

@@ -18,19 +18,19 @@
from __future__ import annotations from __future__ import annotations
import collections import collections
import dataclasses import dataclasses
from dataclasses import field
import enum import enum
import functools import functools
import logging import logging
import secrets import secrets
import struct import struct
from collections.abc import Sequence from collections.abc import Sequence
from typing import Any, Callable, Iterable, Optional, Union, TypeVar, ClassVar from typing import Any, Callable, Iterable, Optional, Union, TypeVar, ClassVar, cast
from typing_extensions import Self from typing_extensions import Self
from bumble import crypto from bumble import crypto
from bumble.colors import color from bumble.colors import color
from bumble.core import ( from bumble.core import (
AdvertisingData,
DeviceClass, DeviceClass,
InvalidArgumentError, InvalidArgumentError,
InvalidPacketError, InvalidPacketError,
@@ -1744,9 +1744,11 @@ class HCI_Object:
raise InvalidArgumentError(f'unknown field type {field_type}') raise InvalidArgumentError(f'unknown field type {field_type}')
@staticmethod @classmethod
def dict_from_bytes(data, offset, fields): def dict_and_offset_from_bytes(
result = collections.OrderedDict() cls, data: bytes, offset: int, fields: Fields
) -> tuple[int, collections.OrderedDict[str, Any]]:
result = collections.OrderedDict[str, Any]()
for field in fields: for field in fields:
if isinstance(field, list): if isinstance(field, list):
# This is an array field, starting with a 1-byte item count. # This is an array field, starting with a 1-byte item count.
@@ -1766,11 +1768,18 @@ class HCI_Object:
continue continue
field_name, field_type = field field_name, field_type = field
field_value, field_size = HCI_Object.parse_field(data, offset, field_type) assert isinstance(field_name, str)
field_value, field_size = HCI_Object.parse_field(
data, offset, cast(FieldSpec, field_type)
)
result[field_name] = field_value result[field_name] = field_value
offset += field_size offset += field_size
return result return (offset, result)
@staticmethod
def dict_from_bytes(data, offset, fields):
return HCI_Object.dict_and_offset_from_bytes(data, offset, fields)[1]
@staticmethod @staticmethod
def serialize_field(field_value: Any, field_type: FieldSpec) -> bytes: def serialize_field(field_value: Any, field_type: FieldSpec) -> bytes:
@@ -1995,6 +2004,19 @@ class HCI_Object:
return self.to_string() return self.to_string()
# -----------------------------------------------------------------------------
@dataclasses.dataclass
class HCI_Dataclass_Object(HCI_Object):
def __post_init__(self) -> None:
self.fields = HCI_Object.fields_from_dataclass(self)
@classmethod
def parse_from_bytes(cls, data: bytes, offset: int) -> tuple[int, Self]:
fields = HCI_Object.fields_from_dataclass(cls)
offset, kwargs = HCI_Object.dict_and_offset_from_bytes(data, offset, fields)
return offset, cls(**kwargs)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# Bluetooth Address # Bluetooth Address
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -2276,6 +2298,7 @@ class HCI_Command(HCI_Packet):
op_code: int = -1 op_code: int = -1
fields: Fields = () fields: Fields = ()
return_parameters_fields: Fields = () return_parameters_fields: Fields = ()
parameters: bytes = b''
@staticmethod @staticmethod
def command( def command(
@@ -2302,6 +2325,15 @@ class HCI_Command(HCI_Packet):
return inner return inner
@classmethod
def dataclass_command(cls, subclass):
# TODO: Move this to __post_init__ when all packets become dataclasses.
subclass.parameters = functools.cached_property(
lambda self: HCI_Object.dict_to_bytes(self.__dict__, self.fields)
)
subclass.parameters.__set_name__(subclass, 'parameters')
return HCI_Command.command(HCI_Object.fields_from_dataclass(subclass))(subclass)
@staticmethod @staticmethod
def command_map(symbols: dict[str, Any]) -> dict[int, str]: def command_map(symbols: dict[str, Any]) -> dict[int, str]:
return { return {
@@ -2331,9 +2363,9 @@ class HCI_Command(HCI_Packet):
@classmethod @classmethod
def from_parameters(cls, parameters: bytes) -> HCI_Command: def from_parameters(cls, parameters: bytes) -> HCI_Command:
command = cls(parameters) if dataclasses.is_dataclass(cls):
HCI_Object.init_from_bytes(command, parameters, 0, cls.fields) return cls(**HCI_Object.dict_from_bytes(parameters, 0, cls.fields))
return command return cls(parameters, **HCI_Object.dict_from_bytes(parameters, 0, cls.fields))
@staticmethod @staticmethod
def command_name(op_code): def command_name(op_code):
@@ -2373,7 +2405,7 @@ class HCI_Command(HCI_Packet):
parameters = HCI_Object.dict_to_bytes(kwargs, self.fields) parameters = HCI_Object.dict_to_bytes(kwargs, self.fields)
self.parameters = parameters or b'' self.parameters = parameters or b''
def __bytes__(self): def __bytes__(self) -> bytes:
parameters = b'' if self.parameters is None else self.parameters parameters = b'' if self.parameters is None else self.parameters
return ( return (
struct.pack('<BHB', HCI_COMMAND_PACKET, self.op_code, len(parameters)) struct.pack('<BHB', HCI_COMMAND_PACKET, self.op_code, len(parameters))
@@ -2394,18 +2426,19 @@ HCI_Command.register_commands(globals())
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@HCI_Command.command( @HCI_Command.dataclass_command
[ @dataclasses.dataclass
('lap', {'size': 3, 'mapper': HCI_Constant.inquiry_lap_name}),
('inquiry_length', 1),
('num_responses', 1),
]
)
class HCI_Inquiry_Command(HCI_Command): class HCI_Inquiry_Command(HCI_Command):
''' '''
See Bluetooth spec @ 7.1.1 Inquiry Command See Bluetooth spec @ 7.1.1 Inquiry Command
''' '''
lap: int = field(
metadata=metadata({'size': 3, 'mapper': HCI_Constant.inquiry_lap_name})
)
inquiry_length: int = field(metadata=metadata(1))
num_responses: int = field(metadata=metadata(1))
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@HCI_Command.command() @HCI_Command.command()
@@ -5594,6 +5627,7 @@ class HCI_Event(HCI_Packet):
vendor_factories: list[Callable[[bytes], Optional[HCI_Event]]] = [] vendor_factories: list[Callable[[bytes], Optional[HCI_Event]]] = []
event_code: int = -1 event_code: int = -1
fields: Fields = () fields: Fields = ()
parameters: bytes = b''
@staticmethod @staticmethod
def event(fields: Optional[Fields] = ()): def event(fields: Optional[Fields] = ()):
@@ -5618,6 +5652,15 @@ class HCI_Event(HCI_Packet):
return inner return inner
@classmethod
def dataclass_event(cls, subclass):
# TODO: Move this to __post_init__ when all packets become dataclasses.
subclass.parameters = functools.cached_property(
lambda self: HCI_Object.dict_to_bytes(self.__dict__, self.fields)
)
subclass.parameters.__set_name__(subclass, 'parameters')
return HCI_Event.event(HCI_Object.fields_from_dataclass(subclass))(subclass)
@staticmethod @staticmethod
def event_map(symbols: dict[str, Any]) -> dict[int, str]: def event_map(symbols: dict[str, Any]) -> dict[int, str]:
return { return {
@@ -5701,10 +5744,9 @@ class HCI_Event(HCI_Packet):
@classmethod @classmethod
def from_parameters(cls, parameters: bytes) -> Self: def from_parameters(cls, parameters: bytes) -> Self:
self = cls(parameters) if dataclasses.is_dataclass(cls):
if self.fields: return cls(**HCI_Object.dict_from_bytes(parameters, 0, cls.fields))
HCI_Object.init_from_bytes(self, parameters, 0, self.fields) return cls(parameters, **HCI_Object.dict_from_bytes(parameters, 0, cls.fields))
return self
def __init__( def __init__(
self, self,
@@ -5722,7 +5764,7 @@ class HCI_Event(HCI_Packet):
parameters = HCI_Object.dict_to_bytes(kwargs, self.fields) parameters = HCI_Object.dict_to_bytes(kwargs, self.fields)
self.parameters = parameters or b'' self.parameters = parameters or b''
def __bytes__(self): def __bytes__(self) -> bytes:
parameters = b'' if self.parameters is None else self.parameters parameters = b'' if self.parameters is None else self.parameters
return bytes([HCI_EVENT_PACKET, self.event_code, len(parameters)]) + parameters return bytes([HCI_EVENT_PACKET, self.event_code, len(parameters)]) + parameters
@@ -5757,21 +5799,35 @@ class HCI_Extended_Event(HCI_Event):
ExtendedEvent = TypeVar("ExtendedEvent", bound=HCI_Extended_Event) ExtendedEvent = TypeVar("ExtendedEvent", bound=HCI_Extended_Event)
def inner(cls: type[ExtendedEvent]) -> type[ExtendedEvent]: def inner(subclass: type[ExtendedEvent]) -> type[ExtendedEvent]:
cls.name = cls.__name__.upper() subclass.name = subclass.__name__.upper()
cls.subevent_code = key_with_value(cls.subevent_names, cls.name) subclass.subevent_code = key_with_value(
if cls.subevent_code is None: subclass.subevent_names, subclass.name
raise KeyError(f'subevent {cls.name} not found in subevent_names') )
if subclass.subevent_code is None:
raise KeyError(f'subevent {subclass.name} not found in subevent_names')
if fields is not None: if fields is not None:
cls.fields = fields subclass.fields = fields
# Register a factory for this class # Register a factory for this class
cls.subevent_classes[cls.subevent_code] = cls cls.subevent_classes[subclass.subevent_code] = subclass
return cls return subclass
return inner return inner
@classmethod
def dataclass_event(cls, subclass):
# TODO: Move this to __post_init__ when all packets become dataclasses.
subclass.parameters = functools.cached_property(
lambda self: (
bytes([self.subevent_code])
+ HCI_Object.dict_to_bytes(self.__dict__, self.fields)
)
)
subclass.parameters.__set_name__(subclass, 'parameters')
return cls.event(HCI_Object.fields_from_dataclass(subclass))(subclass)
@classmethod @classmethod
def subevent_name(cls, subevent_code): def subevent_name(cls, subevent_code):
subevent_name = cls.subevent_names.get(subevent_code) subevent_name = cls.subevent_names.get(subevent_code)
@@ -5809,10 +5865,9 @@ class HCI_Extended_Event(HCI_Event):
@classmethod @classmethod
def from_parameters(cls, parameters: bytes) -> HCI_Extended_Event: def from_parameters(cls, parameters: bytes) -> HCI_Extended_Event:
"""Factory method for subclasses (the subevent code has already been parsed)""" """Factory method for subclasses (the subevent code has already been parsed)"""
event = cls(parameters) if dataclasses.is_dataclass(cls):
if event.fields: return cls(**HCI_Object.dict_from_bytes(parameters, 1, cls.fields))
HCI_Object.init_from_bytes(event, parameters, 1, event.fields) return cls(parameters, **HCI_Object.dict_from_bytes(parameters, 1, cls.fields))
return event
def __init__( def __init__(
self, self,
@@ -5878,96 +5933,41 @@ class HCI_LE_Connection_Complete_Event(HCI_LE_Meta_Event):
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@HCI_LE_Meta_Event.dataclass_event
@dataclasses.dataclass
class HCI_LE_Advertising_Report_Event(HCI_LE_Meta_Event): class HCI_LE_Advertising_Report_Event(HCI_LE_Meta_Event):
''' '''
See Bluetooth spec @ 7.7.65.2 LE Advertising Report Event See Bluetooth spec @ 7.7.65.2 LE Advertising Report Event
''' '''
subevent_code = HCI_LE_ADVERTISING_REPORT_EVENT class EventType(utils.OpenIntEnum):
name = 'HCI_LE_ADVERTISING_REPORT_EVENT'
# Event Types
ADV_IND = 0x00 ADV_IND = 0x00
ADV_DIRECT_IND = 0x01 ADV_DIRECT_IND = 0x01
ADV_SCAN_IND = 0x02 ADV_SCAN_IND = 0x02
ADV_NONCONN_IND = 0x03 ADV_NONCONN_IND = 0x03
SCAN_RSP = 0x04 SCAN_RSP = 0x04
EVENT_TYPE_NAMES = { @dataclasses.dataclass
ADV_IND: 'ADV_IND', # Connectable and scannable undirected advertising class Report(HCI_Dataclass_Object):
ADV_DIRECT_IND: 'ADV_DIRECT_IND', # Connectable directed advertising event_type: int = field(
ADV_SCAN_IND: 'ADV_SCAN_IND', # Scannable undirected advertising metadata=metadata(
ADV_NONCONN_IND: 'ADV_NONCONN_IND', # Non connectable undirected advertising
SCAN_RSP: 'SCAN_RSP', # Scan Response
}
class Report(HCI_Object):
FIELDS = [
('event_type', 1),
('address_type', Address.ADDRESS_TYPE_SPEC),
('address', Address.parse_address_preceded_by_type),
('data', 'v'),
('rssi', -1),
]
@classmethod
def from_parameters(cls, parameters, offset):
return cls.from_bytes(parameters, offset, cls.FIELDS)
def event_type_string(self):
return HCI_LE_Advertising_Report_Event.event_type_name(self.event_type)
def to_string(self, indentation='', _=None):
def data_to_str(data):
try:
return data.hex() + ': ' + str(AdvertisingData.from_bytes(data))
except Exception:
return data.hex()
return super().to_string(
indentation,
{ {
'event_type': HCI_LE_Advertising_Report_Event.event_type_name, 'size': 1,
'address_type': Address.address_type_name, 'mapper': lambda x: HCI_LE_Advertising_Report_Event.EventType(
'data': data_to_str, x
}, ).name,
}
) )
@classmethod
def event_type_name(cls, event_type):
return name_or_number(cls.EVENT_TYPE_NAMES, event_type)
@classmethod
def from_parameters(cls, parameters: bytes) -> Self:
num_reports = parameters[1]
reports = []
offset = 2
for _ in range(num_reports):
report = cls.Report.from_parameters(parameters, offset)
offset += 10 + len(report.data)
reports.append(report)
return cls(reports)
def __init__(self, reports):
self.reports = reports[:]
# Serialize the fields
parameters = bytes([HCI_LE_ADVERTISING_REPORT_EVENT, len(reports)]) + b''.join(
[bytes(report) for report in reports]
) )
address_type: int = field(metadata=metadata(Address.ADDRESS_TYPE_SPEC))
super().__init__(parameters) address: Address = field(
metadata=metadata(Address.parse_address_preceded_by_type)
def __str__(self):
reports = '\n'.join(
[f'{i}:\n{report.to_string(" ")}' for i, report in enumerate(self.reports)]
) )
return f'{color(self.subevent_name(self.subevent_code), "magenta")}:\n{reports}' data: bytes = field(metadata=metadata('v'))
rssi: int = field(metadata=metadata(-1))
reports: Sequence[Report] = field(
HCI_LE_Meta_Event.subevent_classes[HCI_LE_ADVERTISING_REPORT_EVENT] = ( metadata=metadata(Report.parse_from_bytes, list_begin=True, list_end=True)
HCI_LE_Advertising_Report_Event
) )
@@ -6107,40 +6107,31 @@ class HCI_LE_PHY_Update_Complete_Event(HCI_LE_Meta_Event):
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@HCI_LE_Meta_Event.dataclass_event
@dataclasses.dataclass
class HCI_LE_Extended_Advertising_Report_Event(HCI_LE_Meta_Event): class HCI_LE_Extended_Advertising_Report_Event(HCI_LE_Meta_Event):
''' '''
See Bluetooth spec @ 7.7.65.13 LE Extended Advertising Report Event See Bluetooth spec @ 7.7.65.13 LE Extended Advertising Report Event
''' '''
subevent_code = HCI_LE_EXTENDED_ADVERTISING_REPORT_EVENT class EventType(enum.IntFlag):
name = 'HCI_LE_EXTENDED_ADVERTISING_REPORT_EVENT' CONNECTABLE_ADVERTISING = 1 << 0
SCANNABLE_ADVERTISING = 1 << 1
# Event types flags DIRECTED_ADVERTISING = 1 << 2
CONNECTABLE_ADVERTISING = 0 SCAN_RESPONSE = 1 << 3
SCANNABLE_ADVERTISING = 1 LEGACY_ADVERTISING_PDU_USED = 1 << 4
DIRECTED_ADVERTISING = 2
SCAN_RESPONSE = 3
LEGACY_ADVERTISING_PDU_USED = 4
DATA_COMPLETE = 0x00 DATA_COMPLETE = 0x00
DATA_INCOMPLETE_MORE_TO_COME = 0x01 DATA_INCOMPLETE_MORE_TO_COME = 0x01
DATA_INCOMPLETE_TRUNCATED_NO_MORE_TO_COME = 0x02 DATA_INCOMPLETE_TRUNCATED_NO_MORE_TO_COME = 0x02
EVENT_TYPE_FLAG_NAMES = (
'CONNECTABLE_ADVERTISING',
'SCANNABLE_ADVERTISING',
'DIRECTED_ADVERTISING',
'SCAN_RESPONSE',
'LEGACY_ADVERTISING_PDU_USED',
)
LEGACY_PDU_TYPE_MAP = { LEGACY_PDU_TYPE_MAP = {
0b0011: HCI_LE_Advertising_Report_Event.ADV_IND, 0b0011: HCI_LE_Advertising_Report_Event.EventType.ADV_IND,
0b0101: HCI_LE_Advertising_Report_Event.ADV_DIRECT_IND, 0b0101: HCI_LE_Advertising_Report_Event.EventType.ADV_DIRECT_IND,
0b0010: HCI_LE_Advertising_Report_Event.ADV_SCAN_IND, 0b0010: HCI_LE_Advertising_Report_Event.EventType.ADV_SCAN_IND,
0b0000: HCI_LE_Advertising_Report_Event.ADV_NONCONN_IND, 0b0000: HCI_LE_Advertising_Report_Event.EventType.ADV_NONCONN_IND,
0b1011: HCI_LE_Advertising_Report_Event.SCAN_RSP, 0b1011: HCI_LE_Advertising_Report_Event.EventType.SCAN_RSP,
0b1010: HCI_LE_Advertising_Report_Event.SCAN_RSP, 0b1010: HCI_LE_Advertising_Report_Event.EventType.SCAN_RSP,
} }
NO_ADI_FIELD_PROVIDED = 0xFF NO_ADI_FIELD_PROVIDED = 0xFF
@@ -6149,109 +6140,40 @@ class HCI_LE_Extended_Advertising_Report_Event(HCI_LE_Meta_Event):
ANONYMOUS_ADDRESS_TYPE = 0xFF ANONYMOUS_ADDRESS_TYPE = 0xFF
UNRESOLVED_RESOLVABLE_ADDRESS_TYPE = 0xFE UNRESOLVED_RESOLVABLE_ADDRESS_TYPE = 0xFE
class Report(HCI_Object): @dataclasses.dataclass
FIELDS = [ class Report(HCI_Dataclass_Object):
('event_type', 2), event_type: int = field(
('address_type', Address.ADDRESS_TYPE_SPEC), metadata=metadata(
('address', Address.parse_address_preceded_by_type),
('primary_phy', {'size': 1, 'mapper': HCI_Constant.le_phy_name}),
('secondary_phy', {'size': 1, 'mapper': HCI_Constant.le_phy_name}),
('advertising_sid', 1),
('tx_power', 1),
('rssi', -1),
('periodic_advertising_interval', 2),
('direct_address_type', Address.ADDRESS_TYPE_SPEC),
('direct_address', Address.parse_address_preceded_by_type),
('data', 'v'),
]
@classmethod
def from_parameters(cls, parameters, offset):
return cls.from_bytes(parameters, offset, cls.FIELDS)
def event_type_string(self):
return HCI_LE_Extended_Advertising_Report_Event.event_type_string(
self.event_type
)
def to_string(self, indentation='', _=None):
# pylint: disable=line-too-long
def data_to_str(data):
try:
return data.hex() + ': ' + str(AdvertisingData.from_bytes(data))
except Exception:
return data.hex()
return super().to_string(
indentation,
{ {
'event_type': HCI_LE_Extended_Advertising_Report_Event.event_type_string, 'size': 2,
'address_type': Address.address_type_name, 'mapper': lambda x: HCI_LE_Extended_Advertising_Report_Event.EventType(
'data': data_to_str, x
}, ).name,
) }
@staticmethod
def event_type_string(event_type):
event_type_flags = bit_flags_to_strings(
event_type & 0x1F,
HCI_LE_Extended_Advertising_Report_Event.EVENT_TYPE_FLAG_NAMES,
)
event_type_flags.append(
('COMPLETE', 'INCOMPLETE+', 'INCOMPLETE#', '?')[(event_type >> 5) & 3]
)
if event_type & (
1 << HCI_LE_Extended_Advertising_Report_Event.LEGACY_ADVERTISING_PDU_USED
):
legacy_pdu_type = (
HCI_LE_Extended_Advertising_Report_Event.LEGACY_PDU_TYPE_MAP.get(
event_type & 0x0F
) )
) )
if legacy_pdu_type is not None: address_type: int = field(metadata=metadata(Address.ADDRESS_TYPE_SPEC))
# pylint: disable=line-too-long address: Address = field(
legacy_info_string = f'({HCI_LE_Advertising_Report_Event.event_type_name(legacy_pdu_type)})' metadata=metadata(Address.parse_address_preceded_by_type)
else:
legacy_info_string = ''
else:
legacy_info_string = ''
return f'0x{event_type:04X} [{",".join(event_type_flags)}]{legacy_info_string}'
@classmethod
def from_parameters(cls, parameters: bytes) -> Self:
num_reports = parameters[1]
reports: list[HCI_LE_Extended_Advertising_Report_Event.Report] = []
offset = 2
for _ in range(num_reports):
report = cls.Report.from_parameters(parameters, offset)
offset += 24 + len(report.data)
reports.append(report)
return cls(reports)
def __init__(
self, reports: Sequence[HCI_LE_Extended_Advertising_Report_Event.Report]
):
self.reports = reports[:]
# Serialize the fields
parameters = bytes(
[HCI_LE_EXTENDED_ADVERTISING_REPORT_EVENT, len(reports)]
) + b''.join([bytes(report) for report in reports])
super().__init__(parameters)
def __str__(self):
reports = '\n'.join(
[f'{i}:\n{report.to_string(" ")}' for i, report in enumerate(self.reports)]
) )
return f'{color(self.subevent_name(self.subevent_code), "magenta")}:\n{reports}' primary_phy: int = field(
metadata=metadata({'size': 1, 'mapper': HCI_Constant.le_phy_name})
)
secondary_phy: int = field(
metadata=metadata({'size': 1, 'mapper': HCI_Constant.le_phy_name})
)
advertising_sid: int = field(metadata=metadata(1))
tx_power: int = field(metadata=metadata(1))
rssi: int = field(metadata=metadata(-1))
periodic_advertising_interval: int = field(metadata=metadata(2))
direct_address_type: int = field(metadata=metadata(Address.ADDRESS_TYPE_SPEC))
direct_address: int = field(
metadata=metadata(Address.parse_address_preceded_by_type)
)
data: bytes = field(metadata=metadata('v'))
reports: Sequence[Report] = field(
HCI_LE_Meta_Event.subevent_classes[HCI_LE_EXTENDED_ADVERTISING_REPORT_EVENT] = ( metadata=metadata(Report.parse_from_bytes, list_begin=True, list_end=True)
HCI_LE_Extended_Advertising_Report_Event
) )
@@ -6876,12 +6798,15 @@ class HCI_LE_CS_Test_End_Complete_Event(HCI_LE_Meta_Event):
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@HCI_Event.event([('status', STATUS_SPEC)]) @HCI_Event.dataclass_event
@dataclasses.dataclass
class HCI_Inquiry_Complete_Event(HCI_Event): class HCI_Inquiry_Complete_Event(HCI_Event):
''' '''
See Bluetooth spec @ 7.7.1 Inquiry Complete Event See Bluetooth spec @ 7.7.1 Inquiry Complete Event
''' '''
status: int = field(metadata=metadata(STATUS_SPEC))
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@HCI_Event.event( @HCI_Event.event(

View File

@@ -1126,11 +1126,19 @@ class Host(utils.EventEmitter):
else: else:
self.emit('connection_phy_update_failure', connection.handle, event.status) self.emit('connection_phy_update_failure', connection.handle, event.status)
def on_hci_le_advertising_report_event(self, event): def on_hci_le_advertising_report_event(
self,
event: (
hci.HCI_LE_Advertising_Report_Event
| hci.HCI_LE_Extended_Advertising_Report_Event
),
):
for report in event.reports: for report in event.reports:
self.emit('advertising_report', report) self.emit('advertising_report', report)
def on_hci_le_extended_advertising_report_event(self, event): def on_hci_le_extended_advertising_report_event(
self, event: hci.HCI_LE_Extended_Advertising_Report_Event
):
self.on_hci_le_advertising_report_event(event) self.on_hci_le_advertising_report_event(event)
def on_hci_le_advertising_set_terminated_event(self, event): def on_hci_le_advertising_set_terminated_event(self, event):

View File

@@ -21,60 +21,6 @@ import struct
import pytest import pytest
from bumble import hci from bumble import hci
from bumble.hci import (
HCI_DISCONNECT_COMMAND,
HCI_LE_1M_PHY_BIT,
HCI_LE_CODED_PHY_BIT,
HCI_LE_READ_BUFFER_SIZE_COMMAND,
HCI_RESET_COMMAND,
HCI_VENDOR_EVENT,
HCI_SUCCESS,
HCI_LE_CONNECTION_COMPLETE_EVENT,
HCI_LE_ENHANCED_CONNECTION_COMPLETE_V2_EVENT,
Address,
CodingFormat,
CodecID,
HCI_Command,
HCI_Command_Complete_Event,
HCI_Command_Status_Event,
HCI_CustomPacket,
HCI_Disconnect_Command,
HCI_Event,
HCI_IsoDataPacket,
HCI_LE_Add_Device_To_Filter_Accept_List_Command,
HCI_LE_Advertising_Report_Event,
HCI_LE_Channel_Selection_Algorithm_Event,
HCI_LE_Connection_Complete_Event,
HCI_LE_Connection_Update_Command,
HCI_LE_Connection_Update_Complete_Event,
HCI_LE_Create_Connection_Command,
HCI_LE_Extended_Create_Connection_Command,
HCI_LE_Read_Buffer_Size_Command,
HCI_LE_Read_Remote_Features_Command,
HCI_LE_Read_Remote_Features_Complete_Event,
HCI_LE_Remove_Device_From_Filter_Accept_List_Command,
HCI_LE_Set_Advertising_Data_Command,
HCI_LE_Set_Advertising_Parameters_Command,
HCI_LE_Set_Default_PHY_Command,
HCI_LE_Set_Event_Mask_Command,
HCI_LE_Set_Extended_Advertising_Enable_Command,
HCI_LE_Set_Extended_Scan_Parameters_Command,
HCI_LE_Set_Random_Address_Command,
HCI_LE_Set_Scan_Enable_Command,
HCI_LE_Set_Scan_Parameters_Command,
HCI_LE_Setup_ISO_Data_Path_Command,
HCI_Number_Of_Completed_Packets_Event,
HCI_Packet,
HCI_PIN_Code_Request_Reply_Command,
HCI_Read_Local_Supported_Codecs_Command,
HCI_Read_Local_Supported_Codecs_V2_Command,
HCI_Read_Local_Supported_Commands_Command,
HCI_Read_Local_Supported_Features_Command,
HCI_Read_Local_Version_Information_Command,
HCI_Reset_Command,
HCI_Set_Event_Mask_Command,
HCI_Vendor_Event,
)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -84,7 +30,7 @@ from bumble.hci import (
def basic_check(x): def basic_check(x):
packet = bytes(x) packet = bytes(x)
print(packet.hex()) print(packet.hex())
parsed = HCI_Packet.from_bytes(packet) parsed = hci.HCI_Packet.from_bytes(packet)
x_str = str(x) x_str = str(x)
parsed_str = str(parsed) parsed_str = str(parsed)
print(x_str) print(x_str)
@@ -95,18 +41,18 @@ def basic_check(x):
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_HCI_Event(): def test_HCI_Event():
event = HCI_Event(event_code=0xF9) event = hci.HCI_Event(event_code=0xF9)
basic_check(event) basic_check(event)
event = HCI_Event(event_code=0xF8, parameters=bytes.fromhex('AABBCC')) event = hci.HCI_Event(event_code=0xF8, parameters=bytes.fromhex('AABBCC'))
basic_check(event) basic_check(event)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_HCI_LE_Connection_Complete_Event(): def test_HCI_LE_Connection_Complete_Event():
address = Address('00:11:22:33:44:55') address = hci.Address('00:11:22:33:44:55')
event = HCI_LE_Connection_Complete_Event( event = hci.HCI_LE_Connection_Complete_Event(
status=HCI_SUCCESS, status=hci.HCI_SUCCESS,
connection_handle=1, connection_handle=1,
role=1, role=1,
peer_address_type=1, peer_address_type=1,
@@ -121,25 +67,47 @@ def test_HCI_LE_Connection_Complete_Event():
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_HCI_LE_Advertising_Report_Event(): def test_HCI_LE_Advertising_Report_Event():
address = Address('00:11:22:33:44:55/P') address = hci.Address('00:11:22:33:44:55/P')
report = HCI_LE_Advertising_Report_Event.Report( report = hci.HCI_LE_Advertising_Report_Event.Report(
HCI_LE_Advertising_Report_Event.Report.FIELDS, event_type=hci.HCI_LE_Advertising_Report_Event.EventType.ADV_IND,
event_type=HCI_LE_Advertising_Report_Event.ADV_IND, address_type=hci.Address.PUBLIC_DEVICE_ADDRESS,
address_type=Address.PUBLIC_DEVICE_ADDRESS,
address=address, address=address,
data=bytes.fromhex( data=bytes.fromhex(
'0201061106ba5689a6fabfa2bd01467d6e00fbabad08160a181604659b03' '0201061106ba5689a6fabfa2bd01467d6e00fbabad08160a181604659b03'
), ),
rssi=100, rssi=100,
) )
event = HCI_LE_Advertising_Report_Event([report]) event = hci.HCI_LE_Advertising_Report_Event([report])
basic_check(event)
# -----------------------------------------------------------------------------
def test_HCI_LE_Extended_Advertising_Report_Event():
address = hci.Address('00:11:22:33:44:55/P')
report = hci.HCI_LE_Extended_Advertising_Report_Event.Report(
event_type=hci.HCI_LE_Extended_Advertising_Report_Event.EventType.CONNECTABLE_ADVERTISING,
address_type=hci.Address.PUBLIC_DEVICE_ADDRESS,
address=address,
data=bytes.fromhex(
'0201061106ba5689a6fabfa2bd01467d6e00fbabad08160a181604659b03'
),
rssi=100,
primary_phy=hci.HCI_LE_1M_PHY,
secondary_phy=hci.HCI_LE_CODED_PHY,
advertising_sid=0,
tx_power=10,
periodic_advertising_interval=2,
direct_address=hci.Address('00:11:22:33:44:55/P'),
direct_address_type=hci.Address.PUBLIC_DEVICE_ADDRESS,
)
event = hci.HCI_LE_Extended_Advertising_Report_Event([report])
basic_check(event) basic_check(event)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_HCI_LE_Read_Remote_Features_Complete_Event(): def test_HCI_LE_Read_Remote_Features_Complete_Event():
event = HCI_LE_Read_Remote_Features_Complete_Event( event = hci.HCI_LE_Read_Remote_Features_Complete_Event(
status=HCI_SUCCESS, status=hci.HCI_SUCCESS,
connection_handle=0x007, connection_handle=0x007,
le_features=bytes.fromhex('0011223344556677'), le_features=bytes.fromhex('0011223344556677'),
) )
@@ -148,8 +116,8 @@ def test_HCI_LE_Read_Remote_Features_Complete_Event():
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_HCI_LE_Connection_Update_Complete_Event(): def test_HCI_LE_Connection_Update_Complete_Event():
event = HCI_LE_Connection_Update_Complete_Event( event = hci.HCI_LE_Connection_Update_Complete_Event(
status=HCI_SUCCESS, status=hci.HCI_SUCCESS,
connection_handle=0x007, connection_handle=0x007,
connection_interval=10, connection_interval=10,
peripheral_latency=3, peripheral_latency=3,
@@ -160,7 +128,7 @@ def test_HCI_LE_Connection_Update_Complete_Event():
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_HCI_LE_Channel_Selection_Algorithm_Event(): def test_HCI_LE_Channel_Selection_Algorithm_Event():
event = HCI_LE_Channel_Selection_Algorithm_Event( event = hci.HCI_LE_Channel_Selection_Algorithm_Event(
connection_handle=7, channel_selection_algorithm=1 connection_handle=7, channel_selection_algorithm=1
) )
basic_check(event) basic_check(event)
@@ -169,10 +137,10 @@ def test_HCI_LE_Channel_Selection_Algorithm_Event():
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_HCI_Command_Complete_Event(): def test_HCI_Command_Complete_Event():
# With a serializable object # With a serializable object
event = HCI_Command_Complete_Event( event = hci.HCI_Command_Complete_Event(
num_hci_command_packets=34, num_hci_command_packets=34,
command_opcode=HCI_LE_READ_BUFFER_SIZE_COMMAND, command_opcode=hci.HCI_LE_READ_BUFFER_SIZE_COMMAND,
return_parameters=HCI_LE_Read_Buffer_Size_Command.create_return_parameters( return_parameters=hci.HCI_LE_Read_Buffer_Size_Command.create_return_parameters(
status=0, status=0,
le_acl_data_packet_length=1234, le_acl_data_packet_length=1234,
total_num_le_acl_data_packets=56, total_num_le_acl_data_packets=56,
@@ -181,26 +149,28 @@ def test_HCI_Command_Complete_Event():
basic_check(event) basic_check(event)
# With an arbitrary byte array # With an arbitrary byte array
event = HCI_Command_Complete_Event( event = hci.HCI_Command_Complete_Event(
num_hci_command_packets=1, num_hci_command_packets=1,
command_opcode=HCI_RESET_COMMAND, command_opcode=hci.HCI_RESET_COMMAND,
return_parameters=bytes([1, 2, 3, 4]), return_parameters=bytes([1, 2, 3, 4]),
) )
basic_check(event) basic_check(event)
# With a simple status as a 1-byte array # With a simple status as a 1-byte array
event = HCI_Command_Complete_Event( event = hci.HCI_Command_Complete_Event(
num_hci_command_packets=1, num_hci_command_packets=1,
command_opcode=HCI_RESET_COMMAND, command_opcode=hci.HCI_RESET_COMMAND,
return_parameters=bytes([7]), return_parameters=bytes([7]),
) )
basic_check(event) basic_check(event)
event = HCI_Packet.from_bytes(bytes(event)) event = hci.HCI_Packet.from_bytes(bytes(event))
assert event.return_parameters == 7 assert event.return_parameters == 7
# With a simple status as an integer status # With a simple status as an integer status
event = HCI_Command_Complete_Event( event = hci.HCI_Command_Complete_Event(
num_hci_command_packets=1, command_opcode=HCI_RESET_COMMAND, return_parameters=9 num_hci_command_packets=1,
command_opcode=hci.HCI_RESET_COMMAND,
return_parameters=9,
) )
basic_check(event) basic_check(event)
assert event.return_parameters == 9 assert event.return_parameters == 9
@@ -208,15 +178,15 @@ def test_HCI_Command_Complete_Event():
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_HCI_Command_Status_Event(): def test_HCI_Command_Status_Event():
event = HCI_Command_Status_Event( event = hci.HCI_Command_Status_Event(
status=0, num_hci_command_packets=37, command_opcode=HCI_DISCONNECT_COMMAND status=0, num_hci_command_packets=37, command_opcode=hci.HCI_DISCONNECT_COMMAND
) )
basic_check(event) basic_check(event)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_HCI_Number_Of_Completed_Packets_Event(): def test_HCI_Number_Of_Completed_Packets_Event():
event = HCI_Number_Of_Completed_Packets_Event( event = hci.HCI_Number_Of_Completed_Packets_Event(
connection_handles=(1, 2), connection_handles=(1, 2),
num_completed_packets=(3, 4), num_completed_packets=(3, 4),
) )
@@ -226,16 +196,16 @@ def test_HCI_Number_Of_Completed_Packets_Event():
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_HCI_Vendor_Event(): def test_HCI_Vendor_Event():
data = bytes.fromhex('01020304') data = bytes.fromhex('01020304')
event = HCI_Vendor_Event(data=data) event = hci.HCI_Vendor_Event(data=data)
event_bytes = bytes(event) event_bytes = bytes(event)
parsed = HCI_Packet.from_bytes(event_bytes) parsed = hci.HCI_Packet.from_bytes(event_bytes)
assert isinstance(parsed, HCI_Vendor_Event) assert isinstance(parsed, hci.HCI_Vendor_Event)
assert parsed.data == data assert parsed.data == data
class HCI_Custom_Event(HCI_Event): class HCI_Custom_Event(hci.HCI_Event):
def __init__(self, blabla): def __init__(self, blabla):
super().__init__( super().__init__(
event_code=HCI_VENDOR_EVENT, parameters=struct.pack("<I", blabla) event_code=hci.HCI_VENDOR_EVENT, parameters=struct.pack("<I", blabla)
) )
self.name = 'HCI_CUSTOM_EVENT' self.name = 'HCI_CUSTOM_EVENT'
self.blabla = blabla self.blabla = blabla
@@ -245,27 +215,27 @@ def test_HCI_Vendor_Event():
return HCI_Custom_Event(blabla=struct.unpack('<I', payload)[0]) return HCI_Custom_Event(blabla=struct.unpack('<I', payload)[0])
return None return None
HCI_Event.add_vendor_factory(create_event) hci.HCI_Event.add_vendor_factory(create_event)
parsed = HCI_Packet.from_bytes(event_bytes) parsed = hci.HCI_Packet.from_bytes(event_bytes)
assert isinstance(parsed, HCI_Custom_Event) assert isinstance(parsed, HCI_Custom_Event)
assert parsed.blabla == 0x04030201 assert parsed.blabla == 0x04030201
event_bytes2 = event_bytes[:3] + bytes([7]) + event_bytes[4:] event_bytes2 = event_bytes[:3] + bytes([7]) + event_bytes[4:]
parsed = HCI_Packet.from_bytes(event_bytes2) parsed = hci.HCI_Packet.from_bytes(event_bytes2)
assert not isinstance(parsed, HCI_Custom_Event) assert not isinstance(parsed, HCI_Custom_Event)
assert isinstance(parsed, HCI_Vendor_Event) assert isinstance(parsed, hci.HCI_Vendor_Event)
HCI_Event.remove_vendor_factory(create_event) hci.HCI_Event.remove_vendor_factory(create_event)
parsed = HCI_Packet.from_bytes(event_bytes) parsed = hci.HCI_Packet.from_bytes(event_bytes)
assert not isinstance(parsed, HCI_Custom_Event) assert not isinstance(parsed, HCI_Custom_Event)
assert isinstance(parsed, HCI_Vendor_Event) assert isinstance(parsed, hci.HCI_Vendor_Event)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_HCI_Command(): def test_HCI_Command():
command = HCI_Command(op_code=0x5566) command = hci.HCI_Command(op_code=0x5566)
basic_check(command) basic_check(command)
command = HCI_Command(op_code=0x5566, parameters=bytes.fromhex('AABBCC')) command = hci.HCI_Command(op_code=0x5566, parameters=bytes.fromhex('AABBCC'))
basic_check(command) basic_check(command)
@@ -324,12 +294,12 @@ def test_HCI_PIN_Code_Request_Reply_Command():
pin_code = b'1234' pin_code = b'1234'
pin_code_length = len(pin_code) pin_code_length = len(pin_code)
# here to make the test pass, we need to # here to make the test pass, we need to
# pad pin_code, as HCI_Object.format_fields # pad pin_code, as hci.HCI_Object.format_fields
# does not do it for us # does not do it for us
padded_pin_code = pin_code + bytes(16 - pin_code_length) padded_pin_code = pin_code + bytes(16 - pin_code_length)
command = HCI_PIN_Code_Request_Reply_Command( command = hci.HCI_PIN_Code_Request_Reply_Command(
bd_addr=Address( bd_addr=hci.Address(
'00:11:22:33:44:55', address_type=Address.PUBLIC_DEVICE_ADDRESS '00:11:22:33:44:55', address_type=hci.Address.PUBLIC_DEVICE_ADDRESS
), ),
pin_code_length=pin_code_length, pin_code_length=pin_code_length,
pin_code=padded_pin_code, pin_code=padded_pin_code,
@@ -338,47 +308,49 @@ def test_HCI_PIN_Code_Request_Reply_Command():
def test_HCI_Reset_Command(): def test_HCI_Reset_Command():
command = HCI_Reset_Command() command = hci.HCI_Reset_Command()
basic_check(command) basic_check(command)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_HCI_Read_Local_Version_Information_Command(): def test_HCI_Read_Local_Version_Information_Command():
command = HCI_Read_Local_Version_Information_Command() command = hci.HCI_Read_Local_Version_Information_Command()
basic_check(command) basic_check(command)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_HCI_Read_Local_Supported_Commands_Command(): def test_HCI_Read_Local_Supported_Commands_Command():
command = HCI_Read_Local_Supported_Commands_Command() command = hci.HCI_Read_Local_Supported_Commands_Command()
basic_check(command) basic_check(command)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_HCI_Read_Local_Supported_Features_Command(): def test_HCI_Read_Local_Supported_Features_Command():
command = HCI_Read_Local_Supported_Features_Command() command = hci.HCI_Read_Local_Supported_Features_Command()
basic_check(command) basic_check(command)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_HCI_Disconnect_Command(): def test_HCI_Disconnect_Command():
command = HCI_Disconnect_Command(connection_handle=123, reason=0x11) command = hci.HCI_Disconnect_Command(connection_handle=123, reason=0x11)
basic_check(command) basic_check(command)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_HCI_Set_Event_Mask_Command(): def test_HCI_Set_Event_Mask_Command():
command = HCI_Set_Event_Mask_Command(event_mask=bytes.fromhex('0011223344556677')) command = hci.HCI_Set_Event_Mask_Command(
event_mask=bytes.fromhex('0011223344556677')
)
basic_check(command) basic_check(command)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_HCI_LE_Set_Event_Mask_Command(): def test_HCI_LE_Set_Event_Mask_Command():
command = HCI_LE_Set_Event_Mask_Command( command = hci.HCI_LE_Set_Event_Mask_Command(
le_event_mask=HCI_LE_Set_Event_Mask_Command.mask( le_event_mask=hci.HCI_LE_Set_Event_Mask_Command.mask(
[ [
HCI_LE_CONNECTION_COMPLETE_EVENT, hci.HCI_LE_CONNECTION_COMPLETE_EVENT,
HCI_LE_ENHANCED_CONNECTION_COMPLETE_V2_EVENT, hci.HCI_LE_ENHANCED_CONNECTION_COMPLETE_V2_EVENT,
] ]
) )
) )
@@ -388,21 +360,21 @@ def test_HCI_LE_Set_Event_Mask_Command():
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_HCI_LE_Set_Random_Address_Command(): def test_HCI_LE_Set_Random_Address_Command():
command = HCI_LE_Set_Random_Address_Command( command = hci.HCI_LE_Set_Random_Address_Command(
random_address=Address('00:11:22:33:44:55') random_address=hci.Address('00:11:22:33:44:55')
) )
basic_check(command) basic_check(command)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_HCI_LE_Set_Advertising_Parameters_Command(): def test_HCI_LE_Set_Advertising_Parameters_Command():
command = HCI_LE_Set_Advertising_Parameters_Command( command = hci.HCI_LE_Set_Advertising_Parameters_Command(
advertising_interval_min=20, advertising_interval_min=20,
advertising_interval_max=30, advertising_interval_max=30,
advertising_type=HCI_LE_Set_Advertising_Parameters_Command.ADV_NONCONN_IND, advertising_type=hci.HCI_LE_Set_Advertising_Parameters_Command.ADV_NONCONN_IND,
own_address_type=Address.PUBLIC_DEVICE_ADDRESS, own_address_type=hci.Address.PUBLIC_DEVICE_ADDRESS,
peer_address_type=Address.RANDOM_DEVICE_ADDRESS, peer_address_type=hci.Address.RANDOM_DEVICE_ADDRESS,
peer_address=Address('00:11:22:33:44:55'), peer_address=hci.Address('00:11:22:33:44:55'),
advertising_channel_map=0x03, advertising_channel_map=0x03,
advertising_filter_policy=1, advertising_filter_policy=1,
) )
@@ -411,7 +383,7 @@ def test_HCI_LE_Set_Advertising_Parameters_Command():
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_HCI_LE_Set_Advertising_Data_Command(): def test_HCI_LE_Set_Advertising_Data_Command():
command = HCI_LE_Set_Advertising_Data_Command( command = hci.HCI_LE_Set_Advertising_Data_Command(
advertising_data=bytes.fromhex('AABBCC') advertising_data=bytes.fromhex('AABBCC')
) )
basic_check(command) basic_check(command)
@@ -419,7 +391,7 @@ def test_HCI_LE_Set_Advertising_Data_Command():
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_HCI_LE_Set_Scan_Parameters_Command(): def test_HCI_LE_Set_Scan_Parameters_Command():
command = HCI_LE_Set_Scan_Parameters_Command( command = hci.HCI_LE_Set_Scan_Parameters_Command(
le_scan_type=1, le_scan_type=1,
le_scan_interval=20, le_scan_interval=20,
le_scan_window=10, le_scan_window=10,
@@ -431,18 +403,18 @@ def test_HCI_LE_Set_Scan_Parameters_Command():
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_HCI_LE_Set_Scan_Enable_Command(): def test_HCI_LE_Set_Scan_Enable_Command():
command = HCI_LE_Set_Scan_Enable_Command(le_scan_enable=1, filter_duplicates=0) command = hci.HCI_LE_Set_Scan_Enable_Command(le_scan_enable=1, filter_duplicates=0)
basic_check(command) basic_check(command)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_HCI_LE_Create_Connection_Command(): def test_HCI_LE_Create_Connection_Command():
command = HCI_LE_Create_Connection_Command( command = hci.HCI_LE_Create_Connection_Command(
le_scan_interval=4, le_scan_interval=4,
le_scan_window=5, le_scan_window=5,
initiator_filter_policy=1, initiator_filter_policy=1,
peer_address_type=1, peer_address_type=1,
peer_address=Address('00:11:22:33:44:55'), peer_address=hci.Address('00:11:22:33:44:55'),
own_address_type=2, own_address_type=2,
connection_interval_min=7, connection_interval_min=7,
connection_interval_max=8, connection_interval_max=8,
@@ -456,11 +428,11 @@ def test_HCI_LE_Create_Connection_Command():
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_HCI_LE_Extended_Create_Connection_Command(): def test_HCI_LE_Extended_Create_Connection_Command():
command = HCI_LE_Extended_Create_Connection_Command( command = hci.HCI_LE_Extended_Create_Connection_Command(
initiator_filter_policy=0, initiator_filter_policy=0,
own_address_type=0, own_address_type=0,
peer_address_type=1, peer_address_type=1,
peer_address=Address('00:11:22:33:44:55'), peer_address=hci.Address('00:11:22:33:44:55'),
initiating_phys=3, initiating_phys=3,
scan_intervals=(10, 11), scan_intervals=(10, 11),
scan_windows=(12, 13), scan_windows=(12, 13),
@@ -476,23 +448,23 @@ def test_HCI_LE_Extended_Create_Connection_Command():
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_HCI_LE_Add_Device_To_Filter_Accept_List_Command(): def test_HCI_LE_Add_Device_To_Filter_Accept_List_Command():
command = HCI_LE_Add_Device_To_Filter_Accept_List_Command( command = hci.HCI_LE_Add_Device_To_Filter_Accept_List_Command(
address_type=1, address=Address('00:11:22:33:44:55') address_type=1, address=hci.Address('00:11:22:33:44:55')
) )
basic_check(command) basic_check(command)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_HCI_LE_Remove_Device_From_Filter_Accept_List_Command(): def test_HCI_LE_Remove_Device_From_Filter_Accept_List_Command():
command = HCI_LE_Remove_Device_From_Filter_Accept_List_Command( command = hci.HCI_LE_Remove_Device_From_Filter_Accept_List_Command(
address_type=1, address=Address('00:11:22:33:44:55') address_type=1, address=hci.Address('00:11:22:33:44:55')
) )
basic_check(command) basic_check(command)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_HCI_LE_Connection_Update_Command(): def test_HCI_LE_Connection_Update_Command():
command = HCI_LE_Connection_Update_Command( command = hci.HCI_LE_Connection_Update_Command(
connection_handle=0x0002, connection_handle=0x0002,
connection_interval_min=10, connection_interval_min=10,
connection_interval_max=20, connection_interval_max=20,
@@ -506,27 +478,29 @@ def test_HCI_LE_Connection_Update_Command():
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_HCI_LE_Read_Remote_Features_Command(): def test_HCI_LE_Read_Remote_Features_Command():
command = HCI_LE_Read_Remote_Features_Command(connection_handle=0x0002) command = hci.HCI_LE_Read_Remote_Features_Command(connection_handle=0x0002)
basic_check(command) basic_check(command)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_HCI_LE_Set_Default_PHY_Command(): def test_HCI_LE_Set_Default_PHY_Command():
command = HCI_LE_Set_Default_PHY_Command(all_phys=0, tx_phys=1, rx_phys=1) command = hci.HCI_LE_Set_Default_PHY_Command(all_phys=0, tx_phys=1, rx_phys=1)
basic_check(command) basic_check(command)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_HCI_LE_Set_Extended_Scan_Parameters_Command(): def test_HCI_LE_Set_Extended_Scan_Parameters_Command():
command = HCI_LE_Set_Extended_Scan_Parameters_Command( command = hci.HCI_LE_Set_Extended_Scan_Parameters_Command(
own_address_type=Address.RANDOM_DEVICE_ADDRESS, own_address_type=hci.Address.RANDOM_DEVICE_ADDRESS,
# pylint: disable-next=line-too-long # pylint: disable-next=line-too-long
scanning_filter_policy=HCI_LE_Set_Extended_Scan_Parameters_Command.BASIC_FILTERED_POLICY, scanning_filter_policy=hci.HCI_LE_Set_Extended_Scan_Parameters_Command.BASIC_FILTERED_POLICY,
scanning_phys=(1 << HCI_LE_1M_PHY_BIT | 1 << HCI_LE_CODED_PHY_BIT | 1 << 4), scanning_phys=(
1 << hci.HCI_LE_1M_PHY_BIT | 1 << hci.HCI_LE_CODED_PHY_BIT | 1 << 4
),
scan_types=[ scan_types=[
HCI_LE_Set_Extended_Scan_Parameters_Command.ACTIVE_SCANNING, hci.HCI_LE_Set_Extended_Scan_Parameters_Command.ACTIVE_SCANNING,
HCI_LE_Set_Extended_Scan_Parameters_Command.ACTIVE_SCANNING, hci.HCI_LE_Set_Extended_Scan_Parameters_Command.ACTIVE_SCANNING,
HCI_LE_Set_Extended_Scan_Parameters_Command.PASSIVE_SCANNING, hci.HCI_LE_Set_Extended_Scan_Parameters_Command.PASSIVE_SCANNING,
], ],
scan_intervals=[1, 2, 3], scan_intervals=[1, 2, 3],
scan_windows=[4, 5, 6], scan_windows=[4, 5, 6],
@@ -536,7 +510,7 @@ def test_HCI_LE_Set_Extended_Scan_Parameters_Command():
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_HCI_LE_Set_Extended_Advertising_Enable_Command(): def test_HCI_LE_Set_Extended_Advertising_Enable_Command():
command = HCI_Packet.from_bytes( command = hci.HCI_Packet.from_bytes(
bytes.fromhex('0139200e010301050008020600090307000a') bytes.fromhex('0139200e010301050008020600090307000a')
) )
assert command.enable == 1 assert command.enable == 1
@@ -544,7 +518,7 @@ def test_HCI_LE_Set_Extended_Advertising_Enable_Command():
assert command.durations == [5, 6, 7] assert command.durations == [5, 6, 7]
assert command.max_extended_advertising_events == [8, 9, 10] assert command.max_extended_advertising_events == [8, 9, 10]
command = HCI_LE_Set_Extended_Advertising_Enable_Command( command = hci.HCI_LE_Set_Extended_Advertising_Enable_Command(
enable=1, enable=1,
advertising_handles=[1, 2, 3], advertising_handles=[1, 2, 3],
durations=[5, 6, 7], durations=[5, 6, 7],
@@ -555,20 +529,22 @@ def test_HCI_LE_Set_Extended_Advertising_Enable_Command():
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_HCI_LE_Setup_ISO_Data_Path_Command(): def test_HCI_LE_Setup_ISO_Data_Path_Command():
command = HCI_Packet.from_bytes(bytes.fromhex('016e200d60000001030000000000000000')) command = hci.HCI_Packet.from_bytes(
bytes.fromhex('016e200d60000001030000000000000000')
)
assert command.connection_handle == 0x0060 assert command.connection_handle == 0x0060
assert command.data_path_direction == 0x00 assert command.data_path_direction == 0x00
assert command.data_path_id == 0x01 assert command.data_path_id == 0x01
assert command.codec_id == CodingFormat(CodecID.TRANSPARENT) assert command.codec_id == hci.CodingFormat(hci.CodecID.TRANSPARENT)
assert command.controller_delay == 0 assert command.controller_delay == 0
assert command.codec_configuration == b'' assert command.codec_configuration == b''
command = HCI_LE_Setup_ISO_Data_Path_Command( command = hci.HCI_LE_Setup_ISO_Data_Path_Command(
connection_handle=0x0060, connection_handle=0x0060,
data_path_direction=0x00, data_path_direction=0x00,
data_path_id=0x01, data_path_id=0x01,
codec_id=CodingFormat(CodecID.TRANSPARENT), codec_id=hci.CodingFormat(hci.CodecID.TRANSPARENT),
controller_delay=0x00, controller_delay=0x00,
codec_configuration=b'', codec_configuration=b'',
) )
@@ -578,54 +554,63 @@ def test_HCI_LE_Setup_ISO_Data_Path_Command():
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_HCI_Read_Local_Supported_Codecs_Command_Complete(): def test_HCI_Read_Local_Supported_Codecs_Command_Complete():
returned_parameters = ( returned_parameters = (
HCI_Read_Local_Supported_Codecs_Command.parse_return_parameters( hci.HCI_Read_Local_Supported_Codecs_Command.parse_return_parameters(
bytes([HCI_SUCCESS, 3, CodecID.A_LOG, CodecID.CVSD, CodecID.LINEAR_PCM, 0])
)
)
assert returned_parameters.standard_codec_ids == [
CodecID.A_LOG,
CodecID.CVSD,
CodecID.LINEAR_PCM,
]
# -----------------------------------------------------------------------------
def test_HCI_Read_Local_Supported_Codecs_V2_Command_Complete():
returned_parameters = (
HCI_Read_Local_Supported_Codecs_V2_Command.parse_return_parameters(
bytes( bytes(
[ [
HCI_SUCCESS, hci.HCI_SUCCESS,
3, 3,
CodecID.A_LOG, hci.CodecID.A_LOG,
HCI_Read_Local_Supported_Codecs_V2_Command.Transport.BR_EDR_ACL, hci.CodecID.CVSD,
CodecID.CVSD, hci.CodecID.LINEAR_PCM,
HCI_Read_Local_Supported_Codecs_V2_Command.Transport.BR_EDR_SCO,
CodecID.LINEAR_PCM,
HCI_Read_Local_Supported_Codecs_V2_Command.Transport.LE_CIS,
0, 0,
] ]
) )
) )
) )
assert returned_parameters.standard_codec_ids == [ assert returned_parameters.standard_codec_ids == [
CodecID.A_LOG, hci.CodecID.A_LOG,
CodecID.CVSD, hci.CodecID.CVSD,
CodecID.LINEAR_PCM, hci.CodecID.LINEAR_PCM,
]
# -----------------------------------------------------------------------------
def test_HCI_Read_Local_Supported_Codecs_V2_Command_Complete():
returned_parameters = (
hci.HCI_Read_Local_Supported_Codecs_V2_Command.parse_return_parameters(
bytes(
[
hci.HCI_SUCCESS,
3,
hci.CodecID.A_LOG,
hci.HCI_Read_Local_Supported_Codecs_V2_Command.Transport.BR_EDR_ACL,
hci.CodecID.CVSD,
hci.HCI_Read_Local_Supported_Codecs_V2_Command.Transport.BR_EDR_SCO,
hci.CodecID.LINEAR_PCM,
hci.HCI_Read_Local_Supported_Codecs_V2_Command.Transport.LE_CIS,
0,
]
)
)
)
assert returned_parameters.standard_codec_ids == [
hci.CodecID.A_LOG,
hci.CodecID.CVSD,
hci.CodecID.LINEAR_PCM,
] ]
assert returned_parameters.standard_codec_transports == [ assert returned_parameters.standard_codec_transports == [
HCI_Read_Local_Supported_Codecs_V2_Command.Transport.BR_EDR_ACL, hci.HCI_Read_Local_Supported_Codecs_V2_Command.Transport.BR_EDR_ACL,
HCI_Read_Local_Supported_Codecs_V2_Command.Transport.BR_EDR_SCO, hci.HCI_Read_Local_Supported_Codecs_V2_Command.Transport.BR_EDR_SCO,
HCI_Read_Local_Supported_Codecs_V2_Command.Transport.LE_CIS, hci.HCI_Read_Local_Supported_Codecs_V2_Command.Transport.LE_CIS,
] ]
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_address(): def test_address():
a = Address('C4:F2:17:1A:1D:BB') a = hci.Address('C4:F2:17:1A:1D:BB')
assert not a.is_public assert not a.is_public
assert a.is_random assert a.is_random
assert a.address_type == Address.RANDOM_DEVICE_ADDRESS assert a.address_type == hci.Address.RANDOM_DEVICE_ADDRESS
assert not a.is_resolvable assert not a.is_resolvable
assert not a.is_resolved assert not a.is_resolved
assert a.is_static assert a.is_static
@@ -634,7 +619,7 @@ def test_address():
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_custom(): def test_custom():
data = bytes([0x77, 0x02, 0x01, 0x03]) data = bytes([0x77, 0x02, 0x01, 0x03])
packet = HCI_CustomPacket(data) packet = hci.HCI_CustomPacket(data)
assert packet.hci_packet_type == 0x77 assert packet.hci_packet_type == 0x77
assert packet.payload == data assert packet.payload == data
@@ -645,7 +630,7 @@ def test_iso_data_packet():
'05616044002ac9f0a193003c00e83b477b00eba8d41dc018bf1a980f0290afe1e7c37652096697' '05616044002ac9f0a193003c00e83b477b00eba8d41dc018bf1a980f0290afe1e7c37652096697'
'52b6a535a8df61e22931ef5a36281bc77ed6a3206d984bcdabee6be831c699cb50e2' '52b6a535a8df61e22931ef5a36281bc77ed6a3206d984bcdabee6be831c699cb50e2'
) )
packet = HCI_IsoDataPacket.from_bytes(data) packet = hci.HCI_IsoDataPacket.from_bytes(data)
assert packet.connection_handle == 0x0061 assert packet.connection_handle == 0x0061
assert packet.packet_status_flag == 0 assert packet.packet_status_flag == 0
assert packet.pb_flag == 0x02 assert packet.pb_flag == 0x02