Compare commits

...

34 Commits

Author SHA1 Message Date
Josh Wu
27d02ef18d Merge pull request #913 from zxzxwu/sdp
SDP: Fix wrong parameter size
2026-04-20 16:32:37 +08:00
Josh Wu
c0725e2a4a SDP: Fix wrong parameter size 2026-04-20 16:23:19 +08:00
Josh Wu
bf0784dde4 Merge pull request #912 from ibondarenko1/fix/empty-pdu-crash
fix: add input validation to prevent remote crash from empty/malforme…
2026-04-20 14:36:48 +08:00
Ievgen Bondarenko
444f43f6a3 fix: address review feedback - use InvalidPacketError and abort on buffer overflow
- att.py: raise core.InvalidPacketError instead of generic ValueError
- smp.py: raise core.InvalidPacketError instead of generic ValueError
- hfp.py: add MAX_BUFFER_SIZE class constant (64KB)
- hfp.py: drop incoming data when it would overflow buffer instead of
  truncating, preserving existing partial-packet state

Per review comments on PR #912 by @zxzxwu.
2026-04-16 11:24:09 -07:00
Gilles Boccon-Gibod
2420c47cf1 Merge pull request #911 from google/gbg/issue-910
release command semaphore after timeout
2026-04-16 18:11:57 +02:00
Ievgen Bondarenko
0a78e7506b fix: add input validation to prevent remote crash from empty/malformed PDUs
Add length checks in from_bytes() for ATT and SMP protocol parsers
to prevent IndexError crashes from empty PDUs sent by remote Bluetooth
devices. Also add buffer size limit and UTF-8 error handling in HFP
protocol to prevent memory exhaustion and decode crashes.

- bumble/att.py: validate PDU is non-empty before accessing pdu[0]
- bumble/smp.py: validate PDU is non-empty before accessing pdu[0]
- bumble/hfp.py: limit buffer to 64KB, handle invalid UTF-8 gracefully

These issues can be triggered by a remote Bluetooth device sending
malformed packets, causing denial of service on the host.
2026-04-16 01:43:41 -07:00
Gilles Boccon-Gibod
f7cc6f6657 release command semaphore after timeout 2026-04-15 16:54:54 +02:00
Josh Wu
f2824ee6b8 Merge pull request #907 from zxzxwu/example-gatt-client-and-server
Advertise in run_gatt_client_and_server
2026-04-13 16:31:19 +08:00
Josh Wu
7188ef08de Advertise in run_gatt_client_and_server 2026-04-13 15:31:32 +08:00
Josh Wu
3ded9014d3 Merge pull request #905 from markusjellitsch/feature/debug-keys
Feature  - Add SMP Debug Mode  (Core Vol.3, Part H)
2026-04-09 15:36:42 +08:00
Josh Wu
b6125bdfb1 Merge pull request #904 from zxzxwu/keys
Keys: Remove appdirs and improve typing
2026-04-09 15:30:39 +08:00
Markus Jellitsch
dc17f4f1ca remove asserts 2026-04-08 20:58:47 +02:00
Markus Jellitsch
3f65380c20 remove comment 2026-04-03 23:19:43 +02:00
Markus Jellitsch
25a0056ecc remove uncommented line 2026-04-03 23:08:16 +02:00
Markus Jellitsch
85f6b10983 run formatter 2026-04-03 23:06:24 +02:00
Markus Jellitsch
e85f041e9d add test for smp debug mode 2026-04-03 23:04:48 +02:00
Markus Jellitsch
ee09e6f10d add smp_debug_mode config flag to enable debug keys during device init 2026-04-03 23:03:51 +02:00
Markus Jellitsch
c3daf4a7e1 implement debug mode for smp manager using defined private / public key pair 2026-04-03 23:02:15 +02:00
Josh Wu
3af623be7e Keys: Remove appdirs and improve typing 2026-03-31 16:25:15 +08:00
Gilles Boccon-Gibod
4e76d3057b Merge pull request #903 from sameer/micropip-install-crypto-issue
Fix Hive demo install failure
2026-03-28 15:35:32 -04:00
Sameer Puri
eda7360222 Upgrade pyodide in web fixes import error
Prior to this, these web pages fail to load with
`ImportError: cannot import name 'TypeIs' from 'typing_extensions'
(/lib/python3.11/site-packages/typing_extensions.py)`
2026-03-26 18:39:07 +00:00
Sameer Puri
a4c15c00de Downgrade cryptography, fixes micropip failure
Prior to this, these web pages fail to load with

`ValueError: Can't find a pure Python 3 wheel for 'cryptography>=44.0.3;
platform_system == "Emscripten"'.`
2026-03-26 18:38:12 +00:00
Josh Wu
cba4df4aef Merge pull request #900 from zxzxwu/lmp-feat
Add read classic remote features support
2026-03-24 14:03:29 +08:00
Josh Wu
ceb8b448e9 Merge pull request #901 from zxzxwu/rust
Add --locked to allow installing cargo-all-features
2026-03-21 03:45:47 +08:00
Josh Wu
311b716d5c Add --locked to allow installing cargo-all-features 2026-03-20 18:44:49 +08:00
Josh Wu
0ba9e5c317 Add read classic remote features support 2026-03-20 18:32:52 +08:00
Josh Wu
3517225b62 Merge pull request #898 from zxzxwu/phy
Make ConnectionPHY dataclass
2026-03-13 12:04:45 +08:00
Josh Wu
ad4bb1578b Make ConnectionPHY dataclass 2026-03-11 21:41:48 +08:00
Josh Wu
4af65b381b Merge pull request #820 from zxzxwu/sdp
SDP: Migrate to dataclasses
2026-03-04 13:45:39 +08:00
Josh Wu
a5cd3365ae Merge pull request #895 from zxzxwu/uuid
Hash and cache 128 bytes of UUID
2026-03-04 00:29:43 +08:00
Josh Wu
2915cb8bb6 Add test for UUID hash 2026-03-04 00:22:50 +08:00
Josh Wu
28e485b7b3 Hash and cache 128 bytes of UUID 2026-03-03 17:54:27 +08:00
Josh Wu
1198f2c3f5 SDP: Make PDU dataclasses 2026-03-03 02:07:08 +08:00
Josh Wu
80aaf6a2b9 SDP: Make DataElement and ServiceAttribute dataclasses 2026-03-03 01:28:40 +08:00
22 changed files with 776 additions and 335 deletions

View File

@@ -69,7 +69,7 @@ jobs:
components: clippy,rustfmt components: clippy,rustfmt
toolchain: ${{ matrix.rust-version }} toolchain: ${{ matrix.rust-version }}
- name: Install Rust dependencies - name: Install Rust dependencies
run: cargo install cargo-all-features --version 1.11.0 # allows building/testing combinations of features run: cargo install cargo-all-features --version 1.11.0 --locked # allows building/testing combinations of features
- name: Check License Headers - name: Check License Headers
run: cd rust && cargo run --features dev-tools --bin file-header check-all run: cd rust && cargo run --features dev-tools --bin file-header check-all
- name: Rust Build - name: Rust Build

View File

@@ -42,7 +42,7 @@ from typing_extensions import TypeIs
from bumble import hci, l2cap, utils from bumble import hci, l2cap, utils
from bumble.colors import color from bumble.colors import color
from bumble.core import UUID, InvalidOperationError, ProtocolError from bumble.core import UUID, InvalidOperationError, InvalidPacketError, ProtocolError
from bumble.hci import HCI_Object from bumble.hci import HCI_Object
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -249,6 +249,8 @@ class ATT_PDU:
@classmethod @classmethod
def from_bytes(cls, pdu: bytes) -> ATT_PDU: def from_bytes(cls, pdu: bytes) -> ATT_PDU:
if not pdu:
raise InvalidPacketError("Empty ATT PDU")
op_code = pdu[0] op_code = pdu[0]
subclass = ATT_PDU.pdu_classes.get(op_code) subclass = ATT_PDU.pdu_classes.get(op_code)

View File

@@ -238,9 +238,12 @@ class Controller:
hci_revision: int = 0 hci_revision: int = 0
lmp_version: int = hci.HCI_VERSION_BLUETOOTH_CORE_5_0 lmp_version: int = hci.HCI_VERSION_BLUETOOTH_CORE_5_0
lmp_subversion: int = 0 lmp_subversion: int = 0
lmp_features: bytes = bytes.fromhex( lmp_features: hci.LmpFeatureMask = (
'0000000060000000' hci.LmpFeatureMask.LE_SUPPORTED_CONTROLLER
) # BR/EDR Not Supported, LE Supported (Controller) | hci.LmpFeatureMask.BR_EDR_NOT_SUPPORTED
| hci.LmpFeatureMask.EXTENDED_FEATURES
)
lmp_features_max_page_number: int = 3
manufacturer_company_identifier: int = 0xFFFF manufacturer_company_identifier: int = 0xFFFF
acl_data_packet_length: int = 27 acl_data_packet_length: int = 27
total_num_acl_data_packets: int = 64 total_num_acl_data_packets: int = 64
@@ -250,10 +253,78 @@ class Controller:
total_num_iso_data_packets: int = 64 total_num_iso_data_packets: int = 64
event_mask: int = 0 event_mask: int = 0
event_mask_page_2: int = 0 event_mask_page_2: int = 0
supported_commands: bytes = bytes.fromhex( supported_commands: set[int] = {
'2000800000c000000000e4000000a822000000000000040000f7ffff7f000000' hci.HCI_DISCONNECT_COMMAND,
'30f0f9ff01008004002000000000000000000000000000000000000000000000' hci.HCI_READ_REMOTE_VERSION_INFORMATION_COMMAND,
) hci.HCI_READ_CLOCK_OFFSET_COMMAND,
hci.HCI_READ_LMP_HANDLE_COMMAND,
hci.HCI_SET_EVENT_MASK_COMMAND,
hci.HCI_RESET_COMMAND,
hci.HCI_READ_TRANSMIT_POWER_LEVEL_COMMAND,
hci.HCI_SET_CONTROLLER_TO_HOST_FLOW_CONTROL_COMMAND,
hci.HCI_HOST_BUFFER_SIZE_COMMAND,
hci.HCI_HOST_NUMBER_OF_COMPLETED_PACKETS_COMMAND,
hci.HCI_READ_LOCAL_VERSION_INFORMATION_COMMAND,
hci.HCI_READ_LOCAL_SUPPORTED_FEATURES_COMMAND,
hci.HCI_READ_BUFFER_SIZE_COMMAND,
hci.HCI_READ_BD_ADDR_COMMAND,
hci.HCI_READ_RSSI_COMMAND,
hci.HCI_SET_EVENT_MASK_PAGE_2_COMMAND,
hci.HCI_LE_SET_EVENT_MASK_COMMAND,
hci.HCI_LE_READ_BUFFER_SIZE_COMMAND,
hci.HCI_LE_READ_LOCAL_SUPPORTED_FEATURES_COMMAND,
hci.HCI_LE_SET_RANDOM_ADDRESS_COMMAND,
hci.HCI_LE_SET_ADVERTISING_PARAMETERS_COMMAND,
hci.HCI_LE_READ_ADVERTISING_PHYSICAL_CHANNEL_TX_POWER_COMMAND,
hci.HCI_LE_SET_ADVERTISING_DATA_COMMAND,
hci.HCI_LE_SET_SCAN_RESPONSE_DATA_COMMAND,
hci.HCI_LE_SET_ADVERTISING_ENABLE_COMMAND,
hci.HCI_LE_SET_SCAN_PARAMETERS_COMMAND,
hci.HCI_LE_SET_SCAN_ENABLE_COMMAND,
hci.HCI_LE_CREATE_CONNECTION_COMMAND,
hci.HCI_LE_CREATE_CONNECTION_CANCEL_COMMAND,
hci.HCI_LE_READ_FILTER_ACCEPT_LIST_SIZE_COMMAND,
hci.HCI_LE_CLEAR_FILTER_ACCEPT_LIST_COMMAND,
hci.HCI_LE_ADD_DEVICE_TO_FILTER_ACCEPT_LIST_COMMAND,
hci.HCI_LE_REMOVE_DEVICE_FROM_FILTER_ACCEPT_LIST_COMMAND,
hci.HCI_LE_CONNECTION_UPDATE_COMMAND,
hci.HCI_LE_SET_HOST_CHANNEL_CLASSIFICATION_COMMAND,
hci.HCI_LE_READ_CHANNEL_MAP_COMMAND,
hci.HCI_LE_READ_REMOTE_FEATURES_COMMAND,
hci.HCI_LE_ENCRYPT_COMMAND,
hci.HCI_LE_RAND_COMMAND,
hci.HCI_LE_ENABLE_ENCRYPTION_COMMAND,
hci.HCI_LE_LONG_TERM_KEY_REQUEST_REPLY_COMMAND,
hci.HCI_LE_LONG_TERM_KEY_REQUEST_NEGATIVE_REPLY_COMMAND,
hci.HCI_LE_READ_SUPPORTED_STATES_COMMAND,
hci.HCI_LE_RECEIVER_TEST_COMMAND,
hci.HCI_LE_TRANSMITTER_TEST_COMMAND,
hci.HCI_LE_TEST_END_COMMAND,
hci.HCI_READ_AUTHENTICATED_PAYLOAD_TIMEOUT_COMMAND,
hci.HCI_WRITE_AUTHENTICATED_PAYLOAD_TIMEOUT_COMMAND,
hci.HCI_LE_REMOTE_CONNECTION_PARAMETER_REQUEST_REPLY_COMMAND,
hci.HCI_LE_REMOTE_CONNECTION_PARAMETER_REQUEST_NEGATIVE_REPLY_COMMAND,
hci.HCI_LE_SET_DATA_LENGTH_COMMAND,
hci.HCI_LE_READ_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND,
hci.HCI_LE_WRITE_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND,
hci.HCI_LE_ADD_DEVICE_TO_RESOLVING_LIST_COMMAND,
hci.HCI_LE_REMOVE_DEVICE_FROM_RESOLVING_LIST_COMMAND,
hci.HCI_LE_CLEAR_RESOLVING_LIST_COMMAND,
hci.HCI_LE_READ_RESOLVING_LIST_SIZE_COMMAND,
hci.HCI_LE_READ_PEER_RESOLVABLE_ADDRESS_COMMAND,
hci.HCI_LE_READ_LOCAL_RESOLVABLE_ADDRESS_COMMAND,
hci.HCI_LE_SET_ADDRESS_RESOLUTION_ENABLE_COMMAND,
hci.HCI_LE_SET_RESOLVABLE_PRIVATE_ADDRESS_TIMEOUT_COMMAND,
hci.HCI_LE_READ_MAXIMUM_DATA_LENGTH_COMMAND,
hci.HCI_LE_READ_PHY_COMMAND,
hci.HCI_LE_SET_DEFAULT_PHY_COMMAND,
hci.HCI_LE_SET_PHY_COMMAND,
hci.HCI_LE_RECEIVER_TEST_V2_COMMAND,
hci.HCI_LE_TRANSMITTER_TEST_V2_COMMAND,
hci.HCI_LE_READ_TRANSMIT_POWER_COMMAND,
hci.HCI_LE_SET_PRIVACY_MODE_COMMAND,
hci.HCI_LE_READ_BUFFER_SIZE_V2_COMMAND,
}
le_event_mask: int = 0 le_event_mask: int = 0
le_features: hci.LeFeatureMask = ( le_features: hci.LeFeatureMask = (
hci.LeFeatureMask.LE_ENCRYPTION hci.LeFeatureMask.LE_ENCRYPTION
@@ -392,6 +463,12 @@ class Controller:
if self.link: if self.link:
self.link.on_address_changed(self) self.link.on_address_changed(self)
@property
def lmp_features_bytes(self) -> bytes:
return self.lmp_features.to_bytes(
(self.lmp_features_max_page_number + 1) * 8, 'little'
)
# Packet Sink protocol (packets coming from the host via HCI) # Packet Sink protocol (packets coming from the host via HCI)
def on_packet(self, packet: bytes) -> None: def on_packet(self, packet: bytes) -> None:
self.on_hci_packet(hci.HCI_Packet.from_bytes(packet)) self.on_hci_packet(hci.HCI_Packet.from_bytes(packet))
@@ -968,6 +1045,51 @@ class Controller:
packet.name_length, packet.name_length,
packet.name_fregment, packet.name_fregment,
) )
case lmp.LmpFeaturesReq(features):
self.send_lmp_packet(
sender_address,
lmp.LmpFeaturesRes(features=self.lmp_features_bytes[:8]),
)
case lmp.LmpFeaturesRes(features):
if connection := self.classic_connections.get(sender_address):
self.send_hci_packet(
hci.HCI_Read_Remote_Supported_Features_Complete_Event(
status=hci.HCI_ErrorCode.SUCCESS,
connection_handle=connection.handle,
lmp_features=features,
)
)
case lmp.LmpFeaturesReqExt(features_page, features):
# Calculate start/end of page
page_start = features_page * 8
page_end = page_start + 8
features_bytes = self.lmp_features_bytes
if page_start < len(features_bytes):
page_features = features_bytes[page_start:page_end].ljust(
8, b'\x00'
)
else:
page_features = b'\x00' * 8
self.send_lmp_packet(
sender_address,
lmp.LmpFeaturesResExt(
features_page=features_page,
max_features_page=len(features_bytes) // 8 - 1,
features=page_features,
),
)
case lmp.LmpFeaturesResExt(features_page, max_features_page, features):
if connection := self.classic_connections.get(sender_address):
self.send_hci_packet(
hci.HCI_Read_Remote_Extended_Features_Complete_Event(
status=hci.HCI_ErrorCode.SUCCESS,
connection_handle=connection.handle,
page_number=features_page,
maximum_page_number=max_features_page,
extended_lmp_features=features,
)
)
case _: case _:
logger.error("!!! Unhandled packet: %s", packet) logger.error("!!! Unhandled packet: %s", packet)
@@ -1349,6 +1471,53 @@ class Controller:
return None return None
def on_hci_read_remote_supported_features_command(
self, command: hci.HCI_Read_Remote_Supported_Features_Command
) -> None:
'''
See Bluetooth spec Vol 4, Part E - 7.1.20 Read Remote Supported Features command
'''
handle = command.connection_handle
if not (connection := self.find_classic_connection_by_handle(handle)):
self._send_hci_command_status(
hci.HCI_ErrorCode.UNKNOWN_CONNECTION_IDENTIFIER_ERROR, command.op_code
)
return None
self._send_hci_command_status(hci.HCI_COMMAND_STATUS_PENDING, command.op_code)
self.send_lmp_packet(
connection.peer_address,
lmp.LmpFeaturesReq(self.lmp_features_bytes[:8]),
)
return None
def on_hci_read_remote_extended_features_command(
self, command: hci.HCI_Read_Remote_Extended_Features_Command
) -> None:
'''
See Bluetooth spec Vol 4, Part E - 7.1.21 Read Remote Extended Features command
'''
handle = command.connection_handle
if not (connection := self.find_classic_connection_by_handle(handle)):
self._send_hci_command_status(
hci.HCI_ErrorCode.UNKNOWN_CONNECTION_IDENTIFIER_ERROR, command.op_code
)
return None
self._send_hci_command_status(hci.HCI_COMMAND_STATUS_PENDING, command.op_code)
self.send_lmp_packet(
connection.peer_address,
lmp.LmpFeaturesReqExt(
features_page=command.page_number,
features=self.lmp_features_bytes[
command.page_number * 8 : (command.page_number + 1) * 8
],
),
)
return None
def on_hci_enhanced_setup_synchronous_connection_command( def on_hci_enhanced_setup_synchronous_connection_command(
self, command: hci.HCI_Enhanced_Setup_Synchronous_Connection_Command self, command: hci.HCI_Enhanced_Setup_Synchronous_Connection_Command
) -> None: ) -> None:
@@ -1645,11 +1814,15 @@ class Controller:
return hci.HCI_StatusReturnParameters(hci.HCI_ErrorCode.SUCCESS) return hci.HCI_StatusReturnParameters(hci.HCI_ErrorCode.SUCCESS)
def on_hci_write_simple_pairing_mode_command( def on_hci_write_simple_pairing_mode_command(
self, _command: hci.HCI_Write_Simple_Pairing_Mode_Command self, command: hci.HCI_Write_Simple_Pairing_Mode_Command
) -> hci.HCI_StatusReturnParameters: ) -> hci.HCI_StatusReturnParameters:
''' '''
See Bluetooth spec Vol 4, Part E - 7.3.59 Write Simple Pairing Mode Command See Bluetooth spec Vol 4, Part E - 7.3.59 Write Simple Pairing Mode Command
''' '''
if command.simple_pairing_mode:
self.lmp_features |= hci.LmpFeatureMask.SECURE_SIMPLE_PAIRING_HOST_SUPPORT
else:
self.lmp_features &= ~hci.LmpFeatureMask.SECURE_SIMPLE_PAIRING_HOST_SUPPORT
return hci.HCI_StatusReturnParameters(hci.HCI_ErrorCode.SUCCESS) return hci.HCI_StatusReturnParameters(hci.HCI_ErrorCode.SUCCESS)
def on_hci_set_event_mask_page_2_command( def on_hci_set_event_mask_page_2_command(
@@ -1670,16 +1843,23 @@ class Controller:
See Bluetooth spec Vol 4, Part E - 7.3.78 Write LE Host Support Command See Bluetooth spec Vol 4, Part E - 7.3.78 Write LE Host Support Command
''' '''
return hci.HCI_Read_LE_Host_Support_ReturnParameters( return hci.HCI_Read_LE_Host_Support_ReturnParameters(
status=hci.HCI_ErrorCode.SUCCESS, le_supported_host=1, unused=0 status=hci.HCI_ErrorCode.SUCCESS,
le_supported_host=(
1 if self.lmp_features & hci.LmpFeatureMask.LE_SUPPORTED_HOST else 0
),
unused=0,
) )
def on_hci_write_le_host_support_command( def on_hci_write_le_host_support_command(
self, _command: hci.HCI_Write_LE_Host_Support_Command self, command: hci.HCI_Write_LE_Host_Support_Command
) -> hci.HCI_StatusReturnParameters: ) -> hci.HCI_StatusReturnParameters:
''' '''
See Bluetooth spec Vol 4, Part E - 7.3.79 Write LE Host Support Command See Bluetooth spec Vol 4, Part E - 7.3.79 Write LE Host Support Command
''' '''
# TODO / Just ignore for now if command.le_supported_host:
self.lmp_features |= hci.LmpFeatureMask.LE_SUPPORTED_HOST
else:
self.lmp_features &= ~hci.LmpFeatureMask.LE_SUPPORTED_HOST
return hci.HCI_StatusReturnParameters(hci.HCI_ErrorCode.SUCCESS) return hci.HCI_StatusReturnParameters(hci.HCI_ErrorCode.SUCCESS)
def on_hci_write_authenticated_payload_timeout_command( def on_hci_write_authenticated_payload_timeout_command(
@@ -1716,7 +1896,11 @@ class Controller:
See Bluetooth spec Vol 4, Part E - 7.4.2 Read Local Supported Commands Command See Bluetooth spec Vol 4, Part E - 7.4.2 Read Local Supported Commands Command
''' '''
return hci.HCI_Read_Local_Supported_Commands_ReturnParameters( return hci.HCI_Read_Local_Supported_Commands_ReturnParameters(
hci.HCI_ErrorCode.SUCCESS, supported_commands=self.supported_commands hci.HCI_ErrorCode.SUCCESS,
supported_commands=sum(
hci.HCI_SUPPORTED_COMMANDS_MASKS.get(opcode, 0)
for opcode in self.supported_commands
).to_bytes(64, 'little'),
) )
def on_hci_read_local_supported_features_command( def on_hci_read_local_supported_features_command(
@@ -1726,7 +1910,8 @@ class Controller:
See Bluetooth spec Vol 4, Part E - 7.4.3 Read Local Supported Features Command See Bluetooth spec Vol 4, Part E - 7.4.3 Read Local Supported Features Command
''' '''
return hci.HCI_Read_Local_Supported_Features_ReturnParameters( return hci.HCI_Read_Local_Supported_Features_ReturnParameters(
hci.HCI_ErrorCode.SUCCESS, lmp_features=self.lmp_features[:8] hci.HCI_ErrorCode.SUCCESS,
lmp_features=self.lmp_features_bytes[:8],
) )
def on_hci_read_local_extended_features_command( def on_hci_read_local_extended_features_command(
@@ -1735,18 +1920,19 @@ class Controller:
''' '''
See Bluetooth spec Vol 4, Part E - 7.4.4 Read Local Extended Features Command See Bluetooth spec Vol 4, Part E - 7.4.4 Read Local Extended Features Command
''' '''
if command.page_number * 8 > len(self.lmp_features): feature_bytes = self.lmp_features_bytes
if command.page_number * 8 > len(feature_bytes):
return hci.HCI_Read_Local_Extended_Features_ReturnParameters( return hci.HCI_Read_Local_Extended_Features_ReturnParameters(
status=hci.HCI_ErrorCode.INVALID_COMMAND_PARAMETERS_ERROR, status=hci.HCI_ErrorCode.INVALID_COMMAND_PARAMETERS_ERROR,
page_number=command.page_number, page_number=command.page_number,
maximum_page_number=len(self.lmp_features) // 8 - 1, maximum_page_number=len(feature_bytes) // 8 - 1,
extended_lmp_features=bytes(8), extended_lmp_features=bytes(8),
) )
return hci.HCI_Read_Local_Extended_Features_ReturnParameters( return hci.HCI_Read_Local_Extended_Features_ReturnParameters(
status=hci.HCI_ErrorCode.SUCCESS, status=hci.HCI_ErrorCode.SUCCESS,
page_number=command.page_number, page_number=command.page_number,
maximum_page_number=len(self.lmp_features) // 8 - 1, maximum_page_number=len(feature_bytes) // 8 - 1,
extended_lmp_features=self.lmp_features[ extended_lmp_features=feature_bytes[
command.page_number * 8 : (command.page_number + 1) * 8 command.page_number * 8 : (command.page_number + 1) * 8
], ],
) )

View File

@@ -19,6 +19,7 @@ from __future__ import annotations
import dataclasses import dataclasses
import enum import enum
import functools
import struct import struct
from collections.abc import Iterable from collections.abc import Iterable
from typing import ( from typing import (
@@ -273,13 +274,8 @@ class UUID:
def parse_uuid_2(cls, uuid_as_bytes: bytes, offset: int) -> tuple[int, UUID]: def parse_uuid_2(cls, uuid_as_bytes: bytes, offset: int) -> tuple[int, UUID]:
return offset + 2, cls.from_bytes(uuid_as_bytes[offset : offset + 2]) return offset + 2, cls.from_bytes(uuid_as_bytes[offset : offset + 2])
def to_bytes(self, force_128: bool = False) -> bytes: @functools.cached_property
''' def uuid_128_bytes(self) -> bytes:
Serialize UUID in little-endian byte-order
'''
if not force_128:
return self.uuid_bytes
match len(self.uuid_bytes): match len(self.uuid_bytes):
case 2: case 2:
return self.BASE_UUID + self.uuid_bytes + bytes([0, 0]) return self.BASE_UUID + self.uuid_bytes + bytes([0, 0])
@@ -290,6 +286,15 @@ class UUID:
case _: case _:
assert False, "unreachable" assert False, "unreachable"
def to_bytes(self, force_128: bool = False) -> bytes:
'''
Serialize UUID in little-endian byte-order
'''
if not force_128:
return self.uuid_bytes
return self.uuid_128_bytes
def to_pdu_bytes(self) -> bytes: def to_pdu_bytes(self) -> bytes:
''' '''
Convert to bytes for use in an ATT PDU. Convert to bytes for use in an ATT PDU.
@@ -318,7 +323,7 @@ class UUID:
def __eq__(self, other: object) -> bool: def __eq__(self, other: object) -> bool:
if isinstance(other, UUID): if isinstance(other, UUID):
return self.to_bytes(force_128=True) == other.to_bytes(force_128=True) return self.uuid_128_bytes == other.uuid_128_bytes
if isinstance(other, str): if isinstance(other, str):
return UUID(other) == self return UUID(other) == self
@@ -326,7 +331,7 @@ class UUID:
return False return False
def __hash__(self) -> int: def __hash__(self) -> int:
return hash(self.uuid_bytes) return hash(self.uuid_128_bytes)
def __str__(self) -> str: def __str__(self) -> str:
result = self.to_hex_str(separator='-') result = self.to_hex_str(separator='-')
@@ -2111,13 +2116,10 @@ class AdvertisingData:
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# Connection PHY # Connection PHY
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@dataclasses.dataclass
class ConnectionPHY: class ConnectionPHY:
def __init__(self, tx_phy, rx_phy): tx_phy: int
self.tx_phy = tx_phy rx_phy: int
self.rx_phy = rx_phy
def __str__(self):
return f'ConnectionPHY(tx_phy={self.tx_phy}, rx_phy={self.rx_phy})'
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------

View File

@@ -1837,6 +1837,7 @@ class Connection(utils.CompositeEventEmitter):
self.pairing_peer_io_capability = None self.pairing_peer_io_capability = None
self.pairing_peer_authentication_requirements = None self.pairing_peer_authentication_requirements = None
self.peer_le_features = hci.LeFeatureMask(0) self.peer_le_features = hci.LeFeatureMask(0)
self.peer_classic_features = hci.LmpFeatureMask(0)
self.cs_configs = {} self.cs_configs = {}
self.cs_procedures = {} self.cs_procedures = {}
@@ -2054,6 +2055,15 @@ class Connection(utils.CompositeEventEmitter):
self.peer_le_features = await self.device.get_remote_le_features(self) self.peer_le_features = await self.device.get_remote_le_features(self)
return self.peer_le_features return self.peer_le_features
async def get_remote_classic_features(self) -> hci.LmpFeatureMask:
"""[Classic Only] Reads remote LMP supported features.
Returns:
LMP features supported by the remote device.
"""
self.peer_classic_features = await self.device.get_remote_classic_features(self)
return self.peer_classic_features
def on_att_mtu_update(self, mtu: int): def on_att_mtu_update(self, mtu: int):
logger.debug( logger.debug(
f'*** Connection ATT MTU Update: [0x{self.handle:04X}] ' f'*** Connection ATT MTU Update: [0x{self.handle:04X}] '
@@ -2149,6 +2159,7 @@ class DeviceConfiguration:
) )
eatt_enabled: bool = False eatt_enabled: bool = False
gatt_services: list[dict[str, Any]] = field(init=False) gatt_services: list[dict[str, Any]] = field(init=False)
smp_debug_mode: bool = False
def __post_init__(self) -> None: def __post_init__(self) -> None:
self.gatt_services = [] self.gatt_services = []
@@ -2561,6 +2572,7 @@ class Device(utils.CompositeEventEmitter):
), ),
), ),
) )
self.smp_manager.debug_mode = self.config.smp_debug_mode
self.l2cap_channel_manager.register_fixed_channel(smp.SMP_CID, self.on_smp_pdu) self.l2cap_channel_manager.register_fixed_channel(smp.SMP_CID, self.on_smp_pdu)
@@ -5281,6 +5293,77 @@ class Device(utils.CompositeEventEmitter):
) )
return await read_feature_future return await read_feature_future
async def get_remote_classic_features(
self, connection: Connection
) -> hci.LmpFeatureMask:
"""[Classic Only] Reads remote LE supported features.
Args:
handle: connection handle to read LMP features.
Returns:
LMP features supported by the remote device.
"""
with closing(utils.EventWatcher()) as watcher:
read_feature_future: asyncio.Future[tuple[int, int]] = (
asyncio.get_running_loop().create_future()
)
read_features = hci.LmpFeatureMask(0)
current_page_number = 0
@watcher.on(self.host, 'classic_remote_features')
def on_classic_remote_features(
handle: int,
status: int,
features: int,
page_number: int,
max_page_number: int,
) -> None:
if handle != connection.handle:
logger.warning(
"Received classic_remote_features for wrong handle, expected=0x%04X, got=0x%04X",
connection.handle,
handle,
)
return
if page_number != current_page_number:
logger.warning(
"Received classic_remote_features for wrong page, expected=%d, got=%d",
current_page_number,
page_number,
)
return
if status == hci.HCI_ErrorCode.SUCCESS:
read_feature_future.set_result((features, max_page_number))
else:
read_feature_future.set_exception(hci.HCI_Error(status))
await self.send_async_command(
hci.HCI_Read_Remote_Supported_Features_Command(
connection_handle=connection.handle
)
)
new_features, max_page_number = await read_feature_future
read_features |= new_features
if not (read_features & hci.LmpFeatureMask.EXTENDED_FEATURES):
return read_features
while current_page_number <= max_page_number:
read_feature_future = asyncio.get_running_loop().create_future()
await self.send_async_command(
hci.HCI_Read_Remote_Extended_Features_Command(
connection_handle=connection.handle,
page_number=current_page_number,
)
)
new_features, max_page_number = await read_feature_future
read_features |= new_features << (current_page_number * 64)
current_page_number += 1
return read_features
@utils.experimental('Only for testing.') @utils.experimental('Only for testing.')
async def get_remote_cs_capabilities( async def get_remote_cs_capabilities(
self, connection: Connection self, connection: Connection

View File

@@ -68,6 +68,8 @@ class HfpProtocolError(ProtocolError):
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class HfpProtocol: class HfpProtocol:
MAX_BUFFER_SIZE: ClassVar[int] = 65536
dlc: rfcomm.DLC dlc: rfcomm.DLC
buffer: str buffer: str
lines: collections.deque lines: collections.deque
@@ -84,10 +86,19 @@ class HfpProtocol:
def feed(self, data: bytes | str) -> None: def feed(self, data: bytes | str) -> None:
# Convert the data to a string if needed # Convert the data to a string if needed
if isinstance(data, bytes): if isinstance(data, bytes):
data = data.decode('utf-8') data = data.decode('utf-8', errors='replace')
logger.debug(f'<<< Data received: {data}') logger.debug(f'<<< Data received: {data}')
# Drop incoming data if it would overflow the buffer; keep existing
# partial packet state intact so a future clean packet can still parse.
if len(self.buffer) + len(data) > self.MAX_BUFFER_SIZE:
logger.warning(
'HFP buffer overflow (>%d bytes), dropping incoming data',
self.MAX_BUFFER_SIZE,
)
return
# Add to the buffer and look for lines # Add to the buffer and look for lines
self.buffer += data self.buffer += data
while (separator := self.buffer.find('\r')) >= 0: while (separator := self.buffer.find('\r')) >= 0:

View File

@@ -692,10 +692,8 @@ class Host(utils.EventEmitter):
finally: finally:
self.pending_command = None self.pending_command = None
self.pending_response = None self.pending_response = None
if ( if response is None or (
response is not None response.num_hci_command_packets and self.command_semaphore.locked()
and response.num_hci_command_packets
and self.command_semaphore.locked()
): ):
self.command_semaphore.release() self.command_semaphore.release()
@@ -1660,6 +1658,19 @@ class Host(utils.EventEmitter):
'connection_encryption_failure', event.connection_handle, event.status 'connection_encryption_failure', event.connection_handle, event.status
) )
def on_hci_read_remote_supported_features_complete_event(
self, event: hci.HCI_Read_Remote_Supported_Features_Complete_Event
) -> None:
# Notify the client
self.emit(
'classic_remote_features',
event.connection_handle,
event.status,
int.from_bytes(event.lmp_features, 'little'),
0, # page number
0, # max page number
)
def on_hci_encryption_change_v2_event( def on_hci_encryption_change_v2_event(
self, event: hci.HCI_Encryption_Change_V2_Event self, event: hci.HCI_Encryption_Change_V2_Event
): ):
@@ -1816,6 +1827,18 @@ class Host(utils.EventEmitter):
rssi, rssi,
) )
def on_hci_read_remote_extended_features_complete_event(
self, event: hci.HCI_Read_Remote_Extended_Features_Complete_Event
):
self.emit(
'classic_remote_features',
event.connection_handle,
event.status,
int.from_bytes(event.extended_lmp_features, 'little'),
event.page_number,
event.maximum_page_number,
)
def on_hci_extended_inquiry_result_event( def on_hci_extended_inquiry_result_event(
self, event: hci.HCI_Extended_Inquiry_Result_Event self, event: hci.HCI_Extended_Inquiry_Result_Event
): ):

View File

@@ -27,6 +27,7 @@ import dataclasses
import json import json
import logging import logging
import os import os
import pathlib
from typing import TYPE_CHECKING, Any from typing import TYPE_CHECKING, Any
from typing_extensions import Self from typing_extensions import Self
@@ -248,29 +249,26 @@ class JsonKeyStore(KeyStore):
DEFAULT_NAMESPACE = '__DEFAULT__' DEFAULT_NAMESPACE = '__DEFAULT__'
DEFAULT_BASE_NAME = "keys" DEFAULT_BASE_NAME = "keys"
def __init__(self, namespace, filename=None): def __init__(
self.namespace = namespace if namespace is not None else self.DEFAULT_NAMESPACE self, namespace: str | None = None, filename: str | None = None
) -> None:
self.namespace = namespace or self.DEFAULT_NAMESPACE
if filename is None: if filename:
# Use a default for the current user self.filename = pathlib.Path(filename).resolve()
self.directory_name = self.filename.parent
# Import here because this may not exist on all platforms
# pylint: disable=import-outside-toplevel
import appdirs
self.directory_name = os.path.join(
appdirs.user_data_dir(self.APP_NAME, self.APP_AUTHOR), self.KEYS_DIR
)
base_name = self.DEFAULT_BASE_NAME if namespace is None else self.namespace
json_filename = (
f'{base_name}.json'.lower().replace(':', '-').replace('/p', '-p')
)
self.filename = os.path.join(self.directory_name, json_filename)
else: else:
self.filename = filename import platformdirs # Deferred import
self.directory_name = os.path.dirname(os.path.abspath(self.filename))
logger.debug(f'JSON keystore: {self.filename}') base_dir = platformdirs.user_data_path(self.APP_NAME, self.APP_AUTHOR)
self.directory_name = base_dir / self.KEYS_DIR
base_name = self.namespace if namespace else self.DEFAULT_BASE_NAME
safe_name = base_name.lower().replace(':', '-').replace('/', '-')
self.filename = self.directory_name / f"{safe_name}.json"
logger.debug('JSON keystore: %s', self.filename)
@classmethod @classmethod
def from_device( def from_device(
@@ -293,7 +291,9 @@ class JsonKeyStore(KeyStore):
return cls(namespace, filename) return cls(namespace, filename)
async def load(self): async def load(
self,
) -> tuple[dict[str, dict[str, dict[str, Any]]], dict[str, dict[str, Any]]]:
# Try to open the file, without failing. If the file does not exist, it # Try to open the file, without failing. If the file does not exist, it
# will be created upon saving. # will be created upon saving.
try: try:
@@ -312,17 +312,17 @@ class JsonKeyStore(KeyStore):
return next(iter(db.items())) return next(iter(db.items()))
# Finally, just create an empty key map for the namespace # Finally, just create an empty key map for the namespace
key_map = {} key_map: dict[str, dict[str, Any]] = {}
db[self.namespace] = key_map db[self.namespace] = key_map
return (db, key_map) return (db, key_map)
async def save(self, db): async def save(self, db: dict[str, dict[str, dict[str, Any]]]) -> None:
# Create the directory if it doesn't exist # Create the directory if it doesn't exist
if not os.path.exists(self.directory_name): if not self.directory_name.exists():
os.makedirs(self.directory_name, exist_ok=True) self.directory_name.mkdir(parents=True, exist_ok=True)
# Save to a temporary file # Save to a temporary file
temp_filename = self.filename + '.tmp' temp_filename = self.filename.with_name(self.filename.name + ".tmp")
with open(temp_filename, 'w', encoding='utf-8') as output: with open(temp_filename, 'w', encoding='utf-8') as output:
json.dump(db, output, sort_keys=True, indent=4) json.dump(db, output, sort_keys=True, indent=4)
@@ -334,16 +334,16 @@ class JsonKeyStore(KeyStore):
del key_map[name] del key_map[name]
await self.save(db) await self.save(db)
async def update(self, name, keys): async def update(self, name: str, keys: PairingKeys) -> None:
db, key_map = await self.load() db, key_map = await self.load()
key_map.setdefault(name, {}).update(keys.to_dict()) key_map.setdefault(name, {}).update(keys.to_dict())
await self.save(db) await self.save(db)
async def get_all(self): async def get_all(self) -> list[tuple[str, PairingKeys]]:
_, key_map = await self.load() _, key_map = await self.load()
return [(name, PairingKeys.from_dict(keys)) for (name, keys) in key_map.items()] return [(name, PairingKeys.from_dict(keys)) for (name, keys) in key_map.items()]
async def delete_all(self): async def delete_all(self) -> None:
db, key_map = await self.load() db, key_map = await self.load()
key_map.clear() key_map.clear()
await self.save(db) await self.save(db)

View File

@@ -322,3 +322,38 @@ class LmpNameRes(Packet):
name_offset: int = field(metadata=hci.metadata(2)) name_offset: int = field(metadata=hci.metadata(2))
name_length: int = field(metadata=hci.metadata(3)) name_length: int = field(metadata=hci.metadata(3))
name_fregment: bytes = field(metadata=hci.metadata('*')) name_fregment: bytes = field(metadata=hci.metadata('*'))
@Packet.subclass
@dataclass
class LmpFeaturesReq(Packet):
opcode = Opcode.LMP_FEATURES_REQ
features: bytes = field(metadata=hci.metadata(8))
@Packet.subclass
@dataclass
class LmpFeaturesRes(Packet):
opcode = Opcode.LMP_FEATURES_RES
features: bytes = field(metadata=hci.metadata(8))
@Packet.subclass
@dataclass
class LmpFeaturesReqExt(Packet):
opcode = Opcode.LMP_FEATURES_REQ_EXT
features_page: int = field(metadata=hci.metadata(1))
features: bytes = field(metadata=hci.metadata(8))
@Packet.subclass
@dataclass
class LmpFeaturesResExt(Packet):
opcode = Opcode.LMP_FEATURES_RES_EXT
features_page: int = field(metadata=hci.metadata(1))
max_features_page: int = field(metadata=hci.metadata(1))
features: bytes = field(metadata=hci.metadata(8))

View File

@@ -21,11 +21,12 @@ import asyncio
import logging import logging
import struct import struct
from collections.abc import Iterable, Sequence from collections.abc import Iterable, Sequence
from typing import TYPE_CHECKING, NewType from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any, ClassVar, NewType, TypeVar
from typing_extensions import Self from typing_extensions import Self
from bumble import core, l2cap from bumble import core, hci, l2cap, utils
from bumble.colors import color from bumble.colors import color
from bumble.core import ( from bumble.core import (
InvalidArgumentError, InvalidArgumentError,
@@ -33,7 +34,6 @@ from bumble.core import (
InvalidStateError, InvalidStateError,
ProtocolError, ProtocolError,
) )
from bumble.hci import HCI_Object, key_with_value, name_or_number
if TYPE_CHECKING: if TYPE_CHECKING:
from bumble.device import Connection, Device from bumble.device import Connection, Device
@@ -54,39 +54,22 @@ SDP_CONTINUATION_WATCHDOG = 64 # Maximum number of continuations we're willing
SDP_PSM = 0x0001 SDP_PSM = 0x0001
SDP_ERROR_RESPONSE = 0x01 class PduId(hci.SpecableEnum):
SDP_SERVICE_SEARCH_REQUEST = 0x02 SDP_ERROR_RESPONSE = 0x01
SDP_SERVICE_SEARCH_RESPONSE = 0x03 SDP_SERVICE_SEARCH_REQUEST = 0x02
SDP_SERVICE_ATTRIBUTE_REQUEST = 0x04 SDP_SERVICE_SEARCH_RESPONSE = 0x03
SDP_SERVICE_ATTRIBUTE_RESPONSE = 0x05 SDP_SERVICE_ATTRIBUTE_REQUEST = 0x04
SDP_SERVICE_SEARCH_ATTRIBUTE_REQUEST = 0x06 SDP_SERVICE_ATTRIBUTE_RESPONSE = 0x05
SDP_SERVICE_SEARCH_ATTRIBUTE_RESPONSE = 0x07 SDP_SERVICE_SEARCH_ATTRIBUTE_REQUEST = 0x06
SDP_SERVICE_SEARCH_ATTRIBUTE_RESPONSE = 0x07
SDP_PDU_NAMES = { class ErrorCode(hci.SpecableEnum):
SDP_ERROR_RESPONSE: 'SDP_ERROR_RESPONSE', INVALID_SDP_VERSION = 0x0001
SDP_SERVICE_SEARCH_REQUEST: 'SDP_SERVICE_SEARCH_REQUEST', INVALID_SERVICE_RECORD_HANDLE = 0x0002
SDP_SERVICE_SEARCH_RESPONSE: 'SDP_SERVICE_SEARCH_RESPONSE', INVALID_REQUEST_SYNTAX = 0x0003
SDP_SERVICE_ATTRIBUTE_REQUEST: 'SDP_SERVICE_ATTRIBUTE_REQUEST', INVALID_PDU_SIZE = 0x0004
SDP_SERVICE_ATTRIBUTE_RESPONSE: 'SDP_SERVICE_ATTRIBUTE_RESPONSE', INVALID_CONTINUATION_STATE = 0x0005
SDP_SERVICE_SEARCH_ATTRIBUTE_REQUEST: 'SDP_SERVICE_SEARCH_ATTRIBUTE_REQUEST', INSUFFICIENT_RESOURCES_TO_SATISFY_REQUEST = 0x0006
SDP_SERVICE_SEARCH_ATTRIBUTE_RESPONSE: 'SDP_SERVICE_SEARCH_ATTRIBUTE_RESPONSE'
}
SDP_INVALID_SDP_VERSION_ERROR = 0x0001
SDP_INVALID_SERVICE_RECORD_HANDLE_ERROR = 0x0002
SDP_INVALID_REQUEST_SYNTAX_ERROR = 0x0003
SDP_INVALID_PDU_SIZE_ERROR = 0x0004
SDP_INVALID_CONTINUATION_STATE_ERROR = 0x0005
SDP_INSUFFICIENT_RESOURCES_TO_SATISFY_REQUEST_ERROR = 0x0006
SDP_ERROR_NAMES = {
SDP_INVALID_SDP_VERSION_ERROR: 'SDP_INVALID_SDP_VERSION_ERROR',
SDP_INVALID_SERVICE_RECORD_HANDLE_ERROR: 'SDP_INVALID_SERVICE_RECORD_HANDLE_ERROR',
SDP_INVALID_REQUEST_SYNTAX_ERROR: 'SDP_INVALID_REQUEST_SYNTAX_ERROR',
SDP_INVALID_PDU_SIZE_ERROR: 'SDP_INVALID_PDU_SIZE_ERROR',
SDP_INVALID_CONTINUATION_STATE_ERROR: 'SDP_INVALID_CONTINUATION_STATE_ERROR',
SDP_INSUFFICIENT_RESOURCES_TO_SATISFY_REQUEST_ERROR: 'SDP_INSUFFICIENT_RESOURCES_TO_SATISFY_REQUEST_ERROR'
}
SDP_SERVICE_NAME_ATTRIBUTE_ID_OFFSET = 0x0000 SDP_SERVICE_NAME_ATTRIBUTE_ID_OFFSET = 0x0000
SDP_SERVICE_DESCRIPTION_ATTRIBUTE_ID_OFFSET = 0x0001 SDP_SERVICE_DESCRIPTION_ATTRIBUTE_ID_OFFSET = 0x0001
@@ -141,30 +124,31 @@ SDP_ALL_ATTRIBUTES_RANGE = (0x0000, 0xFFFF)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@dataclass
class DataElement: class DataElement:
NIL = 0
UNSIGNED_INTEGER = 1
SIGNED_INTEGER = 2
UUID = 3
TEXT_STRING = 4
BOOLEAN = 5
SEQUENCE = 6
ALTERNATIVE = 7
URL = 8
TYPE_NAMES = { class Type(utils.OpenIntEnum):
NIL: 'NIL', NIL = 0
UNSIGNED_INTEGER: 'UNSIGNED_INTEGER', UNSIGNED_INTEGER = 1
SIGNED_INTEGER: 'SIGNED_INTEGER', SIGNED_INTEGER = 2
UUID: 'UUID', UUID = 3
TEXT_STRING: 'TEXT_STRING', TEXT_STRING = 4
BOOLEAN: 'BOOLEAN', BOOLEAN = 5
SEQUENCE: 'SEQUENCE', SEQUENCE = 6
ALTERNATIVE: 'ALTERNATIVE', ALTERNATIVE = 7
URL: 'URL', URL = 8
}
type_constructors = { NIL = Type.NIL
UNSIGNED_INTEGER = Type.UNSIGNED_INTEGER
SIGNED_INTEGER = Type.SIGNED_INTEGER
UUID = Type.UUID
TEXT_STRING = Type.TEXT_STRING
BOOLEAN = Type.BOOLEAN
SEQUENCE = Type.SEQUENCE
ALTERNATIVE = Type.ALTERNATIVE
URL = Type.URL
TYPE_CONSTRUCTORS = {
NIL: lambda x: DataElement(DataElement.NIL, None), NIL: lambda x: DataElement(DataElement.NIL, None),
UNSIGNED_INTEGER: lambda x, y: DataElement( UNSIGNED_INTEGER: lambda x, y: DataElement(
DataElement.UNSIGNED_INTEGER, DataElement.UNSIGNED_INTEGER,
@@ -190,14 +174,18 @@ class DataElement:
URL: lambda x: DataElement(DataElement.URL, x.decode('utf8')), URL: lambda x: DataElement(DataElement.URL, x.decode('utf8')),
} }
def __init__(self, element_type, value, value_size=None): type: Type
self.type = element_type value: Any
self.value = value value_size: int | None = None
self.value_size = value_size
def __post_init__(self) -> None:
# Used as a cache when parsing from bytes so we can emit a byte-for-byte replica # Used as a cache when parsing from bytes so we can emit a byte-for-byte replica
self.bytes = None self._bytes: bytes | None = None
if element_type in (DataElement.UNSIGNED_INTEGER, DataElement.SIGNED_INTEGER): if self.type in (
if value_size is None: DataElement.UNSIGNED_INTEGER,
DataElement.SIGNED_INTEGER,
):
if self.value_size is None:
raise InvalidArgumentError( raise InvalidArgumentError(
'integer types must have a value size specified' 'integer types must have a value size specified'
) )
@@ -337,7 +325,7 @@ class DataElement:
value_offset = 4 value_offset = 4
value_data = data[1 + value_offset : 1 + value_offset + value_size] value_data = data[1 + value_offset : 1 + value_offset + value_size]
constructor = DataElement.type_constructors.get(element_type) constructor = DataElement.TYPE_CONSTRUCTORS.get(element_type)
if constructor: if constructor:
if element_type in ( if element_type in (
DataElement.UNSIGNED_INTEGER, DataElement.UNSIGNED_INTEGER,
@@ -348,15 +336,15 @@ class DataElement:
result = constructor(value_data) result = constructor(value_data)
else: else:
result = DataElement(element_type, value_data) result = DataElement(element_type, value_data)
result.bytes = data[ result._bytes = data[
: 1 + value_offset + value_size : 1 + value_offset + value_size
] # Keep a copy so we can re-serialize to an exact replica ] # Keep a copy so we can re-serialize to an exact replica
return result return result
def __bytes__(self): def __bytes__(self):
# Return early if we have a cache # Return early if we have a cache
if self.bytes: if self._bytes:
return self.bytes return self._bytes
if self.type == DataElement.NIL: if self.type == DataElement.NIL:
data = b'' data = b''
@@ -443,12 +431,12 @@ class DataElement:
else: else:
raise RuntimeError("internal error - self.type not supported") raise RuntimeError("internal error - self.type not supported")
self.bytes = bytes([self.type << 3 | size_index]) + size_bytes + data self._bytes = bytes([self.type << 3 | size_index]) + size_bytes + data
return self.bytes return self._bytes
def to_string(self, pretty=False, indentation=0): def to_string(self, pretty=False, indentation=0):
prefix = ' ' * indentation prefix = ' ' * indentation
type_name = name_or_number(self.TYPE_NAMES, self.type) type_name = self.type.name
if self.type == DataElement.NIL: if self.type == DataElement.NIL:
value_string = '' value_string = ''
elif self.type in (DataElement.SEQUENCE, DataElement.ALTERNATIVE): elif self.type in (DataElement.SEQUENCE, DataElement.ALTERNATIVE):
@@ -476,10 +464,10 @@ class DataElement:
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@dataclass
class ServiceAttribute: class ServiceAttribute:
def __init__(self, attribute_id: int, value: DataElement) -> None: id: int
self.id = attribute_id value: DataElement
self.value = value
@staticmethod @staticmethod
def list_from_data_elements( def list_from_data_elements(
@@ -510,7 +498,7 @@ class ServiceAttribute:
@staticmethod @staticmethod
def id_name(id_code): def id_name(id_code):
return name_or_number(SDP_ATTRIBUTE_ID_NAMES, id_code) return hci.name_or_number(SDP_ATTRIBUTE_ID_NAMES, id_code)
@staticmethod @staticmethod
def is_uuid_in_value(uuid: core.UUID, value: DataElement) -> bool: def is_uuid_in_value(uuid: core.UUID, value: DataElement) -> bool:
@@ -540,239 +528,228 @@ class ServiceAttribute:
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def _parse_service_record_handle_list(
data: bytes, offset: int
) -> tuple[int, list[int]]:
count = struct.unpack_from('>H', data, offset)[0]
offset += 2
handle_list = [
struct.unpack_from('>I', data, offset + x * 4)[0] for x in range(count)
]
return offset + count * 4, handle_list
def _serialize_service_record_handle_list(
handles: list[int],
) -> bytes:
return struct.pack('>H', len(handles)) + b''.join(
struct.pack('>I', handle) for handle in handles
)
def _parse_bytes_preceded_by_length(data: bytes, offset: int) -> tuple[int, bytes]:
length = struct.unpack_from('>H', data, offset)[0]
offset += 2
return offset + length, data[offset : offset + length]
def _serialize_bytes_preceded_by_length(data: bytes) -> bytes:
return struct.pack('>H', len(data)) + data
_SERVICE_RECORD_HANDLE_LIST_METADATA = hci.metadata(
{
'parser': _parse_service_record_handle_list,
'serializer': _serialize_service_record_handle_list,
}
)
_BYTES_PRECEDED_BY_LENGTH_METADATA = hci.metadata(
{
'parser': _parse_bytes_preceded_by_length,
'serializer': _serialize_bytes_preceded_by_length,
}
)
# -----------------------------------------------------------------------------
@dataclass
class SDP_PDU: class SDP_PDU:
''' '''
See Bluetooth spec @ Vol 3, Part B - 4.2 PROTOCOL DATA UNIT FORMAT See Bluetooth spec @ Vol 3, Part B - 4.2 PROTOCOL DATA UNIT FORMAT
''' '''
RESPONSE_PDU_IDS = { RESPONSE_PDU_IDS = {
SDP_SERVICE_SEARCH_REQUEST: SDP_SERVICE_SEARCH_RESPONSE, PduId.SDP_SERVICE_SEARCH_REQUEST: PduId.SDP_SERVICE_SEARCH_RESPONSE,
SDP_SERVICE_ATTRIBUTE_REQUEST: SDP_SERVICE_ATTRIBUTE_RESPONSE, PduId.SDP_SERVICE_ATTRIBUTE_REQUEST: PduId.SDP_SERVICE_ATTRIBUTE_RESPONSE,
SDP_SERVICE_SEARCH_ATTRIBUTE_REQUEST: SDP_SERVICE_SEARCH_ATTRIBUTE_RESPONSE, PduId.SDP_SERVICE_SEARCH_ATTRIBUTE_REQUEST: PduId.SDP_SERVICE_SEARCH_ATTRIBUTE_RESPONSE,
} }
sdp_pdu_classes: dict[int, type[SDP_PDU]] = {} subclasses: ClassVar[dict[int, type[SDP_PDU]]] = {}
name = None pdu_id: ClassVar[PduId]
pdu_id = 0 fields: ClassVar[hci.Fields]
@staticmethod transaction_id: int
def from_bytes(pdu): _payload: bytes | None = field(init=False, repr=False, default=None)
pdu_id, transaction_id, _parameters_length = struct.unpack_from('>BHH', pdu, 0)
cls = SDP_PDU.sdp_pdu_classes.get(pdu_id) @classmethod
if cls is None: def from_bytes(cls, pdu: bytes) -> SDP_PDU:
instance = SDP_PDU(pdu) pdu_id, transaction_id, parameters_length = struct.unpack_from('>BHH', pdu, 0)
instance.name = SDP_PDU.pdu_name(pdu_id)
instance.pdu_id = pdu_id
instance.transaction_id = transaction_id
return instance
self = cls.__new__(cls)
SDP_PDU.__init__(self, pdu, transaction_id)
if hasattr(self, 'fields'):
self.init_from_bytes(pdu, 5)
return self
@staticmethod if len(pdu) != 5 + parameters_length:
def parse_service_record_handle_list_preceded_by_count( logger.warning("Expect %d bytes, got %d", 5 + parameters_length, len(pdu))
data: bytes, offset: int
) -> tuple[int, list[int]]:
count = struct.unpack_from('>H', data, offset - 2)[0]
handle_list = [
struct.unpack_from('>I', data, offset + x * 4)[0] for x in range(count)
]
return offset + count * 4, handle_list
@staticmethod subclass = cls.subclasses.get(pdu_id)
def parse_bytes_preceded_by_length(data, offset): if not (subclass := cls.subclasses.get(pdu_id)):
length = struct.unpack_from('>H', data, offset - 2)[0] raise InvalidPacketError(f"Unknown PDU type {pdu_id}")
return offset + length, data[offset : offset + length] instance = subclass(
transaction_id=transaction_id,
**hci.HCI_Object.dict_from_bytes(pdu, 5, subclass.fields),
)
instance._payload = pdu
return instance
@staticmethod _PDU = TypeVar('_PDU', bound='SDP_PDU')
def error_name(error_code):
return name_or_number(SDP_ERROR_NAMES, error_code)
@staticmethod @classmethod
def pdu_name(code): def subclass(cls, subclass: type[_PDU]) -> type[_PDU]:
return name_or_number(SDP_PDU_NAMES, code) subclass.fields = hci.HCI_Object.fields_from_dataclass(subclass)
cls.subclasses[subclass.pdu_id] = subclass
@staticmethod return subclass
def subclass(fields):
def inner(cls):
name = cls.__name__
# add a _ character before every uppercase letter, except the SDP_ prefix
location = len(name) - 1
while location > 4:
if not name[location].isupper():
location -= 1
continue
name = name[:location] + '_' + name[location:]
location -= 1
cls.name = name.upper()
cls.pdu_id = key_with_value(SDP_PDU_NAMES, cls.name)
if cls.pdu_id is None:
raise KeyError(f'PDU name {cls.name} not found in SDP_PDU_NAMES')
cls.fields = fields
# Register a factory for this class
SDP_PDU.sdp_pdu_classes[cls.pdu_id] = cls
return cls
return inner
def __init__(self, pdu=None, transaction_id=0, **kwargs):
if hasattr(self, 'fields') and kwargs:
HCI_Object.init_from_fields(self, self.fields, kwargs)
if pdu is None:
parameters = HCI_Object.dict_to_bytes(kwargs, self.fields)
pdu = (
struct.pack('>BHH', self.pdu_id, transaction_id, len(parameters))
+ parameters
)
self.pdu = pdu
self.transaction_id = transaction_id
def init_from_bytes(self, pdu, offset):
return HCI_Object.init_from_bytes(self, pdu, offset, self.fields)
def __bytes__(self): def __bytes__(self):
return self.pdu if self._payload is None:
parameters = hci.HCI_Object.dict_to_bytes(self.__dict__, self.fields)
self._payload = (
struct.pack('>BHH', self.pdu_id, self.transaction_id, len(parameters))
+ parameters
)
return self._payload
@property
def name(self) -> str:
return self.pdu_id.name
def __str__(self): def __str__(self):
result = f'{color(self.name, "blue")} [TID={self.transaction_id}]' result = f'{color(self.name, "blue")} [TID={self.transaction_id}]'
if fields := getattr(self, 'fields', None): if fields := getattr(self, 'fields', None):
result += ':\n' + HCI_Object.format_fields(self.__dict__, fields, ' ') result += ':\n' + hci.HCI_Object.format_fields(self.__dict__, fields, ' ')
elif len(self.pdu) > 1: elif len(self.pdu) > 1:
result += f': {self.pdu.hex()}' result += f': {self.pdu.hex()}'
return result return result
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@SDP_PDU.subclass([('error_code', {'size': 2, 'mapper': SDP_PDU.error_name})]) @SDP_PDU.subclass
@dataclass
class SDP_ErrorResponse(SDP_PDU): class SDP_ErrorResponse(SDP_PDU):
''' '''
See Bluetooth spec @ Vol 3, Part B - 4.4.1 SDP_ErrorResponse PDU See Bluetooth spec @ Vol 3, Part B - 4.4.1 SDP_ErrorResponse PDU
''' '''
error_code: int pdu_id = PduId.SDP_ERROR_RESPONSE
error_code: ErrorCode = field(metadata=ErrorCode.type_metadata(2))
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@SDP_PDU.subclass( @SDP_PDU.subclass
[ @dataclass
('service_search_pattern', DataElement.parse_from_bytes),
('maximum_service_record_count', '>2'),
('continuation_state', '*'),
]
)
class SDP_ServiceSearchRequest(SDP_PDU): class SDP_ServiceSearchRequest(SDP_PDU):
''' '''
See Bluetooth spec @ Vol 3, Part B - 4.5.1 SDP_ServiceSearchRequest PDU See Bluetooth spec @ Vol 3, Part B - 4.5.1 SDP_ServiceSearchRequest PDU
''' '''
service_search_pattern: DataElement pdu_id = PduId.SDP_SERVICE_SEARCH_REQUEST
maximum_service_record_count: int
continuation_state: bytes service_search_pattern: DataElement = field(
metadata=hci.metadata(DataElement.parse_from_bytes)
)
maximum_service_record_count: int = field(metadata=hci.metadata('>2'))
continuation_state: bytes = field(metadata=hci.metadata('*'))
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@SDP_PDU.subclass( @SDP_PDU.subclass
[ @dataclass
('total_service_record_count', '>2'),
('current_service_record_count', '>2'),
(
'service_record_handle_list',
SDP_PDU.parse_service_record_handle_list_preceded_by_count,
),
('continuation_state', '*'),
]
)
class SDP_ServiceSearchResponse(SDP_PDU): class SDP_ServiceSearchResponse(SDP_PDU):
''' '''
See Bluetooth spec @ Vol 3, Part B - 4.5.2 SDP_ServiceSearchResponse PDU See Bluetooth spec @ Vol 3, Part B - 4.5.2 SDP_ServiceSearchResponse PDU
''' '''
service_record_handle_list: list[int] pdu_id = PduId.SDP_SERVICE_SEARCH_RESPONSE
total_service_record_count: int
current_service_record_count: int total_service_record_count: int = field(metadata=hci.metadata('>2'))
continuation_state: bytes service_record_handle_list: Sequence[int] = field(
metadata=_SERVICE_RECORD_HANDLE_LIST_METADATA
)
continuation_state: bytes = field(metadata=hci.metadata('*'))
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@SDP_PDU.subclass( @SDP_PDU.subclass
[ @dataclass
('service_record_handle', '>4'),
('maximum_attribute_byte_count', '>2'),
('attribute_id_list', DataElement.parse_from_bytes),
('continuation_state', '*'),
]
)
class SDP_ServiceAttributeRequest(SDP_PDU): class SDP_ServiceAttributeRequest(SDP_PDU):
''' '''
See Bluetooth spec @ Vol 3, Part B - 4.6.1 SDP_ServiceAttributeRequest PDU See Bluetooth spec @ Vol 3, Part B - 4.6.1 SDP_ServiceAttributeRequest PDU
''' '''
service_record_handle: int pdu_id = PduId.SDP_SERVICE_ATTRIBUTE_REQUEST
maximum_attribute_byte_count: int
attribute_id_list: DataElement service_record_handle: int = field(metadata=hci.metadata('>4'))
continuation_state: bytes maximum_attribute_byte_count: int = field(metadata=hci.metadata('>2'))
attribute_id_list: DataElement = field(
metadata=hci.metadata(DataElement.parse_from_bytes)
)
continuation_state: bytes = field(metadata=hci.metadata('*'))
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@SDP_PDU.subclass( @SDP_PDU.subclass
[ @dataclass
('attribute_list_byte_count', '>2'),
('attribute_list', SDP_PDU.parse_bytes_preceded_by_length),
('continuation_state', '*'),
]
)
class SDP_ServiceAttributeResponse(SDP_PDU): class SDP_ServiceAttributeResponse(SDP_PDU):
''' '''
See Bluetooth spec @ Vol 3, Part B - 4.6.2 SDP_ServiceAttributeResponse PDU See Bluetooth spec @ Vol 3, Part B - 4.6.2 SDP_ServiceAttributeResponse PDU
''' '''
attribute_list_byte_count: int pdu_id = PduId.SDP_SERVICE_ATTRIBUTE_RESPONSE
attribute_list: bytes
continuation_state: bytes attribute_list: bytes = field(metadata=_BYTES_PRECEDED_BY_LENGTH_METADATA)
continuation_state: bytes = field(metadata=hci.metadata('*'))
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@SDP_PDU.subclass( @SDP_PDU.subclass
[ @dataclass
('service_search_pattern', DataElement.parse_from_bytes),
('maximum_attribute_byte_count', '>2'),
('attribute_id_list', DataElement.parse_from_bytes),
('continuation_state', '*'),
]
)
class SDP_ServiceSearchAttributeRequest(SDP_PDU): class SDP_ServiceSearchAttributeRequest(SDP_PDU):
''' '''
See Bluetooth spec @ Vol 3, Part B - 4.7.1 SDP_ServiceSearchAttributeRequest PDU See Bluetooth spec @ Vol 3, Part B - 4.7.1 SDP_ServiceSearchAttributeRequest PDU
''' '''
service_search_pattern: DataElement pdu_id = PduId.SDP_SERVICE_SEARCH_ATTRIBUTE_REQUEST
maximum_attribute_byte_count: int
attribute_id_list: DataElement service_search_pattern: DataElement = field(
continuation_state: bytes metadata=hci.metadata(DataElement.parse_from_bytes)
)
maximum_attribute_byte_count: int = field(metadata=hci.metadata('>2'))
attribute_id_list: DataElement = field(
metadata=hci.metadata(DataElement.parse_from_bytes)
)
continuation_state: bytes = field(metadata=hci.metadata('*'))
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@SDP_PDU.subclass( @SDP_PDU.subclass
[ @dataclass
('attribute_lists_byte_count', '>2'),
('attribute_lists', SDP_PDU.parse_bytes_preceded_by_length),
('continuation_state', '*'),
]
)
class SDP_ServiceSearchAttributeResponse(SDP_PDU): class SDP_ServiceSearchAttributeResponse(SDP_PDU):
''' '''
See Bluetooth spec @ Vol 3, Part B - 4.7.2 SDP_ServiceSearchAttributeResponse PDU See Bluetooth spec @ Vol 3, Part B - 4.7.2 SDP_ServiceSearchAttributeResponse PDU
''' '''
attribute_lists_byte_count: int pdu_id = PduId.SDP_SERVICE_SEARCH_ATTRIBUTE_RESPONSE
attribute_lists: bytes
continuation_state: bytes attribute_lists: bytes = field(metadata=_BYTES_PRECEDED_BY_LENGTH_METADATA)
continuation_state: bytes = field(metadata=hci.metadata('*'))
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -873,7 +850,7 @@ class Client:
) )
# Request and accumulate until there's no more continuation # Request and accumulate until there's no more continuation
service_record_handle_list = [] service_record_handle_list: list[int] = []
continuation_state = bytes([0]) continuation_state = bytes([0])
watchdog = SDP_CONTINUATION_WATCHDOG watchdog = SDP_CONTINUATION_WATCHDOG
while watchdog > 0: while watchdog > 0:
@@ -1091,7 +1068,7 @@ class Server:
logger.exception(color('failed to parse SDP Request PDU', 'red')) logger.exception(color('failed to parse SDP Request PDU', 'red'))
self.send_response( self.send_response(
SDP_ErrorResponse( SDP_ErrorResponse(
transaction_id=0, error_code=SDP_INVALID_REQUEST_SYNTAX_ERROR transaction_id=0, error_code=ErrorCode.INVALID_REQUEST_SYNTAX
) )
) )
@@ -1108,7 +1085,7 @@ class Server:
self.send_response( self.send_response(
SDP_ErrorResponse( SDP_ErrorResponse(
transaction_id=sdp_pdu.transaction_id, transaction_id=sdp_pdu.transaction_id,
error_code=SDP_INSUFFICIENT_RESOURCES_TO_SATISFY_REQUEST_ERROR, error_code=ErrorCode.INSUFFICIENT_RESOURCES_TO_SATISFY_REQUEST,
) )
) )
else: else:
@@ -1116,7 +1093,7 @@ class Server:
self.send_response( self.send_response(
SDP_ErrorResponse( SDP_ErrorResponse(
transaction_id=sdp_pdu.transaction_id, transaction_id=sdp_pdu.transaction_id,
error_code=SDP_INVALID_REQUEST_SYNTAX_ERROR, error_code=ErrorCode.INVALID_REQUEST_SYNTAX,
) )
) )
@@ -1134,7 +1111,7 @@ class Server:
self.send_response( self.send_response(
SDP_ErrorResponse( SDP_ErrorResponse(
transaction_id=transaction_id, transaction_id=transaction_id,
error_code=SDP_INVALID_CONTINUATION_STATE_ERROR, error_code=ErrorCode.INVALID_CONTINUATION_STATE,
) )
) )
return None return None
@@ -1228,15 +1205,11 @@ class Server:
if service_record_handles_remaining if service_record_handles_remaining
else bytes([0]) else bytes([0])
) )
service_record_handle_list = b''.join(
[struct.pack('>I', handle) for handle in service_record_handles]
)
self.send_response( self.send_response(
SDP_ServiceSearchResponse( SDP_ServiceSearchResponse(
transaction_id=request.transaction_id, transaction_id=request.transaction_id,
total_service_record_count=total_service_record_count, total_service_record_count=total_service_record_count,
current_service_record_count=len(service_record_handles), service_record_handle_list=service_record_handles,
service_record_handle_list=service_record_handle_list,
continuation_state=continuation_state, continuation_state=continuation_state,
) )
) )
@@ -1259,7 +1232,7 @@ class Server:
self.send_response( self.send_response(
SDP_ErrorResponse( SDP_ErrorResponse(
transaction_id=request.transaction_id, transaction_id=request.transaction_id,
error_code=SDP_INVALID_SERVICE_RECORD_HANDLE_ERROR, error_code=ErrorCode.INVALID_SERVICE_RECORD_HANDLE,
) )
) )
return return
@@ -1284,7 +1257,6 @@ class Server:
self.send_response( self.send_response(
SDP_ServiceAttributeResponse( SDP_ServiceAttributeResponse(
transaction_id=request.transaction_id, transaction_id=request.transaction_id,
attribute_list_byte_count=len(attribute_list_response),
attribute_list=attribute_list_response, attribute_list=attribute_list_response,
continuation_state=continuation_state, continuation_state=continuation_state,
) )
@@ -1331,7 +1303,6 @@ class Server:
self.send_response( self.send_response(
SDP_ServiceSearchAttributeResponse( SDP_ServiceSearchAttributeResponse(
transaction_id=request.transaction_id, transaction_id=request.transaction_id,
attribute_lists_byte_count=len(attribute_lists_response),
attribute_lists=attribute_lists_response, attribute_lists=attribute_lists_response,
continuation_state=continuation_state, continuation_state=continuation_state,
) )

View File

@@ -36,6 +36,7 @@ from bumble.colors import color
from bumble.core import ( from bumble.core import (
AdvertisingData, AdvertisingData,
InvalidArgumentError, InvalidArgumentError,
InvalidPacketError,
PhysicalTransport, PhysicalTransport,
ProtocolError, ProtocolError,
) )
@@ -178,6 +179,16 @@ class AuthReq(hci.SpecableFlag):
SMP_CTKD_H7_LEBR_SALT = bytes.fromhex('000000000000000000000000746D7031') SMP_CTKD_H7_LEBR_SALT = bytes.fromhex('000000000000000000000000746D7031')
SMP_CTKD_H7_BRLE_SALT = bytes.fromhex('000000000000000000000000746D7032') SMP_CTKD_H7_BRLE_SALT = bytes.fromhex('000000000000000000000000746D7032')
# Diffie-Hellman private / public key pair in Debug Mode (Core - Vol. 3, Part H)
SMP_DEBUG_KEY_PRIVATE = bytes.fromhex(
'3f49f6d4 a3c55f38 74c9b3e3 d2103f50 4aff607b eb40b799 5899b8a6 cd3c1abd'
)
SMP_DEBUG_KEY_PUBLIC_X = bytes.fromhex(
'20b003d2 f297be2c 5e2c83a7 e9f9a5b9 eff49111 acf4fddb cc030148 0e359de6'
)
SMP_DEBUG_KEY_PUBLIC_Y= bytes.fromhex(
'dc809c49 652aeb6d 63329abf 5a52155c 766345c2 8fed3024 741c8ed0 1589d28b'
)
# fmt: on # fmt: on
# pylint: enable=line-too-long # pylint: enable=line-too-long
# pylint: disable=invalid-name # pylint: disable=invalid-name
@@ -205,6 +216,8 @@ class SMP_Command:
@classmethod @classmethod
def from_bytes(cls, pdu: bytes) -> SMP_Command: def from_bytes(cls, pdu: bytes) -> SMP_Command:
if not pdu:
raise InvalidPacketError("Empty SMP PDU")
code = CommandCode(pdu[0]) code = CommandCode(pdu[0])
subclass = SMP_Command.smp_classes.get(code) subclass = SMP_Command.smp_classes.get(code)
@@ -1919,6 +1932,7 @@ class Manager(utils.EventEmitter):
self._ecc_key = None self._ecc_key = None
self.pairing_config_factory = pairing_config_factory self.pairing_config_factory = pairing_config_factory
self.session_proxy = Session self.session_proxy = Session
self.debug_mode = False
def send_command(self, connection: Connection, command: SMP_Command) -> None: def send_command(self, connection: Connection, command: SMP_Command) -> None:
logger.debug( logger.debug(
@@ -1965,6 +1979,13 @@ class Manager(utils.EventEmitter):
@property @property
def ecc_key(self) -> crypto.EccKey: def ecc_key(self) -> crypto.EccKey:
if self.debug_mode:
# Core - Vol 3, Part H:
# When the Security Manager is placed in a Debug mode it shall use the
# following Diffie-Hellman private / public key pair:
debug_key = crypto.EccKey.from_private_key_bytes(SMP_DEBUG_KEY_PRIVATE)
return debug_key
if self._ecc_key is None: if self._ecc_key is None:
self._ecc_key = crypto.EccKey.generate() self._ecc_key = crypto.EccKey.generate()
assert self._ecc_key assert self._ecc_key

View File

@@ -83,6 +83,7 @@ async def main() -> None:
GATT_DEVICE_INFORMATION_SERVICE, [manufacturer_name_characteristic] GATT_DEVICE_INFORMATION_SERVICE, [manufacturer_name_characteristic]
) )
server_device.add_service(device_info_service) server_device.add_service(device_info_service)
await server_device.start_advertising()
# Connect the client to the server # Connect the client to the server
connection = await client_device.connect(server_device.random_address) connection = await client_device.connect(server_device.random_address)

View File

@@ -13,13 +13,12 @@ authors = [{ name = "Google", email = "bumble-dev@google.com" }]
requires-python = ">=3.10" requires-python = ">=3.10"
dependencies = [ dependencies = [
"aiohttp ~= 3.8; platform_system!='Emscripten'", "aiohttp ~= 3.8; platform_system!='Emscripten'",
"appdirs >= 1.4; platform_system!='Emscripten'",
"click >= 8.1.3; platform_system!='Emscripten'", "click >= 8.1.3; platform_system!='Emscripten'",
"cryptography >= 44.0.3; platform_system!='Emscripten' and platform_system!='Android'", "cryptography >= 44.0.3; platform_system!='Emscripten' and platform_system!='Android'",
# Pyodide bundles a version of cryptography that is built for wasm, which may not match the # Pyodide bundles a version of cryptography that is built for wasm, which may not match the
# versions available on PyPI. Relax the version requirement since it's better than being # versions available on PyPI. Relax the version requirement since it's better than being
# completely unable to import the package in case of version mismatch. # completely unable to import the package in case of version mismatch.
"cryptography >= 44.0.3; platform_system=='Emscripten'", "cryptography >= 39.0.0; platform_system=='Emscripten'",
# Android wheels for cryptography are not yet available on PyPI, so chaquopy uses # Android wheels for cryptography are not yet available on PyPI, so chaquopy uses
# the builds from https://chaquo.com/pypi-13.1/cryptography/. But these are not regually # the builds from https://chaquo.com/pypi-13.1/cryptography/. But these are not regually
# updated. Relax the version requirement since it's better than being completely unable # updated. Relax the version requirement since it's better than being completely unable

View File

@@ -73,6 +73,14 @@ def test_uuid_to_hex_str() -> None:
) )
# -----------------------------------------------------------------------------
def test_uuid_hash() -> None:
uuid = UUID("1234")
uuid_128_bytes = UUID.from_bytes(uuid.to_bytes(force_128=True))
assert uuid in {uuid_128_bytes}
assert uuid_128_bytes in {uuid}
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_appearance() -> None: def test_appearance() -> None:
a = Appearance(Appearance.Category.COMPUTER, Appearance.ComputerSubcategory.LAPTOP) a = Appearance(Appearance.Category.COMPUTER, Appearance.ComputerSubcategory.LAPTOP)

View File

@@ -826,6 +826,22 @@ async def test_remote_name_request():
assert actual_name == expected_name assert actual_name == expected_name
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_get_remote_classic_features():
devices = TwoDevices()
devices[0].classic_enabled = True
devices[1].classic_enabled = True
await devices[0].power_on()
await devices[1].power_on()
connection = await devices[0].connect_classic(devices[1].public_address)
assert (
await asyncio.wait_for(connection.get_remote_classic_features(), _TIMEOUT)
== devices.controllers[1].lmp_features
)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
async def run_test_device(): async def run_test_device():
await test_device_connect_parallel() await test_device_connect_parallel()

View File

@@ -22,6 +22,7 @@ import unittest.mock
import pytest import pytest
from bumble import controller, hci
from bumble.controller import Controller from bumble.controller import Controller
from bumble.hci import ( from bumble.hci import (
HCI_AclDataPacket, HCI_AclDataPacket,
@@ -49,34 +50,27 @@ logger = logging.getLogger(__name__)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@pytest.mark.asyncio @pytest.mark.asyncio
@pytest.mark.parametrize( @pytest.mark.parametrize(
'supported_commands, lmp_features', 'supported_commands, max_lmp_features_page_number',
[ [
( (controller.Controller.supported_commands, 0),
# Default commands
'2000800000c000000000e4000000a822000000000000040000f7ffff7f000000'
'30f0f9ff01008004000000000000000000000000000000000000000000000000',
# Only LE LMP feature
'0000000060000000',
),
( (
# All commands # All commands
'ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff' set(hci.HCI_Command.command_names.keys()),
'ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff',
# 3 pages of LMP features # 3 pages of LMP features
'000102030405060708090A0B0C0D0E0F011112131415161718191A1B1C1D1E1F', 2,
), ),
], ],
) )
async def test_reset(supported_commands: str, lmp_features: str): async def test_reset(supported_commands: set[int], max_lmp_features_page_number: int):
controller = Controller('C') controller = Controller('C')
controller.supported_commands = bytes.fromhex(supported_commands) controller.supported_commands = supported_commands
controller.lmp_features = bytes.fromhex(lmp_features) controller.lmp_features_max_page_number = max_lmp_features_page_number
host = Host(controller, AsyncPipeSink(controller)) host = Host(controller, AsyncPipeSink(controller))
await host.reset() await host.reset()
assert host.local_lmp_features == int.from_bytes( assert host.local_lmp_features == (
bytes.fromhex(lmp_features), 'little' controller.lmp_features & ~(1 << (64 * max_lmp_features_page_number + 1))
) )
@@ -177,14 +171,15 @@ class Source:
class Sink: class Sink:
response: HCI_Event response: HCI_Event | None
def __init__(self, source: Source, response: HCI_Event) -> None: def __init__(self, source: Source, response: HCI_Event | None) -> None:
self.source = source self.source = source
self.response = response self.response = response
def on_packet(self, packet: bytes) -> None: def on_packet(self, packet: bytes) -> None:
self.source.sink.on_packet(bytes(self.response)) if self.response is not None:
self.source.sink.on_packet(bytes(self.response))
@pytest.mark.asyncio @pytest.mark.asyncio
@@ -234,6 +229,23 @@ async def test_send_sync_command() -> None:
assert isinstance(response3.return_parameters, HCI_GenericReturnParameters) assert isinstance(response3.return_parameters, HCI_GenericReturnParameters)
@pytest.mark.asyncio
async def test_send_sync_command_timeout() -> None:
source = Source()
sink = Sink(source, None)
host = Host(source, sink)
host.ready = True
with pytest.raises(asyncio.TimeoutError):
await host.send_sync_command(HCI_Reset_Command(), response_timeout=0.01)
# The sending semaphore should have been released, so this should not block
# indefinitely
with pytest.raises(asyncio.TimeoutError):
await host.send_sync_command(hci.HCI_Reset_Command(), response_timeout=0.01)
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_send_async_command() -> None: async def test_send_async_command() -> None:
source = Source() source = Source()

View File

@@ -21,6 +21,7 @@ import logging
import os import os
import pathlib import pathlib
import tempfile import tempfile
from unittest import mock
import pytest import pytest
@@ -179,11 +180,55 @@ async def test_default_namespace(temporary_file):
assert keys.irk.value == bytes.fromhex('e7b2543b206e4e46b44f9e51dad22bd1') assert keys.irk.value == bytes.fromhex('e7b2543b206e4e46b44f9e51dad22bd1')
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_no_filename(tmp_path):
import platformdirs
with mock.patch.object(platformdirs, 'user_data_path', return_value=tmp_path):
# Case 1: no namespace, no filename
keystore = JsonKeyStore(None, None)
expected_directory = tmp_path / 'Pairing'
expected_filename = expected_directory / 'keys.json'
assert keystore.directory_name == expected_directory
assert keystore.filename == expected_filename
# Save some data
keys = PairingKeys()
ltk = bytes([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15])
keys.ltk = PairingKeys.Key(ltk)
await keystore.update('foo', keys)
assert expected_filename.exists()
# Load back
keystore2 = JsonKeyStore(None, None)
foo = await keystore2.get('foo')
assert foo is not None
assert foo.ltk.value == ltk
# Case 2: namespace, no filename
keystore3 = JsonKeyStore('my:namespace', None)
# safe_name = 'my-namespace' (lower is already 'my:namespace', then replace ':' with '-')
expected_filename3 = expected_directory / 'my-namespace.json'
assert keystore3.filename == expected_filename3
# Save some data
await keystore3.update('bar', keys)
assert expected_filename3.exists()
# Load back
keystore4 = JsonKeyStore('my:namespace', None)
bar = await keystore4.get('bar')
assert bar is not None
assert bar.ltk.value == ltk
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
async def run_tests(): async def run_tests():
await test_basic() await test_basic()
await test_parsing() await test_parsing()
await test_default_namespace() await test_default_namespace()
await test_no_filename()
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------

View File

@@ -18,9 +18,11 @@
import asyncio import asyncio
import logging import logging
import os import os
import re
import pytest import pytest
from bumble import sdp
from bumble.core import BT_L2CAP_PROTOCOL_ID, UUID from bumble.core import BT_L2CAP_PROTOCOL_ID, UUID
from bumble.sdp import ( from bumble.sdp import (
SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID, SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID,
@@ -206,6 +208,16 @@ def sdp_records(record_count=1):
} }
# -----------------------------------------------------------------------------
def test_pdu_parameter_length(caplog) -> None:
caplog.set_level(logging.WARNING)
pdu = sdp.SDP_ErrorResponse(
transaction_id=0, error_code=sdp.ErrorCode.INVALID_SDP_VERSION
)
assert sdp.SDP_PDU.from_bytes(bytes(pdu)) == pdu
assert not re.search("Expect \d+ bytes, got \d+", caplog.text)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_service_search(): async def test_service_search():

View File

@@ -24,7 +24,7 @@ import pytest
from bumble import crypto, pairing, smp from bumble import crypto, pairing, smp
from bumble.core import AdvertisingData from bumble.core import AdvertisingData
from bumble.crypto import EccKey, aes_cmac, ah, c1, f4, f5, f6, g2, h6, h7, s1 from bumble.crypto import EccKey, aes_cmac, ah, c1, f4, f5, f6, g2, h6, h7, s1
from bumble.device import Device from bumble.device import Device, DeviceConfiguration
from bumble.hci import Address from bumble.hci import Address
from bumble.pairing import LeRole, OobData, OobSharedData from bumble.pairing import LeRole, OobData, OobSharedData
@@ -312,3 +312,17 @@ async def test_send_identity_address_command(
actual_command = mock_method.call_args.args[0] actual_command = mock_method.call_args.args[0]
assert actual_command.addr_type == expected_identity_address.address_type assert actual_command.addr_type == expected_identity_address.address_type
assert actual_command.bd_addr == expected_identity_address assert actual_command.bd_addr == expected_identity_address
@pytest.mark.asyncio
async def test_smp_debug_mode():
config = DeviceConfiguration(smp_debug_mode=True)
device = Device(config=config)
assert device.smp_manager.ecc_key.x == smp.SMP_DEBUG_KEY_PUBLIC_X
assert device.smp_manager.ecc_key.y == smp.SMP_DEBUG_KEY_PUBLIC_Y
device.smp_manager.debug_mode = False
assert not device.smp_manager.ecc_key.x == smp.SMP_DEBUG_KEY_PUBLIC_X
assert not device.smp_manager.ecc_key.y == smp.SMP_DEBUG_KEY_PUBLIC_Y

View File

@@ -3,7 +3,7 @@
<head> <head>
<link rel="stylesheet" href="https://fonts.googleapis.com/icon?family=Material+Icons"> <link rel="stylesheet" href="https://fonts.googleapis.com/icon?family=Material+Icons">
<link rel="stylesheet" href="https://fonts.googleapis.com/css2?family=Material+Symbols+Outlined:opsz,wght,FILL,GRAD@24,400,0,0" /> <link rel="stylesheet" href="https://fonts.googleapis.com/css2?family=Material+Symbols+Outlined:opsz,wght,FILL,GRAD@24,400,0,0" />
<script src="https://cdn.jsdelivr.net/pyodide/v0.24.1/full/pyodide.js"></script> <script src="https://cdn.jsdelivr.net/pyodide/v0.26.2/full/pyodide.js"></script>
<script type="module" src="../ui.js"></script> <script type="module" src="../ui.js"></script>
<script type="module" src="heart_rate_monitor.js"></script> <script type="module" src="heart_rate_monitor.js"></script>
<style> <style>

View File

@@ -3,7 +3,7 @@
<head> <head>
<link rel="stylesheet" href="https://fonts.googleapis.com/icon?family=Material+Icons"> <link rel="stylesheet" href="https://fonts.googleapis.com/icon?family=Material+Icons">
<link rel="stylesheet" href="scanner.css"> <link rel="stylesheet" href="scanner.css">
<script src="https://cdn.jsdelivr.net/pyodide/v0.24.1/full/pyodide.js"></script> <script src="https://cdn.jsdelivr.net/pyodide/v0.26.2/full/pyodide.js"></script>
<script type="module" src="../ui.js"></script> <script type="module" src="../ui.js"></script>
<script type="module" src="scanner.js"></script> <script type="module" src="scanner.js"></script>
</style> </style>

View File

@@ -4,7 +4,7 @@
<title>Bumble Speaker</title> <title>Bumble Speaker</title>
<link rel="stylesheet" href="https://fonts.googleapis.com/icon?family=Material+Icons"> <link rel="stylesheet" href="https://fonts.googleapis.com/icon?family=Material+Icons">
<link rel="stylesheet" href="speaker.css"> <link rel="stylesheet" href="speaker.css">
<script src="https://cdn.jsdelivr.net/pyodide/v0.24.1/full/pyodide.js"></script> <script src="https://cdn.jsdelivr.net/pyodide/v0.26.2/full/pyodide.js"></script>
<script type="module" src="speaker.js"></script> <script type="module" src="speaker.js"></script>
<script type="module" src="../ui.js"></script> <script type="module" src="../ui.js"></script>
</head> </head>