mirror of
https://github.com/google/bumble.git
synced 2026-05-06 03:38:01 +00:00
Compare commits
34 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
27d02ef18d | ||
|
|
c0725e2a4a | ||
|
|
bf0784dde4 | ||
|
|
444f43f6a3 | ||
|
|
2420c47cf1 | ||
|
|
0a78e7506b | ||
|
|
f7cc6f6657 | ||
|
|
f2824ee6b8 | ||
|
|
7188ef08de | ||
|
|
3ded9014d3 | ||
|
|
b6125bdfb1 | ||
|
|
dc17f4f1ca | ||
|
|
3f65380c20 | ||
|
|
25a0056ecc | ||
|
|
85f6b10983 | ||
|
|
e85f041e9d | ||
|
|
ee09e6f10d | ||
|
|
c3daf4a7e1 | ||
|
|
3af623be7e | ||
|
|
4e76d3057b | ||
|
|
eda7360222 | ||
|
|
a4c15c00de | ||
|
|
cba4df4aef | ||
|
|
ceb8b448e9 | ||
|
|
311b716d5c | ||
|
|
0ba9e5c317 | ||
|
|
3517225b62 | ||
|
|
ad4bb1578b | ||
|
|
4af65b381b | ||
|
|
a5cd3365ae | ||
|
|
2915cb8bb6 | ||
|
|
28e485b7b3 | ||
|
|
1198f2c3f5 | ||
|
|
80aaf6a2b9 |
2
.github/workflows/python-build-test.yml
vendored
2
.github/workflows/python-build-test.yml
vendored
@@ -69,7 +69,7 @@ jobs:
|
||||
components: clippy,rustfmt
|
||||
toolchain: ${{ matrix.rust-version }}
|
||||
- name: Install Rust dependencies
|
||||
run: cargo install cargo-all-features --version 1.11.0 # allows building/testing combinations of features
|
||||
run: cargo install cargo-all-features --version 1.11.0 --locked # allows building/testing combinations of features
|
||||
- name: Check License Headers
|
||||
run: cd rust && cargo run --features dev-tools --bin file-header check-all
|
||||
- name: Rust Build
|
||||
|
||||
@@ -42,7 +42,7 @@ from typing_extensions import TypeIs
|
||||
|
||||
from bumble import hci, l2cap, utils
|
||||
from bumble.colors import color
|
||||
from bumble.core import UUID, InvalidOperationError, ProtocolError
|
||||
from bumble.core import UUID, InvalidOperationError, InvalidPacketError, ProtocolError
|
||||
from bumble.hci import HCI_Object
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@@ -249,6 +249,8 @@ class ATT_PDU:
|
||||
|
||||
@classmethod
|
||||
def from_bytes(cls, pdu: bytes) -> ATT_PDU:
|
||||
if not pdu:
|
||||
raise InvalidPacketError("Empty ATT PDU")
|
||||
op_code = pdu[0]
|
||||
|
||||
subclass = ATT_PDU.pdu_classes.get(op_code)
|
||||
|
||||
@@ -238,9 +238,12 @@ class Controller:
|
||||
hci_revision: int = 0
|
||||
lmp_version: int = hci.HCI_VERSION_BLUETOOTH_CORE_5_0
|
||||
lmp_subversion: int = 0
|
||||
lmp_features: bytes = bytes.fromhex(
|
||||
'0000000060000000'
|
||||
) # BR/EDR Not Supported, LE Supported (Controller)
|
||||
lmp_features: hci.LmpFeatureMask = (
|
||||
hci.LmpFeatureMask.LE_SUPPORTED_CONTROLLER
|
||||
| hci.LmpFeatureMask.BR_EDR_NOT_SUPPORTED
|
||||
| hci.LmpFeatureMask.EXTENDED_FEATURES
|
||||
)
|
||||
lmp_features_max_page_number: int = 3
|
||||
manufacturer_company_identifier: int = 0xFFFF
|
||||
acl_data_packet_length: int = 27
|
||||
total_num_acl_data_packets: int = 64
|
||||
@@ -250,10 +253,78 @@ class Controller:
|
||||
total_num_iso_data_packets: int = 64
|
||||
event_mask: int = 0
|
||||
event_mask_page_2: int = 0
|
||||
supported_commands: bytes = bytes.fromhex(
|
||||
'2000800000c000000000e4000000a822000000000000040000f7ffff7f000000'
|
||||
'30f0f9ff01008004002000000000000000000000000000000000000000000000'
|
||||
)
|
||||
supported_commands: set[int] = {
|
||||
hci.HCI_DISCONNECT_COMMAND,
|
||||
hci.HCI_READ_REMOTE_VERSION_INFORMATION_COMMAND,
|
||||
hci.HCI_READ_CLOCK_OFFSET_COMMAND,
|
||||
hci.HCI_READ_LMP_HANDLE_COMMAND,
|
||||
hci.HCI_SET_EVENT_MASK_COMMAND,
|
||||
hci.HCI_RESET_COMMAND,
|
||||
hci.HCI_READ_TRANSMIT_POWER_LEVEL_COMMAND,
|
||||
hci.HCI_SET_CONTROLLER_TO_HOST_FLOW_CONTROL_COMMAND,
|
||||
hci.HCI_HOST_BUFFER_SIZE_COMMAND,
|
||||
hci.HCI_HOST_NUMBER_OF_COMPLETED_PACKETS_COMMAND,
|
||||
hci.HCI_READ_LOCAL_VERSION_INFORMATION_COMMAND,
|
||||
hci.HCI_READ_LOCAL_SUPPORTED_FEATURES_COMMAND,
|
||||
hci.HCI_READ_BUFFER_SIZE_COMMAND,
|
||||
hci.HCI_READ_BD_ADDR_COMMAND,
|
||||
hci.HCI_READ_RSSI_COMMAND,
|
||||
hci.HCI_SET_EVENT_MASK_PAGE_2_COMMAND,
|
||||
hci.HCI_LE_SET_EVENT_MASK_COMMAND,
|
||||
hci.HCI_LE_READ_BUFFER_SIZE_COMMAND,
|
||||
hci.HCI_LE_READ_LOCAL_SUPPORTED_FEATURES_COMMAND,
|
||||
hci.HCI_LE_SET_RANDOM_ADDRESS_COMMAND,
|
||||
hci.HCI_LE_SET_ADVERTISING_PARAMETERS_COMMAND,
|
||||
hci.HCI_LE_READ_ADVERTISING_PHYSICAL_CHANNEL_TX_POWER_COMMAND,
|
||||
hci.HCI_LE_SET_ADVERTISING_DATA_COMMAND,
|
||||
hci.HCI_LE_SET_SCAN_RESPONSE_DATA_COMMAND,
|
||||
hci.HCI_LE_SET_ADVERTISING_ENABLE_COMMAND,
|
||||
hci.HCI_LE_SET_SCAN_PARAMETERS_COMMAND,
|
||||
hci.HCI_LE_SET_SCAN_ENABLE_COMMAND,
|
||||
hci.HCI_LE_CREATE_CONNECTION_COMMAND,
|
||||
hci.HCI_LE_CREATE_CONNECTION_CANCEL_COMMAND,
|
||||
hci.HCI_LE_READ_FILTER_ACCEPT_LIST_SIZE_COMMAND,
|
||||
hci.HCI_LE_CLEAR_FILTER_ACCEPT_LIST_COMMAND,
|
||||
hci.HCI_LE_ADD_DEVICE_TO_FILTER_ACCEPT_LIST_COMMAND,
|
||||
hci.HCI_LE_REMOVE_DEVICE_FROM_FILTER_ACCEPT_LIST_COMMAND,
|
||||
hci.HCI_LE_CONNECTION_UPDATE_COMMAND,
|
||||
hci.HCI_LE_SET_HOST_CHANNEL_CLASSIFICATION_COMMAND,
|
||||
hci.HCI_LE_READ_CHANNEL_MAP_COMMAND,
|
||||
hci.HCI_LE_READ_REMOTE_FEATURES_COMMAND,
|
||||
hci.HCI_LE_ENCRYPT_COMMAND,
|
||||
hci.HCI_LE_RAND_COMMAND,
|
||||
hci.HCI_LE_ENABLE_ENCRYPTION_COMMAND,
|
||||
hci.HCI_LE_LONG_TERM_KEY_REQUEST_REPLY_COMMAND,
|
||||
hci.HCI_LE_LONG_TERM_KEY_REQUEST_NEGATIVE_REPLY_COMMAND,
|
||||
hci.HCI_LE_READ_SUPPORTED_STATES_COMMAND,
|
||||
hci.HCI_LE_RECEIVER_TEST_COMMAND,
|
||||
hci.HCI_LE_TRANSMITTER_TEST_COMMAND,
|
||||
hci.HCI_LE_TEST_END_COMMAND,
|
||||
hci.HCI_READ_AUTHENTICATED_PAYLOAD_TIMEOUT_COMMAND,
|
||||
hci.HCI_WRITE_AUTHENTICATED_PAYLOAD_TIMEOUT_COMMAND,
|
||||
hci.HCI_LE_REMOTE_CONNECTION_PARAMETER_REQUEST_REPLY_COMMAND,
|
||||
hci.HCI_LE_REMOTE_CONNECTION_PARAMETER_REQUEST_NEGATIVE_REPLY_COMMAND,
|
||||
hci.HCI_LE_SET_DATA_LENGTH_COMMAND,
|
||||
hci.HCI_LE_READ_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND,
|
||||
hci.HCI_LE_WRITE_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND,
|
||||
hci.HCI_LE_ADD_DEVICE_TO_RESOLVING_LIST_COMMAND,
|
||||
hci.HCI_LE_REMOVE_DEVICE_FROM_RESOLVING_LIST_COMMAND,
|
||||
hci.HCI_LE_CLEAR_RESOLVING_LIST_COMMAND,
|
||||
hci.HCI_LE_READ_RESOLVING_LIST_SIZE_COMMAND,
|
||||
hci.HCI_LE_READ_PEER_RESOLVABLE_ADDRESS_COMMAND,
|
||||
hci.HCI_LE_READ_LOCAL_RESOLVABLE_ADDRESS_COMMAND,
|
||||
hci.HCI_LE_SET_ADDRESS_RESOLUTION_ENABLE_COMMAND,
|
||||
hci.HCI_LE_SET_RESOLVABLE_PRIVATE_ADDRESS_TIMEOUT_COMMAND,
|
||||
hci.HCI_LE_READ_MAXIMUM_DATA_LENGTH_COMMAND,
|
||||
hci.HCI_LE_READ_PHY_COMMAND,
|
||||
hci.HCI_LE_SET_DEFAULT_PHY_COMMAND,
|
||||
hci.HCI_LE_SET_PHY_COMMAND,
|
||||
hci.HCI_LE_RECEIVER_TEST_V2_COMMAND,
|
||||
hci.HCI_LE_TRANSMITTER_TEST_V2_COMMAND,
|
||||
hci.HCI_LE_READ_TRANSMIT_POWER_COMMAND,
|
||||
hci.HCI_LE_SET_PRIVACY_MODE_COMMAND,
|
||||
hci.HCI_LE_READ_BUFFER_SIZE_V2_COMMAND,
|
||||
}
|
||||
le_event_mask: int = 0
|
||||
le_features: hci.LeFeatureMask = (
|
||||
hci.LeFeatureMask.LE_ENCRYPTION
|
||||
@@ -392,6 +463,12 @@ class Controller:
|
||||
if self.link:
|
||||
self.link.on_address_changed(self)
|
||||
|
||||
@property
|
||||
def lmp_features_bytes(self) -> bytes:
|
||||
return self.lmp_features.to_bytes(
|
||||
(self.lmp_features_max_page_number + 1) * 8, 'little'
|
||||
)
|
||||
|
||||
# Packet Sink protocol (packets coming from the host via HCI)
|
||||
def on_packet(self, packet: bytes) -> None:
|
||||
self.on_hci_packet(hci.HCI_Packet.from_bytes(packet))
|
||||
@@ -968,6 +1045,51 @@ class Controller:
|
||||
packet.name_length,
|
||||
packet.name_fregment,
|
||||
)
|
||||
case lmp.LmpFeaturesReq(features):
|
||||
self.send_lmp_packet(
|
||||
sender_address,
|
||||
lmp.LmpFeaturesRes(features=self.lmp_features_bytes[:8]),
|
||||
)
|
||||
case lmp.LmpFeaturesRes(features):
|
||||
if connection := self.classic_connections.get(sender_address):
|
||||
self.send_hci_packet(
|
||||
hci.HCI_Read_Remote_Supported_Features_Complete_Event(
|
||||
status=hci.HCI_ErrorCode.SUCCESS,
|
||||
connection_handle=connection.handle,
|
||||
lmp_features=features,
|
||||
)
|
||||
)
|
||||
case lmp.LmpFeaturesReqExt(features_page, features):
|
||||
# Calculate start/end of page
|
||||
page_start = features_page * 8
|
||||
page_end = page_start + 8
|
||||
features_bytes = self.lmp_features_bytes
|
||||
if page_start < len(features_bytes):
|
||||
page_features = features_bytes[page_start:page_end].ljust(
|
||||
8, b'\x00'
|
||||
)
|
||||
else:
|
||||
page_features = b'\x00' * 8
|
||||
|
||||
self.send_lmp_packet(
|
||||
sender_address,
|
||||
lmp.LmpFeaturesResExt(
|
||||
features_page=features_page,
|
||||
max_features_page=len(features_bytes) // 8 - 1,
|
||||
features=page_features,
|
||||
),
|
||||
)
|
||||
case lmp.LmpFeaturesResExt(features_page, max_features_page, features):
|
||||
if connection := self.classic_connections.get(sender_address):
|
||||
self.send_hci_packet(
|
||||
hci.HCI_Read_Remote_Extended_Features_Complete_Event(
|
||||
status=hci.HCI_ErrorCode.SUCCESS,
|
||||
connection_handle=connection.handle,
|
||||
page_number=features_page,
|
||||
maximum_page_number=max_features_page,
|
||||
extended_lmp_features=features,
|
||||
)
|
||||
)
|
||||
case _:
|
||||
logger.error("!!! Unhandled packet: %s", packet)
|
||||
|
||||
@@ -1349,6 +1471,53 @@ class Controller:
|
||||
|
||||
return None
|
||||
|
||||
def on_hci_read_remote_supported_features_command(
|
||||
self, command: hci.HCI_Read_Remote_Supported_Features_Command
|
||||
) -> None:
|
||||
'''
|
||||
See Bluetooth spec Vol 4, Part E - 7.1.20 Read Remote Supported Features command
|
||||
'''
|
||||
handle = command.connection_handle
|
||||
if not (connection := self.find_classic_connection_by_handle(handle)):
|
||||
self._send_hci_command_status(
|
||||
hci.HCI_ErrorCode.UNKNOWN_CONNECTION_IDENTIFIER_ERROR, command.op_code
|
||||
)
|
||||
return None
|
||||
|
||||
self._send_hci_command_status(hci.HCI_COMMAND_STATUS_PENDING, command.op_code)
|
||||
self.send_lmp_packet(
|
||||
connection.peer_address,
|
||||
lmp.LmpFeaturesReq(self.lmp_features_bytes[:8]),
|
||||
)
|
||||
|
||||
return None
|
||||
|
||||
def on_hci_read_remote_extended_features_command(
|
||||
self, command: hci.HCI_Read_Remote_Extended_Features_Command
|
||||
) -> None:
|
||||
'''
|
||||
See Bluetooth spec Vol 4, Part E - 7.1.21 Read Remote Extended Features command
|
||||
'''
|
||||
handle = command.connection_handle
|
||||
if not (connection := self.find_classic_connection_by_handle(handle)):
|
||||
self._send_hci_command_status(
|
||||
hci.HCI_ErrorCode.UNKNOWN_CONNECTION_IDENTIFIER_ERROR, command.op_code
|
||||
)
|
||||
return None
|
||||
|
||||
self._send_hci_command_status(hci.HCI_COMMAND_STATUS_PENDING, command.op_code)
|
||||
self.send_lmp_packet(
|
||||
connection.peer_address,
|
||||
lmp.LmpFeaturesReqExt(
|
||||
features_page=command.page_number,
|
||||
features=self.lmp_features_bytes[
|
||||
command.page_number * 8 : (command.page_number + 1) * 8
|
||||
],
|
||||
),
|
||||
)
|
||||
|
||||
return None
|
||||
|
||||
def on_hci_enhanced_setup_synchronous_connection_command(
|
||||
self, command: hci.HCI_Enhanced_Setup_Synchronous_Connection_Command
|
||||
) -> None:
|
||||
@@ -1645,11 +1814,15 @@ class Controller:
|
||||
return hci.HCI_StatusReturnParameters(hci.HCI_ErrorCode.SUCCESS)
|
||||
|
||||
def on_hci_write_simple_pairing_mode_command(
|
||||
self, _command: hci.HCI_Write_Simple_Pairing_Mode_Command
|
||||
self, command: hci.HCI_Write_Simple_Pairing_Mode_Command
|
||||
) -> hci.HCI_StatusReturnParameters:
|
||||
'''
|
||||
See Bluetooth spec Vol 4, Part E - 7.3.59 Write Simple Pairing Mode Command
|
||||
'''
|
||||
if command.simple_pairing_mode:
|
||||
self.lmp_features |= hci.LmpFeatureMask.SECURE_SIMPLE_PAIRING_HOST_SUPPORT
|
||||
else:
|
||||
self.lmp_features &= ~hci.LmpFeatureMask.SECURE_SIMPLE_PAIRING_HOST_SUPPORT
|
||||
return hci.HCI_StatusReturnParameters(hci.HCI_ErrorCode.SUCCESS)
|
||||
|
||||
def on_hci_set_event_mask_page_2_command(
|
||||
@@ -1670,16 +1843,23 @@ class Controller:
|
||||
See Bluetooth spec Vol 4, Part E - 7.3.78 Write LE Host Support Command
|
||||
'''
|
||||
return hci.HCI_Read_LE_Host_Support_ReturnParameters(
|
||||
status=hci.HCI_ErrorCode.SUCCESS, le_supported_host=1, unused=0
|
||||
status=hci.HCI_ErrorCode.SUCCESS,
|
||||
le_supported_host=(
|
||||
1 if self.lmp_features & hci.LmpFeatureMask.LE_SUPPORTED_HOST else 0
|
||||
),
|
||||
unused=0,
|
||||
)
|
||||
|
||||
def on_hci_write_le_host_support_command(
|
||||
self, _command: hci.HCI_Write_LE_Host_Support_Command
|
||||
self, command: hci.HCI_Write_LE_Host_Support_Command
|
||||
) -> hci.HCI_StatusReturnParameters:
|
||||
'''
|
||||
See Bluetooth spec Vol 4, Part E - 7.3.79 Write LE Host Support Command
|
||||
'''
|
||||
# TODO / Just ignore for now
|
||||
if command.le_supported_host:
|
||||
self.lmp_features |= hci.LmpFeatureMask.LE_SUPPORTED_HOST
|
||||
else:
|
||||
self.lmp_features &= ~hci.LmpFeatureMask.LE_SUPPORTED_HOST
|
||||
return hci.HCI_StatusReturnParameters(hci.HCI_ErrorCode.SUCCESS)
|
||||
|
||||
def on_hci_write_authenticated_payload_timeout_command(
|
||||
@@ -1716,7 +1896,11 @@ class Controller:
|
||||
See Bluetooth spec Vol 4, Part E - 7.4.2 Read Local Supported Commands Command
|
||||
'''
|
||||
return hci.HCI_Read_Local_Supported_Commands_ReturnParameters(
|
||||
hci.HCI_ErrorCode.SUCCESS, supported_commands=self.supported_commands
|
||||
hci.HCI_ErrorCode.SUCCESS,
|
||||
supported_commands=sum(
|
||||
hci.HCI_SUPPORTED_COMMANDS_MASKS.get(opcode, 0)
|
||||
for opcode in self.supported_commands
|
||||
).to_bytes(64, 'little'),
|
||||
)
|
||||
|
||||
def on_hci_read_local_supported_features_command(
|
||||
@@ -1726,7 +1910,8 @@ class Controller:
|
||||
See Bluetooth spec Vol 4, Part E - 7.4.3 Read Local Supported Features Command
|
||||
'''
|
||||
return hci.HCI_Read_Local_Supported_Features_ReturnParameters(
|
||||
hci.HCI_ErrorCode.SUCCESS, lmp_features=self.lmp_features[:8]
|
||||
hci.HCI_ErrorCode.SUCCESS,
|
||||
lmp_features=self.lmp_features_bytes[:8],
|
||||
)
|
||||
|
||||
def on_hci_read_local_extended_features_command(
|
||||
@@ -1735,18 +1920,19 @@ class Controller:
|
||||
'''
|
||||
See Bluetooth spec Vol 4, Part E - 7.4.4 Read Local Extended Features Command
|
||||
'''
|
||||
if command.page_number * 8 > len(self.lmp_features):
|
||||
feature_bytes = self.lmp_features_bytes
|
||||
if command.page_number * 8 > len(feature_bytes):
|
||||
return hci.HCI_Read_Local_Extended_Features_ReturnParameters(
|
||||
status=hci.HCI_ErrorCode.INVALID_COMMAND_PARAMETERS_ERROR,
|
||||
page_number=command.page_number,
|
||||
maximum_page_number=len(self.lmp_features) // 8 - 1,
|
||||
maximum_page_number=len(feature_bytes) // 8 - 1,
|
||||
extended_lmp_features=bytes(8),
|
||||
)
|
||||
return hci.HCI_Read_Local_Extended_Features_ReturnParameters(
|
||||
status=hci.HCI_ErrorCode.SUCCESS,
|
||||
page_number=command.page_number,
|
||||
maximum_page_number=len(self.lmp_features) // 8 - 1,
|
||||
extended_lmp_features=self.lmp_features[
|
||||
maximum_page_number=len(feature_bytes) // 8 - 1,
|
||||
extended_lmp_features=feature_bytes[
|
||||
command.page_number * 8 : (command.page_number + 1) * 8
|
||||
],
|
||||
)
|
||||
|
||||
@@ -19,6 +19,7 @@ from __future__ import annotations
|
||||
|
||||
import dataclasses
|
||||
import enum
|
||||
import functools
|
||||
import struct
|
||||
from collections.abc import Iterable
|
||||
from typing import (
|
||||
@@ -273,13 +274,8 @@ class UUID:
|
||||
def parse_uuid_2(cls, uuid_as_bytes: bytes, offset: int) -> tuple[int, UUID]:
|
||||
return offset + 2, cls.from_bytes(uuid_as_bytes[offset : offset + 2])
|
||||
|
||||
def to_bytes(self, force_128: bool = False) -> bytes:
|
||||
'''
|
||||
Serialize UUID in little-endian byte-order
|
||||
'''
|
||||
if not force_128:
|
||||
return self.uuid_bytes
|
||||
|
||||
@functools.cached_property
|
||||
def uuid_128_bytes(self) -> bytes:
|
||||
match len(self.uuid_bytes):
|
||||
case 2:
|
||||
return self.BASE_UUID + self.uuid_bytes + bytes([0, 0])
|
||||
@@ -290,6 +286,15 @@ class UUID:
|
||||
case _:
|
||||
assert False, "unreachable"
|
||||
|
||||
def to_bytes(self, force_128: bool = False) -> bytes:
|
||||
'''
|
||||
Serialize UUID in little-endian byte-order
|
||||
'''
|
||||
if not force_128:
|
||||
return self.uuid_bytes
|
||||
|
||||
return self.uuid_128_bytes
|
||||
|
||||
def to_pdu_bytes(self) -> bytes:
|
||||
'''
|
||||
Convert to bytes for use in an ATT PDU.
|
||||
@@ -318,7 +323,7 @@ class UUID:
|
||||
|
||||
def __eq__(self, other: object) -> bool:
|
||||
if isinstance(other, UUID):
|
||||
return self.to_bytes(force_128=True) == other.to_bytes(force_128=True)
|
||||
return self.uuid_128_bytes == other.uuid_128_bytes
|
||||
|
||||
if isinstance(other, str):
|
||||
return UUID(other) == self
|
||||
@@ -326,7 +331,7 @@ class UUID:
|
||||
return False
|
||||
|
||||
def __hash__(self) -> int:
|
||||
return hash(self.uuid_bytes)
|
||||
return hash(self.uuid_128_bytes)
|
||||
|
||||
def __str__(self) -> str:
|
||||
result = self.to_hex_str(separator='-')
|
||||
@@ -2111,13 +2116,10 @@ class AdvertisingData:
|
||||
# -----------------------------------------------------------------------------
|
||||
# Connection PHY
|
||||
# -----------------------------------------------------------------------------
|
||||
@dataclasses.dataclass
|
||||
class ConnectionPHY:
|
||||
def __init__(self, tx_phy, rx_phy):
|
||||
self.tx_phy = tx_phy
|
||||
self.rx_phy = rx_phy
|
||||
|
||||
def __str__(self):
|
||||
return f'ConnectionPHY(tx_phy={self.tx_phy}, rx_phy={self.rx_phy})'
|
||||
tx_phy: int
|
||||
rx_phy: int
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
@@ -1837,6 +1837,7 @@ class Connection(utils.CompositeEventEmitter):
|
||||
self.pairing_peer_io_capability = None
|
||||
self.pairing_peer_authentication_requirements = None
|
||||
self.peer_le_features = hci.LeFeatureMask(0)
|
||||
self.peer_classic_features = hci.LmpFeatureMask(0)
|
||||
self.cs_configs = {}
|
||||
self.cs_procedures = {}
|
||||
|
||||
@@ -2054,6 +2055,15 @@ class Connection(utils.CompositeEventEmitter):
|
||||
self.peer_le_features = await self.device.get_remote_le_features(self)
|
||||
return self.peer_le_features
|
||||
|
||||
async def get_remote_classic_features(self) -> hci.LmpFeatureMask:
|
||||
"""[Classic Only] Reads remote LMP supported features.
|
||||
|
||||
Returns:
|
||||
LMP features supported by the remote device.
|
||||
"""
|
||||
self.peer_classic_features = await self.device.get_remote_classic_features(self)
|
||||
return self.peer_classic_features
|
||||
|
||||
def on_att_mtu_update(self, mtu: int):
|
||||
logger.debug(
|
||||
f'*** Connection ATT MTU Update: [0x{self.handle:04X}] '
|
||||
@@ -2149,6 +2159,7 @@ class DeviceConfiguration:
|
||||
)
|
||||
eatt_enabled: bool = False
|
||||
gatt_services: list[dict[str, Any]] = field(init=False)
|
||||
smp_debug_mode: bool = False
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
self.gatt_services = []
|
||||
@@ -2561,6 +2572,7 @@ class Device(utils.CompositeEventEmitter):
|
||||
),
|
||||
),
|
||||
)
|
||||
self.smp_manager.debug_mode = self.config.smp_debug_mode
|
||||
|
||||
self.l2cap_channel_manager.register_fixed_channel(smp.SMP_CID, self.on_smp_pdu)
|
||||
|
||||
@@ -5281,6 +5293,77 @@ class Device(utils.CompositeEventEmitter):
|
||||
)
|
||||
return await read_feature_future
|
||||
|
||||
async def get_remote_classic_features(
|
||||
self, connection: Connection
|
||||
) -> hci.LmpFeatureMask:
|
||||
"""[Classic Only] Reads remote LE supported features.
|
||||
|
||||
Args:
|
||||
handle: connection handle to read LMP features.
|
||||
|
||||
Returns:
|
||||
LMP features supported by the remote device.
|
||||
"""
|
||||
with closing(utils.EventWatcher()) as watcher:
|
||||
read_feature_future: asyncio.Future[tuple[int, int]] = (
|
||||
asyncio.get_running_loop().create_future()
|
||||
)
|
||||
read_features = hci.LmpFeatureMask(0)
|
||||
current_page_number = 0
|
||||
|
||||
@watcher.on(self.host, 'classic_remote_features')
|
||||
def on_classic_remote_features(
|
||||
handle: int,
|
||||
status: int,
|
||||
features: int,
|
||||
page_number: int,
|
||||
max_page_number: int,
|
||||
) -> None:
|
||||
if handle != connection.handle:
|
||||
logger.warning(
|
||||
"Received classic_remote_features for wrong handle, expected=0x%04X, got=0x%04X",
|
||||
connection.handle,
|
||||
handle,
|
||||
)
|
||||
return
|
||||
if page_number != current_page_number:
|
||||
logger.warning(
|
||||
"Received classic_remote_features for wrong page, expected=%d, got=%d",
|
||||
current_page_number,
|
||||
page_number,
|
||||
)
|
||||
return
|
||||
|
||||
if status == hci.HCI_ErrorCode.SUCCESS:
|
||||
read_feature_future.set_result((features, max_page_number))
|
||||
else:
|
||||
read_feature_future.set_exception(hci.HCI_Error(status))
|
||||
|
||||
await self.send_async_command(
|
||||
hci.HCI_Read_Remote_Supported_Features_Command(
|
||||
connection_handle=connection.handle
|
||||
)
|
||||
)
|
||||
|
||||
new_features, max_page_number = await read_feature_future
|
||||
read_features |= new_features
|
||||
if not (read_features & hci.LmpFeatureMask.EXTENDED_FEATURES):
|
||||
return read_features
|
||||
|
||||
while current_page_number <= max_page_number:
|
||||
read_feature_future = asyncio.get_running_loop().create_future()
|
||||
await self.send_async_command(
|
||||
hci.HCI_Read_Remote_Extended_Features_Command(
|
||||
connection_handle=connection.handle,
|
||||
page_number=current_page_number,
|
||||
)
|
||||
)
|
||||
new_features, max_page_number = await read_feature_future
|
||||
read_features |= new_features << (current_page_number * 64)
|
||||
current_page_number += 1
|
||||
|
||||
return read_features
|
||||
|
||||
@utils.experimental('Only for testing.')
|
||||
async def get_remote_cs_capabilities(
|
||||
self, connection: Connection
|
||||
|
||||
@@ -68,6 +68,8 @@ class HfpProtocolError(ProtocolError):
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
class HfpProtocol:
|
||||
MAX_BUFFER_SIZE: ClassVar[int] = 65536
|
||||
|
||||
dlc: rfcomm.DLC
|
||||
buffer: str
|
||||
lines: collections.deque
|
||||
@@ -84,10 +86,19 @@ class HfpProtocol:
|
||||
def feed(self, data: bytes | str) -> None:
|
||||
# Convert the data to a string if needed
|
||||
if isinstance(data, bytes):
|
||||
data = data.decode('utf-8')
|
||||
data = data.decode('utf-8', errors='replace')
|
||||
|
||||
logger.debug(f'<<< Data received: {data}')
|
||||
|
||||
# Drop incoming data if it would overflow the buffer; keep existing
|
||||
# partial packet state intact so a future clean packet can still parse.
|
||||
if len(self.buffer) + len(data) > self.MAX_BUFFER_SIZE:
|
||||
logger.warning(
|
||||
'HFP buffer overflow (>%d bytes), dropping incoming data',
|
||||
self.MAX_BUFFER_SIZE,
|
||||
)
|
||||
return
|
||||
|
||||
# Add to the buffer and look for lines
|
||||
self.buffer += data
|
||||
while (separator := self.buffer.find('\r')) >= 0:
|
||||
|
||||
@@ -692,10 +692,8 @@ class Host(utils.EventEmitter):
|
||||
finally:
|
||||
self.pending_command = None
|
||||
self.pending_response = None
|
||||
if (
|
||||
response is not None
|
||||
and response.num_hci_command_packets
|
||||
and self.command_semaphore.locked()
|
||||
if response is None or (
|
||||
response.num_hci_command_packets and self.command_semaphore.locked()
|
||||
):
|
||||
self.command_semaphore.release()
|
||||
|
||||
@@ -1660,6 +1658,19 @@ class Host(utils.EventEmitter):
|
||||
'connection_encryption_failure', event.connection_handle, event.status
|
||||
)
|
||||
|
||||
def on_hci_read_remote_supported_features_complete_event(
|
||||
self, event: hci.HCI_Read_Remote_Supported_Features_Complete_Event
|
||||
) -> None:
|
||||
# Notify the client
|
||||
self.emit(
|
||||
'classic_remote_features',
|
||||
event.connection_handle,
|
||||
event.status,
|
||||
int.from_bytes(event.lmp_features, 'little'),
|
||||
0, # page number
|
||||
0, # max page number
|
||||
)
|
||||
|
||||
def on_hci_encryption_change_v2_event(
|
||||
self, event: hci.HCI_Encryption_Change_V2_Event
|
||||
):
|
||||
@@ -1816,6 +1827,18 @@ class Host(utils.EventEmitter):
|
||||
rssi,
|
||||
)
|
||||
|
||||
def on_hci_read_remote_extended_features_complete_event(
|
||||
self, event: hci.HCI_Read_Remote_Extended_Features_Complete_Event
|
||||
):
|
||||
self.emit(
|
||||
'classic_remote_features',
|
||||
event.connection_handle,
|
||||
event.status,
|
||||
int.from_bytes(event.extended_lmp_features, 'little'),
|
||||
event.page_number,
|
||||
event.maximum_page_number,
|
||||
)
|
||||
|
||||
def on_hci_extended_inquiry_result_event(
|
||||
self, event: hci.HCI_Extended_Inquiry_Result_Event
|
||||
):
|
||||
|
||||
@@ -27,6 +27,7 @@ import dataclasses
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import pathlib
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
from typing_extensions import Self
|
||||
@@ -248,29 +249,26 @@ class JsonKeyStore(KeyStore):
|
||||
DEFAULT_NAMESPACE = '__DEFAULT__'
|
||||
DEFAULT_BASE_NAME = "keys"
|
||||
|
||||
def __init__(self, namespace, filename=None):
|
||||
self.namespace = namespace if namespace is not None else self.DEFAULT_NAMESPACE
|
||||
def __init__(
|
||||
self, namespace: str | None = None, filename: str | None = None
|
||||
) -> None:
|
||||
self.namespace = namespace or self.DEFAULT_NAMESPACE
|
||||
|
||||
if filename is None:
|
||||
# Use a default for the current user
|
||||
|
||||
# Import here because this may not exist on all platforms
|
||||
# pylint: disable=import-outside-toplevel
|
||||
import appdirs
|
||||
|
||||
self.directory_name = os.path.join(
|
||||
appdirs.user_data_dir(self.APP_NAME, self.APP_AUTHOR), self.KEYS_DIR
|
||||
)
|
||||
base_name = self.DEFAULT_BASE_NAME if namespace is None else self.namespace
|
||||
json_filename = (
|
||||
f'{base_name}.json'.lower().replace(':', '-').replace('/p', '-p')
|
||||
)
|
||||
self.filename = os.path.join(self.directory_name, json_filename)
|
||||
if filename:
|
||||
self.filename = pathlib.Path(filename).resolve()
|
||||
self.directory_name = self.filename.parent
|
||||
else:
|
||||
self.filename = filename
|
||||
self.directory_name = os.path.dirname(os.path.abspath(self.filename))
|
||||
import platformdirs # Deferred import
|
||||
|
||||
logger.debug(f'JSON keystore: {self.filename}')
|
||||
base_dir = platformdirs.user_data_path(self.APP_NAME, self.APP_AUTHOR)
|
||||
self.directory_name = base_dir / self.KEYS_DIR
|
||||
|
||||
base_name = self.namespace if namespace else self.DEFAULT_BASE_NAME
|
||||
safe_name = base_name.lower().replace(':', '-').replace('/', '-')
|
||||
|
||||
self.filename = self.directory_name / f"{safe_name}.json"
|
||||
|
||||
logger.debug('JSON keystore: %s', self.filename)
|
||||
|
||||
@classmethod
|
||||
def from_device(
|
||||
@@ -293,7 +291,9 @@ class JsonKeyStore(KeyStore):
|
||||
|
||||
return cls(namespace, filename)
|
||||
|
||||
async def load(self):
|
||||
async def load(
|
||||
self,
|
||||
) -> tuple[dict[str, dict[str, dict[str, Any]]], dict[str, dict[str, Any]]]:
|
||||
# Try to open the file, without failing. If the file does not exist, it
|
||||
# will be created upon saving.
|
||||
try:
|
||||
@@ -312,17 +312,17 @@ class JsonKeyStore(KeyStore):
|
||||
return next(iter(db.items()))
|
||||
|
||||
# Finally, just create an empty key map for the namespace
|
||||
key_map = {}
|
||||
key_map: dict[str, dict[str, Any]] = {}
|
||||
db[self.namespace] = key_map
|
||||
return (db, key_map)
|
||||
|
||||
async def save(self, db):
|
||||
async def save(self, db: dict[str, dict[str, dict[str, Any]]]) -> None:
|
||||
# Create the directory if it doesn't exist
|
||||
if not os.path.exists(self.directory_name):
|
||||
os.makedirs(self.directory_name, exist_ok=True)
|
||||
if not self.directory_name.exists():
|
||||
self.directory_name.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Save to a temporary file
|
||||
temp_filename = self.filename + '.tmp'
|
||||
temp_filename = self.filename.with_name(self.filename.name + ".tmp")
|
||||
with open(temp_filename, 'w', encoding='utf-8') as output:
|
||||
json.dump(db, output, sort_keys=True, indent=4)
|
||||
|
||||
@@ -334,16 +334,16 @@ class JsonKeyStore(KeyStore):
|
||||
del key_map[name]
|
||||
await self.save(db)
|
||||
|
||||
async def update(self, name, keys):
|
||||
async def update(self, name: str, keys: PairingKeys) -> None:
|
||||
db, key_map = await self.load()
|
||||
key_map.setdefault(name, {}).update(keys.to_dict())
|
||||
await self.save(db)
|
||||
|
||||
async def get_all(self):
|
||||
async def get_all(self) -> list[tuple[str, PairingKeys]]:
|
||||
_, key_map = await self.load()
|
||||
return [(name, PairingKeys.from_dict(keys)) for (name, keys) in key_map.items()]
|
||||
|
||||
async def delete_all(self):
|
||||
async def delete_all(self) -> None:
|
||||
db, key_map = await self.load()
|
||||
key_map.clear()
|
||||
await self.save(db)
|
||||
|
||||
@@ -322,3 +322,38 @@ class LmpNameRes(Packet):
|
||||
name_offset: int = field(metadata=hci.metadata(2))
|
||||
name_length: int = field(metadata=hci.metadata(3))
|
||||
name_fregment: bytes = field(metadata=hci.metadata('*'))
|
||||
|
||||
|
||||
@Packet.subclass
|
||||
@dataclass
|
||||
class LmpFeaturesReq(Packet):
|
||||
opcode = Opcode.LMP_FEATURES_REQ
|
||||
|
||||
features: bytes = field(metadata=hci.metadata(8))
|
||||
|
||||
|
||||
@Packet.subclass
|
||||
@dataclass
|
||||
class LmpFeaturesRes(Packet):
|
||||
opcode = Opcode.LMP_FEATURES_RES
|
||||
|
||||
features: bytes = field(metadata=hci.metadata(8))
|
||||
|
||||
|
||||
@Packet.subclass
|
||||
@dataclass
|
||||
class LmpFeaturesReqExt(Packet):
|
||||
opcode = Opcode.LMP_FEATURES_REQ_EXT
|
||||
|
||||
features_page: int = field(metadata=hci.metadata(1))
|
||||
features: bytes = field(metadata=hci.metadata(8))
|
||||
|
||||
|
||||
@Packet.subclass
|
||||
@dataclass
|
||||
class LmpFeaturesResExt(Packet):
|
||||
opcode = Opcode.LMP_FEATURES_RES_EXT
|
||||
|
||||
features_page: int = field(metadata=hci.metadata(1))
|
||||
max_features_page: int = field(metadata=hci.metadata(1))
|
||||
features: bytes = field(metadata=hci.metadata(8))
|
||||
|
||||
455
bumble/sdp.py
455
bumble/sdp.py
@@ -21,11 +21,12 @@ import asyncio
|
||||
import logging
|
||||
import struct
|
||||
from collections.abc import Iterable, Sequence
|
||||
from typing import TYPE_CHECKING, NewType
|
||||
from dataclasses import dataclass, field
|
||||
from typing import TYPE_CHECKING, Any, ClassVar, NewType, TypeVar
|
||||
|
||||
from typing_extensions import Self
|
||||
|
||||
from bumble import core, l2cap
|
||||
from bumble import core, hci, l2cap, utils
|
||||
from bumble.colors import color
|
||||
from bumble.core import (
|
||||
InvalidArgumentError,
|
||||
@@ -33,7 +34,6 @@ from bumble.core import (
|
||||
InvalidStateError,
|
||||
ProtocolError,
|
||||
)
|
||||
from bumble.hci import HCI_Object, key_with_value, name_or_number
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from bumble.device import Connection, Device
|
||||
@@ -54,39 +54,22 @@ SDP_CONTINUATION_WATCHDOG = 64 # Maximum number of continuations we're willing
|
||||
|
||||
SDP_PSM = 0x0001
|
||||
|
||||
SDP_ERROR_RESPONSE = 0x01
|
||||
SDP_SERVICE_SEARCH_REQUEST = 0x02
|
||||
SDP_SERVICE_SEARCH_RESPONSE = 0x03
|
||||
SDP_SERVICE_ATTRIBUTE_REQUEST = 0x04
|
||||
SDP_SERVICE_ATTRIBUTE_RESPONSE = 0x05
|
||||
SDP_SERVICE_SEARCH_ATTRIBUTE_REQUEST = 0x06
|
||||
SDP_SERVICE_SEARCH_ATTRIBUTE_RESPONSE = 0x07
|
||||
class PduId(hci.SpecableEnum):
|
||||
SDP_ERROR_RESPONSE = 0x01
|
||||
SDP_SERVICE_SEARCH_REQUEST = 0x02
|
||||
SDP_SERVICE_SEARCH_RESPONSE = 0x03
|
||||
SDP_SERVICE_ATTRIBUTE_REQUEST = 0x04
|
||||
SDP_SERVICE_ATTRIBUTE_RESPONSE = 0x05
|
||||
SDP_SERVICE_SEARCH_ATTRIBUTE_REQUEST = 0x06
|
||||
SDP_SERVICE_SEARCH_ATTRIBUTE_RESPONSE = 0x07
|
||||
|
||||
SDP_PDU_NAMES = {
|
||||
SDP_ERROR_RESPONSE: 'SDP_ERROR_RESPONSE',
|
||||
SDP_SERVICE_SEARCH_REQUEST: 'SDP_SERVICE_SEARCH_REQUEST',
|
||||
SDP_SERVICE_SEARCH_RESPONSE: 'SDP_SERVICE_SEARCH_RESPONSE',
|
||||
SDP_SERVICE_ATTRIBUTE_REQUEST: 'SDP_SERVICE_ATTRIBUTE_REQUEST',
|
||||
SDP_SERVICE_ATTRIBUTE_RESPONSE: 'SDP_SERVICE_ATTRIBUTE_RESPONSE',
|
||||
SDP_SERVICE_SEARCH_ATTRIBUTE_REQUEST: 'SDP_SERVICE_SEARCH_ATTRIBUTE_REQUEST',
|
||||
SDP_SERVICE_SEARCH_ATTRIBUTE_RESPONSE: 'SDP_SERVICE_SEARCH_ATTRIBUTE_RESPONSE'
|
||||
}
|
||||
|
||||
SDP_INVALID_SDP_VERSION_ERROR = 0x0001
|
||||
SDP_INVALID_SERVICE_RECORD_HANDLE_ERROR = 0x0002
|
||||
SDP_INVALID_REQUEST_SYNTAX_ERROR = 0x0003
|
||||
SDP_INVALID_PDU_SIZE_ERROR = 0x0004
|
||||
SDP_INVALID_CONTINUATION_STATE_ERROR = 0x0005
|
||||
SDP_INSUFFICIENT_RESOURCES_TO_SATISFY_REQUEST_ERROR = 0x0006
|
||||
|
||||
SDP_ERROR_NAMES = {
|
||||
SDP_INVALID_SDP_VERSION_ERROR: 'SDP_INVALID_SDP_VERSION_ERROR',
|
||||
SDP_INVALID_SERVICE_RECORD_HANDLE_ERROR: 'SDP_INVALID_SERVICE_RECORD_HANDLE_ERROR',
|
||||
SDP_INVALID_REQUEST_SYNTAX_ERROR: 'SDP_INVALID_REQUEST_SYNTAX_ERROR',
|
||||
SDP_INVALID_PDU_SIZE_ERROR: 'SDP_INVALID_PDU_SIZE_ERROR',
|
||||
SDP_INVALID_CONTINUATION_STATE_ERROR: 'SDP_INVALID_CONTINUATION_STATE_ERROR',
|
||||
SDP_INSUFFICIENT_RESOURCES_TO_SATISFY_REQUEST_ERROR: 'SDP_INSUFFICIENT_RESOURCES_TO_SATISFY_REQUEST_ERROR'
|
||||
}
|
||||
class ErrorCode(hci.SpecableEnum):
|
||||
INVALID_SDP_VERSION = 0x0001
|
||||
INVALID_SERVICE_RECORD_HANDLE = 0x0002
|
||||
INVALID_REQUEST_SYNTAX = 0x0003
|
||||
INVALID_PDU_SIZE = 0x0004
|
||||
INVALID_CONTINUATION_STATE = 0x0005
|
||||
INSUFFICIENT_RESOURCES_TO_SATISFY_REQUEST = 0x0006
|
||||
|
||||
SDP_SERVICE_NAME_ATTRIBUTE_ID_OFFSET = 0x0000
|
||||
SDP_SERVICE_DESCRIPTION_ATTRIBUTE_ID_OFFSET = 0x0001
|
||||
@@ -141,30 +124,31 @@ SDP_ALL_ATTRIBUTES_RANGE = (0x0000, 0xFFFF)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@dataclass
|
||||
class DataElement:
|
||||
NIL = 0
|
||||
UNSIGNED_INTEGER = 1
|
||||
SIGNED_INTEGER = 2
|
||||
UUID = 3
|
||||
TEXT_STRING = 4
|
||||
BOOLEAN = 5
|
||||
SEQUENCE = 6
|
||||
ALTERNATIVE = 7
|
||||
URL = 8
|
||||
|
||||
TYPE_NAMES = {
|
||||
NIL: 'NIL',
|
||||
UNSIGNED_INTEGER: 'UNSIGNED_INTEGER',
|
||||
SIGNED_INTEGER: 'SIGNED_INTEGER',
|
||||
UUID: 'UUID',
|
||||
TEXT_STRING: 'TEXT_STRING',
|
||||
BOOLEAN: 'BOOLEAN',
|
||||
SEQUENCE: 'SEQUENCE',
|
||||
ALTERNATIVE: 'ALTERNATIVE',
|
||||
URL: 'URL',
|
||||
}
|
||||
class Type(utils.OpenIntEnum):
|
||||
NIL = 0
|
||||
UNSIGNED_INTEGER = 1
|
||||
SIGNED_INTEGER = 2
|
||||
UUID = 3
|
||||
TEXT_STRING = 4
|
||||
BOOLEAN = 5
|
||||
SEQUENCE = 6
|
||||
ALTERNATIVE = 7
|
||||
URL = 8
|
||||
|
||||
type_constructors = {
|
||||
NIL = Type.NIL
|
||||
UNSIGNED_INTEGER = Type.UNSIGNED_INTEGER
|
||||
SIGNED_INTEGER = Type.SIGNED_INTEGER
|
||||
UUID = Type.UUID
|
||||
TEXT_STRING = Type.TEXT_STRING
|
||||
BOOLEAN = Type.BOOLEAN
|
||||
SEQUENCE = Type.SEQUENCE
|
||||
ALTERNATIVE = Type.ALTERNATIVE
|
||||
URL = Type.URL
|
||||
|
||||
TYPE_CONSTRUCTORS = {
|
||||
NIL: lambda x: DataElement(DataElement.NIL, None),
|
||||
UNSIGNED_INTEGER: lambda x, y: DataElement(
|
||||
DataElement.UNSIGNED_INTEGER,
|
||||
@@ -190,14 +174,18 @@ class DataElement:
|
||||
URL: lambda x: DataElement(DataElement.URL, x.decode('utf8')),
|
||||
}
|
||||
|
||||
def __init__(self, element_type, value, value_size=None):
|
||||
self.type = element_type
|
||||
self.value = value
|
||||
self.value_size = value_size
|
||||
type: Type
|
||||
value: Any
|
||||
value_size: int | None = None
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
# Used as a cache when parsing from bytes so we can emit a byte-for-byte replica
|
||||
self.bytes = None
|
||||
if element_type in (DataElement.UNSIGNED_INTEGER, DataElement.SIGNED_INTEGER):
|
||||
if value_size is None:
|
||||
self._bytes: bytes | None = None
|
||||
if self.type in (
|
||||
DataElement.UNSIGNED_INTEGER,
|
||||
DataElement.SIGNED_INTEGER,
|
||||
):
|
||||
if self.value_size is None:
|
||||
raise InvalidArgumentError(
|
||||
'integer types must have a value size specified'
|
||||
)
|
||||
@@ -337,7 +325,7 @@ class DataElement:
|
||||
value_offset = 4
|
||||
|
||||
value_data = data[1 + value_offset : 1 + value_offset + value_size]
|
||||
constructor = DataElement.type_constructors.get(element_type)
|
||||
constructor = DataElement.TYPE_CONSTRUCTORS.get(element_type)
|
||||
if constructor:
|
||||
if element_type in (
|
||||
DataElement.UNSIGNED_INTEGER,
|
||||
@@ -348,15 +336,15 @@ class DataElement:
|
||||
result = constructor(value_data)
|
||||
else:
|
||||
result = DataElement(element_type, value_data)
|
||||
result.bytes = data[
|
||||
result._bytes = data[
|
||||
: 1 + value_offset + value_size
|
||||
] # Keep a copy so we can re-serialize to an exact replica
|
||||
return result
|
||||
|
||||
def __bytes__(self):
|
||||
# Return early if we have a cache
|
||||
if self.bytes:
|
||||
return self.bytes
|
||||
if self._bytes:
|
||||
return self._bytes
|
||||
|
||||
if self.type == DataElement.NIL:
|
||||
data = b''
|
||||
@@ -443,12 +431,12 @@ class DataElement:
|
||||
else:
|
||||
raise RuntimeError("internal error - self.type not supported")
|
||||
|
||||
self.bytes = bytes([self.type << 3 | size_index]) + size_bytes + data
|
||||
return self.bytes
|
||||
self._bytes = bytes([self.type << 3 | size_index]) + size_bytes + data
|
||||
return self._bytes
|
||||
|
||||
def to_string(self, pretty=False, indentation=0):
|
||||
prefix = ' ' * indentation
|
||||
type_name = name_or_number(self.TYPE_NAMES, self.type)
|
||||
type_name = self.type.name
|
||||
if self.type == DataElement.NIL:
|
||||
value_string = ''
|
||||
elif self.type in (DataElement.SEQUENCE, DataElement.ALTERNATIVE):
|
||||
@@ -476,10 +464,10 @@ class DataElement:
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@dataclass
|
||||
class ServiceAttribute:
|
||||
def __init__(self, attribute_id: int, value: DataElement) -> None:
|
||||
self.id = attribute_id
|
||||
self.value = value
|
||||
id: int
|
||||
value: DataElement
|
||||
|
||||
@staticmethod
|
||||
def list_from_data_elements(
|
||||
@@ -510,7 +498,7 @@ class ServiceAttribute:
|
||||
|
||||
@staticmethod
|
||||
def id_name(id_code):
|
||||
return name_or_number(SDP_ATTRIBUTE_ID_NAMES, id_code)
|
||||
return hci.name_or_number(SDP_ATTRIBUTE_ID_NAMES, id_code)
|
||||
|
||||
@staticmethod
|
||||
def is_uuid_in_value(uuid: core.UUID, value: DataElement) -> bool:
|
||||
@@ -540,239 +528,228 @@ class ServiceAttribute:
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
def _parse_service_record_handle_list(
|
||||
data: bytes, offset: int
|
||||
) -> tuple[int, list[int]]:
|
||||
count = struct.unpack_from('>H', data, offset)[0]
|
||||
offset += 2
|
||||
handle_list = [
|
||||
struct.unpack_from('>I', data, offset + x * 4)[0] for x in range(count)
|
||||
]
|
||||
return offset + count * 4, handle_list
|
||||
|
||||
|
||||
def _serialize_service_record_handle_list(
|
||||
handles: list[int],
|
||||
) -> bytes:
|
||||
return struct.pack('>H', len(handles)) + b''.join(
|
||||
struct.pack('>I', handle) for handle in handles
|
||||
)
|
||||
|
||||
|
||||
def _parse_bytes_preceded_by_length(data: bytes, offset: int) -> tuple[int, bytes]:
|
||||
length = struct.unpack_from('>H', data, offset)[0]
|
||||
offset += 2
|
||||
return offset + length, data[offset : offset + length]
|
||||
|
||||
|
||||
def _serialize_bytes_preceded_by_length(data: bytes) -> bytes:
|
||||
return struct.pack('>H', len(data)) + data
|
||||
|
||||
|
||||
_SERVICE_RECORD_HANDLE_LIST_METADATA = hci.metadata(
|
||||
{
|
||||
'parser': _parse_service_record_handle_list,
|
||||
'serializer': _serialize_service_record_handle_list,
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
_BYTES_PRECEDED_BY_LENGTH_METADATA = hci.metadata(
|
||||
{
|
||||
'parser': _parse_bytes_preceded_by_length,
|
||||
'serializer': _serialize_bytes_preceded_by_length,
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@dataclass
|
||||
class SDP_PDU:
|
||||
'''
|
||||
See Bluetooth spec @ Vol 3, Part B - 4.2 PROTOCOL DATA UNIT FORMAT
|
||||
'''
|
||||
|
||||
RESPONSE_PDU_IDS = {
|
||||
SDP_SERVICE_SEARCH_REQUEST: SDP_SERVICE_SEARCH_RESPONSE,
|
||||
SDP_SERVICE_ATTRIBUTE_REQUEST: SDP_SERVICE_ATTRIBUTE_RESPONSE,
|
||||
SDP_SERVICE_SEARCH_ATTRIBUTE_REQUEST: SDP_SERVICE_SEARCH_ATTRIBUTE_RESPONSE,
|
||||
PduId.SDP_SERVICE_SEARCH_REQUEST: PduId.SDP_SERVICE_SEARCH_RESPONSE,
|
||||
PduId.SDP_SERVICE_ATTRIBUTE_REQUEST: PduId.SDP_SERVICE_ATTRIBUTE_RESPONSE,
|
||||
PduId.SDP_SERVICE_SEARCH_ATTRIBUTE_REQUEST: PduId.SDP_SERVICE_SEARCH_ATTRIBUTE_RESPONSE,
|
||||
}
|
||||
sdp_pdu_classes: dict[int, type[SDP_PDU]] = {}
|
||||
name = None
|
||||
pdu_id = 0
|
||||
subclasses: ClassVar[dict[int, type[SDP_PDU]]] = {}
|
||||
pdu_id: ClassVar[PduId]
|
||||
fields: ClassVar[hci.Fields]
|
||||
|
||||
@staticmethod
|
||||
def from_bytes(pdu):
|
||||
pdu_id, transaction_id, _parameters_length = struct.unpack_from('>BHH', pdu, 0)
|
||||
transaction_id: int
|
||||
_payload: bytes | None = field(init=False, repr=False, default=None)
|
||||
|
||||
cls = SDP_PDU.sdp_pdu_classes.get(pdu_id)
|
||||
if cls is None:
|
||||
instance = SDP_PDU(pdu)
|
||||
instance.name = SDP_PDU.pdu_name(pdu_id)
|
||||
instance.pdu_id = pdu_id
|
||||
instance.transaction_id = transaction_id
|
||||
return instance
|
||||
self = cls.__new__(cls)
|
||||
SDP_PDU.__init__(self, pdu, transaction_id)
|
||||
if hasattr(self, 'fields'):
|
||||
self.init_from_bytes(pdu, 5)
|
||||
return self
|
||||
@classmethod
|
||||
def from_bytes(cls, pdu: bytes) -> SDP_PDU:
|
||||
pdu_id, transaction_id, parameters_length = struct.unpack_from('>BHH', pdu, 0)
|
||||
|
||||
@staticmethod
|
||||
def parse_service_record_handle_list_preceded_by_count(
|
||||
data: bytes, offset: int
|
||||
) -> tuple[int, list[int]]:
|
||||
count = struct.unpack_from('>H', data, offset - 2)[0]
|
||||
handle_list = [
|
||||
struct.unpack_from('>I', data, offset + x * 4)[0] for x in range(count)
|
||||
]
|
||||
return offset + count * 4, handle_list
|
||||
if len(pdu) != 5 + parameters_length:
|
||||
logger.warning("Expect %d bytes, got %d", 5 + parameters_length, len(pdu))
|
||||
|
||||
@staticmethod
|
||||
def parse_bytes_preceded_by_length(data, offset):
|
||||
length = struct.unpack_from('>H', data, offset - 2)[0]
|
||||
return offset + length, data[offset : offset + length]
|
||||
subclass = cls.subclasses.get(pdu_id)
|
||||
if not (subclass := cls.subclasses.get(pdu_id)):
|
||||
raise InvalidPacketError(f"Unknown PDU type {pdu_id}")
|
||||
instance = subclass(
|
||||
transaction_id=transaction_id,
|
||||
**hci.HCI_Object.dict_from_bytes(pdu, 5, subclass.fields),
|
||||
)
|
||||
instance._payload = pdu
|
||||
return instance
|
||||
|
||||
@staticmethod
|
||||
def error_name(error_code):
|
||||
return name_or_number(SDP_ERROR_NAMES, error_code)
|
||||
_PDU = TypeVar('_PDU', bound='SDP_PDU')
|
||||
|
||||
@staticmethod
|
||||
def pdu_name(code):
|
||||
return name_or_number(SDP_PDU_NAMES, code)
|
||||
|
||||
@staticmethod
|
||||
def subclass(fields):
|
||||
def inner(cls):
|
||||
name = cls.__name__
|
||||
|
||||
# add a _ character before every uppercase letter, except the SDP_ prefix
|
||||
location = len(name) - 1
|
||||
while location > 4:
|
||||
if not name[location].isupper():
|
||||
location -= 1
|
||||
continue
|
||||
name = name[:location] + '_' + name[location:]
|
||||
location -= 1
|
||||
|
||||
cls.name = name.upper()
|
||||
cls.pdu_id = key_with_value(SDP_PDU_NAMES, cls.name)
|
||||
if cls.pdu_id is None:
|
||||
raise KeyError(f'PDU name {cls.name} not found in SDP_PDU_NAMES')
|
||||
cls.fields = fields
|
||||
|
||||
# Register a factory for this class
|
||||
SDP_PDU.sdp_pdu_classes[cls.pdu_id] = cls
|
||||
|
||||
return cls
|
||||
|
||||
return inner
|
||||
|
||||
def __init__(self, pdu=None, transaction_id=0, **kwargs):
|
||||
if hasattr(self, 'fields') and kwargs:
|
||||
HCI_Object.init_from_fields(self, self.fields, kwargs)
|
||||
if pdu is None:
|
||||
parameters = HCI_Object.dict_to_bytes(kwargs, self.fields)
|
||||
pdu = (
|
||||
struct.pack('>BHH', self.pdu_id, transaction_id, len(parameters))
|
||||
+ parameters
|
||||
)
|
||||
self.pdu = pdu
|
||||
self.transaction_id = transaction_id
|
||||
|
||||
def init_from_bytes(self, pdu, offset):
|
||||
return HCI_Object.init_from_bytes(self, pdu, offset, self.fields)
|
||||
@classmethod
|
||||
def subclass(cls, subclass: type[_PDU]) -> type[_PDU]:
|
||||
subclass.fields = hci.HCI_Object.fields_from_dataclass(subclass)
|
||||
cls.subclasses[subclass.pdu_id] = subclass
|
||||
return subclass
|
||||
|
||||
def __bytes__(self):
|
||||
return self.pdu
|
||||
if self._payload is None:
|
||||
parameters = hci.HCI_Object.dict_to_bytes(self.__dict__, self.fields)
|
||||
self._payload = (
|
||||
struct.pack('>BHH', self.pdu_id, self.transaction_id, len(parameters))
|
||||
+ parameters
|
||||
)
|
||||
return self._payload
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
return self.pdu_id.name
|
||||
|
||||
def __str__(self):
|
||||
result = f'{color(self.name, "blue")} [TID={self.transaction_id}]'
|
||||
if fields := getattr(self, 'fields', None):
|
||||
result += ':\n' + HCI_Object.format_fields(self.__dict__, fields, ' ')
|
||||
result += ':\n' + hci.HCI_Object.format_fields(self.__dict__, fields, ' ')
|
||||
elif len(self.pdu) > 1:
|
||||
result += f': {self.pdu.hex()}'
|
||||
return result
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@SDP_PDU.subclass([('error_code', {'size': 2, 'mapper': SDP_PDU.error_name})])
|
||||
@SDP_PDU.subclass
|
||||
@dataclass
|
||||
class SDP_ErrorResponse(SDP_PDU):
|
||||
'''
|
||||
See Bluetooth spec @ Vol 3, Part B - 4.4.1 SDP_ErrorResponse PDU
|
||||
'''
|
||||
|
||||
error_code: int
|
||||
pdu_id = PduId.SDP_ERROR_RESPONSE
|
||||
|
||||
error_code: ErrorCode = field(metadata=ErrorCode.type_metadata(2))
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@SDP_PDU.subclass(
|
||||
[
|
||||
('service_search_pattern', DataElement.parse_from_bytes),
|
||||
('maximum_service_record_count', '>2'),
|
||||
('continuation_state', '*'),
|
||||
]
|
||||
)
|
||||
@SDP_PDU.subclass
|
||||
@dataclass
|
||||
class SDP_ServiceSearchRequest(SDP_PDU):
|
||||
'''
|
||||
See Bluetooth spec @ Vol 3, Part B - 4.5.1 SDP_ServiceSearchRequest PDU
|
||||
'''
|
||||
|
||||
service_search_pattern: DataElement
|
||||
maximum_service_record_count: int
|
||||
continuation_state: bytes
|
||||
pdu_id = PduId.SDP_SERVICE_SEARCH_REQUEST
|
||||
|
||||
service_search_pattern: DataElement = field(
|
||||
metadata=hci.metadata(DataElement.parse_from_bytes)
|
||||
)
|
||||
maximum_service_record_count: int = field(metadata=hci.metadata('>2'))
|
||||
continuation_state: bytes = field(metadata=hci.metadata('*'))
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@SDP_PDU.subclass(
|
||||
[
|
||||
('total_service_record_count', '>2'),
|
||||
('current_service_record_count', '>2'),
|
||||
(
|
||||
'service_record_handle_list',
|
||||
SDP_PDU.parse_service_record_handle_list_preceded_by_count,
|
||||
),
|
||||
('continuation_state', '*'),
|
||||
]
|
||||
)
|
||||
@SDP_PDU.subclass
|
||||
@dataclass
|
||||
class SDP_ServiceSearchResponse(SDP_PDU):
|
||||
'''
|
||||
See Bluetooth spec @ Vol 3, Part B - 4.5.2 SDP_ServiceSearchResponse PDU
|
||||
'''
|
||||
|
||||
service_record_handle_list: list[int]
|
||||
total_service_record_count: int
|
||||
current_service_record_count: int
|
||||
continuation_state: bytes
|
||||
pdu_id = PduId.SDP_SERVICE_SEARCH_RESPONSE
|
||||
|
||||
total_service_record_count: int = field(metadata=hci.metadata('>2'))
|
||||
service_record_handle_list: Sequence[int] = field(
|
||||
metadata=_SERVICE_RECORD_HANDLE_LIST_METADATA
|
||||
)
|
||||
continuation_state: bytes = field(metadata=hci.metadata('*'))
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@SDP_PDU.subclass(
|
||||
[
|
||||
('service_record_handle', '>4'),
|
||||
('maximum_attribute_byte_count', '>2'),
|
||||
('attribute_id_list', DataElement.parse_from_bytes),
|
||||
('continuation_state', '*'),
|
||||
]
|
||||
)
|
||||
@SDP_PDU.subclass
|
||||
@dataclass
|
||||
class SDP_ServiceAttributeRequest(SDP_PDU):
|
||||
'''
|
||||
See Bluetooth spec @ Vol 3, Part B - 4.6.1 SDP_ServiceAttributeRequest PDU
|
||||
'''
|
||||
|
||||
service_record_handle: int
|
||||
maximum_attribute_byte_count: int
|
||||
attribute_id_list: DataElement
|
||||
continuation_state: bytes
|
||||
pdu_id = PduId.SDP_SERVICE_ATTRIBUTE_REQUEST
|
||||
|
||||
service_record_handle: int = field(metadata=hci.metadata('>4'))
|
||||
maximum_attribute_byte_count: int = field(metadata=hci.metadata('>2'))
|
||||
attribute_id_list: DataElement = field(
|
||||
metadata=hci.metadata(DataElement.parse_from_bytes)
|
||||
)
|
||||
continuation_state: bytes = field(metadata=hci.metadata('*'))
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@SDP_PDU.subclass(
|
||||
[
|
||||
('attribute_list_byte_count', '>2'),
|
||||
('attribute_list', SDP_PDU.parse_bytes_preceded_by_length),
|
||||
('continuation_state', '*'),
|
||||
]
|
||||
)
|
||||
@SDP_PDU.subclass
|
||||
@dataclass
|
||||
class SDP_ServiceAttributeResponse(SDP_PDU):
|
||||
'''
|
||||
See Bluetooth spec @ Vol 3, Part B - 4.6.2 SDP_ServiceAttributeResponse PDU
|
||||
'''
|
||||
|
||||
attribute_list_byte_count: int
|
||||
attribute_list: bytes
|
||||
continuation_state: bytes
|
||||
pdu_id = PduId.SDP_SERVICE_ATTRIBUTE_RESPONSE
|
||||
|
||||
attribute_list: bytes = field(metadata=_BYTES_PRECEDED_BY_LENGTH_METADATA)
|
||||
continuation_state: bytes = field(metadata=hci.metadata('*'))
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@SDP_PDU.subclass(
|
||||
[
|
||||
('service_search_pattern', DataElement.parse_from_bytes),
|
||||
('maximum_attribute_byte_count', '>2'),
|
||||
('attribute_id_list', DataElement.parse_from_bytes),
|
||||
('continuation_state', '*'),
|
||||
]
|
||||
)
|
||||
@SDP_PDU.subclass
|
||||
@dataclass
|
||||
class SDP_ServiceSearchAttributeRequest(SDP_PDU):
|
||||
'''
|
||||
See Bluetooth spec @ Vol 3, Part B - 4.7.1 SDP_ServiceSearchAttributeRequest PDU
|
||||
'''
|
||||
|
||||
service_search_pattern: DataElement
|
||||
maximum_attribute_byte_count: int
|
||||
attribute_id_list: DataElement
|
||||
continuation_state: bytes
|
||||
pdu_id = PduId.SDP_SERVICE_SEARCH_ATTRIBUTE_REQUEST
|
||||
|
||||
service_search_pattern: DataElement = field(
|
||||
metadata=hci.metadata(DataElement.parse_from_bytes)
|
||||
)
|
||||
maximum_attribute_byte_count: int = field(metadata=hci.metadata('>2'))
|
||||
attribute_id_list: DataElement = field(
|
||||
metadata=hci.metadata(DataElement.parse_from_bytes)
|
||||
)
|
||||
continuation_state: bytes = field(metadata=hci.metadata('*'))
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@SDP_PDU.subclass(
|
||||
[
|
||||
('attribute_lists_byte_count', '>2'),
|
||||
('attribute_lists', SDP_PDU.parse_bytes_preceded_by_length),
|
||||
('continuation_state', '*'),
|
||||
]
|
||||
)
|
||||
@SDP_PDU.subclass
|
||||
@dataclass
|
||||
class SDP_ServiceSearchAttributeResponse(SDP_PDU):
|
||||
'''
|
||||
See Bluetooth spec @ Vol 3, Part B - 4.7.2 SDP_ServiceSearchAttributeResponse PDU
|
||||
'''
|
||||
|
||||
attribute_lists_byte_count: int
|
||||
attribute_lists: bytes
|
||||
continuation_state: bytes
|
||||
pdu_id = PduId.SDP_SERVICE_SEARCH_ATTRIBUTE_RESPONSE
|
||||
|
||||
attribute_lists: bytes = field(metadata=_BYTES_PRECEDED_BY_LENGTH_METADATA)
|
||||
continuation_state: bytes = field(metadata=hci.metadata('*'))
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@@ -873,7 +850,7 @@ class Client:
|
||||
)
|
||||
|
||||
# Request and accumulate until there's no more continuation
|
||||
service_record_handle_list = []
|
||||
service_record_handle_list: list[int] = []
|
||||
continuation_state = bytes([0])
|
||||
watchdog = SDP_CONTINUATION_WATCHDOG
|
||||
while watchdog > 0:
|
||||
@@ -1091,7 +1068,7 @@ class Server:
|
||||
logger.exception(color('failed to parse SDP Request PDU', 'red'))
|
||||
self.send_response(
|
||||
SDP_ErrorResponse(
|
||||
transaction_id=0, error_code=SDP_INVALID_REQUEST_SYNTAX_ERROR
|
||||
transaction_id=0, error_code=ErrorCode.INVALID_REQUEST_SYNTAX
|
||||
)
|
||||
)
|
||||
|
||||
@@ -1108,7 +1085,7 @@ class Server:
|
||||
self.send_response(
|
||||
SDP_ErrorResponse(
|
||||
transaction_id=sdp_pdu.transaction_id,
|
||||
error_code=SDP_INSUFFICIENT_RESOURCES_TO_SATISFY_REQUEST_ERROR,
|
||||
error_code=ErrorCode.INSUFFICIENT_RESOURCES_TO_SATISFY_REQUEST,
|
||||
)
|
||||
)
|
||||
else:
|
||||
@@ -1116,7 +1093,7 @@ class Server:
|
||||
self.send_response(
|
||||
SDP_ErrorResponse(
|
||||
transaction_id=sdp_pdu.transaction_id,
|
||||
error_code=SDP_INVALID_REQUEST_SYNTAX_ERROR,
|
||||
error_code=ErrorCode.INVALID_REQUEST_SYNTAX,
|
||||
)
|
||||
)
|
||||
|
||||
@@ -1134,7 +1111,7 @@ class Server:
|
||||
self.send_response(
|
||||
SDP_ErrorResponse(
|
||||
transaction_id=transaction_id,
|
||||
error_code=SDP_INVALID_CONTINUATION_STATE_ERROR,
|
||||
error_code=ErrorCode.INVALID_CONTINUATION_STATE,
|
||||
)
|
||||
)
|
||||
return None
|
||||
@@ -1228,15 +1205,11 @@ class Server:
|
||||
if service_record_handles_remaining
|
||||
else bytes([0])
|
||||
)
|
||||
service_record_handle_list = b''.join(
|
||||
[struct.pack('>I', handle) for handle in service_record_handles]
|
||||
)
|
||||
self.send_response(
|
||||
SDP_ServiceSearchResponse(
|
||||
transaction_id=request.transaction_id,
|
||||
total_service_record_count=total_service_record_count,
|
||||
current_service_record_count=len(service_record_handles),
|
||||
service_record_handle_list=service_record_handle_list,
|
||||
service_record_handle_list=service_record_handles,
|
||||
continuation_state=continuation_state,
|
||||
)
|
||||
)
|
||||
@@ -1259,7 +1232,7 @@ class Server:
|
||||
self.send_response(
|
||||
SDP_ErrorResponse(
|
||||
transaction_id=request.transaction_id,
|
||||
error_code=SDP_INVALID_SERVICE_RECORD_HANDLE_ERROR,
|
||||
error_code=ErrorCode.INVALID_SERVICE_RECORD_HANDLE,
|
||||
)
|
||||
)
|
||||
return
|
||||
@@ -1284,7 +1257,6 @@ class Server:
|
||||
self.send_response(
|
||||
SDP_ServiceAttributeResponse(
|
||||
transaction_id=request.transaction_id,
|
||||
attribute_list_byte_count=len(attribute_list_response),
|
||||
attribute_list=attribute_list_response,
|
||||
continuation_state=continuation_state,
|
||||
)
|
||||
@@ -1331,7 +1303,6 @@ class Server:
|
||||
self.send_response(
|
||||
SDP_ServiceSearchAttributeResponse(
|
||||
transaction_id=request.transaction_id,
|
||||
attribute_lists_byte_count=len(attribute_lists_response),
|
||||
attribute_lists=attribute_lists_response,
|
||||
continuation_state=continuation_state,
|
||||
)
|
||||
|
||||
@@ -36,6 +36,7 @@ from bumble.colors import color
|
||||
from bumble.core import (
|
||||
AdvertisingData,
|
||||
InvalidArgumentError,
|
||||
InvalidPacketError,
|
||||
PhysicalTransport,
|
||||
ProtocolError,
|
||||
)
|
||||
@@ -178,6 +179,16 @@ class AuthReq(hci.SpecableFlag):
|
||||
SMP_CTKD_H7_LEBR_SALT = bytes.fromhex('000000000000000000000000746D7031')
|
||||
SMP_CTKD_H7_BRLE_SALT = bytes.fromhex('000000000000000000000000746D7032')
|
||||
|
||||
# Diffie-Hellman private / public key pair in Debug Mode (Core - Vol. 3, Part H)
|
||||
SMP_DEBUG_KEY_PRIVATE = bytes.fromhex(
|
||||
'3f49f6d4 a3c55f38 74c9b3e3 d2103f50 4aff607b eb40b799 5899b8a6 cd3c1abd'
|
||||
)
|
||||
SMP_DEBUG_KEY_PUBLIC_X = bytes.fromhex(
|
||||
'20b003d2 f297be2c 5e2c83a7 e9f9a5b9 eff49111 acf4fddb cc030148 0e359de6'
|
||||
)
|
||||
SMP_DEBUG_KEY_PUBLIC_Y= bytes.fromhex(
|
||||
'dc809c49 652aeb6d 63329abf 5a52155c 766345c2 8fed3024 741c8ed0 1589d28b'
|
||||
)
|
||||
# fmt: on
|
||||
# pylint: enable=line-too-long
|
||||
# pylint: disable=invalid-name
|
||||
@@ -205,6 +216,8 @@ class SMP_Command:
|
||||
|
||||
@classmethod
|
||||
def from_bytes(cls, pdu: bytes) -> SMP_Command:
|
||||
if not pdu:
|
||||
raise InvalidPacketError("Empty SMP PDU")
|
||||
code = CommandCode(pdu[0])
|
||||
|
||||
subclass = SMP_Command.smp_classes.get(code)
|
||||
@@ -1919,6 +1932,7 @@ class Manager(utils.EventEmitter):
|
||||
self._ecc_key = None
|
||||
self.pairing_config_factory = pairing_config_factory
|
||||
self.session_proxy = Session
|
||||
self.debug_mode = False
|
||||
|
||||
def send_command(self, connection: Connection, command: SMP_Command) -> None:
|
||||
logger.debug(
|
||||
@@ -1965,6 +1979,13 @@ class Manager(utils.EventEmitter):
|
||||
|
||||
@property
|
||||
def ecc_key(self) -> crypto.EccKey:
|
||||
if self.debug_mode:
|
||||
# Core - Vol 3, Part H:
|
||||
# When the Security Manager is placed in a Debug mode it shall use the
|
||||
# following Diffie-Hellman private / public key pair:
|
||||
debug_key = crypto.EccKey.from_private_key_bytes(SMP_DEBUG_KEY_PRIVATE)
|
||||
return debug_key
|
||||
|
||||
if self._ecc_key is None:
|
||||
self._ecc_key = crypto.EccKey.generate()
|
||||
assert self._ecc_key
|
||||
|
||||
@@ -83,6 +83,7 @@ async def main() -> None:
|
||||
GATT_DEVICE_INFORMATION_SERVICE, [manufacturer_name_characteristic]
|
||||
)
|
||||
server_device.add_service(device_info_service)
|
||||
await server_device.start_advertising()
|
||||
|
||||
# Connect the client to the server
|
||||
connection = await client_device.connect(server_device.random_address)
|
||||
|
||||
@@ -13,13 +13,12 @@ authors = [{ name = "Google", email = "bumble-dev@google.com" }]
|
||||
requires-python = ">=3.10"
|
||||
dependencies = [
|
||||
"aiohttp ~= 3.8; platform_system!='Emscripten'",
|
||||
"appdirs >= 1.4; platform_system!='Emscripten'",
|
||||
"click >= 8.1.3; platform_system!='Emscripten'",
|
||||
"cryptography >= 44.0.3; platform_system!='Emscripten' and platform_system!='Android'",
|
||||
# Pyodide bundles a version of cryptography that is built for wasm, which may not match the
|
||||
# versions available on PyPI. Relax the version requirement since it's better than being
|
||||
# completely unable to import the package in case of version mismatch.
|
||||
"cryptography >= 44.0.3; platform_system=='Emscripten'",
|
||||
"cryptography >= 39.0.0; platform_system=='Emscripten'",
|
||||
# Android wheels for cryptography are not yet available on PyPI, so chaquopy uses
|
||||
# the builds from https://chaquo.com/pypi-13.1/cryptography/. But these are not regually
|
||||
# updated. Relax the version requirement since it's better than being completely unable
|
||||
|
||||
@@ -73,6 +73,14 @@ def test_uuid_to_hex_str() -> None:
|
||||
)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
def test_uuid_hash() -> None:
|
||||
uuid = UUID("1234")
|
||||
uuid_128_bytes = UUID.from_bytes(uuid.to_bytes(force_128=True))
|
||||
assert uuid in {uuid_128_bytes}
|
||||
assert uuid_128_bytes in {uuid}
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
def test_appearance() -> None:
|
||||
a = Appearance(Appearance.Category.COMPUTER, Appearance.ComputerSubcategory.LAPTOP)
|
||||
|
||||
@@ -826,6 +826,22 @@ async def test_remote_name_request():
|
||||
assert actual_name == expected_name
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_remote_classic_features():
|
||||
devices = TwoDevices()
|
||||
devices[0].classic_enabled = True
|
||||
devices[1].classic_enabled = True
|
||||
await devices[0].power_on()
|
||||
await devices[1].power_on()
|
||||
connection = await devices[0].connect_classic(devices[1].public_address)
|
||||
|
||||
assert (
|
||||
await asyncio.wait_for(connection.get_remote_classic_features(), _TIMEOUT)
|
||||
== devices.controllers[1].lmp_features
|
||||
)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def run_test_device():
|
||||
await test_device_connect_parallel()
|
||||
|
||||
@@ -22,6 +22,7 @@ import unittest.mock
|
||||
|
||||
import pytest
|
||||
|
||||
from bumble import controller, hci
|
||||
from bumble.controller import Controller
|
||||
from bumble.hci import (
|
||||
HCI_AclDataPacket,
|
||||
@@ -49,34 +50,27 @@ logger = logging.getLogger(__name__)
|
||||
# -----------------------------------------------------------------------------
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize(
|
||||
'supported_commands, lmp_features',
|
||||
'supported_commands, max_lmp_features_page_number',
|
||||
[
|
||||
(
|
||||
# Default commands
|
||||
'2000800000c000000000e4000000a822000000000000040000f7ffff7f000000'
|
||||
'30f0f9ff01008004000000000000000000000000000000000000000000000000',
|
||||
# Only LE LMP feature
|
||||
'0000000060000000',
|
||||
),
|
||||
(controller.Controller.supported_commands, 0),
|
||||
(
|
||||
# All commands
|
||||
'ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff'
|
||||
'ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff',
|
||||
set(hci.HCI_Command.command_names.keys()),
|
||||
# 3 pages of LMP features
|
||||
'000102030405060708090A0B0C0D0E0F011112131415161718191A1B1C1D1E1F',
|
||||
2,
|
||||
),
|
||||
],
|
||||
)
|
||||
async def test_reset(supported_commands: str, lmp_features: str):
|
||||
async def test_reset(supported_commands: set[int], max_lmp_features_page_number: int):
|
||||
controller = Controller('C')
|
||||
controller.supported_commands = bytes.fromhex(supported_commands)
|
||||
controller.lmp_features = bytes.fromhex(lmp_features)
|
||||
controller.supported_commands = supported_commands
|
||||
controller.lmp_features_max_page_number = max_lmp_features_page_number
|
||||
host = Host(controller, AsyncPipeSink(controller))
|
||||
|
||||
await host.reset()
|
||||
|
||||
assert host.local_lmp_features == int.from_bytes(
|
||||
bytes.fromhex(lmp_features), 'little'
|
||||
assert host.local_lmp_features == (
|
||||
controller.lmp_features & ~(1 << (64 * max_lmp_features_page_number + 1))
|
||||
)
|
||||
|
||||
|
||||
@@ -177,14 +171,15 @@ class Source:
|
||||
|
||||
|
||||
class Sink:
|
||||
response: HCI_Event
|
||||
response: HCI_Event | None
|
||||
|
||||
def __init__(self, source: Source, response: HCI_Event) -> None:
|
||||
def __init__(self, source: Source, response: HCI_Event | None) -> None:
|
||||
self.source = source
|
||||
self.response = response
|
||||
|
||||
def on_packet(self, packet: bytes) -> None:
|
||||
self.source.sink.on_packet(bytes(self.response))
|
||||
if self.response is not None:
|
||||
self.source.sink.on_packet(bytes(self.response))
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@@ -234,6 +229,23 @@ async def test_send_sync_command() -> None:
|
||||
assert isinstance(response3.return_parameters, HCI_GenericReturnParameters)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_send_sync_command_timeout() -> None:
|
||||
source = Source()
|
||||
sink = Sink(source, None)
|
||||
|
||||
host = Host(source, sink)
|
||||
host.ready = True
|
||||
|
||||
with pytest.raises(asyncio.TimeoutError):
|
||||
await host.send_sync_command(HCI_Reset_Command(), response_timeout=0.01)
|
||||
|
||||
# The sending semaphore should have been released, so this should not block
|
||||
# indefinitely
|
||||
with pytest.raises(asyncio.TimeoutError):
|
||||
await host.send_sync_command(hci.HCI_Reset_Command(), response_timeout=0.01)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_send_async_command() -> None:
|
||||
source = Source()
|
||||
|
||||
@@ -21,6 +21,7 @@ import logging
|
||||
import os
|
||||
import pathlib
|
||||
import tempfile
|
||||
from unittest import mock
|
||||
|
||||
import pytest
|
||||
|
||||
@@ -179,11 +180,55 @@ async def test_default_namespace(temporary_file):
|
||||
assert keys.irk.value == bytes.fromhex('e7b2543b206e4e46b44f9e51dad22bd1')
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@pytest.mark.asyncio
|
||||
async def test_no_filename(tmp_path):
|
||||
import platformdirs
|
||||
|
||||
with mock.patch.object(platformdirs, 'user_data_path', return_value=tmp_path):
|
||||
# Case 1: no namespace, no filename
|
||||
keystore = JsonKeyStore(None, None)
|
||||
expected_directory = tmp_path / 'Pairing'
|
||||
expected_filename = expected_directory / 'keys.json'
|
||||
assert keystore.directory_name == expected_directory
|
||||
assert keystore.filename == expected_filename
|
||||
|
||||
# Save some data
|
||||
keys = PairingKeys()
|
||||
ltk = bytes([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15])
|
||||
keys.ltk = PairingKeys.Key(ltk)
|
||||
await keystore.update('foo', keys)
|
||||
assert expected_filename.exists()
|
||||
|
||||
# Load back
|
||||
keystore2 = JsonKeyStore(None, None)
|
||||
foo = await keystore2.get('foo')
|
||||
assert foo is not None
|
||||
assert foo.ltk.value == ltk
|
||||
|
||||
# Case 2: namespace, no filename
|
||||
keystore3 = JsonKeyStore('my:namespace', None)
|
||||
# safe_name = 'my-namespace' (lower is already 'my:namespace', then replace ':' with '-')
|
||||
expected_filename3 = expected_directory / 'my-namespace.json'
|
||||
assert keystore3.filename == expected_filename3
|
||||
|
||||
# Save some data
|
||||
await keystore3.update('bar', keys)
|
||||
assert expected_filename3.exists()
|
||||
|
||||
# Load back
|
||||
keystore4 = JsonKeyStore('my:namespace', None)
|
||||
bar = await keystore4.get('bar')
|
||||
assert bar is not None
|
||||
assert bar.ltk.value == ltk
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def run_tests():
|
||||
await test_basic()
|
||||
await test_parsing()
|
||||
await test_default_namespace()
|
||||
await test_no_filename()
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
@@ -18,9 +18,11 @@
|
||||
import asyncio
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
|
||||
import pytest
|
||||
|
||||
from bumble import sdp
|
||||
from bumble.core import BT_L2CAP_PROTOCOL_ID, UUID
|
||||
from bumble.sdp import (
|
||||
SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID,
|
||||
@@ -206,6 +208,16 @@ def sdp_records(record_count=1):
|
||||
}
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
def test_pdu_parameter_length(caplog) -> None:
|
||||
caplog.set_level(logging.WARNING)
|
||||
pdu = sdp.SDP_ErrorResponse(
|
||||
transaction_id=0, error_code=sdp.ErrorCode.INVALID_SDP_VERSION
|
||||
)
|
||||
assert sdp.SDP_PDU.from_bytes(bytes(pdu)) == pdu
|
||||
assert not re.search("Expect \d+ bytes, got \d+", caplog.text)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@pytest.mark.asyncio
|
||||
async def test_service_search():
|
||||
|
||||
@@ -24,7 +24,7 @@ import pytest
|
||||
from bumble import crypto, pairing, smp
|
||||
from bumble.core import AdvertisingData
|
||||
from bumble.crypto import EccKey, aes_cmac, ah, c1, f4, f5, f6, g2, h6, h7, s1
|
||||
from bumble.device import Device
|
||||
from bumble.device import Device, DeviceConfiguration
|
||||
from bumble.hci import Address
|
||||
from bumble.pairing import LeRole, OobData, OobSharedData
|
||||
|
||||
@@ -312,3 +312,17 @@ async def test_send_identity_address_command(
|
||||
actual_command = mock_method.call_args.args[0]
|
||||
assert actual_command.addr_type == expected_identity_address.address_type
|
||||
assert actual_command.bd_addr == expected_identity_address
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_smp_debug_mode():
|
||||
config = DeviceConfiguration(smp_debug_mode=True)
|
||||
device = Device(config=config)
|
||||
|
||||
assert device.smp_manager.ecc_key.x == smp.SMP_DEBUG_KEY_PUBLIC_X
|
||||
assert device.smp_manager.ecc_key.y == smp.SMP_DEBUG_KEY_PUBLIC_Y
|
||||
|
||||
device.smp_manager.debug_mode = False
|
||||
|
||||
assert not device.smp_manager.ecc_key.x == smp.SMP_DEBUG_KEY_PUBLIC_X
|
||||
assert not device.smp_manager.ecc_key.y == smp.SMP_DEBUG_KEY_PUBLIC_Y
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
<head>
|
||||
<link rel="stylesheet" href="https://fonts.googleapis.com/icon?family=Material+Icons">
|
||||
<link rel="stylesheet" href="https://fonts.googleapis.com/css2?family=Material+Symbols+Outlined:opsz,wght,FILL,GRAD@24,400,0,0" />
|
||||
<script src="https://cdn.jsdelivr.net/pyodide/v0.24.1/full/pyodide.js"></script>
|
||||
<script src="https://cdn.jsdelivr.net/pyodide/v0.26.2/full/pyodide.js"></script>
|
||||
<script type="module" src="../ui.js"></script>
|
||||
<script type="module" src="heart_rate_monitor.js"></script>
|
||||
<style>
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
<head>
|
||||
<link rel="stylesheet" href="https://fonts.googleapis.com/icon?family=Material+Icons">
|
||||
<link rel="stylesheet" href="scanner.css">
|
||||
<script src="https://cdn.jsdelivr.net/pyodide/v0.24.1/full/pyodide.js"></script>
|
||||
<script src="https://cdn.jsdelivr.net/pyodide/v0.26.2/full/pyodide.js"></script>
|
||||
<script type="module" src="../ui.js"></script>
|
||||
<script type="module" src="scanner.js"></script>
|
||||
</style>
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
<title>Bumble Speaker</title>
|
||||
<link rel="stylesheet" href="https://fonts.googleapis.com/icon?family=Material+Icons">
|
||||
<link rel="stylesheet" href="speaker.css">
|
||||
<script src="https://cdn.jsdelivr.net/pyodide/v0.24.1/full/pyodide.js"></script>
|
||||
<script src="https://cdn.jsdelivr.net/pyodide/v0.26.2/full/pyodide.js"></script>
|
||||
<script type="module" src="speaker.js"></script>
|
||||
<script type="module" src="../ui.js"></script>
|
||||
</head>
|
||||
|
||||
Reference in New Issue
Block a user