add support for vendor HCI commands and events

This commit is contained in:
Gilles Boccon-Gibod
2023-08-31 16:35:21 -07:00
parent 9303f4fc5b
commit ae77e4528f
6 changed files with 475 additions and 74 deletions

View File

@@ -102,9 +102,19 @@ class SnoopPacketReader:
default='h4',
help='Format of the input file',
)
@click.option(
'--vendors',
type=click.Choice(['android']),
multiple=True,
help='Support vendor-specific commands (list one or more)',
)
@click.argument('filename')
# pylint: disable=redefined-builtin
def main(format, filename):
def main(format, vendors, filename):
for vendor in vendors:
if vendor == 'android':
import bumble.vendor.android.hci
input = open(filename, 'rb')
if format == 'h4':
packet_reader = PacketReader(input)
@@ -124,7 +134,6 @@ def main(format, filename):
if packet is None:
break
tracer.trace(hci.HCI_Packet.from_bytes(packet), direction)
except Exception as error:
print(color(f'!!! {error}', 'red'))

View File

@@ -34,10 +34,9 @@ import weakref
from bumble.hci import (
hci_command_op_code,
hci_vendor_command_op_code,
STATUS_SPEC,
HCI_SUCCESS,
HCI_COMMAND_NAMES,
HCI_Command,
HCI_Reset_Command,
HCI_Read_Local_Version_Information_Command,
@@ -179,8 +178,10 @@ RTK_USB_PRODUCTS = {
# -----------------------------------------------------------------------------
# HCI Commands
# -----------------------------------------------------------------------------
HCI_RTK_READ_ROM_VERSION_COMMAND = hci_command_op_code(0x3F, 0x6D)
HCI_COMMAND_NAMES[HCI_RTK_READ_ROM_VERSION_COMMAND] = "HCI_RTK_READ_ROM_VERSION_COMMAND"
HCI_RTK_READ_ROM_VERSION_COMMAND = hci_vendor_command_op_code(0x6D)
HCI_RTK_DOWNLOAD_COMMAND = hci_vendor_command_op_code(0x20)
HCI_RTK_DROP_FIRMWARE_COMMAND = hci_vendor_command_op_code(0x66)
HCI_Command.register_commands(globals())
@HCI_Command.command(return_parameters_fields=[("status", STATUS_SPEC), ("version", 1)])
@@ -188,10 +189,6 @@ class HCI_RTK_Read_ROM_Version_Command(HCI_Command):
pass
HCI_RTK_DOWNLOAD_COMMAND = hci_command_op_code(0x3F, 0x20)
HCI_COMMAND_NAMES[HCI_RTK_DOWNLOAD_COMMAND] = "HCI_RTK_DOWNLOAD_COMMAND"
@HCI_Command.command(
fields=[("index", 1), ("payload", RTK_FRAGMENT_LENGTH)],
return_parameters_fields=[("status", STATUS_SPEC), ("index", 1)],
@@ -200,10 +197,6 @@ class HCI_RTK_Download_Command(HCI_Command):
pass
HCI_RTK_DROP_FIRMWARE_COMMAND = hci_command_op_code(0x3F, 0x66)
HCI_COMMAND_NAMES[HCI_RTK_DROP_FIRMWARE_COMMAND] = "HCI_RTK_DROP_FIRMWARE_COMMAND"
@HCI_Command.command()
class HCI_RTK_Drop_Firmware_Command(HCI_Command):
pass

View File

@@ -16,11 +16,11 @@
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import struct
import collections
import logging
import functools
from typing import Dict, Type, Union, Callable, Any, Optional
import logging
import struct
from typing import Any, Dict, Callable, Optional, Type, Union
from .colors import color
from .core import (
@@ -47,6 +47,10 @@ def hci_command_op_code(ogf, ocf):
return ogf << 10 | ocf
def hci_vendor_command_op_code(ocf):
return hci_command_op_code(HCI_VENDOR_OGF, ocf)
def key_with_value(dictionary, target_value):
for key, value in dictionary.items():
if value == target_value:
@@ -101,6 +105,8 @@ def phy_list_to_bits(phys):
# fmt: off
# pylint: disable=line-too-long
HCI_VENDOR_OGF = 0x3F
# HCI Version
HCI_VERSION_BLUETOOTH_CORE_1_0B = 0
HCI_VERSION_BLUETOOTH_CORE_1_1 = 1
@@ -206,10 +212,8 @@ HCI_INQUIRY_RESPONSE_NOTIFICATION_EVENT = 0X56
HCI_AUTHENTICATED_PAYLOAD_TIMEOUT_EXPIRED_EVENT = 0X57
HCI_SAM_STATUS_CHANGE_EVENT = 0X58
HCI_EVENT_NAMES = {
event_code: event_name for (event_name, event_code) in globals().items()
if event_name.startswith('HCI_') and event_name.endswith('_EVENT')
}
HCI_VENDOR_EVENT = 0xFF
# HCI Subevent Codes
HCI_LE_CONNECTION_COMPLETE_EVENT = 0x01
@@ -248,10 +252,6 @@ HCI_LE_TRANSMIT_POWER_REPORTING_EVENT = 0X21
HCI_LE_BIGINFO_ADVERTISING_REPORT_EVENT = 0X22
HCI_LE_SUBRATE_CHANGE_EVENT = 0X23
HCI_SUBEVENT_NAMES = {
event_code: event_name for (event_name, event_code) in globals().items()
if event_name.startswith('HCI_LE_') and event_name.endswith('_EVENT') and event_code != HCI_LE_META_EVENT
}
# HCI Command
HCI_INQUIRY_COMMAND = hci_command_op_code(0x01, 0x0001)
@@ -557,10 +557,6 @@ HCI_LE_SET_DATA_RELATED_ADDRESS_CHANGES_COMMAND = hci_c
HCI_LE_SET_DEFAULT_SUBRATE_COMMAND = hci_command_op_code(0x08, 0x007D)
HCI_LE_SUBRATE_REQUEST_COMMAND = hci_command_op_code(0x08, 0x007E)
HCI_COMMAND_NAMES = {
command_code: command_name for (command_name, command_code) in globals().items()
if command_name.startswith('HCI_') and command_name.endswith('_COMMAND')
}
# HCI Error Codes
# See Bluetooth spec Vol 2, Part D - 1.3 LIST OF ERROR CODES
@@ -1960,6 +1956,7 @@ class HCI_Command(HCI_Packet):
'''
hci_packet_type = HCI_COMMAND_PACKET
command_names: Dict[int, str] = {}
command_classes: Dict[int, Type[HCI_Command]] = {}
@staticmethod
@@ -1970,9 +1967,9 @@ class HCI_Command(HCI_Packet):
def inner(cls):
cls.name = cls.__name__.upper()
cls.op_code = key_with_value(HCI_COMMAND_NAMES, cls.name)
cls.op_code = key_with_value(cls.command_names, cls.name)
if cls.op_code is None:
raise KeyError(f'command {cls.name} not found in HCI_COMMAND_NAMES')
raise KeyError(f'command {cls.name} not found in command_names')
cls.fields = fields
cls.return_parameters_fields = return_parameters_fields
@@ -1991,6 +1988,18 @@ class HCI_Command(HCI_Packet):
return inner
@staticmethod
def command_map(symbols: Dict[str, Any]) -> Dict[int, str]:
return {
command_code: command_name
for (command_name, command_code) in symbols.items()
if command_name.startswith('HCI_') and command_name.endswith('_COMMAND')
}
@classmethod
def register_commands(cls, symbols: Dict[str, Any]) -> None:
cls.command_names.update(cls.command_map(symbols))
@staticmethod
def from_bytes(packet: bytes) -> HCI_Command:
op_code, length = struct.unpack_from('<HB', packet, 1)
@@ -2015,7 +2024,7 @@ class HCI_Command(HCI_Packet):
@staticmethod
def command_name(op_code):
name = HCI_COMMAND_NAMES.get(op_code)
name = HCI_Command.command_names.get(op_code)
if name is not None:
return name
return f'[OGF=0x{op_code >> 10:02x}, OCF=0x{op_code & 0x3FF:04x}]'
@@ -2024,6 +2033,16 @@ class HCI_Command(HCI_Packet):
def create_return_parameters(cls, **kwargs):
return HCI_Object(cls.return_parameters_fields, **kwargs)
@classmethod
def parse_return_parameters(cls, parameters):
if not cls.return_parameters_fields:
return None
return_parameters = HCI_Object.from_bytes(
parameters, 0, cls.return_parameters_fields
)
return_parameters.fields = cls.return_parameters_fields
return return_parameters
def __init__(self, op_code, parameters=None, **kwargs):
super().__init__(HCI_Command.command_name(op_code))
if (fields := getattr(self, 'fields', None)) and kwargs:
@@ -2053,6 +2072,9 @@ class HCI_Command(HCI_Packet):
return result
HCI_Command.register_commands(globals())
# -----------------------------------------------------------------------------
@HCI_Command.command(
[
@@ -4308,8 +4330,8 @@ class HCI_Event(HCI_Packet):
'''
hci_packet_type = HCI_EVENT_PACKET
event_names: Dict[int, str] = {}
event_classes: Dict[int, Type[HCI_Event]] = {}
meta_event_classes: Dict[int, Type[HCI_LE_Meta_Event]] = {}
@staticmethod
def event(fields=()):
@@ -4319,9 +4341,9 @@ class HCI_Event(HCI_Packet):
def inner(cls):
cls.name = cls.__name__.upper()
cls.event_code = key_with_value(HCI_EVENT_NAMES, cls.name)
cls.event_code = key_with_value(cls.event_names, cls.name)
if cls.event_code is None:
raise KeyError('event not found in HCI_EVENT_NAMES')
raise KeyError(f'event {cls.name} not found in event_names')
cls.fields = fields
# Patch the __init__ method to fix the event_code
@@ -4337,12 +4359,30 @@ class HCI_Event(HCI_Packet):
return inner
@staticmethod
def event_map(symbols: Dict[str, Any]) -> Dict[int, str]:
return {
event_code: event_name
for (event_name, event_code) in symbols.items()
if event_name.startswith('HCI_')
and not event_name.startswith('HCI_LE_')
and event_name.endswith('_EVENT')
}
@staticmethod
def event_name(event_code):
return name_or_number(HCI_Event.event_names, event_code)
@staticmethod
def register_events(symbols: Dict[str, Any]) -> None:
HCI_Event.event_names.update(HCI_Event.event_map(symbols))
@staticmethod
def registered(event_class):
event_class.name = event_class.__name__.upper()
event_class.event_code = key_with_value(HCI_EVENT_NAMES, event_class.name)
event_class.event_code = key_with_value(HCI_Event.event_names, event_class.name)
if event_class.event_code is None:
raise KeyError('event not found in HCI_EVENT_NAMES')
raise KeyError(f'event {event_class.name} not found in event_names')
# Register a factory for this class
HCI_Event.event_classes[event_class.event_code] = event_class
@@ -4362,11 +4402,16 @@ class HCI_Event(HCI_Packet):
# We do this dispatch here and not in the subclass in order to avoid call
# loops
subevent_code = parameters[0]
cls = HCI_Event.meta_event_classes.get(subevent_code)
cls = HCI_LE_Meta_Event.subevent_classes.get(subevent_code)
if cls is None:
# No class registered, just use a generic class instance
return HCI_LE_Meta_Event(subevent_code, parameters)
elif event_code == HCI_VENDOR_EVENT:
subevent_code = parameters[0]
cls = HCI_Vendor_Event.subevent_classes.get(subevent_code)
if cls is None:
# No class registered, just use a generic class instance
return HCI_Vendor_Event(subevent_code, parameters)
else:
cls = HCI_Event.event_classes.get(event_code)
if cls is None:
@@ -4384,10 +4429,6 @@ class HCI_Event(HCI_Packet):
HCI_Object.init_from_bytes(self, parameters, 0, fields)
return self
@staticmethod
def event_name(event_code):
return name_or_number(HCI_EVENT_NAMES, event_code)
def __init__(self, event_code, parameters=None, **kwargs):
super().__init__(HCI_Event.event_name(event_code))
if (fields := getattr(self, 'fields', None)) and kwargs:
@@ -4414,71 +4455,111 @@ class HCI_Event(HCI_Packet):
return result
HCI_Event.register_events(globals())
# -----------------------------------------------------------------------------
class HCI_LE_Meta_Event(HCI_Event):
class HCI_Extended_Event(HCI_Event):
'''
See Bluetooth spec @ 7.7.65 LE Meta Event
HCI_Event subclass for events that has a subevent code.
'''
@staticmethod
def event(fields=()):
subevent_names: Dict[int, str] = {}
subevent_classes: Dict[int, Type[HCI_Extended_Event]]
@classmethod
def event(cls, fields=()):
'''
Decorator used to declare and register subclasses
'''
def inner(cls):
cls.name = cls.__name__.upper()
cls.subevent_code = key_with_value(HCI_SUBEVENT_NAMES, cls.name)
cls.subevent_code = key_with_value(cls.subevent_names, cls.name)
if cls.subevent_code is None:
raise KeyError('subevent not found in HCI_SUBEVENT_NAMES')
raise KeyError(f'subevent {cls.name} not found in subevent_names')
cls.fields = fields
# Patch the __init__ method to fix the subevent_code
original_init = cls.__init__
def init(self, parameters=None, **kwargs):
return HCI_LE_Meta_Event.__init__(
self, cls.subevent_code, parameters, **kwargs
)
return original_init(self, cls.subevent_code, parameters, **kwargs)
cls.__init__ = init
# Register a factory for this class
HCI_Event.meta_event_classes[cls.subevent_code] = cls
cls.subevent_classes[cls.subevent_code] = cls
return cls
return inner
@classmethod
def subevent_name(cls, subevent_code):
subevent_name = cls.subevent_names.get(subevent_code)
if subevent_name is not None:
return subevent_name
return f'{cls.__name__.upper()}[0x{subevent_code:02X}]'
@staticmethod
def subevent_map(symbols: Dict[str, Any]) -> Dict[int, str]:
return {
subevent_code: subevent_name
for (subevent_name, subevent_code) in symbols.items()
if subevent_name.startswith('HCI_') and subevent_name.endswith('_EVENT')
}
@classmethod
def register_subevents(cls, symbols: Dict[str, Any]) -> None:
cls.subevent_names.update(cls.subevent_map(symbols))
@classmethod
def from_parameters(cls, parameters):
self = cls.__new__(cls)
HCI_LE_Meta_Event.__init__(self, self.subevent_code, parameters)
HCI_Extended_Event.__init__(self, self.subevent_code, parameters)
if fields := getattr(self, 'fields', None):
HCI_Object.init_from_bytes(self, parameters, 1, fields)
return self
@staticmethod
def subevent_name(subevent_code):
return name_or_number(HCI_SUBEVENT_NAMES, subevent_code)
def __init__(self, subevent_code, parameters, **kwargs):
self.subevent_code = subevent_code
if parameters is None and (fields := getattr(self, 'fields', None)) and kwargs:
parameters = bytes([subevent_code]) + HCI_Object.dict_to_bytes(
kwargs, fields
)
super().__init__(HCI_LE_META_EVENT, parameters, **kwargs)
super().__init__(self.event_code, parameters, **kwargs)
# Override the name in order to adopt the subevent name instead
self.name = self.subevent_name(subevent_code)
def __str__(self):
result = color(self.subevent_name(self.subevent_code), 'magenta')
if fields := getattr(self, 'fields', None):
result += ':\n' + HCI_Object.format_fields(self.__dict__, fields, ' ')
else:
if self.parameters:
result += f': {self.parameters.hex()}'
return result
# -----------------------------------------------------------------------------
class HCI_LE_Meta_Event(HCI_Extended_Event):
'''
See Bluetooth spec @ 7.7.65 LE Meta Event
'''
event_code: int = HCI_LE_META_EVENT
subevent_classes = {}
@staticmethod
def subevent_map(symbols: Dict[str, Any]) -> Dict[int, str]:
return {
subevent_code: subevent_name
for (subevent_name, subevent_code) in symbols.items()
if subevent_name.startswith('HCI_LE_') and subevent_name.endswith('_EVENT')
}
HCI_LE_Meta_Event.register_subevents(globals())
# -----------------------------------------------------------------------------
class HCI_Vendor_Event(HCI_Extended_Event):
event_code: int = HCI_VENDOR_EVENT
subevent_classes = {}
# -----------------------------------------------------------------------------
@@ -4592,7 +4673,7 @@ class HCI_LE_Advertising_Report_Event(HCI_LE_Meta_Event):
return f'{color(self.subevent_name(self.subevent_code), "magenta")}:\n{reports}'
HCI_Event.meta_event_classes[
HCI_LE_Meta_Event.subevent_classes[
HCI_LE_ADVERTISING_REPORT_EVENT
] = HCI_LE_Advertising_Report_Event
@@ -4846,7 +4927,7 @@ class HCI_LE_Extended_Advertising_Report_Event(HCI_LE_Meta_Event):
return f'{color(self.subevent_name(self.subevent_code), "magenta")}:\n{reports}'
HCI_Event.meta_event_classes[
HCI_LE_Meta_Event.subevent_classes[
HCI_LE_EXTENDED_ADVERTISING_REPORT_EVENT
] = HCI_LE_Extended_Advertising_Report_Event
@@ -5120,11 +5201,11 @@ class HCI_Command_Complete_Event(HCI_Event):
self.return_parameters = self.return_parameters[0]
else:
cls = HCI_Command.command_classes.get(self.command_opcode)
if cls and cls.return_parameters_fields:
self.return_parameters = HCI_Object.from_bytes(
self.return_parameters, 0, cls.return_parameters_fields
)
self.return_parameters.fields = cls.return_parameters_fields
if cls:
# Try to parse the return parameters bytes into an object.
return_parameters = cls.parse_return_parameters(self.return_parameters)
if return_parameters is not None:
self.return_parameters = return_parameters
return self

0
bumble/vendor/__init__.py vendored Normal file
View File

0
bumble/vendor/android/__init__.py vendored Normal file
View File

318
bumble/vendor/android/hci.py vendored Normal file
View File

@@ -0,0 +1,318 @@
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import struct
from bumble.hci import (
name_or_number,
hci_vendor_command_op_code,
Address,
HCI_Constant,
HCI_Object,
HCI_Command,
HCI_Vendor_Event,
STATUS_SPEC,
)
# -----------------------------------------------------------------------------
# Constants
# -----------------------------------------------------------------------------
# Android Vendor Specific Commands and Events.
# Only a subset of the commands are implemented here currently.
#
# pylint: disable-next=line-too-long
# See https://source.android.com/docs/core/connect/bluetooth/hci_requirements#chip-capabilities-and-configuration
HCI_LE_GET_VENDOR_CAPABILITIES_COMMAND = hci_vendor_command_op_code(0x153)
HCI_LE_APCF_COMMAND = hci_vendor_command_op_code(0x157)
HCI_GET_CONTROLLER_ACTIVITY_ENERGY_INFO_COMMAND = hci_vendor_command_op_code(0x159)
HCI_A2DP_HARDWARE_OFFLOAD_COMMAND = hci_vendor_command_op_code(0x15D)
HCI_BLUETOOTH_QUALITY_REPORT_COMMAND = hci_vendor_command_op_code(0x15E)
HCI_DYNAMIC_AUDIO_BUFFER_COMMAND = hci_vendor_command_op_code(0x15F)
HCI_BLUETOOTH_QUALITY_REPORT_EVENT = 0x58
HCI_Command.register_commands(globals())
HCI_Vendor_Event.register_subevents(globals())
# -----------------------------------------------------------------------------
@HCI_Command.command(
return_parameters_fields=[
('status', STATUS_SPEC),
('max_advt_instances', 1),
('offloaded_resolution_of_private_address', 1),
('total_scan_results_storage', 2),
('max_irk_list_sz', 1),
('filtering_support', 1),
('max_filter', 1),
('activity_energy_info_support', 1),
('version_supported', 2),
('total_num_of_advt_tracked', 2),
('extended_scan_support', 1),
('debug_logging_supported', 1),
('le_address_generation_offloading_support', 1),
('a2dp_source_offload_capability_mask', 4),
('bluetooth_quality_report_support', 1),
('dynamic_audio_buffer_support', 4),
]
)
class HCI_LE_Get_Vendor_Capabilities_Command(HCI_Command):
# pylint: disable=line-too-long
'''
See https://source.android.com/docs/core/connect/bluetooth/hci_requirements#vendor-specific-capabilities
'''
@classmethod
def parse_return_parameters(cls, parameters):
# There are many versions of this data structure, so we need to parse until
# there are no more bytes to parse, and leave un-signal parameters set to
# None (older versions)
nones = {field: None for field, _ in cls.return_parameters_fields}
return_parameters = HCI_Object(cls.return_parameters_fields, **nones)
try:
offset = 0
for field in cls.return_parameters_fields:
field_name, field_type = field
field_value, field_size = HCI_Object.parse_field(
parameters, offset, field_type
)
setattr(return_parameters, field_name, field_value)
offset += field_size
except struct.error:
pass
return return_parameters
# -----------------------------------------------------------------------------
@HCI_Command.command(
fields=[
(
'opcode',
{
'size': 1,
'mapper': lambda x: HCI_LE_APCF_Command.opcode_name(x),
},
),
('payload', '*'),
],
return_parameters_fields=[
('status', STATUS_SPEC),
(
'opcode',
{
'size': 1,
'mapper': lambda x: HCI_LE_APCF_Command.opcode_name(x),
},
),
('payload', '*'),
],
)
class HCI_LE_APCF_Command(HCI_Command):
# pylint: disable=line-too-long
'''
See https://source.android.com/docs/core/connect/bluetooth/hci_requirements#le_apcf_command
NOTE: the subcommand-specific payloads are left as opaque byte arrays in this
implementation. A future enhancement may define subcommand-specific data structures.
'''
# APCF Subcommands
# TODO: use the OpenIntEnum class (when upcoming PR is merged)
APCF_ENABLE = 0x00
APCF_SET_FILTERING_PARAMETERS = 0x01
APCF_BROADCASTER_ADDRESS = 0x02
APCF_SERVICE_UUID = 0x03
APCF_SERVICE_SOLICITATION_UUID = 0x04
APCF_LOCAL_NAME = 0x05
APCF_MANUFACTURER_DATA = 0x06
APCF_SERVICE_DATA = 0x07
APCF_TRANSPORT_DISCOVERY_SERVICE = 0x08
APCF_AD_TYPE_FILTER = 0x09
APCF_READ_EXTENDED_FEATURES = 0xFF
OPCODE_NAMES = {
APCF_ENABLE: 'APCF_ENABLE',
APCF_SET_FILTERING_PARAMETERS: 'APCF_SET_FILTERING_PARAMETERS',
APCF_BROADCASTER_ADDRESS: 'APCF_BROADCASTER_ADDRESS',
APCF_SERVICE_UUID: 'APCF_SERVICE_UUID',
APCF_SERVICE_SOLICITATION_UUID: 'APCF_SERVICE_SOLICITATION_UUID',
APCF_LOCAL_NAME: 'APCF_LOCAL_NAME',
APCF_MANUFACTURER_DATA: 'APCF_MANUFACTURER_DATA',
APCF_SERVICE_DATA: 'APCF_SERVICE_DATA',
APCF_TRANSPORT_DISCOVERY_SERVICE: 'APCF_TRANSPORT_DISCOVERY_SERVICE',
APCF_AD_TYPE_FILTER: 'APCF_AD_TYPE_FILTER',
APCF_READ_EXTENDED_FEATURES: 'APCF_READ_EXTENDED_FEATURES',
}
@classmethod
def opcode_name(cls, opcode):
return name_or_number(cls.OPCODE_NAMES, opcode)
# -----------------------------------------------------------------------------
@HCI_Command.command(
return_parameters_fields=[
('status', STATUS_SPEC),
('total_tx_time_ms', 4),
('total_rx_time_ms', 4),
('total_idle_time_ms', 4),
('total_energy_used', 4),
],
)
class HCI_Get_Controller_Activity_Energy_Info_Command(HCI_Command):
# pylint: disable=line-too-long
'''
See https://source.android.com/docs/core/connect/bluetooth/hci_requirements#le_get_controller_activity_energy_info
'''
# -----------------------------------------------------------------------------
@HCI_Command.command(
fields=[
(
'opcode',
{
'size': 1,
'mapper': lambda x: HCI_A2DP_Hardware_Offload_Command.opcode_name(x),
},
),
('payload', '*'),
],
return_parameters_fields=[
('status', STATUS_SPEC),
(
'opcode',
{
'size': 1,
'mapper': lambda x: HCI_A2DP_Hardware_Offload_Command.opcode_name(x),
},
),
('payload', '*'),
],
)
class HCI_A2DP_Hardware_Offload_Command(HCI_Command):
# pylint: disable=line-too-long
'''
See https://source.android.com/docs/core/connect/bluetooth/hci_requirements#a2dp-hardware-offload-support
NOTE: the subcommand-specific payloads are left as opaque byte arrays in this
implementation. A future enhancement may define subcommand-specific data structures.
'''
# A2DP Hardware Offload Subcommands
# TODO: use the OpenIntEnum class (when upcoming PR is merged)
START_A2DP_OFFLOAD = 0x01
STOP_A2DP_OFFLOAD = 0x02
OPCODE_NAMES = {
START_A2DP_OFFLOAD: 'START_A2DP_OFFLOAD',
STOP_A2DP_OFFLOAD: 'STOP_A2DP_OFFLOAD',
}
@classmethod
def opcode_name(cls, opcode):
return name_or_number(cls.OPCODE_NAMES, opcode)
# -----------------------------------------------------------------------------
@HCI_Command.command(
fields=[
(
'opcode',
{
'size': 1,
'mapper': lambda x: HCI_Dynamic_Audio_Buffer_Command.opcode_name(x),
},
),
('payload', '*'),
],
return_parameters_fields=[
('status', STATUS_SPEC),
(
'opcode',
{
'size': 1,
'mapper': lambda x: HCI_Dynamic_Audio_Buffer_Command.opcode_name(x),
},
),
('payload', '*'),
],
)
class HCI_Dynamic_Audio_Buffer_Command(HCI_Command):
# pylint: disable=line-too-long
'''
See https://source.android.com/docs/core/connect/bluetooth/hci_requirements#dynamic-audio-buffer-command
NOTE: the subcommand-specific payloads are left as opaque byte arrays in this
implementation. A future enhancement may define subcommand-specific data structures.
'''
# Dynamic Audio Buffer Subcommands
# TODO: use the OpenIntEnum class (when upcoming PR is merged)
GET_AUDIO_BUFFER_TIME_CAPABILITY = 0x01
OPCODE_NAMES = {
GET_AUDIO_BUFFER_TIME_CAPABILITY: 'GET_AUDIO_BUFFER_TIME_CAPABILITY',
}
@classmethod
def opcode_name(cls, opcode):
return name_or_number(cls.OPCODE_NAMES, opcode)
# -----------------------------------------------------------------------------
@HCI_Vendor_Event.event(
fields=[
('quality_report_id', 1),
('packet_types', 1),
('connection_handle', 2),
('connection_role', {'size': 1, 'mapper': HCI_Constant.role_name}),
('tx_power_level', -1),
('rssi', -1),
('snr', 1),
('unused_afh_channel_count', 1),
('afh_select_unideal_channel_count', 1),
('lsto', 2),
('connection_piconet_clock', 4),
('retransmission_count', 4),
('no_rx_count', 4),
('nak_count', 4),
('last_tx_ack_timestamp', 4),
('flow_off_count', 4),
('last_flow_on_timestamp', 4),
('buffer_overflow_bytes', 4),
('buffer_underflow_bytes', 4),
('bdaddr', Address.parse_address),
('cal_failed_item_count', 1),
('tx_total_packets', 4),
('tx_unacked_packets', 4),
('tx_flushed_packets', 4),
('tx_last_subevent_packets', 4),
('crc_error_packets', 4),
('rx_duplicate_packets', 4),
('vendor_specific_parameters', '*'),
]
)
class HCI_Bluetooth_Quality_Report_Event(HCI_Vendor_Event):
# pylint: disable=line-too-long
'''
See https://source.android.com/docs/core/connect/bluetooth/hci_requirements#bluetooth-quality-report-sub-event
'''