From a0498af626c3dac63707fc4e6358a2c3e022e9cf Mon Sep 17 00:00:00 2001 From: Josh Wu Date: Fri, 13 Jun 2025 16:32:07 +0800 Subject: [PATCH] Dataclass-based HCI packets --- bumble/controller.py | 8 +- bumble/device.py | 50 +++--- bumble/hci.py | 399 ++++++++++++++++++------------------------- bumble/host.py | 12 +- tests/hci_test.py | 341 ++++++++++++++++++------------------ 5 files changed, 369 insertions(+), 441 deletions(-) diff --git a/bumble/controller.py b/bumble/controller.py index 39b330cb..9f12a403 100644 --- a/bumble/controller.py +++ b/bumble/controller.py @@ -544,15 +544,14 @@ class Controller: acl_packet = HCI_AclDataPacket(connection.handle, 2, 0, len(data), data) self.send_hci_packet(acl_packet) - def on_link_advertising_data(self, sender_address, data): + def on_link_advertising_data(self, sender_address: Address, data: bytes): # Ignore if we're not scanning if self.le_scan_enable == 0: return # Send a scan report report = HCI_LE_Advertising_Report_Event.Report( - HCI_LE_Advertising_Report_Event.Report.FIELDS, - event_type=HCI_LE_Advertising_Report_Event.ADV_IND, + event_type=HCI_LE_Advertising_Report_Event.EventType.ADV_IND, address_type=sender_address.address_type, address=sender_address, data=data, @@ -562,8 +561,7 @@ class Controller: # Simulate a scan response report = HCI_LE_Advertising_Report_Event.Report( - HCI_LE_Advertising_Report_Event.Report.FIELDS, - event_type=HCI_LE_Advertising_Report_Event.SCAN_RSP, + event_type=HCI_LE_Advertising_Report_Event.EventType.SCAN_RSP, address_type=sender_address.address_type, address=sender_address, data=data, diff --git a/bumble/device.py b/bumble/device.py index f935af42..24732a4b 100644 --- a/bumble/device.py +++ b/bumble/device.py @@ -201,25 +201,35 @@ class Advertisement: # ----------------------------------------------------------------------------- class LegacyAdvertisement(Advertisement): @classmethod - def from_advertising_report(cls, report): + def from_advertising_report( + cls, report: hci.HCI_LE_Advertising_Report_Event.Report + ) -> Self: return cls( address=report.address, rssi=report.rssi, is_legacy=True, - is_connectable=report.event_type - in ( - hci.HCI_LE_Advertising_Report_Event.ADV_IND, - hci.HCI_LE_Advertising_Report_Event.ADV_DIRECT_IND, + is_connectable=( + report.event_type + in ( + hci.HCI_LE_Advertising_Report_Event.EventType.ADV_IND, + hci.HCI_LE_Advertising_Report_Event.EventType.ADV_DIRECT_IND, + ) ), - is_directed=report.event_type - == hci.HCI_LE_Advertising_Report_Event.ADV_DIRECT_IND, - is_scannable=report.event_type - in ( - hci.HCI_LE_Advertising_Report_Event.ADV_IND, - hci.HCI_LE_Advertising_Report_Event.ADV_SCAN_IND, + is_directed=( + report.event_type + == hci.HCI_LE_Advertising_Report_Event.EventType.ADV_DIRECT_IND + ), + is_scannable=( + report.event_type + in ( + hci.HCI_LE_Advertising_Report_Event.EventType.ADV_IND, + hci.HCI_LE_Advertising_Report_Event.EventType.ADV_SCAN_IND, + ) + ), + is_scan_response=( + report.event_type + == hci.HCI_LE_Advertising_Report_Event.EventType.SCAN_RSP ), - is_scan_response=report.event_type - == hci.HCI_LE_Advertising_Report_Event.SCAN_RSP, data_bytes=report.data, ) @@ -227,18 +237,20 @@ class LegacyAdvertisement(Advertisement): # ----------------------------------------------------------------------------- class ExtendedAdvertisement(Advertisement): @classmethod - def from_advertising_report(cls, report): + def from_advertising_report( + cls, report: hci.HCI_LE_Extended_Advertising_Report_Event.Report + ) -> Self: # fmt: off # pylint: disable=line-too-long return cls( address = report.address, rssi = report.rssi, - is_legacy = report.event_type & (1 << hci.HCI_LE_Extended_Advertising_Report_Event.LEGACY_ADVERTISING_PDU_USED) != 0, + is_legacy = (report.event_type & hci.HCI_LE_Extended_Advertising_Report_Event.EventType.LEGACY_ADVERTISING_PDU_USED) != 0, is_anonymous = report.address.address_type == hci.HCI_LE_Extended_Advertising_Report_Event.ANONYMOUS_ADDRESS_TYPE, - is_connectable = report.event_type & (1 << hci.HCI_LE_Extended_Advertising_Report_Event.CONNECTABLE_ADVERTISING) != 0, - is_directed = report.event_type & (1 << hci.HCI_LE_Extended_Advertising_Report_Event.DIRECTED_ADVERTISING) != 0, - is_scannable = report.event_type & (1 << hci.HCI_LE_Extended_Advertising_Report_Event.SCANNABLE_ADVERTISING) != 0, - is_scan_response = report.event_type & (1 << hci.HCI_LE_Extended_Advertising_Report_Event.SCAN_RESPONSE) != 0, + is_connectable = (report.event_type & hci.HCI_LE_Extended_Advertising_Report_Event.EventType.CONNECTABLE_ADVERTISING) != 0, + is_directed = (report.event_type & hci.HCI_LE_Extended_Advertising_Report_Event.EventType.DIRECTED_ADVERTISING) != 0, + is_scannable = (report.event_type & hci.HCI_LE_Extended_Advertising_Report_Event.EventType.SCANNABLE_ADVERTISING) != 0, + is_scan_response = (report.event_type & hci.HCI_LE_Extended_Advertising_Report_Event.EventType.SCAN_RESPONSE) != 0, is_complete = (report.event_type >> 5 & 3) == hci.HCI_LE_Extended_Advertising_Report_Event.DATA_COMPLETE, is_truncated = (report.event_type >> 5 & 3) == hci.HCI_LE_Extended_Advertising_Report_Event.DATA_INCOMPLETE_TRUNCATED_NO_MORE_TO_COME, primary_phy = report.primary_phy, diff --git a/bumble/hci.py b/bumble/hci.py index 7ee64b0b..3f847b7b 100644 --- a/bumble/hci.py +++ b/bumble/hci.py @@ -18,19 +18,19 @@ from __future__ import annotations import collections import dataclasses +from dataclasses import field import enum import functools import logging import secrets import struct from collections.abc import Sequence -from typing import Any, Callable, Iterable, Optional, Union, TypeVar, ClassVar +from typing import Any, Callable, Iterable, Optional, Union, TypeVar, ClassVar, cast from typing_extensions import Self from bumble import crypto from bumble.colors import color from bumble.core import ( - AdvertisingData, DeviceClass, InvalidArgumentError, InvalidPacketError, @@ -1744,9 +1744,11 @@ class HCI_Object: raise InvalidArgumentError(f'unknown field type {field_type}') - @staticmethod - def dict_from_bytes(data, offset, fields): - result = collections.OrderedDict() + @classmethod + def dict_and_offset_from_bytes( + cls, data: bytes, offset: int, fields: Fields + ) -> tuple[int, collections.OrderedDict[str, Any]]: + result = collections.OrderedDict[str, Any]() for field in fields: if isinstance(field, list): # This is an array field, starting with a 1-byte item count. @@ -1766,11 +1768,18 @@ class HCI_Object: continue field_name, field_type = field - field_value, field_size = HCI_Object.parse_field(data, offset, field_type) + assert isinstance(field_name, str) + field_value, field_size = HCI_Object.parse_field( + data, offset, cast(FieldSpec, field_type) + ) result[field_name] = field_value offset += field_size - return result + return (offset, result) + + @staticmethod + def dict_from_bytes(data, offset, fields): + return HCI_Object.dict_and_offset_from_bytes(data, offset, fields)[1] @staticmethod def serialize_field(field_value: Any, field_type: FieldSpec) -> bytes: @@ -1995,6 +2004,19 @@ class HCI_Object: return self.to_string() +# ----------------------------------------------------------------------------- +@dataclasses.dataclass +class HCI_Dataclass_Object(HCI_Object): + def __post_init__(self) -> None: + self.fields = HCI_Object.fields_from_dataclass(self) + + @classmethod + def parse_from_bytes(cls, data: bytes, offset: int) -> tuple[int, Self]: + fields = HCI_Object.fields_from_dataclass(cls) + offset, kwargs = HCI_Object.dict_and_offset_from_bytes(data, offset, fields) + return offset, cls(**kwargs) + + # ----------------------------------------------------------------------------- # Bluetooth Address # ----------------------------------------------------------------------------- @@ -2276,6 +2298,7 @@ class HCI_Command(HCI_Packet): op_code: int = -1 fields: Fields = () return_parameters_fields: Fields = () + parameters: bytes = b'' @staticmethod def command( @@ -2302,6 +2325,15 @@ class HCI_Command(HCI_Packet): return inner + @classmethod + def dataclass_command(cls, subclass): + # TODO: Move this to __post_init__ when all packets become dataclasses. + subclass.parameters = functools.cached_property( + lambda self: HCI_Object.dict_to_bytes(self.__dict__, self.fields) + ) + subclass.parameters.__set_name__(subclass, 'parameters') + return HCI_Command.command(HCI_Object.fields_from_dataclass(subclass))(subclass) + @staticmethod def command_map(symbols: dict[str, Any]) -> dict[int, str]: return { @@ -2331,9 +2363,9 @@ class HCI_Command(HCI_Packet): @classmethod def from_parameters(cls, parameters: bytes) -> HCI_Command: - command = cls(parameters) - HCI_Object.init_from_bytes(command, parameters, 0, cls.fields) - return command + if dataclasses.is_dataclass(cls): + return cls(**HCI_Object.dict_from_bytes(parameters, 0, cls.fields)) + return cls(parameters, **HCI_Object.dict_from_bytes(parameters, 0, cls.fields)) @staticmethod def command_name(op_code): @@ -2373,7 +2405,7 @@ class HCI_Command(HCI_Packet): parameters = HCI_Object.dict_to_bytes(kwargs, self.fields) self.parameters = parameters or b'' - def __bytes__(self): + def __bytes__(self) -> bytes: parameters = b'' if self.parameters is None else self.parameters return ( struct.pack(' dict[int, str]: return { @@ -5701,10 +5744,9 @@ class HCI_Event(HCI_Packet): @classmethod def from_parameters(cls, parameters: bytes) -> Self: - self = cls(parameters) - if self.fields: - HCI_Object.init_from_bytes(self, parameters, 0, self.fields) - return self + if dataclasses.is_dataclass(cls): + return cls(**HCI_Object.dict_from_bytes(parameters, 0, cls.fields)) + return cls(parameters, **HCI_Object.dict_from_bytes(parameters, 0, cls.fields)) def __init__( self, @@ -5722,7 +5764,7 @@ class HCI_Event(HCI_Packet): parameters = HCI_Object.dict_to_bytes(kwargs, self.fields) self.parameters = parameters or b'' - def __bytes__(self): + def __bytes__(self) -> bytes: parameters = b'' if self.parameters is None else self.parameters return bytes([HCI_EVENT_PACKET, self.event_code, len(parameters)]) + parameters @@ -5757,21 +5799,35 @@ class HCI_Extended_Event(HCI_Event): ExtendedEvent = TypeVar("ExtendedEvent", bound=HCI_Extended_Event) - def inner(cls: type[ExtendedEvent]) -> type[ExtendedEvent]: - cls.name = cls.__name__.upper() - cls.subevent_code = key_with_value(cls.subevent_names, cls.name) - if cls.subevent_code is None: - raise KeyError(f'subevent {cls.name} not found in subevent_names') + def inner(subclass: type[ExtendedEvent]) -> type[ExtendedEvent]: + subclass.name = subclass.__name__.upper() + subclass.subevent_code = key_with_value( + subclass.subevent_names, subclass.name + ) + if subclass.subevent_code is None: + raise KeyError(f'subevent {subclass.name} not found in subevent_names') if fields is not None: - cls.fields = fields + subclass.fields = fields # Register a factory for this class - cls.subevent_classes[cls.subevent_code] = cls + cls.subevent_classes[subclass.subevent_code] = subclass - return cls + return subclass return inner + @classmethod + def dataclass_event(cls, subclass): + # TODO: Move this to __post_init__ when all packets become dataclasses. + subclass.parameters = functools.cached_property( + lambda self: ( + bytes([self.subevent_code]) + + HCI_Object.dict_to_bytes(self.__dict__, self.fields) + ) + ) + subclass.parameters.__set_name__(subclass, 'parameters') + return cls.event(HCI_Object.fields_from_dataclass(subclass))(subclass) + @classmethod def subevent_name(cls, subevent_code): subevent_name = cls.subevent_names.get(subevent_code) @@ -5809,10 +5865,9 @@ class HCI_Extended_Event(HCI_Event): @classmethod def from_parameters(cls, parameters: bytes) -> HCI_Extended_Event: """Factory method for subclasses (the subevent code has already been parsed)""" - event = cls(parameters) - if event.fields: - HCI_Object.init_from_bytes(event, parameters, 1, event.fields) - return event + if dataclasses.is_dataclass(cls): + return cls(**HCI_Object.dict_from_bytes(parameters, 1, cls.fields)) + return cls(parameters, **HCI_Object.dict_from_bytes(parameters, 1, cls.fields)) def __init__( self, @@ -5878,97 +5933,42 @@ class HCI_LE_Connection_Complete_Event(HCI_LE_Meta_Event): # ----------------------------------------------------------------------------- +@HCI_LE_Meta_Event.dataclass_event +@dataclasses.dataclass class HCI_LE_Advertising_Report_Event(HCI_LE_Meta_Event): ''' See Bluetooth spec @ 7.7.65.2 LE Advertising Report Event ''' - subevent_code = HCI_LE_ADVERTISING_REPORT_EVENT - name = 'HCI_LE_ADVERTISING_REPORT_EVENT' + class EventType(utils.OpenIntEnum): + ADV_IND = 0x00 + ADV_DIRECT_IND = 0x01 + ADV_SCAN_IND = 0x02 + ADV_NONCONN_IND = 0x03 + SCAN_RSP = 0x04 - # Event Types - ADV_IND = 0x00 - ADV_DIRECT_IND = 0x01 - ADV_SCAN_IND = 0x02 - ADV_NONCONN_IND = 0x03 - SCAN_RSP = 0x04 - - EVENT_TYPE_NAMES = { - ADV_IND: 'ADV_IND', # Connectable and scannable undirected advertising - ADV_DIRECT_IND: 'ADV_DIRECT_IND', # Connectable directed advertising - ADV_SCAN_IND: 'ADV_SCAN_IND', # Scannable undirected advertising - ADV_NONCONN_IND: 'ADV_NONCONN_IND', # Non connectable undirected advertising - SCAN_RSP: 'SCAN_RSP', # Scan Response - } - - class Report(HCI_Object): - FIELDS = [ - ('event_type', 1), - ('address_type', Address.ADDRESS_TYPE_SPEC), - ('address', Address.parse_address_preceded_by_type), - ('data', 'v'), - ('rssi', -1), - ] - - @classmethod - def from_parameters(cls, parameters, offset): - return cls.from_bytes(parameters, offset, cls.FIELDS) - - def event_type_string(self): - return HCI_LE_Advertising_Report_Event.event_type_name(self.event_type) - - def to_string(self, indentation='', _=None): - def data_to_str(data): - try: - return data.hex() + ': ' + str(AdvertisingData.from_bytes(data)) - except Exception: - return data.hex() - - return super().to_string( - indentation, + @dataclasses.dataclass + class Report(HCI_Dataclass_Object): + event_type: int = field( + metadata=metadata( { - 'event_type': HCI_LE_Advertising_Report_Event.event_type_name, - 'address_type': Address.address_type_name, - 'data': data_to_str, - }, + 'size': 1, + 'mapper': lambda x: HCI_LE_Advertising_Report_Event.EventType( + x + ).name, + } ) - - @classmethod - def event_type_name(cls, event_type): - return name_or_number(cls.EVENT_TYPE_NAMES, event_type) - - @classmethod - def from_parameters(cls, parameters: bytes) -> Self: - num_reports = parameters[1] - reports = [] - offset = 2 - for _ in range(num_reports): - report = cls.Report.from_parameters(parameters, offset) - offset += 10 + len(report.data) - reports.append(report) - - return cls(reports) - - def __init__(self, reports): - self.reports = reports[:] - - # Serialize the fields - parameters = bytes([HCI_LE_ADVERTISING_REPORT_EVENT, len(reports)]) + b''.join( - [bytes(report) for report in reports] ) - - super().__init__(parameters) - - def __str__(self): - reports = '\n'.join( - [f'{i}:\n{report.to_string(" ")}' for i, report in enumerate(self.reports)] + address_type: int = field(metadata=metadata(Address.ADDRESS_TYPE_SPEC)) + address: Address = field( + metadata=metadata(Address.parse_address_preceded_by_type) ) - return f'{color(self.subevent_name(self.subevent_code), "magenta")}:\n{reports}' + data: bytes = field(metadata=metadata('v')) + rssi: int = field(metadata=metadata(-1)) - -HCI_LE_Meta_Event.subevent_classes[HCI_LE_ADVERTISING_REPORT_EVENT] = ( - HCI_LE_Advertising_Report_Event -) + reports: Sequence[Report] = field( + metadata=metadata(Report.parse_from_bytes, list_begin=True, list_end=True) + ) # ----------------------------------------------------------------------------- @@ -6107,40 +6107,31 @@ class HCI_LE_PHY_Update_Complete_Event(HCI_LE_Meta_Event): # ----------------------------------------------------------------------------- +@HCI_LE_Meta_Event.dataclass_event +@dataclasses.dataclass class HCI_LE_Extended_Advertising_Report_Event(HCI_LE_Meta_Event): ''' See Bluetooth spec @ 7.7.65.13 LE Extended Advertising Report Event ''' - subevent_code = HCI_LE_EXTENDED_ADVERTISING_REPORT_EVENT - name = 'HCI_LE_EXTENDED_ADVERTISING_REPORT_EVENT' - - # Event types flags - CONNECTABLE_ADVERTISING = 0 - SCANNABLE_ADVERTISING = 1 - DIRECTED_ADVERTISING = 2 - SCAN_RESPONSE = 3 - LEGACY_ADVERTISING_PDU_USED = 4 + class EventType(enum.IntFlag): + CONNECTABLE_ADVERTISING = 1 << 0 + SCANNABLE_ADVERTISING = 1 << 1 + DIRECTED_ADVERTISING = 1 << 2 + SCAN_RESPONSE = 1 << 3 + LEGACY_ADVERTISING_PDU_USED = 1 << 4 DATA_COMPLETE = 0x00 DATA_INCOMPLETE_MORE_TO_COME = 0x01 DATA_INCOMPLETE_TRUNCATED_NO_MORE_TO_COME = 0x02 - EVENT_TYPE_FLAG_NAMES = ( - 'CONNECTABLE_ADVERTISING', - 'SCANNABLE_ADVERTISING', - 'DIRECTED_ADVERTISING', - 'SCAN_RESPONSE', - 'LEGACY_ADVERTISING_PDU_USED', - ) - LEGACY_PDU_TYPE_MAP = { - 0b0011: HCI_LE_Advertising_Report_Event.ADV_IND, - 0b0101: HCI_LE_Advertising_Report_Event.ADV_DIRECT_IND, - 0b0010: HCI_LE_Advertising_Report_Event.ADV_SCAN_IND, - 0b0000: HCI_LE_Advertising_Report_Event.ADV_NONCONN_IND, - 0b1011: HCI_LE_Advertising_Report_Event.SCAN_RSP, - 0b1010: HCI_LE_Advertising_Report_Event.SCAN_RSP, + 0b0011: HCI_LE_Advertising_Report_Event.EventType.ADV_IND, + 0b0101: HCI_LE_Advertising_Report_Event.EventType.ADV_DIRECT_IND, + 0b0010: HCI_LE_Advertising_Report_Event.EventType.ADV_SCAN_IND, + 0b0000: HCI_LE_Advertising_Report_Event.EventType.ADV_NONCONN_IND, + 0b1011: HCI_LE_Advertising_Report_Event.EventType.SCAN_RSP, + 0b1010: HCI_LE_Advertising_Report_Event.EventType.SCAN_RSP, } NO_ADI_FIELD_PROVIDED = 0xFF @@ -6149,110 +6140,41 @@ class HCI_LE_Extended_Advertising_Report_Event(HCI_LE_Meta_Event): ANONYMOUS_ADDRESS_TYPE = 0xFF UNRESOLVED_RESOLVABLE_ADDRESS_TYPE = 0xFE - class Report(HCI_Object): - FIELDS = [ - ('event_type', 2), - ('address_type', Address.ADDRESS_TYPE_SPEC), - ('address', Address.parse_address_preceded_by_type), - ('primary_phy', {'size': 1, 'mapper': HCI_Constant.le_phy_name}), - ('secondary_phy', {'size': 1, 'mapper': HCI_Constant.le_phy_name}), - ('advertising_sid', 1), - ('tx_power', 1), - ('rssi', -1), - ('periodic_advertising_interval', 2), - ('direct_address_type', Address.ADDRESS_TYPE_SPEC), - ('direct_address', Address.parse_address_preceded_by_type), - ('data', 'v'), - ] - - @classmethod - def from_parameters(cls, parameters, offset): - return cls.from_bytes(parameters, offset, cls.FIELDS) - - def event_type_string(self): - return HCI_LE_Extended_Advertising_Report_Event.event_type_string( - self.event_type - ) - - def to_string(self, indentation='', _=None): - # pylint: disable=line-too-long - def data_to_str(data): - try: - return data.hex() + ': ' + str(AdvertisingData.from_bytes(data)) - except Exception: - return data.hex() - - return super().to_string( - indentation, + @dataclasses.dataclass + class Report(HCI_Dataclass_Object): + event_type: int = field( + metadata=metadata( { - 'event_type': HCI_LE_Extended_Advertising_Report_Event.event_type_string, - 'address_type': Address.address_type_name, - 'data': data_to_str, - }, + 'size': 2, + 'mapper': lambda x: HCI_LE_Extended_Advertising_Report_Event.EventType( + x + ).name, + } ) - - @staticmethod - def event_type_string(event_type): - event_type_flags = bit_flags_to_strings( - event_type & 0x1F, - HCI_LE_Extended_Advertising_Report_Event.EVENT_TYPE_FLAG_NAMES, ) - event_type_flags.append( - ('COMPLETE', 'INCOMPLETE+', 'INCOMPLETE#', '?')[(event_type >> 5) & 3] + address_type: int = field(metadata=metadata(Address.ADDRESS_TYPE_SPEC)) + address: Address = field( + metadata=metadata(Address.parse_address_preceded_by_type) ) - - if event_type & ( - 1 << HCI_LE_Extended_Advertising_Report_Event.LEGACY_ADVERTISING_PDU_USED - ): - legacy_pdu_type = ( - HCI_LE_Extended_Advertising_Report_Event.LEGACY_PDU_TYPE_MAP.get( - event_type & 0x0F - ) - ) - if legacy_pdu_type is not None: - # pylint: disable=line-too-long - legacy_info_string = f'({HCI_LE_Advertising_Report_Event.event_type_name(legacy_pdu_type)})' - else: - legacy_info_string = '' - else: - legacy_info_string = '' - - return f'0x{event_type:04X} [{",".join(event_type_flags)}]{legacy_info_string}' - - @classmethod - def from_parameters(cls, parameters: bytes) -> Self: - num_reports = parameters[1] - reports: list[HCI_LE_Extended_Advertising_Report_Event.Report] = [] - offset = 2 - for _ in range(num_reports): - report = cls.Report.from_parameters(parameters, offset) - offset += 24 + len(report.data) - reports.append(report) - - return cls(reports) - - def __init__( - self, reports: Sequence[HCI_LE_Extended_Advertising_Report_Event.Report] - ): - self.reports = reports[:] - - # Serialize the fields - parameters = bytes( - [HCI_LE_EXTENDED_ADVERTISING_REPORT_EVENT, len(reports)] - ) + b''.join([bytes(report) for report in reports]) - - super().__init__(parameters) - - def __str__(self): - reports = '\n'.join( - [f'{i}:\n{report.to_string(" ")}' for i, report in enumerate(self.reports)] + primary_phy: int = field( + metadata=metadata({'size': 1, 'mapper': HCI_Constant.le_phy_name}) ) - return f'{color(self.subevent_name(self.subevent_code), "magenta")}:\n{reports}' + secondary_phy: int = field( + metadata=metadata({'size': 1, 'mapper': HCI_Constant.le_phy_name}) + ) + advertising_sid: int = field(metadata=metadata(1)) + tx_power: int = field(metadata=metadata(1)) + rssi: int = field(metadata=metadata(-1)) + periodic_advertising_interval: int = field(metadata=metadata(2)) + direct_address_type: int = field(metadata=metadata(Address.ADDRESS_TYPE_SPEC)) + direct_address: int = field( + metadata=metadata(Address.parse_address_preceded_by_type) + ) + data: bytes = field(metadata=metadata('v')) - -HCI_LE_Meta_Event.subevent_classes[HCI_LE_EXTENDED_ADVERTISING_REPORT_EVENT] = ( - HCI_LE_Extended_Advertising_Report_Event -) + reports: Sequence[Report] = field( + metadata=metadata(Report.parse_from_bytes, list_begin=True, list_end=True) + ) # ----------------------------------------------------------------------------- @@ -6876,12 +6798,15 @@ class HCI_LE_CS_Test_End_Complete_Event(HCI_LE_Meta_Event): # ----------------------------------------------------------------------------- -@HCI_Event.event([('status', STATUS_SPEC)]) +@HCI_Event.dataclass_event +@dataclasses.dataclass class HCI_Inquiry_Complete_Event(HCI_Event): ''' See Bluetooth spec @ 7.7.1 Inquiry Complete Event ''' + status: int = field(metadata=metadata(STATUS_SPEC)) + # ----------------------------------------------------------------------------- @HCI_Event.event( diff --git a/bumble/host.py b/bumble/host.py index 2f2338d3..a1f97194 100644 --- a/bumble/host.py +++ b/bumble/host.py @@ -1126,11 +1126,19 @@ class Host(utils.EventEmitter): else: self.emit('connection_phy_update_failure', connection.handle, event.status) - def on_hci_le_advertising_report_event(self, event): + def on_hci_le_advertising_report_event( + self, + event: ( + hci.HCI_LE_Advertising_Report_Event + | hci.HCI_LE_Extended_Advertising_Report_Event + ), + ): for report in event.reports: self.emit('advertising_report', report) - def on_hci_le_extended_advertising_report_event(self, event): + def on_hci_le_extended_advertising_report_event( + self, event: hci.HCI_LE_Extended_Advertising_Report_Event + ): self.on_hci_le_advertising_report_event(event) def on_hci_le_advertising_set_terminated_event(self, event): diff --git a/tests/hci_test.py b/tests/hci_test.py index 412b91e9..e56bcd55 100644 --- a/tests/hci_test.py +++ b/tests/hci_test.py @@ -21,60 +21,6 @@ import struct import pytest from bumble import hci -from bumble.hci import ( - HCI_DISCONNECT_COMMAND, - HCI_LE_1M_PHY_BIT, - HCI_LE_CODED_PHY_BIT, - HCI_LE_READ_BUFFER_SIZE_COMMAND, - HCI_RESET_COMMAND, - HCI_VENDOR_EVENT, - HCI_SUCCESS, - HCI_LE_CONNECTION_COMPLETE_EVENT, - HCI_LE_ENHANCED_CONNECTION_COMPLETE_V2_EVENT, - Address, - CodingFormat, - CodecID, - HCI_Command, - HCI_Command_Complete_Event, - HCI_Command_Status_Event, - HCI_CustomPacket, - HCI_Disconnect_Command, - HCI_Event, - HCI_IsoDataPacket, - HCI_LE_Add_Device_To_Filter_Accept_List_Command, - HCI_LE_Advertising_Report_Event, - HCI_LE_Channel_Selection_Algorithm_Event, - HCI_LE_Connection_Complete_Event, - HCI_LE_Connection_Update_Command, - HCI_LE_Connection_Update_Complete_Event, - HCI_LE_Create_Connection_Command, - HCI_LE_Extended_Create_Connection_Command, - HCI_LE_Read_Buffer_Size_Command, - HCI_LE_Read_Remote_Features_Command, - HCI_LE_Read_Remote_Features_Complete_Event, - HCI_LE_Remove_Device_From_Filter_Accept_List_Command, - HCI_LE_Set_Advertising_Data_Command, - HCI_LE_Set_Advertising_Parameters_Command, - HCI_LE_Set_Default_PHY_Command, - HCI_LE_Set_Event_Mask_Command, - HCI_LE_Set_Extended_Advertising_Enable_Command, - HCI_LE_Set_Extended_Scan_Parameters_Command, - HCI_LE_Set_Random_Address_Command, - HCI_LE_Set_Scan_Enable_Command, - HCI_LE_Set_Scan_Parameters_Command, - HCI_LE_Setup_ISO_Data_Path_Command, - HCI_Number_Of_Completed_Packets_Event, - HCI_Packet, - HCI_PIN_Code_Request_Reply_Command, - HCI_Read_Local_Supported_Codecs_Command, - HCI_Read_Local_Supported_Codecs_V2_Command, - HCI_Read_Local_Supported_Commands_Command, - HCI_Read_Local_Supported_Features_Command, - HCI_Read_Local_Version_Information_Command, - HCI_Reset_Command, - HCI_Set_Event_Mask_Command, - HCI_Vendor_Event, -) # ----------------------------------------------------------------------------- @@ -84,7 +30,7 @@ from bumble.hci import ( def basic_check(x): packet = bytes(x) print(packet.hex()) - parsed = HCI_Packet.from_bytes(packet) + parsed = hci.HCI_Packet.from_bytes(packet) x_str = str(x) parsed_str = str(parsed) print(x_str) @@ -95,18 +41,18 @@ def basic_check(x): # ----------------------------------------------------------------------------- def test_HCI_Event(): - event = HCI_Event(event_code=0xF9) + event = hci.HCI_Event(event_code=0xF9) basic_check(event) - event = HCI_Event(event_code=0xF8, parameters=bytes.fromhex('AABBCC')) + event = hci.HCI_Event(event_code=0xF8, parameters=bytes.fromhex('AABBCC')) basic_check(event) # ----------------------------------------------------------------------------- def test_HCI_LE_Connection_Complete_Event(): - address = Address('00:11:22:33:44:55') - event = HCI_LE_Connection_Complete_Event( - status=HCI_SUCCESS, + address = hci.Address('00:11:22:33:44:55') + event = hci.HCI_LE_Connection_Complete_Event( + status=hci.HCI_SUCCESS, connection_handle=1, role=1, peer_address_type=1, @@ -121,25 +67,47 @@ def test_HCI_LE_Connection_Complete_Event(): # ----------------------------------------------------------------------------- def test_HCI_LE_Advertising_Report_Event(): - address = Address('00:11:22:33:44:55/P') - report = HCI_LE_Advertising_Report_Event.Report( - HCI_LE_Advertising_Report_Event.Report.FIELDS, - event_type=HCI_LE_Advertising_Report_Event.ADV_IND, - address_type=Address.PUBLIC_DEVICE_ADDRESS, + address = hci.Address('00:11:22:33:44:55/P') + report = hci.HCI_LE_Advertising_Report_Event.Report( + event_type=hci.HCI_LE_Advertising_Report_Event.EventType.ADV_IND, + address_type=hci.Address.PUBLIC_DEVICE_ADDRESS, address=address, data=bytes.fromhex( '0201061106ba5689a6fabfa2bd01467d6e00fbabad08160a181604659b03' ), rssi=100, ) - event = HCI_LE_Advertising_Report_Event([report]) + event = hci.HCI_LE_Advertising_Report_Event([report]) + basic_check(event) + + +# ----------------------------------------------------------------------------- +def test_HCI_LE_Extended_Advertising_Report_Event(): + address = hci.Address('00:11:22:33:44:55/P') + report = hci.HCI_LE_Extended_Advertising_Report_Event.Report( + event_type=hci.HCI_LE_Extended_Advertising_Report_Event.EventType.CONNECTABLE_ADVERTISING, + address_type=hci.Address.PUBLIC_DEVICE_ADDRESS, + address=address, + data=bytes.fromhex( + '0201061106ba5689a6fabfa2bd01467d6e00fbabad08160a181604659b03' + ), + rssi=100, + primary_phy=hci.HCI_LE_1M_PHY, + secondary_phy=hci.HCI_LE_CODED_PHY, + advertising_sid=0, + tx_power=10, + periodic_advertising_interval=2, + direct_address=hci.Address('00:11:22:33:44:55/P'), + direct_address_type=hci.Address.PUBLIC_DEVICE_ADDRESS, + ) + event = hci.HCI_LE_Extended_Advertising_Report_Event([report]) basic_check(event) # ----------------------------------------------------------------------------- def test_HCI_LE_Read_Remote_Features_Complete_Event(): - event = HCI_LE_Read_Remote_Features_Complete_Event( - status=HCI_SUCCESS, + event = hci.HCI_LE_Read_Remote_Features_Complete_Event( + status=hci.HCI_SUCCESS, connection_handle=0x007, le_features=bytes.fromhex('0011223344556677'), ) @@ -148,8 +116,8 @@ def test_HCI_LE_Read_Remote_Features_Complete_Event(): # ----------------------------------------------------------------------------- def test_HCI_LE_Connection_Update_Complete_Event(): - event = HCI_LE_Connection_Update_Complete_Event( - status=HCI_SUCCESS, + event = hci.HCI_LE_Connection_Update_Complete_Event( + status=hci.HCI_SUCCESS, connection_handle=0x007, connection_interval=10, peripheral_latency=3, @@ -160,7 +128,7 @@ def test_HCI_LE_Connection_Update_Complete_Event(): # ----------------------------------------------------------------------------- def test_HCI_LE_Channel_Selection_Algorithm_Event(): - event = HCI_LE_Channel_Selection_Algorithm_Event( + event = hci.HCI_LE_Channel_Selection_Algorithm_Event( connection_handle=7, channel_selection_algorithm=1 ) basic_check(event) @@ -169,10 +137,10 @@ def test_HCI_LE_Channel_Selection_Algorithm_Event(): # ----------------------------------------------------------------------------- def test_HCI_Command_Complete_Event(): # With a serializable object - event = HCI_Command_Complete_Event( + event = hci.HCI_Command_Complete_Event( num_hci_command_packets=34, - command_opcode=HCI_LE_READ_BUFFER_SIZE_COMMAND, - return_parameters=HCI_LE_Read_Buffer_Size_Command.create_return_parameters( + command_opcode=hci.HCI_LE_READ_BUFFER_SIZE_COMMAND, + return_parameters=hci.HCI_LE_Read_Buffer_Size_Command.create_return_parameters( status=0, le_acl_data_packet_length=1234, total_num_le_acl_data_packets=56, @@ -181,26 +149,28 @@ def test_HCI_Command_Complete_Event(): basic_check(event) # With an arbitrary byte array - event = HCI_Command_Complete_Event( + event = hci.HCI_Command_Complete_Event( num_hci_command_packets=1, - command_opcode=HCI_RESET_COMMAND, + command_opcode=hci.HCI_RESET_COMMAND, return_parameters=bytes([1, 2, 3, 4]), ) basic_check(event) # With a simple status as a 1-byte array - event = HCI_Command_Complete_Event( + event = hci.HCI_Command_Complete_Event( num_hci_command_packets=1, - command_opcode=HCI_RESET_COMMAND, + command_opcode=hci.HCI_RESET_COMMAND, return_parameters=bytes([7]), ) basic_check(event) - event = HCI_Packet.from_bytes(bytes(event)) + event = hci.HCI_Packet.from_bytes(bytes(event)) assert event.return_parameters == 7 # With a simple status as an integer status - event = HCI_Command_Complete_Event( - num_hci_command_packets=1, command_opcode=HCI_RESET_COMMAND, return_parameters=9 + event = hci.HCI_Command_Complete_Event( + num_hci_command_packets=1, + command_opcode=hci.HCI_RESET_COMMAND, + return_parameters=9, ) basic_check(event) assert event.return_parameters == 9 @@ -208,15 +178,15 @@ def test_HCI_Command_Complete_Event(): # ----------------------------------------------------------------------------- def test_HCI_Command_Status_Event(): - event = HCI_Command_Status_Event( - status=0, num_hci_command_packets=37, command_opcode=HCI_DISCONNECT_COMMAND + event = hci.HCI_Command_Status_Event( + status=0, num_hci_command_packets=37, command_opcode=hci.HCI_DISCONNECT_COMMAND ) basic_check(event) # ----------------------------------------------------------------------------- def test_HCI_Number_Of_Completed_Packets_Event(): - event = HCI_Number_Of_Completed_Packets_Event( + event = hci.HCI_Number_Of_Completed_Packets_Event( connection_handles=(1, 2), num_completed_packets=(3, 4), ) @@ -226,16 +196,16 @@ def test_HCI_Number_Of_Completed_Packets_Event(): # ----------------------------------------------------------------------------- def test_HCI_Vendor_Event(): data = bytes.fromhex('01020304') - event = HCI_Vendor_Event(data=data) + event = hci.HCI_Vendor_Event(data=data) event_bytes = bytes(event) - parsed = HCI_Packet.from_bytes(event_bytes) - assert isinstance(parsed, HCI_Vendor_Event) + parsed = hci.HCI_Packet.from_bytes(event_bytes) + assert isinstance(parsed, hci.HCI_Vendor_Event) assert parsed.data == data - class HCI_Custom_Event(HCI_Event): + class HCI_Custom_Event(hci.HCI_Event): def __init__(self, blabla): super().__init__( - event_code=HCI_VENDOR_EVENT, parameters=struct.pack("