Compare commits

..

49 Commits

Author SHA1 Message Date
Josh Wu
8988a85245 Merge pull request #919 from zxzxwu/sdp
SDP: Move parser functions to parser class
2026-04-29 13:21:13 +08:00
Josh Wu
0813da2278 SDP: Move parser functions to parser class 2026-04-28 13:27:50 +08:00
Gilles Boccon-Gibod
a1ff183d44 Merge pull request #915 from dlech/notify-subscribers-type-hints
improve type hints for notify/indicate subscriber(s) methods
2026-04-27 21:45:38 +02:00
Gilles Boccon-Gibod
7adf44eddf Merge pull request #916 from dlech/fix-crash-in-attribute-repr
fix crash in `bumble.att.Attribute.__repr__`
2026-04-27 21:41:41 +02:00
Josh Wu
05accbf805 Merge pull request #918 from ibondarenko1/fix/avdtp-empty-pdu-guard
avdtp: bound message assembler to drop truncated PDUs (DoS prevention)
2026-04-27 10:01:51 +08:00
Josh Wu
80f54f2a09 Merge pull request #917 from dlech/fix-regex-with-backslash
Fix regex syntax warning in sdp_test.py.
2026-04-27 09:55:36 +08:00
ibondarenko1
07b5e33e09 avdtp: address review nits — use truthy checks
Per @zxzxwu review on #918:
- bumble/avdtp.py: replace `if len(pdu) < 1:` with `if not pdu:`
- tests/avdtp_test.py: replace `assert completed == []` with
  `assert not completed`

Both are idiomatic Python truthy checks; behavior identical.
2026-04-26 18:49:55 -07:00
ibondarenko1
b874e26a4f avdtp: bound message assembler to drop truncated PDUs (DoS prevention)
A remote peer can send an AVDTP frame shorter than the assembler expects.
The current MessageAssembler.on_pdu() unconditionally accesses pdu[0],
pdu[1], and (for START packets) pdu[2], so a 0-, 1-, or 2-byte frame
raises IndexError. The exception propagates up through L2CAP's read loop
and tears down the channel — same DoS class as #912 (empty ATT PDU) and
#914 (unbounded SDP recursion).

Fix: validate length before each access. Empty PDUs and packets shorter
than the type-specific minimum are logged and dropped; the assembler
stays alive so the L2CAP channel is not torn down.

- bumble/avdtp.py: length guards in MessageAssembler.on_pdu before
  accessing pdu[0], pdu[1], pdu[2].
- tests/avdtp_test.py: regression test covering empty PDU, 1-byte SINGLE,
  1-byte START, 2-byte START — all four would have raised IndexError
  pre-fix; assembler now drops without raising.
2026-04-26 18:16:15 -07:00
David Lechner
baa5257780 improve type hints for notify/indicate subscriber(s) methods
Pyright expects generic type parameters to be specified for the
Attribute class, otherwise it treats the type as Unknown which can
trigger reportUnknownMemberType errors.

This can be solved by using a generic type parameter for these methods
which also has the benefit of making sure that the value parameter has
the correct type for the attribute.

In some cases, a new local `value_as_bytes` variable is needed to avoid
type errors and makes the code less confusing by not overwriting the
original `value` variable.
2026-04-26 09:43:40 -05:00
David Lechner
a91ea9110c Fix regex syntax warning in sdp_test.py.
Change regex match string to raw string to avoid syntax warning:

    tests/sdp_test.py:218: SyntaxWarning: invalid escape sequence '\d'
    assert not re.search("Expect \d+ bytes, got \d+", caplog.text)

In the future, this will become an error, so we should fix it now.
2026-04-26 09:31:18 -05:00
Josh Wu
1686c5b11b Merge pull request #914 from ibondarenko1/fix/sdp-recursion-depth-limit
sdp: bound DataElement parse recursion to prevent RecursionError DoS
2026-04-26 17:22:59 +08:00
David Lechner
d9481992bb fix crash in bumble.att.Attribute.__repr__
If an attribute does not contains a bytes value, it would crash with
something like:

    AttributeError: 'NoneType' object has no attribute 'hex'

Clearly, the intention here was to use `value_str` to avoid this
possibility.
2026-04-25 17:01:25 -05:00
ibondarenko1
16d0ed56cf sdp: address review nits (import at top, InvalidPacketError)
- bumble/sdp.py: replace raise ValueError with raise InvalidPacketError
  in DataElement.list_from_bytes depth guard. InvalidPacketError
  already imported at line 34 and extends ValueError so the existing
  regression test continues to match.
- tests/sdp_test.py: remove duplicate 'import pytest' inside
  test_nested_sequence_recursion_guard; pytest already imported at
  module top (line 23).

Threading.local counter left as-is per zxzxwu's 'leave it here and
refactor later' comment on the PR.
2026-04-24 11:42:49 -07:00
Ievgen Bondarenko
c55eb156b8 sdp: fix lint formatting (black: blank line after import pytest) 2026-04-24 00:06:56 -07:00
ibondarenko1
8614881fb3 sdp: bound DataElement parse recursion to prevent RecursionError DoS
DataElement.from_bytes -> list_from_bytes -> (SEQUENCE/ALTERNATIVE
constructor dispatches back to list_from_bytes) had no depth limit. A
malicious SDP peer could send a PDU of a few kilobytes containing ~1000
nested SEQUENCE tags and exhaust the Python recursion stack, crashing the
host with an unhandled RecursionError propagating out of the SDP handler.

Reachable via: any remote Bluetooth device that Bumble performs SDP
service discovery against (default during Classic connection setup).

Same family as PR #912 (ATT_PDU.from_bytes empty PDU IndexError) - remote
unchecked-input parser crash in the Bluetooth stack.

Fix: thread-local depth counter, cap nesting at 32 (well above anything a
legitimate service record uses). Added two regression tests covering the
deep-nesting reject path and normal 16-level-nested SEQUENCE parsing.

Reproducer (4.5 KB payload, deterministic crash on 0.0.228):

    from bumble.sdp import DataElement
    inner = b"\x35\x00"
    for _ in range(1500):
        size = len(inner)
        if size < 65535:
            inner = bytes([0x36, (size >> 8) & 0xFF, size & 0xFF]) + inner
    DataElement.from_bytes(inner)  # RecursionError before fix

Signed-off-by: ibondarenko1 <ibondarenko1@users.noreply.github.com>
2026-04-23 00:53:06 -07:00
Josh Wu
27d02ef18d Merge pull request #913 from zxzxwu/sdp
SDP: Fix wrong parameter size
2026-04-20 16:32:37 +08:00
Josh Wu
c0725e2a4a SDP: Fix wrong parameter size 2026-04-20 16:23:19 +08:00
Josh Wu
bf0784dde4 Merge pull request #912 from ibondarenko1/fix/empty-pdu-crash
fix: add input validation to prevent remote crash from empty/malforme…
2026-04-20 14:36:48 +08:00
Ievgen Bondarenko
444f43f6a3 fix: address review feedback - use InvalidPacketError and abort on buffer overflow
- att.py: raise core.InvalidPacketError instead of generic ValueError
- smp.py: raise core.InvalidPacketError instead of generic ValueError
- hfp.py: add MAX_BUFFER_SIZE class constant (64KB)
- hfp.py: drop incoming data when it would overflow buffer instead of
  truncating, preserving existing partial-packet state

Per review comments on PR #912 by @zxzxwu.
2026-04-16 11:24:09 -07:00
Gilles Boccon-Gibod
2420c47cf1 Merge pull request #911 from google/gbg/issue-910
release command semaphore after timeout
2026-04-16 18:11:57 +02:00
Ievgen Bondarenko
0a78e7506b fix: add input validation to prevent remote crash from empty/malformed PDUs
Add length checks in from_bytes() for ATT and SMP protocol parsers
to prevent IndexError crashes from empty PDUs sent by remote Bluetooth
devices. Also add buffer size limit and UTF-8 error handling in HFP
protocol to prevent memory exhaustion and decode crashes.

- bumble/att.py: validate PDU is non-empty before accessing pdu[0]
- bumble/smp.py: validate PDU is non-empty before accessing pdu[0]
- bumble/hfp.py: limit buffer to 64KB, handle invalid UTF-8 gracefully

These issues can be triggered by a remote Bluetooth device sending
malformed packets, causing denial of service on the host.
2026-04-16 01:43:41 -07:00
Gilles Boccon-Gibod
f7cc6f6657 release command semaphore after timeout 2026-04-15 16:54:54 +02:00
Josh Wu
f2824ee6b8 Merge pull request #907 from zxzxwu/example-gatt-client-and-server
Advertise in run_gatt_client_and_server
2026-04-13 16:31:19 +08:00
Josh Wu
7188ef08de Advertise in run_gatt_client_and_server 2026-04-13 15:31:32 +08:00
Josh Wu
3ded9014d3 Merge pull request #905 from markusjellitsch/feature/debug-keys
Feature  - Add SMP Debug Mode  (Core Vol.3, Part H)
2026-04-09 15:36:42 +08:00
Josh Wu
b6125bdfb1 Merge pull request #904 from zxzxwu/keys
Keys: Remove appdirs and improve typing
2026-04-09 15:30:39 +08:00
Markus Jellitsch
dc17f4f1ca remove asserts 2026-04-08 20:58:47 +02:00
Markus Jellitsch
3f65380c20 remove comment 2026-04-03 23:19:43 +02:00
Markus Jellitsch
25a0056ecc remove uncommented line 2026-04-03 23:08:16 +02:00
Markus Jellitsch
85f6b10983 run formatter 2026-04-03 23:06:24 +02:00
Markus Jellitsch
e85f041e9d add test for smp debug mode 2026-04-03 23:04:48 +02:00
Markus Jellitsch
ee09e6f10d add smp_debug_mode config flag to enable debug keys during device init 2026-04-03 23:03:51 +02:00
Markus Jellitsch
c3daf4a7e1 implement debug mode for smp manager using defined private / public key pair 2026-04-03 23:02:15 +02:00
Josh Wu
3af623be7e Keys: Remove appdirs and improve typing 2026-03-31 16:25:15 +08:00
Gilles Boccon-Gibod
4e76d3057b Merge pull request #903 from sameer/micropip-install-crypto-issue
Fix Hive demo install failure
2026-03-28 15:35:32 -04:00
Sameer Puri
eda7360222 Upgrade pyodide in web fixes import error
Prior to this, these web pages fail to load with
`ImportError: cannot import name 'TypeIs' from 'typing_extensions'
(/lib/python3.11/site-packages/typing_extensions.py)`
2026-03-26 18:39:07 +00:00
Sameer Puri
a4c15c00de Downgrade cryptography, fixes micropip failure
Prior to this, these web pages fail to load with

`ValueError: Can't find a pure Python 3 wheel for 'cryptography>=44.0.3;
platform_system == "Emscripten"'.`
2026-03-26 18:38:12 +00:00
Josh Wu
cba4df4aef Merge pull request #900 from zxzxwu/lmp-feat
Add read classic remote features support
2026-03-24 14:03:29 +08:00
Josh Wu
ceb8b448e9 Merge pull request #901 from zxzxwu/rust
Add --locked to allow installing cargo-all-features
2026-03-21 03:45:47 +08:00
Josh Wu
311b716d5c Add --locked to allow installing cargo-all-features 2026-03-20 18:44:49 +08:00
Josh Wu
0ba9e5c317 Add read classic remote features support 2026-03-20 18:32:52 +08:00
Josh Wu
3517225b62 Merge pull request #898 from zxzxwu/phy
Make ConnectionPHY dataclass
2026-03-13 12:04:45 +08:00
Josh Wu
ad4bb1578b Make ConnectionPHY dataclass 2026-03-11 21:41:48 +08:00
Josh Wu
4af65b381b Merge pull request #820 from zxzxwu/sdp
SDP: Migrate to dataclasses
2026-03-04 13:45:39 +08:00
Josh Wu
a5cd3365ae Merge pull request #895 from zxzxwu/uuid
Hash and cache 128 bytes of UUID
2026-03-04 00:29:43 +08:00
Josh Wu
2915cb8bb6 Add test for UUID hash 2026-03-04 00:22:50 +08:00
Josh Wu
28e485b7b3 Hash and cache 128 bytes of UUID 2026-03-03 17:54:27 +08:00
Josh Wu
1198f2c3f5 SDP: Make PDU dataclasses 2026-03-03 02:07:08 +08:00
Josh Wu
80aaf6a2b9 SDP: Make DataElement and ServiceAttribute dataclasses 2026-03-03 01:28:40 +08:00
25 changed files with 1207 additions and 620 deletions

View File

@@ -69,7 +69,7 @@ jobs:
components: clippy,rustfmt
toolchain: ${{ matrix.rust-version }}
- name: Install Rust dependencies
run: cargo install cargo-all-features --version 1.11.0 # allows building/testing combinations of features
run: cargo install cargo-all-features --version 1.11.0 --locked # allows building/testing combinations of features
- name: Check License Headers
run: cd rust && cargo run --features dev-tools --bin file-header check-all
- name: Rust Build

View File

@@ -42,7 +42,7 @@ from typing_extensions import TypeIs
from bumble import hci, l2cap, utils
from bumble.colors import color
from bumble.core import UUID, InvalidOperationError, ProtocolError
from bumble.core import UUID, InvalidOperationError, InvalidPacketError, ProtocolError
from bumble.hci import HCI_Object
# -----------------------------------------------------------------------------
@@ -249,6 +249,8 @@ class ATT_PDU:
@classmethod
def from_bytes(cls, pdu: bytes) -> ATT_PDU:
if not pdu:
raise InvalidPacketError("Empty ATT PDU")
op_code = pdu[0]
subclass = ATT_PDU.pdu_classes.get(op_code)
@@ -1081,7 +1083,7 @@ class Attribute(utils.EventEmitter, Generic[_T]):
else:
value_str = str(self.value)
if value_str:
value_string = f', value={self.value.hex()}'
value_string = f', value={value_str}'
else:
value_string = ''
return (

View File

@@ -311,6 +311,13 @@ class MessageAssembler:
def on_pdu(self, pdu: bytes) -> None:
self.packet_count += 1
# Drop empty PDUs sent by remote — accessing pdu[0] below would
# raise IndexError, propagating up to the L2CAP read loop and
# tearing down the channel. Same class as #912 (ATT empty PDU).
if not pdu:
logger.warning('AVDTP message assembler: empty PDU dropped')
return
transaction_label = pdu[0] >> 4
packet_type = Protocol.PacketType((pdu[0] >> 2) & 3)
message_type = Message.MessageType(pdu[0] & 3)
@@ -324,6 +331,23 @@ class MessageAssembler:
Protocol.PacketType.SINGLE_PACKET,
Protocol.PacketType.START_PACKET,
):
# Both single and start packets carry the signal identifier in
# pdu[1]; start packets additionally carry the packet count in
# pdu[2]. Guard each access so a malformed remote frame can't
# crash the message assembler.
if len(pdu) < 2:
logger.warning(
'AVDTP %s packet too short (%d bytes); dropped',
packet_type.name,
len(pdu),
)
return
if packet_type == Protocol.PacketType.START_PACKET and len(pdu) < 3:
logger.warning(
'AVDTP START packet missing signal-packet count; dropped'
)
return
if self.message is not None:
# The previous message has not been terminated
logger.warning(

View File

@@ -238,9 +238,12 @@ class Controller:
hci_revision: int = 0
lmp_version: int = hci.HCI_VERSION_BLUETOOTH_CORE_5_0
lmp_subversion: int = 0
lmp_features: bytes = bytes.fromhex(
'0000000060000000'
) # BR/EDR Not Supported, LE Supported (Controller)
lmp_features: hci.LmpFeatureMask = (
hci.LmpFeatureMask.LE_SUPPORTED_CONTROLLER
| hci.LmpFeatureMask.BR_EDR_NOT_SUPPORTED
| hci.LmpFeatureMask.EXTENDED_FEATURES
)
lmp_features_max_page_number: int = 3
manufacturer_company_identifier: int = 0xFFFF
acl_data_packet_length: int = 27
total_num_acl_data_packets: int = 64
@@ -250,10 +253,78 @@ class Controller:
total_num_iso_data_packets: int = 64
event_mask: int = 0
event_mask_page_2: int = 0
supported_commands: bytes = bytes.fromhex(
'2000800000c000000000e4000000a822000000000000040000f7ffff7f000000'
'30f0f9ff01008004002000000000000000000000000000000000000000000000'
)
supported_commands: set[int] = {
hci.HCI_DISCONNECT_COMMAND,
hci.HCI_READ_REMOTE_VERSION_INFORMATION_COMMAND,
hci.HCI_READ_CLOCK_OFFSET_COMMAND,
hci.HCI_READ_LMP_HANDLE_COMMAND,
hci.HCI_SET_EVENT_MASK_COMMAND,
hci.HCI_RESET_COMMAND,
hci.HCI_READ_TRANSMIT_POWER_LEVEL_COMMAND,
hci.HCI_SET_CONTROLLER_TO_HOST_FLOW_CONTROL_COMMAND,
hci.HCI_HOST_BUFFER_SIZE_COMMAND,
hci.HCI_HOST_NUMBER_OF_COMPLETED_PACKETS_COMMAND,
hci.HCI_READ_LOCAL_VERSION_INFORMATION_COMMAND,
hci.HCI_READ_LOCAL_SUPPORTED_FEATURES_COMMAND,
hci.HCI_READ_BUFFER_SIZE_COMMAND,
hci.HCI_READ_BD_ADDR_COMMAND,
hci.HCI_READ_RSSI_COMMAND,
hci.HCI_SET_EVENT_MASK_PAGE_2_COMMAND,
hci.HCI_LE_SET_EVENT_MASK_COMMAND,
hci.HCI_LE_READ_BUFFER_SIZE_COMMAND,
hci.HCI_LE_READ_LOCAL_SUPPORTED_FEATURES_COMMAND,
hci.HCI_LE_SET_RANDOM_ADDRESS_COMMAND,
hci.HCI_LE_SET_ADVERTISING_PARAMETERS_COMMAND,
hci.HCI_LE_READ_ADVERTISING_PHYSICAL_CHANNEL_TX_POWER_COMMAND,
hci.HCI_LE_SET_ADVERTISING_DATA_COMMAND,
hci.HCI_LE_SET_SCAN_RESPONSE_DATA_COMMAND,
hci.HCI_LE_SET_ADVERTISING_ENABLE_COMMAND,
hci.HCI_LE_SET_SCAN_PARAMETERS_COMMAND,
hci.HCI_LE_SET_SCAN_ENABLE_COMMAND,
hci.HCI_LE_CREATE_CONNECTION_COMMAND,
hci.HCI_LE_CREATE_CONNECTION_CANCEL_COMMAND,
hci.HCI_LE_READ_FILTER_ACCEPT_LIST_SIZE_COMMAND,
hci.HCI_LE_CLEAR_FILTER_ACCEPT_LIST_COMMAND,
hci.HCI_LE_ADD_DEVICE_TO_FILTER_ACCEPT_LIST_COMMAND,
hci.HCI_LE_REMOVE_DEVICE_FROM_FILTER_ACCEPT_LIST_COMMAND,
hci.HCI_LE_CONNECTION_UPDATE_COMMAND,
hci.HCI_LE_SET_HOST_CHANNEL_CLASSIFICATION_COMMAND,
hci.HCI_LE_READ_CHANNEL_MAP_COMMAND,
hci.HCI_LE_READ_REMOTE_FEATURES_COMMAND,
hci.HCI_LE_ENCRYPT_COMMAND,
hci.HCI_LE_RAND_COMMAND,
hci.HCI_LE_ENABLE_ENCRYPTION_COMMAND,
hci.HCI_LE_LONG_TERM_KEY_REQUEST_REPLY_COMMAND,
hci.HCI_LE_LONG_TERM_KEY_REQUEST_NEGATIVE_REPLY_COMMAND,
hci.HCI_LE_READ_SUPPORTED_STATES_COMMAND,
hci.HCI_LE_RECEIVER_TEST_COMMAND,
hci.HCI_LE_TRANSMITTER_TEST_COMMAND,
hci.HCI_LE_TEST_END_COMMAND,
hci.HCI_READ_AUTHENTICATED_PAYLOAD_TIMEOUT_COMMAND,
hci.HCI_WRITE_AUTHENTICATED_PAYLOAD_TIMEOUT_COMMAND,
hci.HCI_LE_REMOTE_CONNECTION_PARAMETER_REQUEST_REPLY_COMMAND,
hci.HCI_LE_REMOTE_CONNECTION_PARAMETER_REQUEST_NEGATIVE_REPLY_COMMAND,
hci.HCI_LE_SET_DATA_LENGTH_COMMAND,
hci.HCI_LE_READ_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND,
hci.HCI_LE_WRITE_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND,
hci.HCI_LE_ADD_DEVICE_TO_RESOLVING_LIST_COMMAND,
hci.HCI_LE_REMOVE_DEVICE_FROM_RESOLVING_LIST_COMMAND,
hci.HCI_LE_CLEAR_RESOLVING_LIST_COMMAND,
hci.HCI_LE_READ_RESOLVING_LIST_SIZE_COMMAND,
hci.HCI_LE_READ_PEER_RESOLVABLE_ADDRESS_COMMAND,
hci.HCI_LE_READ_LOCAL_RESOLVABLE_ADDRESS_COMMAND,
hci.HCI_LE_SET_ADDRESS_RESOLUTION_ENABLE_COMMAND,
hci.HCI_LE_SET_RESOLVABLE_PRIVATE_ADDRESS_TIMEOUT_COMMAND,
hci.HCI_LE_READ_MAXIMUM_DATA_LENGTH_COMMAND,
hci.HCI_LE_READ_PHY_COMMAND,
hci.HCI_LE_SET_DEFAULT_PHY_COMMAND,
hci.HCI_LE_SET_PHY_COMMAND,
hci.HCI_LE_RECEIVER_TEST_V2_COMMAND,
hci.HCI_LE_TRANSMITTER_TEST_V2_COMMAND,
hci.HCI_LE_READ_TRANSMIT_POWER_COMMAND,
hci.HCI_LE_SET_PRIVACY_MODE_COMMAND,
hci.HCI_LE_READ_BUFFER_SIZE_V2_COMMAND,
}
le_event_mask: int = 0
le_features: hci.LeFeatureMask = (
hci.LeFeatureMask.LE_ENCRYPTION
@@ -392,6 +463,12 @@ class Controller:
if self.link:
self.link.on_address_changed(self)
@property
def lmp_features_bytes(self) -> bytes:
return self.lmp_features.to_bytes(
(self.lmp_features_max_page_number + 1) * 8, 'little'
)
# Packet Sink protocol (packets coming from the host via HCI)
def on_packet(self, packet: bytes) -> None:
self.on_hci_packet(hci.HCI_Packet.from_bytes(packet))
@@ -968,6 +1045,51 @@ class Controller:
packet.name_length,
packet.name_fregment,
)
case lmp.LmpFeaturesReq(features):
self.send_lmp_packet(
sender_address,
lmp.LmpFeaturesRes(features=self.lmp_features_bytes[:8]),
)
case lmp.LmpFeaturesRes(features):
if connection := self.classic_connections.get(sender_address):
self.send_hci_packet(
hci.HCI_Read_Remote_Supported_Features_Complete_Event(
status=hci.HCI_ErrorCode.SUCCESS,
connection_handle=connection.handle,
lmp_features=features,
)
)
case lmp.LmpFeaturesReqExt(features_page, features):
# Calculate start/end of page
page_start = features_page * 8
page_end = page_start + 8
features_bytes = self.lmp_features_bytes
if page_start < len(features_bytes):
page_features = features_bytes[page_start:page_end].ljust(
8, b'\x00'
)
else:
page_features = b'\x00' * 8
self.send_lmp_packet(
sender_address,
lmp.LmpFeaturesResExt(
features_page=features_page,
max_features_page=len(features_bytes) // 8 - 1,
features=page_features,
),
)
case lmp.LmpFeaturesResExt(features_page, max_features_page, features):
if connection := self.classic_connections.get(sender_address):
self.send_hci_packet(
hci.HCI_Read_Remote_Extended_Features_Complete_Event(
status=hci.HCI_ErrorCode.SUCCESS,
connection_handle=connection.handle,
page_number=features_page,
maximum_page_number=max_features_page,
extended_lmp_features=features,
)
)
case _:
logger.error("!!! Unhandled packet: %s", packet)
@@ -1349,6 +1471,53 @@ class Controller:
return None
def on_hci_read_remote_supported_features_command(
self, command: hci.HCI_Read_Remote_Supported_Features_Command
) -> None:
'''
See Bluetooth spec Vol 4, Part E - 7.1.20 Read Remote Supported Features command
'''
handle = command.connection_handle
if not (connection := self.find_classic_connection_by_handle(handle)):
self._send_hci_command_status(
hci.HCI_ErrorCode.UNKNOWN_CONNECTION_IDENTIFIER_ERROR, command.op_code
)
return None
self._send_hci_command_status(hci.HCI_COMMAND_STATUS_PENDING, command.op_code)
self.send_lmp_packet(
connection.peer_address,
lmp.LmpFeaturesReq(self.lmp_features_bytes[:8]),
)
return None
def on_hci_read_remote_extended_features_command(
self, command: hci.HCI_Read_Remote_Extended_Features_Command
) -> None:
'''
See Bluetooth spec Vol 4, Part E - 7.1.21 Read Remote Extended Features command
'''
handle = command.connection_handle
if not (connection := self.find_classic_connection_by_handle(handle)):
self._send_hci_command_status(
hci.HCI_ErrorCode.UNKNOWN_CONNECTION_IDENTIFIER_ERROR, command.op_code
)
return None
self._send_hci_command_status(hci.HCI_COMMAND_STATUS_PENDING, command.op_code)
self.send_lmp_packet(
connection.peer_address,
lmp.LmpFeaturesReqExt(
features_page=command.page_number,
features=self.lmp_features_bytes[
command.page_number * 8 : (command.page_number + 1) * 8
],
),
)
return None
def on_hci_enhanced_setup_synchronous_connection_command(
self, command: hci.HCI_Enhanced_Setup_Synchronous_Connection_Command
) -> None:
@@ -1645,11 +1814,15 @@ class Controller:
return hci.HCI_StatusReturnParameters(hci.HCI_ErrorCode.SUCCESS)
def on_hci_write_simple_pairing_mode_command(
self, _command: hci.HCI_Write_Simple_Pairing_Mode_Command
self, command: hci.HCI_Write_Simple_Pairing_Mode_Command
) -> hci.HCI_StatusReturnParameters:
'''
See Bluetooth spec Vol 4, Part E - 7.3.59 Write Simple Pairing Mode Command
'''
if command.simple_pairing_mode:
self.lmp_features |= hci.LmpFeatureMask.SECURE_SIMPLE_PAIRING_HOST_SUPPORT
else:
self.lmp_features &= ~hci.LmpFeatureMask.SECURE_SIMPLE_PAIRING_HOST_SUPPORT
return hci.HCI_StatusReturnParameters(hci.HCI_ErrorCode.SUCCESS)
def on_hci_set_event_mask_page_2_command(
@@ -1670,16 +1843,23 @@ class Controller:
See Bluetooth spec Vol 4, Part E - 7.3.78 Write LE Host Support Command
'''
return hci.HCI_Read_LE_Host_Support_ReturnParameters(
status=hci.HCI_ErrorCode.SUCCESS, le_supported_host=1, unused=0
status=hci.HCI_ErrorCode.SUCCESS,
le_supported_host=(
1 if self.lmp_features & hci.LmpFeatureMask.LE_SUPPORTED_HOST else 0
),
unused=0,
)
def on_hci_write_le_host_support_command(
self, _command: hci.HCI_Write_LE_Host_Support_Command
self, command: hci.HCI_Write_LE_Host_Support_Command
) -> hci.HCI_StatusReturnParameters:
'''
See Bluetooth spec Vol 4, Part E - 7.3.79 Write LE Host Support Command
'''
# TODO / Just ignore for now
if command.le_supported_host:
self.lmp_features |= hci.LmpFeatureMask.LE_SUPPORTED_HOST
else:
self.lmp_features &= ~hci.LmpFeatureMask.LE_SUPPORTED_HOST
return hci.HCI_StatusReturnParameters(hci.HCI_ErrorCode.SUCCESS)
def on_hci_write_authenticated_payload_timeout_command(
@@ -1716,7 +1896,11 @@ class Controller:
See Bluetooth spec Vol 4, Part E - 7.4.2 Read Local Supported Commands Command
'''
return hci.HCI_Read_Local_Supported_Commands_ReturnParameters(
hci.HCI_ErrorCode.SUCCESS, supported_commands=self.supported_commands
hci.HCI_ErrorCode.SUCCESS,
supported_commands=sum(
hci.HCI_SUPPORTED_COMMANDS_MASKS.get(opcode, 0)
for opcode in self.supported_commands
).to_bytes(64, 'little'),
)
def on_hci_read_local_supported_features_command(
@@ -1726,7 +1910,8 @@ class Controller:
See Bluetooth spec Vol 4, Part E - 7.4.3 Read Local Supported Features Command
'''
return hci.HCI_Read_Local_Supported_Features_ReturnParameters(
hci.HCI_ErrorCode.SUCCESS, lmp_features=self.lmp_features[:8]
hci.HCI_ErrorCode.SUCCESS,
lmp_features=self.lmp_features_bytes[:8],
)
def on_hci_read_local_extended_features_command(
@@ -1735,18 +1920,19 @@ class Controller:
'''
See Bluetooth spec Vol 4, Part E - 7.4.4 Read Local Extended Features Command
'''
if command.page_number * 8 > len(self.lmp_features):
feature_bytes = self.lmp_features_bytes
if command.page_number * 8 > len(feature_bytes):
return hci.HCI_Read_Local_Extended_Features_ReturnParameters(
status=hci.HCI_ErrorCode.INVALID_COMMAND_PARAMETERS_ERROR,
page_number=command.page_number,
maximum_page_number=len(self.lmp_features) // 8 - 1,
maximum_page_number=len(feature_bytes) // 8 - 1,
extended_lmp_features=bytes(8),
)
return hci.HCI_Read_Local_Extended_Features_ReturnParameters(
status=hci.HCI_ErrorCode.SUCCESS,
page_number=command.page_number,
maximum_page_number=len(self.lmp_features) // 8 - 1,
extended_lmp_features=self.lmp_features[
maximum_page_number=len(feature_bytes) // 8 - 1,
extended_lmp_features=feature_bytes[
command.page_number * 8 : (command.page_number + 1) * 8
],
)

View File

@@ -19,6 +19,7 @@ from __future__ import annotations
import dataclasses
import enum
import functools
import struct
from collections.abc import Iterable
from typing import (
@@ -273,13 +274,8 @@ class UUID:
def parse_uuid_2(cls, uuid_as_bytes: bytes, offset: int) -> tuple[int, UUID]:
return offset + 2, cls.from_bytes(uuid_as_bytes[offset : offset + 2])
def to_bytes(self, force_128: bool = False) -> bytes:
'''
Serialize UUID in little-endian byte-order
'''
if not force_128:
return self.uuid_bytes
@functools.cached_property
def uuid_128_bytes(self) -> bytes:
match len(self.uuid_bytes):
case 2:
return self.BASE_UUID + self.uuid_bytes + bytes([0, 0])
@@ -290,6 +286,15 @@ class UUID:
case _:
assert False, "unreachable"
def to_bytes(self, force_128: bool = False) -> bytes:
'''
Serialize UUID in little-endian byte-order
'''
if not force_128:
return self.uuid_bytes
return self.uuid_128_bytes
def to_pdu_bytes(self) -> bytes:
'''
Convert to bytes for use in an ATT PDU.
@@ -318,7 +323,7 @@ class UUID:
def __eq__(self, other: object) -> bool:
if isinstance(other, UUID):
return self.to_bytes(force_128=True) == other.to_bytes(force_128=True)
return self.uuid_128_bytes == other.uuid_128_bytes
if isinstance(other, str):
return UUID(other) == self
@@ -326,7 +331,7 @@ class UUID:
return False
def __hash__(self) -> int:
return hash(self.uuid_bytes)
return hash(self.uuid_128_bytes)
def __str__(self) -> str:
result = self.to_hex_str(separator='-')
@@ -2111,13 +2116,10 @@ class AdvertisingData:
# -----------------------------------------------------------------------------
# Connection PHY
# -----------------------------------------------------------------------------
@dataclasses.dataclass
class ConnectionPHY:
def __init__(self, tx_phy, rx_phy):
self.tx_phy = tx_phy
self.rx_phy = rx_phy
def __str__(self):
return f'ConnectionPHY(tx_phy={self.tx_phy}, rx_phy={self.rx_phy})'
tx_phy: int
rx_phy: int
# -----------------------------------------------------------------------------

View File

@@ -1837,6 +1837,7 @@ class Connection(utils.CompositeEventEmitter):
self.pairing_peer_io_capability = None
self.pairing_peer_authentication_requirements = None
self.peer_le_features = hci.LeFeatureMask(0)
self.peer_classic_features = hci.LmpFeatureMask(0)
self.cs_configs = {}
self.cs_procedures = {}
@@ -2054,6 +2055,15 @@ class Connection(utils.CompositeEventEmitter):
self.peer_le_features = await self.device.get_remote_le_features(self)
return self.peer_le_features
async def get_remote_classic_features(self) -> hci.LmpFeatureMask:
"""[Classic Only] Reads remote LMP supported features.
Returns:
LMP features supported by the remote device.
"""
self.peer_classic_features = await self.device.get_remote_classic_features(self)
return self.peer_classic_features
def on_att_mtu_update(self, mtu: int):
logger.debug(
f'*** Connection ATT MTU Update: [0x{self.handle:04X}] '
@@ -2149,6 +2159,7 @@ class DeviceConfiguration:
)
eatt_enabled: bool = False
gatt_services: list[dict[str, Any]] = field(init=False)
smp_debug_mode: bool = False
def __post_init__(self) -> None:
self.gatt_services = []
@@ -2561,6 +2572,7 @@ class Device(utils.CompositeEventEmitter):
),
),
)
self.smp_manager.debug_mode = self.config.smp_debug_mode
self.l2cap_channel_manager.register_fixed_channel(smp.SMP_CID, self.on_smp_pdu)
@@ -5281,6 +5293,77 @@ class Device(utils.CompositeEventEmitter):
)
return await read_feature_future
async def get_remote_classic_features(
self, connection: Connection
) -> hci.LmpFeatureMask:
"""[Classic Only] Reads remote LE supported features.
Args:
handle: connection handle to read LMP features.
Returns:
LMP features supported by the remote device.
"""
with closing(utils.EventWatcher()) as watcher:
read_feature_future: asyncio.Future[tuple[int, int]] = (
asyncio.get_running_loop().create_future()
)
read_features = hci.LmpFeatureMask(0)
current_page_number = 0
@watcher.on(self.host, 'classic_remote_features')
def on_classic_remote_features(
handle: int,
status: int,
features: int,
page_number: int,
max_page_number: int,
) -> None:
if handle != connection.handle:
logger.warning(
"Received classic_remote_features for wrong handle, expected=0x%04X, got=0x%04X",
connection.handle,
handle,
)
return
if page_number != current_page_number:
logger.warning(
"Received classic_remote_features for wrong page, expected=%d, got=%d",
current_page_number,
page_number,
)
return
if status == hci.HCI_ErrorCode.SUCCESS:
read_feature_future.set_result((features, max_page_number))
else:
read_feature_future.set_exception(hci.HCI_Error(status))
await self.send_async_command(
hci.HCI_Read_Remote_Supported_Features_Command(
connection_handle=connection.handle
)
)
new_features, max_page_number = await read_feature_future
read_features |= new_features
if not (read_features & hci.LmpFeatureMask.EXTENDED_FEATURES):
return read_features
while current_page_number <= max_page_number:
read_feature_future = asyncio.get_running_loop().create_future()
await self.send_async_command(
hci.HCI_Read_Remote_Extended_Features_Command(
connection_handle=connection.handle,
page_number=current_page_number,
)
)
new_features, max_page_number = await read_feature_future
read_features |= new_features << (current_page_number * 64)
current_page_number += 1
return read_features
@utils.experimental('Only for testing.')
async def get_remote_cs_capabilities(
self, connection: Connection
@@ -5535,8 +5618,8 @@ class Device(utils.CompositeEventEmitter):
async def notify_subscriber(
self,
connection: Connection,
attribute: Attribute,
value: Any | None = None,
attribute: Attribute[_T],
value: _T | None = None,
force: bool = False,
) -> None:
"""
@@ -5555,7 +5638,7 @@ class Device(utils.CompositeEventEmitter):
await self.gatt_server.notify_subscriber(connection, attribute, value, force)
async def notify_subscribers(
self, attribute: Attribute, value: Any | None = None, force: bool = False
self, attribute: Attribute[_T], value: _T | None = None, force: bool = False
) -> None:
"""
Send a notification to all the subscribers of an attribute.
@@ -5574,8 +5657,8 @@ class Device(utils.CompositeEventEmitter):
async def indicate_subscriber(
self,
connection: Connection,
attribute: Attribute,
value: Any | None = None,
attribute: Attribute[_T],
value: _T | None = None,
force: bool = False,
):
"""
@@ -5596,7 +5679,7 @@ class Device(utils.CompositeEventEmitter):
await self.gatt_server.indicate_subscriber(connection, attribute, value, force)
async def indicate_subscribers(
self, attribute: Attribute, value: Any | None = None, force: bool = False
self, attribute: Attribute[_T], value: _T | None = None, force: bool = False
):
"""
Send an indication to all the subscribers of an attribute.

View File

@@ -67,6 +67,8 @@ GATT_SERVER_DEFAULT_MAX_MTU = 517
# Helpers
# -----------------------------------------------------------------------------
_T = TypeVar('_T')
def _bearer_id(bearer: att.Bearer) -> str:
if att.is_enhanced_bearer(bearer):
@@ -369,8 +371,8 @@ class Server(utils.EventEmitter):
async def notify_subscriber(
self,
bearer: att.Bearer,
attribute: att.Attribute,
value: bytes | None = None,
attribute: att.Attribute[_T],
value: _T | None = None,
force: bool = False,
) -> None:
if att.is_enhanced_bearer(bearer) or force:
@@ -390,8 +392,8 @@ class Server(utils.EventEmitter):
async def _notify_single_subscriber(
self,
bearer: att.Bearer,
attribute: att.Attribute,
value: bytes | None,
attribute: att.Attribute[_T],
value: _T | None,
force: bool,
) -> None:
# Check if there's a subscriber
@@ -411,19 +413,19 @@ class Server(utils.EventEmitter):
return
# Get or encode the value
value = (
value_as_bytes = (
await attribute.read_value(bearer)
if value is None
else attribute.encode_value(value)
)
# Truncate if needed
if len(value) > bearer.att_mtu - 3:
value = value[: bearer.att_mtu - 3]
if len(value_as_bytes) > bearer.att_mtu - 3:
value_as_bytes = value_as_bytes[: bearer.att_mtu - 3]
# Notify
notification = att.ATT_Handle_Value_Notification(
attribute_handle=attribute.handle, attribute_value=value
attribute_handle=attribute.handle, attribute_value=value_as_bytes
)
logger.debug(f'GATT Notify from server: {_bearer_id(bearer)} {notification}')
self.send_gatt_pdu(bearer, bytes(notification))
@@ -431,8 +433,8 @@ class Server(utils.EventEmitter):
async def indicate_subscriber(
self,
bearer: att.Bearer,
attribute: att.Attribute,
value: bytes | None = None,
attribute: att.Attribute[_T],
value: _T | None = None,
force: bool = False,
) -> None:
if att.is_enhanced_bearer(bearer) or force:
@@ -452,8 +454,8 @@ class Server(utils.EventEmitter):
async def _indicate_single_bearer(
self,
bearer: att.Bearer,
attribute: att.Attribute,
value: bytes | None,
attribute: att.Attribute[_T],
value: _T | None,
force: bool,
) -> None:
# Check if there's a subscriber
@@ -473,19 +475,19 @@ class Server(utils.EventEmitter):
return
# Get or encode the value
value = (
value_as_bytes = (
await attribute.read_value(bearer)
if value is None
else attribute.encode_value(value)
)
# Truncate if needed
if len(value) > bearer.att_mtu - 3:
value = value[: bearer.att_mtu - 3]
if len(value_as_bytes) > bearer.att_mtu - 3:
value_as_bytes = value_as_bytes[: bearer.att_mtu - 3]
# Indicate
indication = att.ATT_Handle_Value_Indication(
attribute_handle=attribute.handle, attribute_value=value
attribute_handle=attribute.handle, attribute_value=value_as_bytes
)
logger.debug(f'GATT Indicate from server: {_bearer_id(bearer)} {indication}')
@@ -510,8 +512,8 @@ class Server(utils.EventEmitter):
async def _notify_or_indicate_subscribers(
self,
indicate: bool,
attribute: att.Attribute,
value: bytes | None = None,
attribute: att.Attribute[_T],
value: _T | None = None,
force: bool = False,
) -> None:
# Get all the bearers for which there's at least one subscription
@@ -537,8 +539,8 @@ class Server(utils.EventEmitter):
async def notify_subscribers(
self,
attribute: att.Attribute,
value: bytes | None = None,
attribute: att.Attribute[_T],
value: _T | None = None,
force: bool = False,
):
return await self._notify_or_indicate_subscribers(
@@ -547,8 +549,8 @@ class Server(utils.EventEmitter):
async def indicate_subscribers(
self,
attribute: att.Attribute,
value: bytes | None = None,
attribute: att.Attribute[_T],
value: _T | None = None,
force: bool = False,
):
return await self._notify_or_indicate_subscribers(True, attribute, value, force)

View File

@@ -68,6 +68,8 @@ class HfpProtocolError(ProtocolError):
# -----------------------------------------------------------------------------
class HfpProtocol:
MAX_BUFFER_SIZE: ClassVar[int] = 65536
dlc: rfcomm.DLC
buffer: str
lines: collections.deque
@@ -84,10 +86,19 @@ class HfpProtocol:
def feed(self, data: bytes | str) -> None:
# Convert the data to a string if needed
if isinstance(data, bytes):
data = data.decode('utf-8')
data = data.decode('utf-8', errors='replace')
logger.debug(f'<<< Data received: {data}')
# Drop incoming data if it would overflow the buffer; keep existing
# partial packet state intact so a future clean packet can still parse.
if len(self.buffer) + len(data) > self.MAX_BUFFER_SIZE:
logger.warning(
'HFP buffer overflow (>%d bytes), dropping incoming data',
self.MAX_BUFFER_SIZE,
)
return
# Add to the buffer and look for lines
self.buffer += data
while (separator := self.buffer.find('\r')) >= 0:

View File

@@ -692,10 +692,8 @@ class Host(utils.EventEmitter):
finally:
self.pending_command = None
self.pending_response = None
if (
response is not None
and response.num_hci_command_packets
and self.command_semaphore.locked()
if response is None or (
response.num_hci_command_packets and self.command_semaphore.locked()
):
self.command_semaphore.release()
@@ -1660,6 +1658,19 @@ class Host(utils.EventEmitter):
'connection_encryption_failure', event.connection_handle, event.status
)
def on_hci_read_remote_supported_features_complete_event(
self, event: hci.HCI_Read_Remote_Supported_Features_Complete_Event
) -> None:
# Notify the client
self.emit(
'classic_remote_features',
event.connection_handle,
event.status,
int.from_bytes(event.lmp_features, 'little'),
0, # page number
0, # max page number
)
def on_hci_encryption_change_v2_event(
self, event: hci.HCI_Encryption_Change_V2_Event
):
@@ -1816,6 +1827,18 @@ class Host(utils.EventEmitter):
rssi,
)
def on_hci_read_remote_extended_features_complete_event(
self, event: hci.HCI_Read_Remote_Extended_Features_Complete_Event
):
self.emit(
'classic_remote_features',
event.connection_handle,
event.status,
int.from_bytes(event.extended_lmp_features, 'little'),
event.page_number,
event.maximum_page_number,
)
def on_hci_extended_inquiry_result_event(
self, event: hci.HCI_Extended_Inquiry_Result_Event
):

View File

@@ -27,6 +27,7 @@ import dataclasses
import json
import logging
import os
import pathlib
from typing import TYPE_CHECKING, Any
from typing_extensions import Self
@@ -248,29 +249,26 @@ class JsonKeyStore(KeyStore):
DEFAULT_NAMESPACE = '__DEFAULT__'
DEFAULT_BASE_NAME = "keys"
def __init__(self, namespace, filename=None):
self.namespace = namespace if namespace is not None else self.DEFAULT_NAMESPACE
def __init__(
self, namespace: str | None = None, filename: str | None = None
) -> None:
self.namespace = namespace or self.DEFAULT_NAMESPACE
if filename is None:
# Use a default for the current user
# Import here because this may not exist on all platforms
# pylint: disable=import-outside-toplevel
import appdirs
self.directory_name = os.path.join(
appdirs.user_data_dir(self.APP_NAME, self.APP_AUTHOR), self.KEYS_DIR
)
base_name = self.DEFAULT_BASE_NAME if namespace is None else self.namespace
json_filename = (
f'{base_name}.json'.lower().replace(':', '-').replace('/p', '-p')
)
self.filename = os.path.join(self.directory_name, json_filename)
if filename:
self.filename = pathlib.Path(filename).resolve()
self.directory_name = self.filename.parent
else:
self.filename = filename
self.directory_name = os.path.dirname(os.path.abspath(self.filename))
import platformdirs # Deferred import
logger.debug(f'JSON keystore: {self.filename}')
base_dir = platformdirs.user_data_path(self.APP_NAME, self.APP_AUTHOR)
self.directory_name = base_dir / self.KEYS_DIR
base_name = self.namespace if namespace else self.DEFAULT_BASE_NAME
safe_name = base_name.lower().replace(':', '-').replace('/', '-')
self.filename = self.directory_name / f"{safe_name}.json"
logger.debug('JSON keystore: %s', self.filename)
@classmethod
def from_device(
@@ -293,7 +291,9 @@ class JsonKeyStore(KeyStore):
return cls(namespace, filename)
async def load(self):
async def load(
self,
) -> tuple[dict[str, dict[str, dict[str, Any]]], dict[str, dict[str, Any]]]:
# Try to open the file, without failing. If the file does not exist, it
# will be created upon saving.
try:
@@ -312,17 +312,17 @@ class JsonKeyStore(KeyStore):
return next(iter(db.items()))
# Finally, just create an empty key map for the namespace
key_map = {}
key_map: dict[str, dict[str, Any]] = {}
db[self.namespace] = key_map
return (db, key_map)
async def save(self, db):
async def save(self, db: dict[str, dict[str, dict[str, Any]]]) -> None:
# Create the directory if it doesn't exist
if not os.path.exists(self.directory_name):
os.makedirs(self.directory_name, exist_ok=True)
if not self.directory_name.exists():
self.directory_name.mkdir(parents=True, exist_ok=True)
# Save to a temporary file
temp_filename = self.filename + '.tmp'
temp_filename = self.filename.with_name(self.filename.name + ".tmp")
with open(temp_filename, 'w', encoding='utf-8') as output:
json.dump(db, output, sort_keys=True, indent=4)
@@ -334,16 +334,16 @@ class JsonKeyStore(KeyStore):
del key_map[name]
await self.save(db)
async def update(self, name, keys):
async def update(self, name: str, keys: PairingKeys) -> None:
db, key_map = await self.load()
key_map.setdefault(name, {}).update(keys.to_dict())
await self.save(db)
async def get_all(self):
async def get_all(self) -> list[tuple[str, PairingKeys]]:
_, key_map = await self.load()
return [(name, PairingKeys.from_dict(keys)) for (name, keys) in key_map.items()]
async def delete_all(self):
async def delete_all(self) -> None:
db, key_map = await self.load()
key_map.clear()
await self.save(db)

View File

@@ -322,3 +322,38 @@ class LmpNameRes(Packet):
name_offset: int = field(metadata=hci.metadata(2))
name_length: int = field(metadata=hci.metadata(3))
name_fregment: bytes = field(metadata=hci.metadata('*'))
@Packet.subclass
@dataclass
class LmpFeaturesReq(Packet):
opcode = Opcode.LMP_FEATURES_REQ
features: bytes = field(metadata=hci.metadata(8))
@Packet.subclass
@dataclass
class LmpFeaturesRes(Packet):
opcode = Opcode.LMP_FEATURES_RES
features: bytes = field(metadata=hci.metadata(8))
@Packet.subclass
@dataclass
class LmpFeaturesReqExt(Packet):
opcode = Opcode.LMP_FEATURES_REQ_EXT
features_page: int = field(metadata=hci.metadata(1))
features: bytes = field(metadata=hci.metadata(8))
@Packet.subclass
@dataclass
class LmpFeaturesResExt(Packet):
opcode = Opcode.LMP_FEATURES_RES_EXT
features_page: int = field(metadata=hci.metadata(1))
max_features_page: int = field(metadata=hci.metadata(1))
features: bytes = field(metadata=hci.metadata(8))

File diff suppressed because it is too large Load Diff

View File

@@ -36,6 +36,7 @@ from bumble.colors import color
from bumble.core import (
AdvertisingData,
InvalidArgumentError,
InvalidPacketError,
PhysicalTransport,
ProtocolError,
)
@@ -178,6 +179,16 @@ class AuthReq(hci.SpecableFlag):
SMP_CTKD_H7_LEBR_SALT = bytes.fromhex('000000000000000000000000746D7031')
SMP_CTKD_H7_BRLE_SALT = bytes.fromhex('000000000000000000000000746D7032')
# Diffie-Hellman private / public key pair in Debug Mode (Core - Vol. 3, Part H)
SMP_DEBUG_KEY_PRIVATE = bytes.fromhex(
'3f49f6d4 a3c55f38 74c9b3e3 d2103f50 4aff607b eb40b799 5899b8a6 cd3c1abd'
)
SMP_DEBUG_KEY_PUBLIC_X = bytes.fromhex(
'20b003d2 f297be2c 5e2c83a7 e9f9a5b9 eff49111 acf4fddb cc030148 0e359de6'
)
SMP_DEBUG_KEY_PUBLIC_Y= bytes.fromhex(
'dc809c49 652aeb6d 63329abf 5a52155c 766345c2 8fed3024 741c8ed0 1589d28b'
)
# fmt: on
# pylint: enable=line-too-long
# pylint: disable=invalid-name
@@ -205,6 +216,8 @@ class SMP_Command:
@classmethod
def from_bytes(cls, pdu: bytes) -> SMP_Command:
if not pdu:
raise InvalidPacketError("Empty SMP PDU")
code = CommandCode(pdu[0])
subclass = SMP_Command.smp_classes.get(code)
@@ -1919,6 +1932,7 @@ class Manager(utils.EventEmitter):
self._ecc_key = None
self.pairing_config_factory = pairing_config_factory
self.session_proxy = Session
self.debug_mode = False
def send_command(self, connection: Connection, command: SMP_Command) -> None:
logger.debug(
@@ -1965,6 +1979,13 @@ class Manager(utils.EventEmitter):
@property
def ecc_key(self) -> crypto.EccKey:
if self.debug_mode:
# Core - Vol 3, Part H:
# When the Security Manager is placed in a Debug mode it shall use the
# following Diffie-Hellman private / public key pair:
debug_key = crypto.EccKey.from_private_key_bytes(SMP_DEBUG_KEY_PRIVATE)
return debug_key
if self._ecc_key is None:
self._ecc_key = crypto.EccKey.generate()
assert self._ecc_key

View File

@@ -83,6 +83,7 @@ async def main() -> None:
GATT_DEVICE_INFORMATION_SERVICE, [manufacturer_name_characteristic]
)
server_device.add_service(device_info_service)
await server_device.start_advertising()
# Connect the client to the server
connection = await client_device.connect(server_device.random_address)

View File

@@ -13,13 +13,12 @@ authors = [{ name = "Google", email = "bumble-dev@google.com" }]
requires-python = ">=3.10"
dependencies = [
"aiohttp ~= 3.8; platform_system!='Emscripten'",
"appdirs >= 1.4; platform_system!='Emscripten'",
"click >= 8.1.3; platform_system!='Emscripten'",
"cryptography >= 44.0.3; platform_system!='Emscripten' and platform_system!='Android'",
# Pyodide bundles a version of cryptography that is built for wasm, which may not match the
# versions available on PyPI. Relax the version requirement since it's better than being
# completely unable to import the package in case of version mismatch.
"cryptography >= 44.0.3; platform_system=='Emscripten'",
"cryptography >= 39.0.0; platform_system=='Emscripten'",
# Android wheels for cryptography are not yet available on PyPI, so chaquopy uses
# the builds from https://chaquo.com/pypi-13.1/cryptography/. But these are not regually
# updated. Relax the version requirement since it's better than being completely unable

View File

@@ -120,6 +120,31 @@ def test_messages(message: avdtp.Message):
assert message.payload == parsed.payload
# -----------------------------------------------------------------------------
@pytest.mark.parametrize(
'pdu',
(
b'', # empty PDU — would IndexError on pdu[0]
b'\x00', # 1-byte SINGLE_PACKET — would IndexError on pdu[1]
b'\x04', # 1-byte START_PACKET — would IndexError on pdu[1]
b'\x44\x10', # 2-byte START_PACKET — would IndexError on pdu[2]
),
)
def test_message_assembler_truncated_pdu(pdu: bytes):
"""Truncated AVDTP PDUs from a remote peer must NOT raise IndexError —
same DoS class as #912 (ATT empty PDU). The assembler is required to
log + drop and stay alive so the L2CAP channel survives."""
completed = []
def callback(transaction_label, message):
completed.append((transaction_label, message))
assembler = avdtp.MessageAssembler(callback)
# Must not raise; nothing should be delivered to callback either.
assembler.on_pdu(pdu)
assert not completed
# -----------------------------------------------------------------------------
def test_rtp():
packet = bytes.fromhex(

View File

@@ -73,6 +73,14 @@ def test_uuid_to_hex_str() -> None:
)
# -----------------------------------------------------------------------------
def test_uuid_hash() -> None:
uuid = UUID("1234")
uuid_128_bytes = UUID.from_bytes(uuid.to_bytes(force_128=True))
assert uuid in {uuid_128_bytes}
assert uuid_128_bytes in {uuid}
# -----------------------------------------------------------------------------
def test_appearance() -> None:
a = Appearance(Appearance.Category.COMPUTER, Appearance.ComputerSubcategory.LAPTOP)

View File

@@ -826,6 +826,22 @@ async def test_remote_name_request():
assert actual_name == expected_name
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_get_remote_classic_features():
devices = TwoDevices()
devices[0].classic_enabled = True
devices[1].classic_enabled = True
await devices[0].power_on()
await devices[1].power_on()
connection = await devices[0].connect_classic(devices[1].public_address)
assert (
await asyncio.wait_for(connection.get_remote_classic_features(), _TIMEOUT)
== devices.controllers[1].lmp_features
)
# -----------------------------------------------------------------------------
async def run_test_device():
await test_device_connect_parallel()

View File

@@ -22,6 +22,7 @@ import unittest.mock
import pytest
from bumble import controller, hci
from bumble.controller import Controller
from bumble.hci import (
HCI_AclDataPacket,
@@ -49,34 +50,27 @@ logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
@pytest.mark.parametrize(
'supported_commands, lmp_features',
'supported_commands, max_lmp_features_page_number',
[
(
# Default commands
'2000800000c000000000e4000000a822000000000000040000f7ffff7f000000'
'30f0f9ff01008004000000000000000000000000000000000000000000000000',
# Only LE LMP feature
'0000000060000000',
),
(controller.Controller.supported_commands, 0),
(
# All commands
'ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff'
'ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff',
set(hci.HCI_Command.command_names.keys()),
# 3 pages of LMP features
'000102030405060708090A0B0C0D0E0F011112131415161718191A1B1C1D1E1F',
2,
),
],
)
async def test_reset(supported_commands: str, lmp_features: str):
async def test_reset(supported_commands: set[int], max_lmp_features_page_number: int):
controller = Controller('C')
controller.supported_commands = bytes.fromhex(supported_commands)
controller.lmp_features = bytes.fromhex(lmp_features)
controller.supported_commands = supported_commands
controller.lmp_features_max_page_number = max_lmp_features_page_number
host = Host(controller, AsyncPipeSink(controller))
await host.reset()
assert host.local_lmp_features == int.from_bytes(
bytes.fromhex(lmp_features), 'little'
assert host.local_lmp_features == (
controller.lmp_features & ~(1 << (64 * max_lmp_features_page_number + 1))
)
@@ -177,14 +171,15 @@ class Source:
class Sink:
response: HCI_Event
response: HCI_Event | None
def __init__(self, source: Source, response: HCI_Event) -> None:
def __init__(self, source: Source, response: HCI_Event | None) -> None:
self.source = source
self.response = response
def on_packet(self, packet: bytes) -> None:
self.source.sink.on_packet(bytes(self.response))
if self.response is not None:
self.source.sink.on_packet(bytes(self.response))
@pytest.mark.asyncio
@@ -234,6 +229,23 @@ async def test_send_sync_command() -> None:
assert isinstance(response3.return_parameters, HCI_GenericReturnParameters)
@pytest.mark.asyncio
async def test_send_sync_command_timeout() -> None:
source = Source()
sink = Sink(source, None)
host = Host(source, sink)
host.ready = True
with pytest.raises(asyncio.TimeoutError):
await host.send_sync_command(HCI_Reset_Command(), response_timeout=0.01)
# The sending semaphore should have been released, so this should not block
# indefinitely
with pytest.raises(asyncio.TimeoutError):
await host.send_sync_command(hci.HCI_Reset_Command(), response_timeout=0.01)
@pytest.mark.asyncio
async def test_send_async_command() -> None:
source = Source()

View File

@@ -21,6 +21,7 @@ import logging
import os
import pathlib
import tempfile
from unittest import mock
import pytest
@@ -179,11 +180,55 @@ async def test_default_namespace(temporary_file):
assert keys.irk.value == bytes.fromhex('e7b2543b206e4e46b44f9e51dad22bd1')
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_no_filename(tmp_path):
import platformdirs
with mock.patch.object(platformdirs, 'user_data_path', return_value=tmp_path):
# Case 1: no namespace, no filename
keystore = JsonKeyStore(None, None)
expected_directory = tmp_path / 'Pairing'
expected_filename = expected_directory / 'keys.json'
assert keystore.directory_name == expected_directory
assert keystore.filename == expected_filename
# Save some data
keys = PairingKeys()
ltk = bytes([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15])
keys.ltk = PairingKeys.Key(ltk)
await keystore.update('foo', keys)
assert expected_filename.exists()
# Load back
keystore2 = JsonKeyStore(None, None)
foo = await keystore2.get('foo')
assert foo is not None
assert foo.ltk.value == ltk
# Case 2: namespace, no filename
keystore3 = JsonKeyStore('my:namespace', None)
# safe_name = 'my-namespace' (lower is already 'my:namespace', then replace ':' with '-')
expected_filename3 = expected_directory / 'my-namespace.json'
assert keystore3.filename == expected_filename3
# Save some data
await keystore3.update('bar', keys)
assert expected_filename3.exists()
# Load back
keystore4 = JsonKeyStore('my:namespace', None)
bar = await keystore4.get('bar')
assert bar is not None
assert bar.ltk.value == ltk
# -----------------------------------------------------------------------------
async def run_tests():
await test_basic()
await test_parsing()
await test_default_namespace()
await test_no_filename()
# -----------------------------------------------------------------------------

View File

@@ -18,9 +18,11 @@
import asyncio
import logging
import os
import re
import pytest
from bumble import sdp
from bumble.core import BT_L2CAP_PROTOCOL_ID, UUID
from bumble.sdp import (
SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID,
@@ -206,6 +208,16 @@ def sdp_records(record_count=1):
}
# -----------------------------------------------------------------------------
def test_pdu_parameter_length(caplog) -> None:
caplog.set_level(logging.WARNING)
pdu = sdp.SDP_ErrorResponse(
transaction_id=0, error_code=sdp.ErrorCode.INVALID_SDP_VERSION
)
assert sdp.SDP_PDU.from_bytes(bytes(pdu)) == pdu
assert not re.search(r"Expect \d+ bytes, got \d+", caplog.text)
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_service_search():
@@ -428,3 +440,43 @@ async def run():
if __name__ == '__main__':
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
asyncio.run(run())
# -----------------------------------------------------------------------------
def test_nested_sequence_recursion_guard():
"""Regression test: deeply-nested SDP SEQUENCE/ALTERNATIVE must not crash
the parser with RecursionError. Instead a ValueError is raised once the
configured nesting limit is exceeded.
Root cause: DataElement.from_bytes -> list_from_bytes -> (constructor
dispatching back to list_from_bytes for SEQUENCE/ALTERNATIVE) recursed
without a depth limit. A malicious SDP peer could craft a PDU exceeding
Pythons default recursion limit (~1000 frames) and crash the host.
"""
# Build nested SEQUENCE payload with tag 0x36 (SEQUENCE, 2-byte length).
inner = b"\x35\x00" # empty SEQUENCE terminator
for _ in range(1500):
size = len(inner)
if size >= 65535:
break
inner = bytes([0x36, (size >> 8) & 0xFF, size & 0xFF]) + inner
with pytest.raises(ValueError, match="nesting exceeds max depth"):
DataElement.from_bytes(inner)
def test_nested_sequence_within_limit_still_works():
"""Nested-but-reasonable SDP SEQUENCEs must still parse correctly."""
leaf = DataElement.unsigned_integer(1, value_size=2)
payload = leaf
for _ in range(16): # under the 32-depth limit
payload = DataElement.sequence([payload])
raw = bytes(payload)
parsed = DataElement.from_bytes(raw)
# Walk back down to confirm structural integrity preserved
cur = parsed
for _ in range(16):
assert cur.type == DataElement.SEQUENCE
cur = cur.value[0]
assert cur.type == DataElement.UNSIGNED_INTEGER
assert cur.value == 1

View File

@@ -24,7 +24,7 @@ import pytest
from bumble import crypto, pairing, smp
from bumble.core import AdvertisingData
from bumble.crypto import EccKey, aes_cmac, ah, c1, f4, f5, f6, g2, h6, h7, s1
from bumble.device import Device
from bumble.device import Device, DeviceConfiguration
from bumble.hci import Address
from bumble.pairing import LeRole, OobData, OobSharedData
@@ -312,3 +312,17 @@ async def test_send_identity_address_command(
actual_command = mock_method.call_args.args[0]
assert actual_command.addr_type == expected_identity_address.address_type
assert actual_command.bd_addr == expected_identity_address
@pytest.mark.asyncio
async def test_smp_debug_mode():
config = DeviceConfiguration(smp_debug_mode=True)
device = Device(config=config)
assert device.smp_manager.ecc_key.x == smp.SMP_DEBUG_KEY_PUBLIC_X
assert device.smp_manager.ecc_key.y == smp.SMP_DEBUG_KEY_PUBLIC_Y
device.smp_manager.debug_mode = False
assert not device.smp_manager.ecc_key.x == smp.SMP_DEBUG_KEY_PUBLIC_X
assert not device.smp_manager.ecc_key.y == smp.SMP_DEBUG_KEY_PUBLIC_Y

View File

@@ -3,7 +3,7 @@
<head>
<link rel="stylesheet" href="https://fonts.googleapis.com/icon?family=Material+Icons">
<link rel="stylesheet" href="https://fonts.googleapis.com/css2?family=Material+Symbols+Outlined:opsz,wght,FILL,GRAD@24,400,0,0" />
<script src="https://cdn.jsdelivr.net/pyodide/v0.24.1/full/pyodide.js"></script>
<script src="https://cdn.jsdelivr.net/pyodide/v0.26.2/full/pyodide.js"></script>
<script type="module" src="../ui.js"></script>
<script type="module" src="heart_rate_monitor.js"></script>
<style>

View File

@@ -3,7 +3,7 @@
<head>
<link rel="stylesheet" href="https://fonts.googleapis.com/icon?family=Material+Icons">
<link rel="stylesheet" href="scanner.css">
<script src="https://cdn.jsdelivr.net/pyodide/v0.24.1/full/pyodide.js"></script>
<script src="https://cdn.jsdelivr.net/pyodide/v0.26.2/full/pyodide.js"></script>
<script type="module" src="../ui.js"></script>
<script type="module" src="scanner.js"></script>
</style>

View File

@@ -4,7 +4,7 @@
<title>Bumble Speaker</title>
<link rel="stylesheet" href="https://fonts.googleapis.com/icon?family=Material+Icons">
<link rel="stylesheet" href="speaker.css">
<script src="https://cdn.jsdelivr.net/pyodide/v0.24.1/full/pyodide.js"></script>
<script src="https://cdn.jsdelivr.net/pyodide/v0.26.2/full/pyodide.js"></script>
<script type="module" src="speaker.js"></script>
<script type="module" src="../ui.js"></script>
</head>