Migrate all HCI_Command to dataclasses

This commit is contained in:
Josh Wu
2025-06-26 00:51:11 +08:00
parent 88777710a4
commit bad037b010
6 changed files with 1271 additions and 1377 deletions

View File

@@ -90,12 +90,10 @@ HCI_INTEL_WRITE_BOOT_PARAMS_COMMAND = hci.hci_vendor_command_op_code(0x000E)
hci.HCI_Command.register_commands(globals())
@hci.HCI_Command.command(
fields=[
("param0", 1),
],
)
@hci.HCI_Command.command
@dataclasses.dataclass
class HCI_Intel_Read_Version_Command(hci.HCI_Command):
param0: int = dataclasses.field(metadata=hci.metadata(1))
return_parameters_fields = [
("status", hci.STATUS_SPEC),
@@ -103,35 +101,35 @@ class HCI_Intel_Read_Version_Command(hci.HCI_Command):
]
@hci.HCI_Command.command(
fields=[("data_type", 1), ("data", "*")],
)
@hci.HCI_Command.command
@dataclasses.dataclass
class Hci_Intel_Secure_Send_Command(hci.HCI_Command):
data_type: int = dataclasses.field(metadata=hci.metadata(1))
data: bytes = dataclasses.field(metadata=hci.metadata("*"))
return_parameters_fields = [
("status", 1),
]
@hci.HCI_Command.command(
fields=[
("reset_type", 1),
("patch_enable", 1),
("ddc_reload", 1),
("boot_option", 1),
("boot_address", 4),
],
)
@hci.HCI_Command.command
@dataclasses.dataclass
class HCI_Intel_Reset_Command(hci.HCI_Command):
reset_type: int = dataclasses.field(metadata=hci.metadata(1))
patch_enable: int = dataclasses.field(metadata=hci.metadata(1))
ddc_reload: int = dataclasses.field(metadata=hci.metadata(1))
boot_option: int = dataclasses.field(metadata=hci.metadata(1))
boot_address: int = dataclasses.field(metadata=hci.metadata(4))
return_parameters_fields = [
("data", "*"),
]
@hci.HCI_Command.command(
fields=[("data", "*")],
)
@hci.HCI_Command.command
@dataclasses.dataclass
class Hci_Intel_Write_Device_Config_Command(hci.HCI_Command):
data: bytes = dataclasses.field(metadata=hci.metadata("*"))
return_parameters_fields = [
("status", hci.STATUS_SPEC),

View File

@@ -20,7 +20,7 @@ Based on various online bits of information, including the Linux kernel.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from dataclasses import dataclass
from dataclasses import dataclass, field
import asyncio
import enum
import logging
@@ -33,14 +33,7 @@ import weakref
from bumble import core
from bumble.hci import (
hci_vendor_command_op_code,
STATUS_SPEC,
HCI_SUCCESS,
HCI_Command,
HCI_Reset_Command,
HCI_Read_Local_Version_Information_Command,
)
from bumble import hci
from bumble.drivers import common
# -----------------------------------------------------------------------------
@@ -182,26 +175,29 @@ RTK_USB_PRODUCTS = {
# -----------------------------------------------------------------------------
# HCI Commands
# -----------------------------------------------------------------------------
HCI_RTK_READ_ROM_VERSION_COMMAND = hci_vendor_command_op_code(0x6D)
HCI_RTK_DOWNLOAD_COMMAND = hci_vendor_command_op_code(0x20)
HCI_RTK_DROP_FIRMWARE_COMMAND = hci_vendor_command_op_code(0x66)
HCI_Command.register_commands(globals())
HCI_RTK_READ_ROM_VERSION_COMMAND = hci.hci_vendor_command_op_code(0x6D)
HCI_RTK_DOWNLOAD_COMMAND = hci.hci_vendor_command_op_code(0x20)
HCI_RTK_DROP_FIRMWARE_COMMAND = hci.hci_vendor_command_op_code(0x66)
hci.HCI_Command.register_commands(globals())
@HCI_Command.command()
class HCI_RTK_Read_ROM_Version_Command(HCI_Command):
return_parameters_fields = [("status", STATUS_SPEC), ("version", 1)]
@hci.HCI_Command.command
@dataclass
class HCI_RTK_Read_ROM_Version_Command(hci.HCI_Command):
return_parameters_fields = [("status", hci.STATUS_SPEC), ("version", 1)]
@HCI_Command.command(
fields=[("index", 1), ("payload", RTK_FRAGMENT_LENGTH)],
)
class HCI_RTK_Download_Command(HCI_Command):
return_parameters_fields = [("status", STATUS_SPEC), ("index", 1)]
@hci.HCI_Command.command
@dataclass
class HCI_RTK_Download_Command(hci.HCI_Command):
index: int = field(metadata=hci.metadata(1))
payload: bytes = field(metadata=hci.metadata(RTK_FRAGMENT_LENGTH))
return_parameters_fields = [("status", hci.STATUS_SPEC), ("index", 1)]
@HCI_Command.command()
class HCI_RTK_Drop_Firmware_Command(HCI_Command):
@hci.HCI_Command.command
@dataclass
class HCI_RTK_Drop_Firmware_Command(hci.HCI_Command):
pass
@@ -497,17 +493,17 @@ class Driver(common.Driver):
async def driver_info_for_host(cls, host):
try:
await host.send_command(
HCI_Reset_Command(),
hci.HCI_Reset_Command(),
check_result=True,
response_timeout=cls.POST_RESET_DELAY,
)
host.ready = True # Needed to let the host know the controller is ready.
except asyncio.exceptions.TimeoutError:
logger.warning("timeout waiting for hci reset, retrying")
await host.send_command(HCI_Reset_Command(), check_result=True)
await host.send_command(hci.HCI_Reset_Command(), check_result=True)
host.ready = True
command = HCI_Read_Local_Version_Information_Command()
command = hci.HCI_Read_Local_Version_Information_Command()
response = await host.send_command(command, check_result=True)
if response.command_opcode != command.op_code:
logger.error("failed to probe local version information")
@@ -594,7 +590,7 @@ class Driver(common.Driver):
response = await self.host.send_command(
HCI_RTK_Read_ROM_Version_Command(), check_result=True
)
if response.return_parameters.status != HCI_SUCCESS:
if response.return_parameters.status != hci.HCI_SUCCESS:
logger.warning("can't get ROM version")
return
rom_version = response.return_parameters.version
@@ -632,9 +628,8 @@ class Driver(common.Driver):
fragment = payload[fragment_offset : fragment_offset + RTK_FRAGMENT_LENGTH]
logger.debug(f"downloading fragment {fragment_index}")
await self.host.send_command(
HCI_RTK_Download_Command(
index=download_index, payload=fragment, check_result=True
)
HCI_RTK_Download_Command(index=download_index, payload=fragment),
check_result=True,
)
logger.debug("download complete!")
@@ -643,7 +638,7 @@ class Driver(common.Driver):
response = await self.host.send_command(
HCI_RTK_Read_ROM_Version_Command(), check_result=True
)
if response.return_parameters.status != HCI_SUCCESS:
if response.return_parameters.status != hci.HCI_SUCCESS:
logger.warning("can't get ROM version")
else:
rom_version = response.return_parameters.version
@@ -666,7 +661,7 @@ class Driver(common.Driver):
async def init_controller(self):
await self.download_firmware()
await self.host.send_command(HCI_Reset_Command(), check_result=True)
await self.host.send_command(hci.HCI_Reset_Command(), check_result=True)
logger.info(f"loaded FW image {self.driver_info.fw_name}")

File diff suppressed because it is too large Load Diff

View File

@@ -45,7 +45,8 @@ hci.HCI_Command.register_commands(globals())
# -----------------------------------------------------------------------------
@hci.HCI_Command.command()
@hci.HCI_Command.command
@dataclasses.dataclass
class HCI_LE_Get_Vendor_Capabilities_Command(hci.HCI_Command):
# pylint: disable=line-too-long
'''
@@ -94,18 +95,8 @@ class HCI_LE_Get_Vendor_Capabilities_Command(hci.HCI_Command):
# -----------------------------------------------------------------------------
@hci.HCI_Command.command(
fields=[
(
'opcode',
{
'size': 1,
'mapper': lambda x: HCI_LE_APCF_Command.opcode_name(x),
},
),
('payload', '*'),
],
)
@hci.HCI_Command.command
@dataclasses.dataclass
class HCI_LE_APCF_Command(hci.HCI_Command):
# pylint: disable=line-too-long
'''
@@ -114,53 +105,34 @@ class HCI_LE_APCF_Command(hci.HCI_Command):
NOTE: the subcommand-specific payloads are left as opaque byte arrays in this
implementation. A future enhancement may define subcommand-specific data structures.
'''
# APCF Subcommands
class Opcode(hci.SpecableEnum):
ENABLE = 0x00
SET_FILTERING_PARAMETERS = 0x01
BROADCASTER_ADDRESS = 0x02
SERVICE_UUID = 0x03
SERVICE_SOLICITATION_UUID = 0x04
LOCAL_NAME = 0x05
MANUFACTURER_DATA = 0x06
SERVICE_DATA = 0x07
TRANSPORT_DISCOVERY_SERVICE = 0x08
AD_TYPE_FILTER = 0x09
READ_EXTENDED_FEATURES = 0xFF
opcode: int = dataclasses.field(metadata=Opcode.type_metadata(1))
payload: bytes = dataclasses.field(metadata=hci.metadata("*"))
return_parameters_fields = [
('status', hci.STATUS_SPEC),
(
'opcode',
{
'size': 1,
'mapper': lambda x: HCI_LE_APCF_Command.opcode_name(x),
},
),
('opcode', Opcode.type_spec(1)),
('payload', '*'),
]
# APCF Subcommands
# TODO: use the OpenIntEnum class (when upcoming PR is merged)
APCF_ENABLE = 0x00
APCF_SET_FILTERING_PARAMETERS = 0x01
APCF_BROADCASTER_ADDRESS = 0x02
APCF_SERVICE_UUID = 0x03
APCF_SERVICE_SOLICITATION_UUID = 0x04
APCF_LOCAL_NAME = 0x05
APCF_MANUFACTURER_DATA = 0x06
APCF_SERVICE_DATA = 0x07
APCF_TRANSPORT_DISCOVERY_SERVICE = 0x08
APCF_AD_TYPE_FILTER = 0x09
APCF_READ_EXTENDED_FEATURES = 0xFF
OPCODE_NAMES = {
APCF_ENABLE: 'APCF_ENABLE',
APCF_SET_FILTERING_PARAMETERS: 'APCF_SET_FILTERING_PARAMETERS',
APCF_BROADCASTER_ADDRESS: 'APCF_BROADCASTER_ADDRESS',
APCF_SERVICE_UUID: 'APCF_SERVICE_UUID',
APCF_SERVICE_SOLICITATION_UUID: 'APCF_SERVICE_SOLICITATION_UUID',
APCF_LOCAL_NAME: 'APCF_LOCAL_NAME',
APCF_MANUFACTURER_DATA: 'APCF_MANUFACTURER_DATA',
APCF_SERVICE_DATA: 'APCF_SERVICE_DATA',
APCF_TRANSPORT_DISCOVERY_SERVICE: 'APCF_TRANSPORT_DISCOVERY_SERVICE',
APCF_AD_TYPE_FILTER: 'APCF_AD_TYPE_FILTER',
APCF_READ_EXTENDED_FEATURES: 'APCF_READ_EXTENDED_FEATURES',
}
@classmethod
def opcode_name(cls, opcode):
return hci.name_or_number(cls.OPCODE_NAMES, opcode)
# -----------------------------------------------------------------------------
@hci.HCI_Command.command()
@hci.HCI_Command.command
@dataclasses.dataclass
class HCI_Get_Controller_Activity_Energy_Info_Command(hci.HCI_Command):
# pylint: disable=line-too-long
'''
@@ -176,18 +148,8 @@ class HCI_Get_Controller_Activity_Energy_Info_Command(hci.HCI_Command):
# -----------------------------------------------------------------------------
@hci.HCI_Command.command(
fields=[
(
'opcode',
{
'size': 1,
'mapper': lambda x: HCI_A2DP_Hardware_Offload_Command.opcode_name(x),
},
),
('payload', '*'),
],
)
@hci.HCI_Command.command
@dataclasses.dataclass
class HCI_A2DP_Hardware_Offload_Command(hci.HCI_Command):
# pylint: disable=line-too-long
'''
@@ -196,46 +158,25 @@ class HCI_A2DP_Hardware_Offload_Command(hci.HCI_Command):
NOTE: the subcommand-specific payloads are left as opaque byte arrays in this
implementation. A future enhancement may define subcommand-specific data structures.
'''
# A2DP Hardware Offload Subcommands
class Opcode(hci.SpecableEnum):
START_A2DP_OFFLOAD = 0x01
STOP_A2DP_OFFLOAD = 0x02
opcode: int = dataclasses.field(metadata=Opcode.type_metadata(1))
payload: bytes = dataclasses.field(metadata=hci.metadata("*"))
return_parameters_fields = [
('status', hci.STATUS_SPEC),
(
'opcode',
{
'size': 1,
'mapper': lambda x: HCI_A2DP_Hardware_Offload_Command.opcode_name(x),
},
),
('opcode', Opcode.type_spec(1)),
('payload', '*'),
]
# A2DP Hardware Offload Subcommands
# TODO: use the OpenIntEnum class (when upcoming PR is merged)
START_A2DP_OFFLOAD = 0x01
STOP_A2DP_OFFLOAD = 0x02
OPCODE_NAMES = {
START_A2DP_OFFLOAD: 'START_A2DP_OFFLOAD',
STOP_A2DP_OFFLOAD: 'STOP_A2DP_OFFLOAD',
}
@classmethod
def opcode_name(cls, opcode):
return hci.name_or_number(cls.OPCODE_NAMES, opcode)
# -----------------------------------------------------------------------------
@hci.HCI_Command.command(
fields=[
(
'opcode',
{
'size': 1,
'mapper': lambda x: HCI_Dynamic_Audio_Buffer_Command.opcode_name(x),
},
),
('payload', '*'),
],
)
@hci.HCI_Command.command
@dataclasses.dataclass
class HCI_Dynamic_Audio_Buffer_Command(hci.HCI_Command):
# pylint: disable=line-too-long
'''
@@ -244,29 +185,19 @@ class HCI_Dynamic_Audio_Buffer_Command(hci.HCI_Command):
NOTE: the subcommand-specific payloads are left as opaque byte arrays in this
implementation. A future enhancement may define subcommand-specific data structures.
'''
return_parameters_fields = [
('status', hci.STATUS_SPEC),
(
'opcode',
{
'size': 1,
'mapper': lambda x: HCI_Dynamic_Audio_Buffer_Command.opcode_name(x),
},
),
('payload', '*'),
]
# Dynamic Audio Buffer Subcommands
# TODO: use the OpenIntEnum class (when upcoming PR is merged)
GET_AUDIO_BUFFER_TIME_CAPABILITY = 0x01
class Opcode(hci.SpecableEnum):
GET_AUDIO_BUFFER_TIME_CAPABILITY = 0x01
OPCODE_NAMES = {
GET_AUDIO_BUFFER_TIME_CAPABILITY: 'GET_AUDIO_BUFFER_TIME_CAPABILITY',
}
opcode: int = dataclasses.field(metadata=Opcode.type_metadata(1))
payload: bytes = dataclasses.field(metadata=hci.metadata("*"))
@classmethod
def opcode_name(cls, opcode):
return hci.name_or_number(cls.OPCODE_NAMES, opcode)
return_parameters_fields = [
('status', hci.STATUS_SPEC),
('opcode', Opcode.type_spec(1)),
('payload', '*'),
]
# -----------------------------------------------------------------------------

View File

@@ -15,11 +15,9 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from bumble.hci import (
hci_vendor_command_op_code,
HCI_Command,
STATUS_SPEC,
)
import dataclasses
from bumble import hci
# -----------------------------------------------------------------------------
@@ -31,10 +29,10 @@ from bumble.hci import (
#
# pylint: disable-next=line-too-long
# See https://github.com/zephyrproject-rtos/zephyr/blob/main/include/zephyr/bluetooth/hci_vs.h
HCI_WRITE_TX_POWER_LEVEL_COMMAND = hci_vendor_command_op_code(0x000E)
HCI_READ_TX_POWER_LEVEL_COMMAND = hci_vendor_command_op_code(0x000F)
HCI_WRITE_TX_POWER_LEVEL_COMMAND = hci.hci_vendor_command_op_code(0x000E)
HCI_READ_TX_POWER_LEVEL_COMMAND = hci.hci_vendor_command_op_code(0x000F)
HCI_Command.register_commands(globals())
hci.HCI_Command.register_commands(globals())
# -----------------------------------------------------------------------------
@@ -49,10 +47,9 @@ class TX_Power_Level_Command:
# -----------------------------------------------------------------------------
@HCI_Command.command(
fields=[('handle_type', 1), ('connection_handle', 2), ('tx_power_level', -1)],
)
class HCI_Write_Tx_Power_Level_Command(HCI_Command, TX_Power_Level_Command):
@hci.HCI_Command.command
@dataclasses.dataclass
class HCI_Write_Tx_Power_Level_Command(hci.HCI_Command, TX_Power_Level_Command):
'''
Write TX power level. See BT_HCI_OP_VS_WRITE_TX_POWER_LEVEL in
https://github.com/zephyrproject-rtos/zephyr/blob/main/include/zephyr/bluetooth/hci_vs.h
@@ -61,8 +58,12 @@ class HCI_Write_Tx_Power_Level_Command(HCI_Command, TX_Power_Level_Command):
TX_POWER_HANDLE_TYPE_SCAN should be zero.
'''
handle_type: int = dataclasses.field(metadata=hci.metadata(1))
connection_handle: int = dataclasses.field(metadata=hci.metadata(2))
tx_power_level: int = dataclasses.field(metadata=hci.metadata(-1))
return_parameters_fields = [
('status', STATUS_SPEC),
('status', hci.STATUS_SPEC),
('handle_type', 1),
('connection_handle', 2),
('selected_tx_power_level', -1),
@@ -70,10 +71,9 @@ class HCI_Write_Tx_Power_Level_Command(HCI_Command, TX_Power_Level_Command):
# -----------------------------------------------------------------------------
@HCI_Command.command(
fields=[('handle_type', 1), ('connection_handle', 2)],
)
class HCI_Read_Tx_Power_Level_Command(HCI_Command, TX_Power_Level_Command):
@hci.HCI_Command.command
@dataclasses.dataclass
class HCI_Read_Tx_Power_Level_Command(hci.HCI_Command, TX_Power_Level_Command):
'''
Read TX power level. See BT_HCI_OP_VS_READ_TX_POWER_LEVEL in
https://github.com/zephyrproject-rtos/zephyr/blob/main/include/zephyr/bluetooth/hci_vs.h
@@ -82,8 +82,11 @@ class HCI_Read_Tx_Power_Level_Command(HCI_Command, TX_Power_Level_Command):
TX_POWER_HANDLE_TYPE_SCAN should be zero.
'''
handle_type: int = dataclasses.field(metadata=hci.metadata(1))
connection_handle: int = dataclasses.field(metadata=hci.metadata(2))
return_parameters_fields = [
('status', STATUS_SPEC),
('status', hci.STATUS_SPEC),
('handle_type', 1),
('connection_handle', 2),
('tx_power_level', -1),

View File

@@ -371,7 +371,7 @@ def test_HCI_LE_Set_Advertising_Parameters_Command():
command = hci.HCI_LE_Set_Advertising_Parameters_Command(
advertising_interval_min=20,
advertising_interval_max=30,
advertising_type=hci.HCI_LE_Set_Advertising_Parameters_Command.ADV_NONCONN_IND,
advertising_type=hci.HCI_LE_Set_Advertising_Parameters_Command.AdvertisingType.ADV_NONCONN_IND,
own_address_type=hci.Address.PUBLIC_DEVICE_ADDRESS,
peer_address_type=hci.Address.RANDOM_DEVICE_ADDRESS,
peer_address=hci.Address('00:11:22:33:44:55'),