LE read remote features

This commit is contained in:
Josh Wu
2024-01-02 23:03:45 +08:00
parent d8e6700611
commit eff05afb7a
6 changed files with 135 additions and 70 deletions

View File

@@ -27,8 +27,8 @@ from bumble.colors import color
from bumble.core import name_or_number from bumble.core import name_or_number
from bumble.hci import ( from bumble.hci import (
map_null_terminated_utf8_string, map_null_terminated_utf8_string,
LeFeatureMask,
HCI_SUCCESS, HCI_SUCCESS,
HCI_LE_SUPPORTED_FEATURES_NAMES,
HCI_VERSION_NAMES, HCI_VERSION_NAMES,
LMP_VERSION_NAMES, LMP_VERSION_NAMES,
HCI_Command, HCI_Command,
@@ -140,7 +140,7 @@ async def get_le_info(host: Host) -> None:
print(color('LE Features:', 'yellow')) print(color('LE Features:', 'yellow'))
for feature in host.supported_le_features: for feature in host.supported_le_features:
print(' ', name_or_number(HCI_LE_SUPPORTED_FEATURES_NAMES, feature)) print(LeFeatureMask(feature).name)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------

View File

@@ -1212,6 +1212,18 @@ class Controller:
See Bluetooth spec Vol 4, Part E - 7.8.21 LE Read Remote Features Command See Bluetooth spec Vol 4, Part E - 7.8.21 LE Read Remote Features Command
''' '''
handle = command.connection_handle
if not self.find_connection_by_handle(handle):
self.send_hci_packet(
HCI_Command_Status_Event(
status=HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR,
num_hci_command_packets=1,
command_opcode=command.op_code,
)
)
return
# First, say that the command is pending # First, say that the command is pending
self.send_hci_packet( self.send_hci_packet(
HCI_Command_Status_Event( HCI_Command_Status_Event(
@@ -1225,7 +1237,7 @@ class Controller:
self.send_hci_packet( self.send_hci_packet(
HCI_LE_Read_Remote_Features_Complete_Event( HCI_LE_Read_Remote_Features_Complete_Event(
status=HCI_SUCCESS, status=HCI_SUCCESS,
connection_handle=0, connection_handle=handle,
le_features=bytes.fromhex('dd40000000000000'), le_features=bytes.fromhex('dd40000000000000'),
) )
) )

View File

@@ -49,7 +49,6 @@ from .hci import (
HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE, HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE,
HCI_CENTRAL_ROLE, HCI_CENTRAL_ROLE,
HCI_COMMAND_STATUS_PENDING, HCI_COMMAND_STATUS_PENDING,
HCI_CONNECTED_ISOCHRONOUS_STREAM_LE_SUPPORTED_FEATURE,
HCI_CONNECTION_REJECTED_DUE_TO_LIMITED_RESOURCES_ERROR, HCI_CONNECTION_REJECTED_DUE_TO_LIMITED_RESOURCES_ERROR,
HCI_DISPLAY_YES_NO_IO_CAPABILITY, HCI_DISPLAY_YES_NO_IO_CAPABILITY,
HCI_DISPLAY_ONLY_IO_CAPABILITY, HCI_DISPLAY_ONLY_IO_CAPABILITY,
@@ -60,11 +59,8 @@ from .hci import (
HCI_LE_1M_PHY, HCI_LE_1M_PHY,
HCI_LE_1M_PHY_BIT, HCI_LE_1M_PHY_BIT,
HCI_LE_2M_PHY, HCI_LE_2M_PHY,
HCI_LE_2M_PHY_LE_SUPPORTED_FEATURE,
HCI_LE_CODED_PHY, HCI_LE_CODED_PHY,
HCI_LE_CODED_PHY_BIT, HCI_LE_CODED_PHY_BIT,
HCI_LE_CODED_PHY_LE_SUPPORTED_FEATURE,
HCI_LE_EXTENDED_ADVERTISING_LE_SUPPORTED_FEATURE,
HCI_LE_EXTENDED_CREATE_CONNECTION_COMMAND, HCI_LE_EXTENDED_CREATE_CONNECTION_COMMAND,
HCI_LE_RAND_COMMAND, HCI_LE_RAND_COMMAND,
HCI_LE_READ_PHY_COMMAND, HCI_LE_READ_PHY_COMMAND,
@@ -106,6 +102,7 @@ from .hci import (
HCI_LE_Extended_Create_Connection_Command, HCI_LE_Extended_Create_Connection_Command,
HCI_LE_Rand_Command, HCI_LE_Rand_Command,
HCI_LE_Read_PHY_Command, HCI_LE_Read_PHY_Command,
HCI_LE_Read_Remote_Features_Command,
HCI_LE_Reject_CIS_Request_Command, HCI_LE_Reject_CIS_Request_Command,
HCI_LE_Remove_Advertising_Set_Command, HCI_LE_Remove_Advertising_Set_Command,
HCI_LE_Set_Address_Resolution_Enable_Command, HCI_LE_Set_Address_Resolution_Enable_Command,
@@ -151,6 +148,7 @@ from .hci import (
HCI_Write_Secure_Connections_Host_Support_Command, HCI_Write_Secure_Connections_Host_Support_Command,
HCI_Write_Simple_Pairing_Mode_Command, HCI_Write_Simple_Pairing_Mode_Command,
OwnAddressType, OwnAddressType,
LeFeatureMask,
phy_list_to_bits, phy_list_to_bits,
) )
from .host import Host from .host import Host
@@ -681,6 +679,7 @@ class Connection(CompositeEventEmitter):
self_address: Address self_address: Address
peer_address: Address peer_address: Address
peer_resolvable_address: Optional[Address] peer_resolvable_address: Optional[Address]
peer_le_features: Optional[LeFeatureMask]
role: int role: int
encryption: int encryption: int
authenticated: bool authenticated: bool
@@ -757,6 +756,7 @@ class Connection(CompositeEventEmitter):
) # By default, use the device's shared server ) # By default, use the device's shared server
self.pairing_peer_io_capability = None self.pairing_peer_io_capability = None
self.pairing_peer_authentication_requirements = None self.pairing_peer_authentication_requirements = None
self.peer_le_features = None
# [Classic only] # [Classic only]
@classmethod @classmethod
@@ -905,6 +905,15 @@ class Connection(CompositeEventEmitter):
async def request_remote_name(self): async def request_remote_name(self):
return await self.device.request_remote_name(self) return await self.device.request_remote_name(self)
async def get_remote_le_features(self) -> LeFeatureMask:
"""[LE Only] Reads remote LE supported features.
Returns:
LE features supported by the remote device.
"""
self.peer_le_features = await self.device.get_remote_le_features(self)
return self.peer_le_features
async def __aenter__(self): async def __aenter__(self):
return self return self
@@ -1537,9 +1546,7 @@ class Device(CompositeEventEmitter):
if self.cis_enabled: if self.cis_enabled:
await self.send_command( await self.send_command(
HCI_LE_Set_Host_Feature_Command( HCI_LE_Set_Host_Feature_Command(
bit_number=( bit_number=LeFeatureMask.CONNECTED_ISOCHRONOUS_STREAM,
HCI_CONNECTED_ISOCHRONOUS_STREAM_LE_SUPPORTED_FEATURE
),
bit_value=1, bit_value=1,
) )
) )
@@ -1595,21 +1602,21 @@ class Device(CompositeEventEmitter):
) )
) )
def supports_le_feature(self, feature): def supports_le_features(self, feature: LeFeatureMask) -> bool:
return self.host.supports_le_feature(feature) return self.host.supports_le_features(feature)
def supports_le_phy(self, phy): def supports_le_phy(self, phy):
if phy == HCI_LE_1M_PHY: if phy == HCI_LE_1M_PHY:
return True return True
feature_map = { feature_map = {
HCI_LE_2M_PHY: HCI_LE_2M_PHY_LE_SUPPORTED_FEATURE, HCI_LE_2M_PHY: LeFeatureMask.LE_2M_PHY,
HCI_LE_CODED_PHY: HCI_LE_CODED_PHY_LE_SUPPORTED_FEATURE, HCI_LE_CODED_PHY: LeFeatureMask.LE_CODED_PHY,
} }
if phy not in feature_map: if phy not in feature_map:
raise ValueError('invalid PHY') raise ValueError('invalid PHY')
return self.host.supports_le_feature(feature_map[phy]) return self.host.supports_le_features(feature_map[phy])
@deprecated("Please use start_legacy_advertising.") @deprecated("Please use start_legacy_advertising.")
async def start_advertising( async def start_advertising(
@@ -1919,8 +1926,8 @@ class Device(CompositeEventEmitter):
self.advertisement_accumulators = {} self.advertisement_accumulators = {}
# Enable scanning # Enable scanning
if not legacy and self.supports_le_feature( if not legacy and self.supports_le_features(
HCI_LE_EXTENDED_ADVERTISING_LE_SUPPORTED_FEATURE LeFeatureMask.LE_EXTENDED_ADVERTISING
): ):
# Set the scanning parameters # Set the scanning parameters
scan_type = ( scan_type = (
@@ -1938,7 +1945,7 @@ class Device(CompositeEventEmitter):
scanning_phys_bits |= 1 << HCI_LE_1M_PHY_BIT scanning_phys_bits |= 1 << HCI_LE_1M_PHY_BIT
scanning_phy_count += 1 scanning_phy_count += 1
if HCI_LE_CODED_PHY in scanning_phys: if HCI_LE_CODED_PHY in scanning_phys:
if self.supports_le_feature(HCI_LE_CODED_PHY_LE_SUPPORTED_FEATURE): if self.supports_le_features(LeFeatureMask.LE_CODED_PHY):
scanning_phys_bits |= 1 << HCI_LE_CODED_PHY_BIT scanning_phys_bits |= 1 << HCI_LE_CODED_PHY_BIT
scanning_phy_count += 1 scanning_phy_count += 1
@@ -1999,7 +2006,7 @@ class Device(CompositeEventEmitter):
async def stop_scanning(self) -> None: async def stop_scanning(self) -> None:
# Disable scanning # Disable scanning
if self.supports_le_feature(HCI_LE_EXTENDED_ADVERTISING_LE_SUPPORTED_FEATURE): if self.supports_le_features(LeFeatureMask.LE_EXTENDED_ADVERTISING):
await self.send_command( await self.send_command(
HCI_LE_Set_Extended_Scan_Enable_Command( HCI_LE_Set_Extended_Scan_Enable_Command(
enable=0, filter_duplicates=0, duration=0, period=0 enable=0, filter_duplicates=0, duration=0, period=0
@@ -3141,6 +3148,32 @@ class Device(CompositeEventEmitter):
) )
raise HCI_StatusError(result) raise HCI_StatusError(result)
async def get_remote_le_features(self, connection: Connection) -> LeFeatureMask:
"""[LE Only] Reads remote LE supported features.
Args:
handle: connection handle to read LE features.
Returns:
LE features supported by the remote device.
"""
with closing(EventWatcher()) as watcher:
read_feature_future: asyncio.Future[
LeFeatureMask
] = asyncio.get_running_loop().create_future()
def on_le_remote_features(handle: int, features: int):
if handle == connection.handle:
read_feature_future.set_result(LeFeatureMask(features))
watcher.on(self.host, 'le_remote_features', on_le_remote_features)
await self.send_command(
HCI_LE_Read_Remote_Features_Command(
connection_handle=connection.handle
),
)
return await read_feature_future
@host_event_handler @host_event_handler
def on_flush(self): def on_flush(self):
self.emit('flush') self.emit('flush')

View File

@@ -1360,55 +1360,51 @@ HCI_SUPPORTED_COMMANDS_FLAGS = (
# LE Supported Features # LE Supported Features
# See Bluetooth spec @ Vol 6, Part B, 4.6 FEATURE SUPPORT # See Bluetooth spec @ Vol 6, Part B, 4.6 FEATURE SUPPORT
HCI_LE_ENCRYPTION_LE_SUPPORTED_FEATURE = 0 class LeFeatureMask(enum.IntFlag):
HCI_CONNECTION_PARAMETERS_REQUEST_PROCEDURE_LE_SUPPORTED_FEATURE = 1 LE_ENCRYPTION = 1 << 0
HCI_EXTENDED_REJECT_INDICATION_LE_SUPPORTED_FEATURE = 2 CONNECTION_PARAMETERS_REQUEST_PROCEDURE = 1 << 1
HCI_PERIPHERAL_INITIATED_FEATURE_EXCHANGE_LE_SUPPORTED_FEATURE = 3 EXTENDED_REJECT_INDICATION = 1 << 2
HCI_LE_PING_LE_SUPPORTED_FEATURE = 4 PERIPHERAL_INITIATED_FEATURE_EXCHANGE = 1 << 3
HCI_LE_DATA_PACKET_LENGTH_EXTENSION_LE_SUPPORTED_FEATURE = 5 LE_PING = 1 << 4
HCI_LL_PRIVACY_LE_SUPPORTED_FEATURE = 6 LE_DATA_PACKET_LENGTH_EXTENSION = 1 << 5
HCI_EXTENDED_SCANNER_FILTER_POLICIES_LE_SUPPORTED_FEATURE = 7 LL_PRIVACY = 1 << 6
HCI_LE_2M_PHY_LE_SUPPORTED_FEATURE = 8 EXTENDED_SCANNER_FILTER_POLICIES = 1 << 7
HCI_STABLE_MODULATION_INDEX_TRANSMITTER_LE_SUPPORTED_FEATURE = 9 LE_2M_PHY = 1 << 8
HCI_STABLE_MODULATION_INDEX_RECEIVER_LE_SUPPORTED_FEATURE = 10 STABLE_MODULATION_INDEX_TRANSMITTER = 1 << 9
HCI_LE_CODED_PHY_LE_SUPPORTED_FEATURE = 11 STABLE_MODULATION_INDEX_RECEIVER = 1 << 10
HCI_LE_EXTENDED_ADVERTISING_LE_SUPPORTED_FEATURE = 12 LE_CODED_PHY = 1 << 11
HCI_LE_PERIODIC_ADVERTISING_LE_SUPPORTED_FEATURE = 13 LE_EXTENDED_ADVERTISING = 1 << 12
HCI_CHANNEL_SELECTION_ALGORITHM_2_LE_SUPPORTED_FEATURE = 14 LE_PERIODIC_ADVERTISING = 1 << 13
HCI_LE_POWER_CLASS_1_LE_SUPPORTED_FEATURE = 15 CHANNEL_SELECTION_ALGORITHM_2 = 1 << 14
HCI_MINIMUM_NUMBER_OF_USED_CHANNELS_PROCEDURE_LE_SUPPORTED_FEATURE = 16 LE_POWER_CLASS_1 = 1 << 15
HCI_CONNECTION_CTE_REQUEST_LE_SUPPORTED_FEATURE = 17 MINIMUM_NUMBER_OF_USED_CHANNELS_PROCEDURE = 1 << 16
HCI_CONNECTION_CTE_RESPONSE_LE_SUPPORTED_FEATURE = 18 CONNECTION_CTE_REQUEST = 1 << 17
HCI_CONNECTIONLESS_CTE_TRANSMITTER_LE_SUPPORTED_FEATURE = 19 CONNECTION_CTE_RESPONSE = 1 << 18
HCI_CONNECTIONLESS_CTR_RECEIVER_LE_SUPPORTED_FEATURE = 20 CONNECTIONLESS_CTE_TRANSMITTER = 1 << 19
HCI_ANTENNA_SWITCHING_DURING_CTE_TRANSMISSION_LE_SUPPORTED_FEATURE = 21 CONNECTIONLESS_CTR_RECEIVER = 1 << 20
HCI_ANTENNA_SWITCHING_DURING_CTE_RECEPTION_LE_SUPPORTED_FEATURE = 22 ANTENNA_SWITCHING_DURING_CTE_TRANSMISSION = 1 << 21
HCI_RECEIVING_CONSTANT_TONE_EXTENSIONS_LE_SUPPORTED_FEATURE = 23 ANTENNA_SWITCHING_DURING_CTE_RECEPTION = 1 << 22
HCI_PERIODIC_ADVERTISING_SYNC_TRANSFER_SENDER_LE_SUPPORTED_FEATURE = 24 RECEIVING_CONSTANT_TONE_EXTENSIONS = 1 << 23
HCI_PERIODIC_ADVERTISING_SYNC_TRANSFER_RECIPIENT_LE_SUPPORTED_FEATURE = 25 PERIODIC_ADVERTISING_SYNC_TRANSFER_SENDER = 1 << 24
HCI_SLEEP_CLOCK_ACCURACY_UPDATES_LE_SUPPORTED_FEATURE = 26 PERIODIC_ADVERTISING_SYNC_TRANSFER_RECIPIENT = 1 << 25
HCI_REMOTE_PUBLIC_KEY_VALIDATION_LE_SUPPORTED_FEATURE = 27 SLEEP_CLOCK_ACCURACY_UPDATES = 1 << 26
HCI_CONNECTED_ISOCHRONOUS_STREAM_CENTRAL_LE_SUPPORTED_FEATURE = 28 REMOTE_PUBLIC_KEY_VALIDATION = 1 << 27
HCI_CONNECTED_ISOCHRONOUS_STREAM_PERIPHERAL_LE_SUPPORTED_FEATURE = 29 CONNECTED_ISOCHRONOUS_STREAM_CENTRAL = 1 << 28
HCI_ISOCHRONOUS_BROADCASTER_LE_SUPPORTED_FEATURE = 30 CONNECTED_ISOCHRONOUS_STREAM_PERIPHERAL = 1 << 29
HCI_SYNCHRONIZED_RECEIVER_LE_SUPPORTED_FEATURE = 31 ISOCHRONOUS_BROADCASTER = 1 << 30
HCI_CONNECTED_ISOCHRONOUS_STREAM_LE_SUPPORTED_FEATURE = 32 SYNCHRONIZED_RECEIVER = 1 << 31
HCI_LE_POWER_CONTROL_REQUEST_LE_SUPPORTED_FEATURE = 33 CONNECTED_ISOCHRONOUS_STREAM = 1 << 32
HCI_LE_POWER_CONTROL_REQUEST_DUP_LE_SUPPORTED_FEATURE = 34 LE_POWER_CONTROL_REQUEST = 1 << 33
HCI_LE_PATH_LOSS_MONITORING_LE_SUPPORTED_FEATURE = 35 LE_POWER_CONTROL_REQUEST_DUP = 1 << 34
HCI_PERIODIC_ADVERTISING_ADI_SUPPORT_LE_SUPPORTED_FEATURE = 36 LE_PATH_LOSS_MONITORING = 1 << 35
HCI_CONNECTION_SUBRATING_LE_SUPPORTED_FEATURE = 37 PERIODIC_ADVERTISING_ADI_SUPPORT = 1 << 36
HCI_CONNECTION_SUBRATING_HOST_SUPPORT_LE_SUPPORTED_FEATURE = 38 CONNECTION_SUBRATING = 1 << 37
HCI_CHANNEL_CLASSIFICATION_LE_SUPPORTED_FEATURE = 39 CONNECTION_SUBRATING_HOST_SUPPORT = 1 << 38
HCI_ADVERTISING_CODING_SELECTION_LE_SUPPORTED_FEATURE = 40 CHANNEL_CLASSIFICATION = 1 << 39
HCI_ADVERTISING_CODING_SELECTION_HOST_SUPPORT_LE_SUPPORTED_FEATURE = 41 ADVERTISING_CODING_SELECTION = 1 << 40
HCI_PERIODIC_ADVERTISING_WITH_RESPONSES_ADVERTISER_LE_SUPPORTED_FEATURE = 43 ADVERTISING_CODING_SELECTION_HOST_SUPPORT = 1 << 41
HCI_PERIODIC_ADVERTISING_WITH_RESPONSES_SCANNER_LE_SUPPORTED_FEATURE = 44 PERIODIC_ADVERTISING_WITH_RESPONSES_ADVERTISER = 1 << 43
PERIODIC_ADVERTISING_WITH_RESPONSES_SCANNER = 1 << 44
HCI_LE_SUPPORTED_FEATURES_NAMES = {
flag: feature_name for (feature_name, flag) in globals().items()
if feature_name.startswith('HCI_') and feature_name.endswith('_LE_SUPPORTED_FEATURE')
}
# fmt: on # fmt: on

View File

@@ -70,6 +70,7 @@ from .hci import (
HCI_Reset_Command, HCI_Reset_Command,
HCI_Set_Event_Mask_Command, HCI_Set_Event_Mask_Command,
HCI_SynchronousDataPacket, HCI_SynchronousDataPacket,
LeFeatureMask,
) )
from .core import ( from .core import (
BT_BR_EDR_TRANSPORT, BT_BR_EDR_TRANSPORT,
@@ -487,8 +488,8 @@ class Host(AbortableEventEmitter):
return commands return commands
def supports_le_feature(self, feature): def supports_le_features(self, feature: LeFeatureMask) -> bool:
return (self.local_le_features & (1 << feature)) != 0 return (self.local_le_features & feature) == feature
@property @property
def supported_le_features(self): def supported_le_features(self):
@@ -1033,3 +1034,15 @@ class Host(AbortableEventEmitter):
event.bd_addr, event.bd_addr,
event.host_supported_features, event.host_supported_features,
) )
def on_hci_le_read_remote_features_complete_event(self, event):
if event.status != HCI_SUCCESS:
self.emit(
'le_remote_features_failure', event.connection_handle, event.status
)
else:
self.emit(
'le_remote_features',
event.connection_handle,
int.from_bytes(event.le_features, 'little'),
)

View File

@@ -50,6 +50,8 @@ from bumble.gatt import (
GATT_APPEARANCE_CHARACTERISTIC, GATT_APPEARANCE_CHARACTERISTIC,
) )
from .test_utils import TwoDevices
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# Logging # Logging
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -412,6 +414,15 @@ async def test_extended_advertising_disconnection(auto_restart):
device.start_extended_advertising.assert_not_called() device.start_extended_advertising.assert_not_called()
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_get_remote_le_features():
devices = TwoDevices()
await devices.setup_connection()
assert (await devices.connections[0].get_remote_le_features()) is not None
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_gatt_services_with_gas(): def test_gatt_services_with_gas():
device = Device(host=Host(None, None)) device = Device(host=Host(None, None))