Refactor command supporting list

This commit is contained in:
Josh Wu
2024-02-05 02:21:16 +08:00
parent f4aeaa6eb3
commit 0e6d69cd7b
2 changed files with 336 additions and 557 deletions

View File

@@ -22,7 +22,17 @@ import dataclasses
import logging
import struct
from typing import Any, Awaitable, Callable, Deque, Dict, Optional, cast, TYPE_CHECKING
from typing import (
Any,
Awaitable,
Callable,
Deque,
Dict,
Optional,
Set,
cast,
TYPE_CHECKING,
)
from bumble.colors import color
from bumble.l2cap import L2CAP_PDU
@@ -165,7 +175,7 @@ class Host(AbortableEventEmitter):
self.number_of_supported_advertising_sets = 0
self.maximum_advertising_data_length = 31
self.local_version = None
self.local_supported_commands = bytes(64)
self.local_supported_commands = 0
self.local_le_features = 0
self.local_lmp_features = hci.LmpFeatureMask(0) # Classic LMP features
self.suggested_max_tx_octets = 251 # Max allowed
@@ -232,7 +242,9 @@ class Host(AbortableEventEmitter):
response = await self.send_command(
hci.HCI_Read_Local_Supported_Commands_Command(), check_result=True
)
self.local_supported_commands = response.return_parameters.supported_commands
self.local_supported_commands = int.from_bytes(
response.return_parameters.supported_commands, 'little'
)
if self.supports_command(hci.HCI_LE_READ_LOCAL_SUPPORTED_FEATURES_COMMAND):
response = await self.send_command(
@@ -583,31 +595,19 @@ class Host(AbortableEventEmitter):
offset += data_total_length
bytes_remaining -= data_total_length
def supports_command(self, command):
# Find the support flag position for this command
for octet, flags in enumerate(hci.HCI_SUPPORTED_COMMANDS_FLAGS):
for flag_position, value in enumerate(flags):
if value == command:
# Check if the flag is set
if octet < len(self.local_supported_commands) and flag_position < 8:
return (
self.local_supported_commands[octet] & (1 << flag_position)
) != 0
return False
def supports_command(self, op_code: int) -> bool:
return (
self.local_supported_commands
& hci.HCI_SUPPORTED_COMMANDS_MASKS.get(op_code, 0)
) != 0
@property
def supported_commands(self):
commands = []
for octet, flags in enumerate(self.local_supported_commands):
if octet < len(hci.HCI_SUPPORTED_COMMANDS_FLAGS):
for flag in range(8):
if flags & (1 << flag) != 0:
command = hci.HCI_SUPPORTED_COMMANDS_FLAGS[octet][flag]
if command is not None:
commands.append(command)
return commands
def supported_commands(self) -> Set[int]:
return set(
op_code
for op_code, mask in hci.HCI_SUPPORTED_COMMANDS_MASKS.items()
if self.local_supported_commands & mask
)
def supports_le_features(self, feature: hci.LeFeatureMask) -> bool:
return (self.local_le_features & feature) == feature