diff --git a/apps/show.py b/apps/show.py index bf01eada..5cd2309e 100644 --- a/apps/show.py +++ b/apps/show.py @@ -102,9 +102,19 @@ class SnoopPacketReader: default='h4', help='Format of the input file', ) +@click.option( + '--vendors', + type=click.Choice(['android']), + multiple=True, + help='Support vendor-specific commands (list one or more)', +) @click.argument('filename') # pylint: disable=redefined-builtin -def main(format, filename): +def main(format, vendors, filename): + for vendor in vendors: + if vendor == 'android': + import bumble.vendor.android.hci + input = open(filename, 'rb') if format == 'h4': packet_reader = PacketReader(input) @@ -124,7 +134,6 @@ def main(format, filename): if packet is None: break tracer.trace(hci.HCI_Packet.from_bytes(packet), direction) - except Exception as error: print(color(f'!!! {error}', 'red')) diff --git a/bumble/drivers/rtk.py b/bumble/drivers/rtk.py index 0bce67d9..f78a14d3 100644 --- a/bumble/drivers/rtk.py +++ b/bumble/drivers/rtk.py @@ -34,10 +34,9 @@ import weakref from bumble.hci import ( - hci_command_op_code, + hci_vendor_command_op_code, STATUS_SPEC, HCI_SUCCESS, - HCI_COMMAND_NAMES, HCI_Command, HCI_Reset_Command, HCI_Read_Local_Version_Information_Command, @@ -179,8 +178,10 @@ RTK_USB_PRODUCTS = { # ----------------------------------------------------------------------------- # HCI Commands # ----------------------------------------------------------------------------- -HCI_RTK_READ_ROM_VERSION_COMMAND = hci_command_op_code(0x3F, 0x6D) -HCI_COMMAND_NAMES[HCI_RTK_READ_ROM_VERSION_COMMAND] = "HCI_RTK_READ_ROM_VERSION_COMMAND" +HCI_RTK_READ_ROM_VERSION_COMMAND = hci_vendor_command_op_code(0x6D) +HCI_RTK_DOWNLOAD_COMMAND = hci_vendor_command_op_code(0x20) +HCI_RTK_DROP_FIRMWARE_COMMAND = hci_vendor_command_op_code(0x66) +HCI_Command.register_commands(globals()) @HCI_Command.command(return_parameters_fields=[("status", STATUS_SPEC), ("version", 1)]) @@ -188,10 +189,6 @@ class HCI_RTK_Read_ROM_Version_Command(HCI_Command): pass -HCI_RTK_DOWNLOAD_COMMAND = hci_command_op_code(0x3F, 0x20) -HCI_COMMAND_NAMES[HCI_RTK_DOWNLOAD_COMMAND] = "HCI_RTK_DOWNLOAD_COMMAND" - - @HCI_Command.command( fields=[("index", 1), ("payload", RTK_FRAGMENT_LENGTH)], return_parameters_fields=[("status", STATUS_SPEC), ("index", 1)], @@ -200,10 +197,6 @@ class HCI_RTK_Download_Command(HCI_Command): pass -HCI_RTK_DROP_FIRMWARE_COMMAND = hci_command_op_code(0x3F, 0x66) -HCI_COMMAND_NAMES[HCI_RTK_DROP_FIRMWARE_COMMAND] = "HCI_RTK_DROP_FIRMWARE_COMMAND" - - @HCI_Command.command() class HCI_RTK_Drop_Firmware_Command(HCI_Command): pass diff --git a/bumble/hci.py b/bumble/hci.py index 6bf97d9f..b7014ddc 100644 --- a/bumble/hci.py +++ b/bumble/hci.py @@ -16,11 +16,11 @@ # Imports # ----------------------------------------------------------------------------- from __future__ import annotations -import struct import collections -import logging import functools -from typing import Dict, Type, Union, Callable, Any, Optional +import logging +import struct +from typing import Any, Dict, Callable, Optional, Type, Union from .colors import color from .core import ( @@ -47,6 +47,10 @@ def hci_command_op_code(ogf, ocf): return ogf << 10 | ocf +def hci_vendor_command_op_code(ocf): + return hci_command_op_code(HCI_VENDOR_OGF, ocf) + + def key_with_value(dictionary, target_value): for key, value in dictionary.items(): if value == target_value: @@ -101,6 +105,8 @@ def phy_list_to_bits(phys): # fmt: off # pylint: disable=line-too-long +HCI_VENDOR_OGF = 0x3F + # HCI Version HCI_VERSION_BLUETOOTH_CORE_1_0B = 0 HCI_VERSION_BLUETOOTH_CORE_1_1 = 1 @@ -206,10 +212,8 @@ HCI_INQUIRY_RESPONSE_NOTIFICATION_EVENT = 0X56 HCI_AUTHENTICATED_PAYLOAD_TIMEOUT_EXPIRED_EVENT = 0X57 HCI_SAM_STATUS_CHANGE_EVENT = 0X58 -HCI_EVENT_NAMES = { - event_code: event_name for (event_name, event_code) in globals().items() - if event_name.startswith('HCI_') and event_name.endswith('_EVENT') -} +HCI_VENDOR_EVENT = 0xFF + # HCI Subevent Codes HCI_LE_CONNECTION_COMPLETE_EVENT = 0x01 @@ -248,10 +252,6 @@ HCI_LE_TRANSMIT_POWER_REPORTING_EVENT = 0X21 HCI_LE_BIGINFO_ADVERTISING_REPORT_EVENT = 0X22 HCI_LE_SUBRATE_CHANGE_EVENT = 0X23 -HCI_SUBEVENT_NAMES = { - event_code: event_name for (event_name, event_code) in globals().items() - if event_name.startswith('HCI_LE_') and event_name.endswith('_EVENT') and event_code != HCI_LE_META_EVENT -} # HCI Command HCI_INQUIRY_COMMAND = hci_command_op_code(0x01, 0x0001) @@ -557,10 +557,6 @@ HCI_LE_SET_DATA_RELATED_ADDRESS_CHANGES_COMMAND = hci_c HCI_LE_SET_DEFAULT_SUBRATE_COMMAND = hci_command_op_code(0x08, 0x007D) HCI_LE_SUBRATE_REQUEST_COMMAND = hci_command_op_code(0x08, 0x007E) -HCI_COMMAND_NAMES = { - command_code: command_name for (command_name, command_code) in globals().items() - if command_name.startswith('HCI_') and command_name.endswith('_COMMAND') -} # HCI Error Codes # See Bluetooth spec Vol 2, Part D - 1.3 LIST OF ERROR CODES @@ -1960,6 +1956,7 @@ class HCI_Command(HCI_Packet): ''' hci_packet_type = HCI_COMMAND_PACKET + command_names: Dict[int, str] = {} command_classes: Dict[int, Type[HCI_Command]] = {} @staticmethod @@ -1970,9 +1967,9 @@ class HCI_Command(HCI_Packet): def inner(cls): cls.name = cls.__name__.upper() - cls.op_code = key_with_value(HCI_COMMAND_NAMES, cls.name) + cls.op_code = key_with_value(cls.command_names, cls.name) if cls.op_code is None: - raise KeyError(f'command {cls.name} not found in HCI_COMMAND_NAMES') + raise KeyError(f'command {cls.name} not found in command_names') cls.fields = fields cls.return_parameters_fields = return_parameters_fields @@ -1991,6 +1988,18 @@ class HCI_Command(HCI_Packet): return inner + @staticmethod + def command_map(symbols: Dict[str, Any]) -> Dict[int, str]: + return { + command_code: command_name + for (command_name, command_code) in symbols.items() + if command_name.startswith('HCI_') and command_name.endswith('_COMMAND') + } + + @classmethod + def register_commands(cls, symbols: Dict[str, Any]) -> None: + cls.command_names.update(cls.command_map(symbols)) + @staticmethod def from_bytes(packet: bytes) -> HCI_Command: op_code, length = struct.unpack_from('> 10:02x}, OCF=0x{op_code & 0x3FF:04x}]' @@ -2024,6 +2033,16 @@ class HCI_Command(HCI_Packet): def create_return_parameters(cls, **kwargs): return HCI_Object(cls.return_parameters_fields, **kwargs) + @classmethod + def parse_return_parameters(cls, parameters): + if not cls.return_parameters_fields: + return None + return_parameters = HCI_Object.from_bytes( + parameters, 0, cls.return_parameters_fields + ) + return_parameters.fields = cls.return_parameters_fields + return return_parameters + def __init__(self, op_code, parameters=None, **kwargs): super().__init__(HCI_Command.command_name(op_code)) if (fields := getattr(self, 'fields', None)) and kwargs: @@ -2053,6 +2072,9 @@ class HCI_Command(HCI_Packet): return result +HCI_Command.register_commands(globals()) + + # ----------------------------------------------------------------------------- @HCI_Command.command( [ @@ -4308,8 +4330,8 @@ class HCI_Event(HCI_Packet): ''' hci_packet_type = HCI_EVENT_PACKET + event_names: Dict[int, str] = {} event_classes: Dict[int, Type[HCI_Event]] = {} - meta_event_classes: Dict[int, Type[HCI_LE_Meta_Event]] = {} @staticmethod def event(fields=()): @@ -4319,9 +4341,9 @@ class HCI_Event(HCI_Packet): def inner(cls): cls.name = cls.__name__.upper() - cls.event_code = key_with_value(HCI_EVENT_NAMES, cls.name) + cls.event_code = key_with_value(cls.event_names, cls.name) if cls.event_code is None: - raise KeyError('event not found in HCI_EVENT_NAMES') + raise KeyError(f'event {cls.name} not found in event_names') cls.fields = fields # Patch the __init__ method to fix the event_code @@ -4337,12 +4359,30 @@ class HCI_Event(HCI_Packet): return inner + @staticmethod + def event_map(symbols: Dict[str, Any]) -> Dict[int, str]: + return { + event_code: event_name + for (event_name, event_code) in symbols.items() + if event_name.startswith('HCI_') + and not event_name.startswith('HCI_LE_') + and event_name.endswith('_EVENT') + } + + @staticmethod + def event_name(event_code): + return name_or_number(HCI_Event.event_names, event_code) + + @staticmethod + def register_events(symbols: Dict[str, Any]) -> None: + HCI_Event.event_names.update(HCI_Event.event_map(symbols)) + @staticmethod def registered(event_class): event_class.name = event_class.__name__.upper() - event_class.event_code = key_with_value(HCI_EVENT_NAMES, event_class.name) + event_class.event_code = key_with_value(HCI_Event.event_names, event_class.name) if event_class.event_code is None: - raise KeyError('event not found in HCI_EVENT_NAMES') + raise KeyError(f'event {event_class.name} not found in event_names') # Register a factory for this class HCI_Event.event_classes[event_class.event_code] = event_class @@ -4362,11 +4402,16 @@ class HCI_Event(HCI_Packet): # We do this dispatch here and not in the subclass in order to avoid call # loops subevent_code = parameters[0] - cls = HCI_Event.meta_event_classes.get(subevent_code) + cls = HCI_LE_Meta_Event.subevent_classes.get(subevent_code) if cls is None: # No class registered, just use a generic class instance return HCI_LE_Meta_Event(subevent_code, parameters) - + elif event_code == HCI_VENDOR_EVENT: + subevent_code = parameters[0] + cls = HCI_Vendor_Event.subevent_classes.get(subevent_code) + if cls is None: + # No class registered, just use a generic class instance + return HCI_Vendor_Event(subevent_code, parameters) else: cls = HCI_Event.event_classes.get(event_code) if cls is None: @@ -4384,10 +4429,6 @@ class HCI_Event(HCI_Packet): HCI_Object.init_from_bytes(self, parameters, 0, fields) return self - @staticmethod - def event_name(event_code): - return name_or_number(HCI_EVENT_NAMES, event_code) - def __init__(self, event_code, parameters=None, **kwargs): super().__init__(HCI_Event.event_name(event_code)) if (fields := getattr(self, 'fields', None)) and kwargs: @@ -4414,71 +4455,111 @@ class HCI_Event(HCI_Packet): return result +HCI_Event.register_events(globals()) + + # ----------------------------------------------------------------------------- -class HCI_LE_Meta_Event(HCI_Event): +class HCI_Extended_Event(HCI_Event): ''' - See Bluetooth spec @ 7.7.65 LE Meta Event + HCI_Event subclass for events that has a subevent code. ''' - @staticmethod - def event(fields=()): + subevent_names: Dict[int, str] = {} + subevent_classes: Dict[int, Type[HCI_Extended_Event]] + + @classmethod + def event(cls, fields=()): ''' Decorator used to declare and register subclasses ''' def inner(cls): cls.name = cls.__name__.upper() - cls.subevent_code = key_with_value(HCI_SUBEVENT_NAMES, cls.name) + cls.subevent_code = key_with_value(cls.subevent_names, cls.name) if cls.subevent_code is None: - raise KeyError('subevent not found in HCI_SUBEVENT_NAMES') + raise KeyError(f'subevent {cls.name} not found in subevent_names') cls.fields = fields # Patch the __init__ method to fix the subevent_code + original_init = cls.__init__ + def init(self, parameters=None, **kwargs): - return HCI_LE_Meta_Event.__init__( - self, cls.subevent_code, parameters, **kwargs - ) + return original_init(self, cls.subevent_code, parameters, **kwargs) cls.__init__ = init # Register a factory for this class - HCI_Event.meta_event_classes[cls.subevent_code] = cls + cls.subevent_classes[cls.subevent_code] = cls return cls return inner + @classmethod + def subevent_name(cls, subevent_code): + subevent_name = cls.subevent_names.get(subevent_code) + if subevent_name is not None: + return subevent_name + + return f'{cls.__name__.upper()}[0x{subevent_code:02X}]' + + @staticmethod + def subevent_map(symbols: Dict[str, Any]) -> Dict[int, str]: + return { + subevent_code: subevent_name + for (subevent_name, subevent_code) in symbols.items() + if subevent_name.startswith('HCI_') and subevent_name.endswith('_EVENT') + } + + @classmethod + def register_subevents(cls, symbols: Dict[str, Any]) -> None: + cls.subevent_names.update(cls.subevent_map(symbols)) + @classmethod def from_parameters(cls, parameters): self = cls.__new__(cls) - HCI_LE_Meta_Event.__init__(self, self.subevent_code, parameters) + HCI_Extended_Event.__init__(self, self.subevent_code, parameters) if fields := getattr(self, 'fields', None): HCI_Object.init_from_bytes(self, parameters, 1, fields) return self - @staticmethod - def subevent_name(subevent_code): - return name_or_number(HCI_SUBEVENT_NAMES, subevent_code) - def __init__(self, subevent_code, parameters, **kwargs): self.subevent_code = subevent_code if parameters is None and (fields := getattr(self, 'fields', None)) and kwargs: parameters = bytes([subevent_code]) + HCI_Object.dict_to_bytes( kwargs, fields ) - super().__init__(HCI_LE_META_EVENT, parameters, **kwargs) + super().__init__(self.event_code, parameters, **kwargs) # Override the name in order to adopt the subevent name instead self.name = self.subevent_name(subevent_code) - def __str__(self): - result = color(self.subevent_name(self.subevent_code), 'magenta') - if fields := getattr(self, 'fields', None): - result += ':\n' + HCI_Object.format_fields(self.__dict__, fields, ' ') - else: - if self.parameters: - result += f': {self.parameters.hex()}' - return result + +# ----------------------------------------------------------------------------- +class HCI_LE_Meta_Event(HCI_Extended_Event): + ''' + See Bluetooth spec @ 7.7.65 LE Meta Event + ''' + + event_code: int = HCI_LE_META_EVENT + subevent_classes = {} + + @staticmethod + def subevent_map(symbols: Dict[str, Any]) -> Dict[int, str]: + return { + subevent_code: subevent_name + for (subevent_name, subevent_code) in symbols.items() + if subevent_name.startswith('HCI_LE_') and subevent_name.endswith('_EVENT') + } + + +HCI_LE_Meta_Event.register_subevents(globals()) + + +# ----------------------------------------------------------------------------- +class HCI_Vendor_Event(HCI_Extended_Event): + event_code: int = HCI_VENDOR_EVENT + subevent_classes = {} # ----------------------------------------------------------------------------- @@ -4592,7 +4673,7 @@ class HCI_LE_Advertising_Report_Event(HCI_LE_Meta_Event): return f'{color(self.subevent_name(self.subevent_code), "magenta")}:\n{reports}' -HCI_Event.meta_event_classes[ +HCI_LE_Meta_Event.subevent_classes[ HCI_LE_ADVERTISING_REPORT_EVENT ] = HCI_LE_Advertising_Report_Event @@ -4846,7 +4927,7 @@ class HCI_LE_Extended_Advertising_Report_Event(HCI_LE_Meta_Event): return f'{color(self.subevent_name(self.subevent_code), "magenta")}:\n{reports}' -HCI_Event.meta_event_classes[ +HCI_LE_Meta_Event.subevent_classes[ HCI_LE_EXTENDED_ADVERTISING_REPORT_EVENT ] = HCI_LE_Extended_Advertising_Report_Event @@ -5120,11 +5201,11 @@ class HCI_Command_Complete_Event(HCI_Event): self.return_parameters = self.return_parameters[0] else: cls = HCI_Command.command_classes.get(self.command_opcode) - if cls and cls.return_parameters_fields: - self.return_parameters = HCI_Object.from_bytes( - self.return_parameters, 0, cls.return_parameters_fields - ) - self.return_parameters.fields = cls.return_parameters_fields + if cls: + # Try to parse the return parameters bytes into an object. + return_parameters = cls.parse_return_parameters(self.return_parameters) + if return_parameters is not None: + self.return_parameters = return_parameters return self diff --git a/bumble/vendor/__init__.py b/bumble/vendor/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/bumble/vendor/android/__init__.py b/bumble/vendor/android/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/bumble/vendor/android/hci.py b/bumble/vendor/android/hci.py new file mode 100644 index 00000000..c411ecf3 --- /dev/null +++ b/bumble/vendor/android/hci.py @@ -0,0 +1,318 @@ +# Copyright 2021-2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# ----------------------------------------------------------------------------- +# Imports +# ----------------------------------------------------------------------------- +import struct + +from bumble.hci import ( + name_or_number, + hci_vendor_command_op_code, + Address, + HCI_Constant, + HCI_Object, + HCI_Command, + HCI_Vendor_Event, + STATUS_SPEC, +) + + +# ----------------------------------------------------------------------------- +# Constants +# ----------------------------------------------------------------------------- + +# Android Vendor Specific Commands and Events. +# Only a subset of the commands are implemented here currently. +# +# pylint: disable-next=line-too-long +# See https://source.android.com/docs/core/connect/bluetooth/hci_requirements#chip-capabilities-and-configuration +HCI_LE_GET_VENDOR_CAPABILITIES_COMMAND = hci_vendor_command_op_code(0x153) +HCI_LE_APCF_COMMAND = hci_vendor_command_op_code(0x157) +HCI_GET_CONTROLLER_ACTIVITY_ENERGY_INFO_COMMAND = hci_vendor_command_op_code(0x159) +HCI_A2DP_HARDWARE_OFFLOAD_COMMAND = hci_vendor_command_op_code(0x15D) +HCI_BLUETOOTH_QUALITY_REPORT_COMMAND = hci_vendor_command_op_code(0x15E) +HCI_DYNAMIC_AUDIO_BUFFER_COMMAND = hci_vendor_command_op_code(0x15F) + +HCI_BLUETOOTH_QUALITY_REPORT_EVENT = 0x58 + +HCI_Command.register_commands(globals()) +HCI_Vendor_Event.register_subevents(globals()) + + +# ----------------------------------------------------------------------------- +@HCI_Command.command( + return_parameters_fields=[ + ('status', STATUS_SPEC), + ('max_advt_instances', 1), + ('offloaded_resolution_of_private_address', 1), + ('total_scan_results_storage', 2), + ('max_irk_list_sz', 1), + ('filtering_support', 1), + ('max_filter', 1), + ('activity_energy_info_support', 1), + ('version_supported', 2), + ('total_num_of_advt_tracked', 2), + ('extended_scan_support', 1), + ('debug_logging_supported', 1), + ('le_address_generation_offloading_support', 1), + ('a2dp_source_offload_capability_mask', 4), + ('bluetooth_quality_report_support', 1), + ('dynamic_audio_buffer_support', 4), + ] +) +class HCI_LE_Get_Vendor_Capabilities_Command(HCI_Command): + # pylint: disable=line-too-long + ''' + See https://source.android.com/docs/core/connect/bluetooth/hci_requirements#vendor-specific-capabilities + ''' + + @classmethod + def parse_return_parameters(cls, parameters): + # There are many versions of this data structure, so we need to parse until + # there are no more bytes to parse, and leave un-signal parameters set to + # None (older versions) + nones = {field: None for field, _ in cls.return_parameters_fields} + return_parameters = HCI_Object(cls.return_parameters_fields, **nones) + + try: + offset = 0 + for field in cls.return_parameters_fields: + field_name, field_type = field + field_value, field_size = HCI_Object.parse_field( + parameters, offset, field_type + ) + setattr(return_parameters, field_name, field_value) + offset += field_size + except struct.error: + pass + + return return_parameters + + +# ----------------------------------------------------------------------------- +@HCI_Command.command( + fields=[ + ( + 'opcode', + { + 'size': 1, + 'mapper': lambda x: HCI_LE_APCF_Command.opcode_name(x), + }, + ), + ('payload', '*'), + ], + return_parameters_fields=[ + ('status', STATUS_SPEC), + ( + 'opcode', + { + 'size': 1, + 'mapper': lambda x: HCI_LE_APCF_Command.opcode_name(x), + }, + ), + ('payload', '*'), + ], +) +class HCI_LE_APCF_Command(HCI_Command): + # pylint: disable=line-too-long + ''' + See https://source.android.com/docs/core/connect/bluetooth/hci_requirements#le_apcf_command + + NOTE: the subcommand-specific payloads are left as opaque byte arrays in this + implementation. A future enhancement may define subcommand-specific data structures. + ''' + + # APCF Subcommands + # TODO: use the OpenIntEnum class (when upcoming PR is merged) + APCF_ENABLE = 0x00 + APCF_SET_FILTERING_PARAMETERS = 0x01 + APCF_BROADCASTER_ADDRESS = 0x02 + APCF_SERVICE_UUID = 0x03 + APCF_SERVICE_SOLICITATION_UUID = 0x04 + APCF_LOCAL_NAME = 0x05 + APCF_MANUFACTURER_DATA = 0x06 + APCF_SERVICE_DATA = 0x07 + APCF_TRANSPORT_DISCOVERY_SERVICE = 0x08 + APCF_AD_TYPE_FILTER = 0x09 + APCF_READ_EXTENDED_FEATURES = 0xFF + + OPCODE_NAMES = { + APCF_ENABLE: 'APCF_ENABLE', + APCF_SET_FILTERING_PARAMETERS: 'APCF_SET_FILTERING_PARAMETERS', + APCF_BROADCASTER_ADDRESS: 'APCF_BROADCASTER_ADDRESS', + APCF_SERVICE_UUID: 'APCF_SERVICE_UUID', + APCF_SERVICE_SOLICITATION_UUID: 'APCF_SERVICE_SOLICITATION_UUID', + APCF_LOCAL_NAME: 'APCF_LOCAL_NAME', + APCF_MANUFACTURER_DATA: 'APCF_MANUFACTURER_DATA', + APCF_SERVICE_DATA: 'APCF_SERVICE_DATA', + APCF_TRANSPORT_DISCOVERY_SERVICE: 'APCF_TRANSPORT_DISCOVERY_SERVICE', + APCF_AD_TYPE_FILTER: 'APCF_AD_TYPE_FILTER', + APCF_READ_EXTENDED_FEATURES: 'APCF_READ_EXTENDED_FEATURES', + } + + @classmethod + def opcode_name(cls, opcode): + return name_or_number(cls.OPCODE_NAMES, opcode) + + +# ----------------------------------------------------------------------------- +@HCI_Command.command( + return_parameters_fields=[ + ('status', STATUS_SPEC), + ('total_tx_time_ms', 4), + ('total_rx_time_ms', 4), + ('total_idle_time_ms', 4), + ('total_energy_used', 4), + ], +) +class HCI_Get_Controller_Activity_Energy_Info_Command(HCI_Command): + # pylint: disable=line-too-long + ''' + See https://source.android.com/docs/core/connect/bluetooth/hci_requirements#le_get_controller_activity_energy_info + ''' + + +# ----------------------------------------------------------------------------- +@HCI_Command.command( + fields=[ + ( + 'opcode', + { + 'size': 1, + 'mapper': lambda x: HCI_A2DP_Hardware_Offload_Command.opcode_name(x), + }, + ), + ('payload', '*'), + ], + return_parameters_fields=[ + ('status', STATUS_SPEC), + ( + 'opcode', + { + 'size': 1, + 'mapper': lambda x: HCI_A2DP_Hardware_Offload_Command.opcode_name(x), + }, + ), + ('payload', '*'), + ], +) +class HCI_A2DP_Hardware_Offload_Command(HCI_Command): + # pylint: disable=line-too-long + ''' + See https://source.android.com/docs/core/connect/bluetooth/hci_requirements#a2dp-hardware-offload-support + + NOTE: the subcommand-specific payloads are left as opaque byte arrays in this + implementation. A future enhancement may define subcommand-specific data structures. + ''' + + # A2DP Hardware Offload Subcommands + # TODO: use the OpenIntEnum class (when upcoming PR is merged) + START_A2DP_OFFLOAD = 0x01 + STOP_A2DP_OFFLOAD = 0x02 + + OPCODE_NAMES = { + START_A2DP_OFFLOAD: 'START_A2DP_OFFLOAD', + STOP_A2DP_OFFLOAD: 'STOP_A2DP_OFFLOAD', + } + + @classmethod + def opcode_name(cls, opcode): + return name_or_number(cls.OPCODE_NAMES, opcode) + + +# ----------------------------------------------------------------------------- +@HCI_Command.command( + fields=[ + ( + 'opcode', + { + 'size': 1, + 'mapper': lambda x: HCI_Dynamic_Audio_Buffer_Command.opcode_name(x), + }, + ), + ('payload', '*'), + ], + return_parameters_fields=[ + ('status', STATUS_SPEC), + ( + 'opcode', + { + 'size': 1, + 'mapper': lambda x: HCI_Dynamic_Audio_Buffer_Command.opcode_name(x), + }, + ), + ('payload', '*'), + ], +) +class HCI_Dynamic_Audio_Buffer_Command(HCI_Command): + # pylint: disable=line-too-long + ''' + See https://source.android.com/docs/core/connect/bluetooth/hci_requirements#dynamic-audio-buffer-command + + NOTE: the subcommand-specific payloads are left as opaque byte arrays in this + implementation. A future enhancement may define subcommand-specific data structures. + ''' + + # Dynamic Audio Buffer Subcommands + # TODO: use the OpenIntEnum class (when upcoming PR is merged) + GET_AUDIO_BUFFER_TIME_CAPABILITY = 0x01 + + OPCODE_NAMES = { + GET_AUDIO_BUFFER_TIME_CAPABILITY: 'GET_AUDIO_BUFFER_TIME_CAPABILITY', + } + + @classmethod + def opcode_name(cls, opcode): + return name_or_number(cls.OPCODE_NAMES, opcode) + + +# ----------------------------------------------------------------------------- +@HCI_Vendor_Event.event( + fields=[ + ('quality_report_id', 1), + ('packet_types', 1), + ('connection_handle', 2), + ('connection_role', {'size': 1, 'mapper': HCI_Constant.role_name}), + ('tx_power_level', -1), + ('rssi', -1), + ('snr', 1), + ('unused_afh_channel_count', 1), + ('afh_select_unideal_channel_count', 1), + ('lsto', 2), + ('connection_piconet_clock', 4), + ('retransmission_count', 4), + ('no_rx_count', 4), + ('nak_count', 4), + ('last_tx_ack_timestamp', 4), + ('flow_off_count', 4), + ('last_flow_on_timestamp', 4), + ('buffer_overflow_bytes', 4), + ('buffer_underflow_bytes', 4), + ('bdaddr', Address.parse_address), + ('cal_failed_item_count', 1), + ('tx_total_packets', 4), + ('tx_unacked_packets', 4), + ('tx_flushed_packets', 4), + ('tx_last_subevent_packets', 4), + ('crc_error_packets', 4), + ('rx_duplicate_packets', 4), + ('vendor_specific_parameters', '*'), + ] +) +class HCI_Bluetooth_Quality_Report_Event(HCI_Vendor_Event): + # pylint: disable=line-too-long + ''' + See https://source.android.com/docs/core/connect/bluetooth/hci_requirements#bluetooth-quality-report-sub-event + '''