Compare commits

...

34 Commits

Author SHA1 Message Date
Josh Wu
27d02ef18d Merge pull request #913 from zxzxwu/sdp
SDP: Fix wrong parameter size
2026-04-20 16:32:37 +08:00
Josh Wu
c0725e2a4a SDP: Fix wrong parameter size 2026-04-20 16:23:19 +08:00
Josh Wu
bf0784dde4 Merge pull request #912 from ibondarenko1/fix/empty-pdu-crash
fix: add input validation to prevent remote crash from empty/malforme…
2026-04-20 14:36:48 +08:00
Ievgen Bondarenko
444f43f6a3 fix: address review feedback - use InvalidPacketError and abort on buffer overflow
- att.py: raise core.InvalidPacketError instead of generic ValueError
- smp.py: raise core.InvalidPacketError instead of generic ValueError
- hfp.py: add MAX_BUFFER_SIZE class constant (64KB)
- hfp.py: drop incoming data when it would overflow buffer instead of
  truncating, preserving existing partial-packet state

Per review comments on PR #912 by @zxzxwu.
2026-04-16 11:24:09 -07:00
Gilles Boccon-Gibod
2420c47cf1 Merge pull request #911 from google/gbg/issue-910
release command semaphore after timeout
2026-04-16 18:11:57 +02:00
Ievgen Bondarenko
0a78e7506b fix: add input validation to prevent remote crash from empty/malformed PDUs
Add length checks in from_bytes() for ATT and SMP protocol parsers
to prevent IndexError crashes from empty PDUs sent by remote Bluetooth
devices. Also add buffer size limit and UTF-8 error handling in HFP
protocol to prevent memory exhaustion and decode crashes.

- bumble/att.py: validate PDU is non-empty before accessing pdu[0]
- bumble/smp.py: validate PDU is non-empty before accessing pdu[0]
- bumble/hfp.py: limit buffer to 64KB, handle invalid UTF-8 gracefully

These issues can be triggered by a remote Bluetooth device sending
malformed packets, causing denial of service on the host.
2026-04-16 01:43:41 -07:00
Gilles Boccon-Gibod
f7cc6f6657 release command semaphore after timeout 2026-04-15 16:54:54 +02:00
Josh Wu
f2824ee6b8 Merge pull request #907 from zxzxwu/example-gatt-client-and-server
Advertise in run_gatt_client_and_server
2026-04-13 16:31:19 +08:00
Josh Wu
7188ef08de Advertise in run_gatt_client_and_server 2026-04-13 15:31:32 +08:00
Josh Wu
3ded9014d3 Merge pull request #905 from markusjellitsch/feature/debug-keys
Feature  - Add SMP Debug Mode  (Core Vol.3, Part H)
2026-04-09 15:36:42 +08:00
Josh Wu
b6125bdfb1 Merge pull request #904 from zxzxwu/keys
Keys: Remove appdirs and improve typing
2026-04-09 15:30:39 +08:00
Markus Jellitsch
dc17f4f1ca remove asserts 2026-04-08 20:58:47 +02:00
Markus Jellitsch
3f65380c20 remove comment 2026-04-03 23:19:43 +02:00
Markus Jellitsch
25a0056ecc remove uncommented line 2026-04-03 23:08:16 +02:00
Markus Jellitsch
85f6b10983 run formatter 2026-04-03 23:06:24 +02:00
Markus Jellitsch
e85f041e9d add test for smp debug mode 2026-04-03 23:04:48 +02:00
Markus Jellitsch
ee09e6f10d add smp_debug_mode config flag to enable debug keys during device init 2026-04-03 23:03:51 +02:00
Markus Jellitsch
c3daf4a7e1 implement debug mode for smp manager using defined private / public key pair 2026-04-03 23:02:15 +02:00
Josh Wu
3af623be7e Keys: Remove appdirs and improve typing 2026-03-31 16:25:15 +08:00
Gilles Boccon-Gibod
4e76d3057b Merge pull request #903 from sameer/micropip-install-crypto-issue
Fix Hive demo install failure
2026-03-28 15:35:32 -04:00
Sameer Puri
eda7360222 Upgrade pyodide in web fixes import error
Prior to this, these web pages fail to load with
`ImportError: cannot import name 'TypeIs' from 'typing_extensions'
(/lib/python3.11/site-packages/typing_extensions.py)`
2026-03-26 18:39:07 +00:00
Sameer Puri
a4c15c00de Downgrade cryptography, fixes micropip failure
Prior to this, these web pages fail to load with

`ValueError: Can't find a pure Python 3 wheel for 'cryptography>=44.0.3;
platform_system == "Emscripten"'.`
2026-03-26 18:38:12 +00:00
Josh Wu
cba4df4aef Merge pull request #900 from zxzxwu/lmp-feat
Add read classic remote features support
2026-03-24 14:03:29 +08:00
Josh Wu
ceb8b448e9 Merge pull request #901 from zxzxwu/rust
Add --locked to allow installing cargo-all-features
2026-03-21 03:45:47 +08:00
Josh Wu
311b716d5c Add --locked to allow installing cargo-all-features 2026-03-20 18:44:49 +08:00
Josh Wu
0ba9e5c317 Add read classic remote features support 2026-03-20 18:32:52 +08:00
Josh Wu
3517225b62 Merge pull request #898 from zxzxwu/phy
Make ConnectionPHY dataclass
2026-03-13 12:04:45 +08:00
Josh Wu
ad4bb1578b Make ConnectionPHY dataclass 2026-03-11 21:41:48 +08:00
Josh Wu
4af65b381b Merge pull request #820 from zxzxwu/sdp
SDP: Migrate to dataclasses
2026-03-04 13:45:39 +08:00
Josh Wu
a5cd3365ae Merge pull request #895 from zxzxwu/uuid
Hash and cache 128 bytes of UUID
2026-03-04 00:29:43 +08:00
Josh Wu
2915cb8bb6 Add test for UUID hash 2026-03-04 00:22:50 +08:00
Josh Wu
28e485b7b3 Hash and cache 128 bytes of UUID 2026-03-03 17:54:27 +08:00
Josh Wu
1198f2c3f5 SDP: Make PDU dataclasses 2026-03-03 02:07:08 +08:00
Josh Wu
80aaf6a2b9 SDP: Make DataElement and ServiceAttribute dataclasses 2026-03-03 01:28:40 +08:00
22 changed files with 776 additions and 335 deletions

View File

@@ -69,7 +69,7 @@ jobs:
components: clippy,rustfmt
toolchain: ${{ matrix.rust-version }}
- name: Install Rust dependencies
run: cargo install cargo-all-features --version 1.11.0 # allows building/testing combinations of features
run: cargo install cargo-all-features --version 1.11.0 --locked # allows building/testing combinations of features
- name: Check License Headers
run: cd rust && cargo run --features dev-tools --bin file-header check-all
- name: Rust Build

View File

@@ -42,7 +42,7 @@ from typing_extensions import TypeIs
from bumble import hci, l2cap, utils
from bumble.colors import color
from bumble.core import UUID, InvalidOperationError, ProtocolError
from bumble.core import UUID, InvalidOperationError, InvalidPacketError, ProtocolError
from bumble.hci import HCI_Object
# -----------------------------------------------------------------------------
@@ -249,6 +249,8 @@ class ATT_PDU:
@classmethod
def from_bytes(cls, pdu: bytes) -> ATT_PDU:
if not pdu:
raise InvalidPacketError("Empty ATT PDU")
op_code = pdu[0]
subclass = ATT_PDU.pdu_classes.get(op_code)

View File

@@ -238,9 +238,12 @@ class Controller:
hci_revision: int = 0
lmp_version: int = hci.HCI_VERSION_BLUETOOTH_CORE_5_0
lmp_subversion: int = 0
lmp_features: bytes = bytes.fromhex(
'0000000060000000'
) # BR/EDR Not Supported, LE Supported (Controller)
lmp_features: hci.LmpFeatureMask = (
hci.LmpFeatureMask.LE_SUPPORTED_CONTROLLER
| hci.LmpFeatureMask.BR_EDR_NOT_SUPPORTED
| hci.LmpFeatureMask.EXTENDED_FEATURES
)
lmp_features_max_page_number: int = 3
manufacturer_company_identifier: int = 0xFFFF
acl_data_packet_length: int = 27
total_num_acl_data_packets: int = 64
@@ -250,10 +253,78 @@ class Controller:
total_num_iso_data_packets: int = 64
event_mask: int = 0
event_mask_page_2: int = 0
supported_commands: bytes = bytes.fromhex(
'2000800000c000000000e4000000a822000000000000040000f7ffff7f000000'
'30f0f9ff01008004002000000000000000000000000000000000000000000000'
)
supported_commands: set[int] = {
hci.HCI_DISCONNECT_COMMAND,
hci.HCI_READ_REMOTE_VERSION_INFORMATION_COMMAND,
hci.HCI_READ_CLOCK_OFFSET_COMMAND,
hci.HCI_READ_LMP_HANDLE_COMMAND,
hci.HCI_SET_EVENT_MASK_COMMAND,
hci.HCI_RESET_COMMAND,
hci.HCI_READ_TRANSMIT_POWER_LEVEL_COMMAND,
hci.HCI_SET_CONTROLLER_TO_HOST_FLOW_CONTROL_COMMAND,
hci.HCI_HOST_BUFFER_SIZE_COMMAND,
hci.HCI_HOST_NUMBER_OF_COMPLETED_PACKETS_COMMAND,
hci.HCI_READ_LOCAL_VERSION_INFORMATION_COMMAND,
hci.HCI_READ_LOCAL_SUPPORTED_FEATURES_COMMAND,
hci.HCI_READ_BUFFER_SIZE_COMMAND,
hci.HCI_READ_BD_ADDR_COMMAND,
hci.HCI_READ_RSSI_COMMAND,
hci.HCI_SET_EVENT_MASK_PAGE_2_COMMAND,
hci.HCI_LE_SET_EVENT_MASK_COMMAND,
hci.HCI_LE_READ_BUFFER_SIZE_COMMAND,
hci.HCI_LE_READ_LOCAL_SUPPORTED_FEATURES_COMMAND,
hci.HCI_LE_SET_RANDOM_ADDRESS_COMMAND,
hci.HCI_LE_SET_ADVERTISING_PARAMETERS_COMMAND,
hci.HCI_LE_READ_ADVERTISING_PHYSICAL_CHANNEL_TX_POWER_COMMAND,
hci.HCI_LE_SET_ADVERTISING_DATA_COMMAND,
hci.HCI_LE_SET_SCAN_RESPONSE_DATA_COMMAND,
hci.HCI_LE_SET_ADVERTISING_ENABLE_COMMAND,
hci.HCI_LE_SET_SCAN_PARAMETERS_COMMAND,
hci.HCI_LE_SET_SCAN_ENABLE_COMMAND,
hci.HCI_LE_CREATE_CONNECTION_COMMAND,
hci.HCI_LE_CREATE_CONNECTION_CANCEL_COMMAND,
hci.HCI_LE_READ_FILTER_ACCEPT_LIST_SIZE_COMMAND,
hci.HCI_LE_CLEAR_FILTER_ACCEPT_LIST_COMMAND,
hci.HCI_LE_ADD_DEVICE_TO_FILTER_ACCEPT_LIST_COMMAND,
hci.HCI_LE_REMOVE_DEVICE_FROM_FILTER_ACCEPT_LIST_COMMAND,
hci.HCI_LE_CONNECTION_UPDATE_COMMAND,
hci.HCI_LE_SET_HOST_CHANNEL_CLASSIFICATION_COMMAND,
hci.HCI_LE_READ_CHANNEL_MAP_COMMAND,
hci.HCI_LE_READ_REMOTE_FEATURES_COMMAND,
hci.HCI_LE_ENCRYPT_COMMAND,
hci.HCI_LE_RAND_COMMAND,
hci.HCI_LE_ENABLE_ENCRYPTION_COMMAND,
hci.HCI_LE_LONG_TERM_KEY_REQUEST_REPLY_COMMAND,
hci.HCI_LE_LONG_TERM_KEY_REQUEST_NEGATIVE_REPLY_COMMAND,
hci.HCI_LE_READ_SUPPORTED_STATES_COMMAND,
hci.HCI_LE_RECEIVER_TEST_COMMAND,
hci.HCI_LE_TRANSMITTER_TEST_COMMAND,
hci.HCI_LE_TEST_END_COMMAND,
hci.HCI_READ_AUTHENTICATED_PAYLOAD_TIMEOUT_COMMAND,
hci.HCI_WRITE_AUTHENTICATED_PAYLOAD_TIMEOUT_COMMAND,
hci.HCI_LE_REMOTE_CONNECTION_PARAMETER_REQUEST_REPLY_COMMAND,
hci.HCI_LE_REMOTE_CONNECTION_PARAMETER_REQUEST_NEGATIVE_REPLY_COMMAND,
hci.HCI_LE_SET_DATA_LENGTH_COMMAND,
hci.HCI_LE_READ_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND,
hci.HCI_LE_WRITE_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND,
hci.HCI_LE_ADD_DEVICE_TO_RESOLVING_LIST_COMMAND,
hci.HCI_LE_REMOVE_DEVICE_FROM_RESOLVING_LIST_COMMAND,
hci.HCI_LE_CLEAR_RESOLVING_LIST_COMMAND,
hci.HCI_LE_READ_RESOLVING_LIST_SIZE_COMMAND,
hci.HCI_LE_READ_PEER_RESOLVABLE_ADDRESS_COMMAND,
hci.HCI_LE_READ_LOCAL_RESOLVABLE_ADDRESS_COMMAND,
hci.HCI_LE_SET_ADDRESS_RESOLUTION_ENABLE_COMMAND,
hci.HCI_LE_SET_RESOLVABLE_PRIVATE_ADDRESS_TIMEOUT_COMMAND,
hci.HCI_LE_READ_MAXIMUM_DATA_LENGTH_COMMAND,
hci.HCI_LE_READ_PHY_COMMAND,
hci.HCI_LE_SET_DEFAULT_PHY_COMMAND,
hci.HCI_LE_SET_PHY_COMMAND,
hci.HCI_LE_RECEIVER_TEST_V2_COMMAND,
hci.HCI_LE_TRANSMITTER_TEST_V2_COMMAND,
hci.HCI_LE_READ_TRANSMIT_POWER_COMMAND,
hci.HCI_LE_SET_PRIVACY_MODE_COMMAND,
hci.HCI_LE_READ_BUFFER_SIZE_V2_COMMAND,
}
le_event_mask: int = 0
le_features: hci.LeFeatureMask = (
hci.LeFeatureMask.LE_ENCRYPTION
@@ -392,6 +463,12 @@ class Controller:
if self.link:
self.link.on_address_changed(self)
@property
def lmp_features_bytes(self) -> bytes:
return self.lmp_features.to_bytes(
(self.lmp_features_max_page_number + 1) * 8, 'little'
)
# Packet Sink protocol (packets coming from the host via HCI)
def on_packet(self, packet: bytes) -> None:
self.on_hci_packet(hci.HCI_Packet.from_bytes(packet))
@@ -968,6 +1045,51 @@ class Controller:
packet.name_length,
packet.name_fregment,
)
case lmp.LmpFeaturesReq(features):
self.send_lmp_packet(
sender_address,
lmp.LmpFeaturesRes(features=self.lmp_features_bytes[:8]),
)
case lmp.LmpFeaturesRes(features):
if connection := self.classic_connections.get(sender_address):
self.send_hci_packet(
hci.HCI_Read_Remote_Supported_Features_Complete_Event(
status=hci.HCI_ErrorCode.SUCCESS,
connection_handle=connection.handle,
lmp_features=features,
)
)
case lmp.LmpFeaturesReqExt(features_page, features):
# Calculate start/end of page
page_start = features_page * 8
page_end = page_start + 8
features_bytes = self.lmp_features_bytes
if page_start < len(features_bytes):
page_features = features_bytes[page_start:page_end].ljust(
8, b'\x00'
)
else:
page_features = b'\x00' * 8
self.send_lmp_packet(
sender_address,
lmp.LmpFeaturesResExt(
features_page=features_page,
max_features_page=len(features_bytes) // 8 - 1,
features=page_features,
),
)
case lmp.LmpFeaturesResExt(features_page, max_features_page, features):
if connection := self.classic_connections.get(sender_address):
self.send_hci_packet(
hci.HCI_Read_Remote_Extended_Features_Complete_Event(
status=hci.HCI_ErrorCode.SUCCESS,
connection_handle=connection.handle,
page_number=features_page,
maximum_page_number=max_features_page,
extended_lmp_features=features,
)
)
case _:
logger.error("!!! Unhandled packet: %s", packet)
@@ -1349,6 +1471,53 @@ class Controller:
return None
def on_hci_read_remote_supported_features_command(
self, command: hci.HCI_Read_Remote_Supported_Features_Command
) -> None:
'''
See Bluetooth spec Vol 4, Part E - 7.1.20 Read Remote Supported Features command
'''
handle = command.connection_handle
if not (connection := self.find_classic_connection_by_handle(handle)):
self._send_hci_command_status(
hci.HCI_ErrorCode.UNKNOWN_CONNECTION_IDENTIFIER_ERROR, command.op_code
)
return None
self._send_hci_command_status(hci.HCI_COMMAND_STATUS_PENDING, command.op_code)
self.send_lmp_packet(
connection.peer_address,
lmp.LmpFeaturesReq(self.lmp_features_bytes[:8]),
)
return None
def on_hci_read_remote_extended_features_command(
self, command: hci.HCI_Read_Remote_Extended_Features_Command
) -> None:
'''
See Bluetooth spec Vol 4, Part E - 7.1.21 Read Remote Extended Features command
'''
handle = command.connection_handle
if not (connection := self.find_classic_connection_by_handle(handle)):
self._send_hci_command_status(
hci.HCI_ErrorCode.UNKNOWN_CONNECTION_IDENTIFIER_ERROR, command.op_code
)
return None
self._send_hci_command_status(hci.HCI_COMMAND_STATUS_PENDING, command.op_code)
self.send_lmp_packet(
connection.peer_address,
lmp.LmpFeaturesReqExt(
features_page=command.page_number,
features=self.lmp_features_bytes[
command.page_number * 8 : (command.page_number + 1) * 8
],
),
)
return None
def on_hci_enhanced_setup_synchronous_connection_command(
self, command: hci.HCI_Enhanced_Setup_Synchronous_Connection_Command
) -> None:
@@ -1645,11 +1814,15 @@ class Controller:
return hci.HCI_StatusReturnParameters(hci.HCI_ErrorCode.SUCCESS)
def on_hci_write_simple_pairing_mode_command(
self, _command: hci.HCI_Write_Simple_Pairing_Mode_Command
self, command: hci.HCI_Write_Simple_Pairing_Mode_Command
) -> hci.HCI_StatusReturnParameters:
'''
See Bluetooth spec Vol 4, Part E - 7.3.59 Write Simple Pairing Mode Command
'''
if command.simple_pairing_mode:
self.lmp_features |= hci.LmpFeatureMask.SECURE_SIMPLE_PAIRING_HOST_SUPPORT
else:
self.lmp_features &= ~hci.LmpFeatureMask.SECURE_SIMPLE_PAIRING_HOST_SUPPORT
return hci.HCI_StatusReturnParameters(hci.HCI_ErrorCode.SUCCESS)
def on_hci_set_event_mask_page_2_command(
@@ -1670,16 +1843,23 @@ class Controller:
See Bluetooth spec Vol 4, Part E - 7.3.78 Write LE Host Support Command
'''
return hci.HCI_Read_LE_Host_Support_ReturnParameters(
status=hci.HCI_ErrorCode.SUCCESS, le_supported_host=1, unused=0
status=hci.HCI_ErrorCode.SUCCESS,
le_supported_host=(
1 if self.lmp_features & hci.LmpFeatureMask.LE_SUPPORTED_HOST else 0
),
unused=0,
)
def on_hci_write_le_host_support_command(
self, _command: hci.HCI_Write_LE_Host_Support_Command
self, command: hci.HCI_Write_LE_Host_Support_Command
) -> hci.HCI_StatusReturnParameters:
'''
See Bluetooth spec Vol 4, Part E - 7.3.79 Write LE Host Support Command
'''
# TODO / Just ignore for now
if command.le_supported_host:
self.lmp_features |= hci.LmpFeatureMask.LE_SUPPORTED_HOST
else:
self.lmp_features &= ~hci.LmpFeatureMask.LE_SUPPORTED_HOST
return hci.HCI_StatusReturnParameters(hci.HCI_ErrorCode.SUCCESS)
def on_hci_write_authenticated_payload_timeout_command(
@@ -1716,7 +1896,11 @@ class Controller:
See Bluetooth spec Vol 4, Part E - 7.4.2 Read Local Supported Commands Command
'''
return hci.HCI_Read_Local_Supported_Commands_ReturnParameters(
hci.HCI_ErrorCode.SUCCESS, supported_commands=self.supported_commands
hci.HCI_ErrorCode.SUCCESS,
supported_commands=sum(
hci.HCI_SUPPORTED_COMMANDS_MASKS.get(opcode, 0)
for opcode in self.supported_commands
).to_bytes(64, 'little'),
)
def on_hci_read_local_supported_features_command(
@@ -1726,7 +1910,8 @@ class Controller:
See Bluetooth spec Vol 4, Part E - 7.4.3 Read Local Supported Features Command
'''
return hci.HCI_Read_Local_Supported_Features_ReturnParameters(
hci.HCI_ErrorCode.SUCCESS, lmp_features=self.lmp_features[:8]
hci.HCI_ErrorCode.SUCCESS,
lmp_features=self.lmp_features_bytes[:8],
)
def on_hci_read_local_extended_features_command(
@@ -1735,18 +1920,19 @@ class Controller:
'''
See Bluetooth spec Vol 4, Part E - 7.4.4 Read Local Extended Features Command
'''
if command.page_number * 8 > len(self.lmp_features):
feature_bytes = self.lmp_features_bytes
if command.page_number * 8 > len(feature_bytes):
return hci.HCI_Read_Local_Extended_Features_ReturnParameters(
status=hci.HCI_ErrorCode.INVALID_COMMAND_PARAMETERS_ERROR,
page_number=command.page_number,
maximum_page_number=len(self.lmp_features) // 8 - 1,
maximum_page_number=len(feature_bytes) // 8 - 1,
extended_lmp_features=bytes(8),
)
return hci.HCI_Read_Local_Extended_Features_ReturnParameters(
status=hci.HCI_ErrorCode.SUCCESS,
page_number=command.page_number,
maximum_page_number=len(self.lmp_features) // 8 - 1,
extended_lmp_features=self.lmp_features[
maximum_page_number=len(feature_bytes) // 8 - 1,
extended_lmp_features=feature_bytes[
command.page_number * 8 : (command.page_number + 1) * 8
],
)

View File

@@ -19,6 +19,7 @@ from __future__ import annotations
import dataclasses
import enum
import functools
import struct
from collections.abc import Iterable
from typing import (
@@ -273,13 +274,8 @@ class UUID:
def parse_uuid_2(cls, uuid_as_bytes: bytes, offset: int) -> tuple[int, UUID]:
return offset + 2, cls.from_bytes(uuid_as_bytes[offset : offset + 2])
def to_bytes(self, force_128: bool = False) -> bytes:
'''
Serialize UUID in little-endian byte-order
'''
if not force_128:
return self.uuid_bytes
@functools.cached_property
def uuid_128_bytes(self) -> bytes:
match len(self.uuid_bytes):
case 2:
return self.BASE_UUID + self.uuid_bytes + bytes([0, 0])
@@ -290,6 +286,15 @@ class UUID:
case _:
assert False, "unreachable"
def to_bytes(self, force_128: bool = False) -> bytes:
'''
Serialize UUID in little-endian byte-order
'''
if not force_128:
return self.uuid_bytes
return self.uuid_128_bytes
def to_pdu_bytes(self) -> bytes:
'''
Convert to bytes for use in an ATT PDU.
@@ -318,7 +323,7 @@ class UUID:
def __eq__(self, other: object) -> bool:
if isinstance(other, UUID):
return self.to_bytes(force_128=True) == other.to_bytes(force_128=True)
return self.uuid_128_bytes == other.uuid_128_bytes
if isinstance(other, str):
return UUID(other) == self
@@ -326,7 +331,7 @@ class UUID:
return False
def __hash__(self) -> int:
return hash(self.uuid_bytes)
return hash(self.uuid_128_bytes)
def __str__(self) -> str:
result = self.to_hex_str(separator='-')
@@ -2111,13 +2116,10 @@ class AdvertisingData:
# -----------------------------------------------------------------------------
# Connection PHY
# -----------------------------------------------------------------------------
@dataclasses.dataclass
class ConnectionPHY:
def __init__(self, tx_phy, rx_phy):
self.tx_phy = tx_phy
self.rx_phy = rx_phy
def __str__(self):
return f'ConnectionPHY(tx_phy={self.tx_phy}, rx_phy={self.rx_phy})'
tx_phy: int
rx_phy: int
# -----------------------------------------------------------------------------

View File

@@ -1837,6 +1837,7 @@ class Connection(utils.CompositeEventEmitter):
self.pairing_peer_io_capability = None
self.pairing_peer_authentication_requirements = None
self.peer_le_features = hci.LeFeatureMask(0)
self.peer_classic_features = hci.LmpFeatureMask(0)
self.cs_configs = {}
self.cs_procedures = {}
@@ -2054,6 +2055,15 @@ class Connection(utils.CompositeEventEmitter):
self.peer_le_features = await self.device.get_remote_le_features(self)
return self.peer_le_features
async def get_remote_classic_features(self) -> hci.LmpFeatureMask:
"""[Classic Only] Reads remote LMP supported features.
Returns:
LMP features supported by the remote device.
"""
self.peer_classic_features = await self.device.get_remote_classic_features(self)
return self.peer_classic_features
def on_att_mtu_update(self, mtu: int):
logger.debug(
f'*** Connection ATT MTU Update: [0x{self.handle:04X}] '
@@ -2149,6 +2159,7 @@ class DeviceConfiguration:
)
eatt_enabled: bool = False
gatt_services: list[dict[str, Any]] = field(init=False)
smp_debug_mode: bool = False
def __post_init__(self) -> None:
self.gatt_services = []
@@ -2561,6 +2572,7 @@ class Device(utils.CompositeEventEmitter):
),
),
)
self.smp_manager.debug_mode = self.config.smp_debug_mode
self.l2cap_channel_manager.register_fixed_channel(smp.SMP_CID, self.on_smp_pdu)
@@ -5281,6 +5293,77 @@ class Device(utils.CompositeEventEmitter):
)
return await read_feature_future
async def get_remote_classic_features(
self, connection: Connection
) -> hci.LmpFeatureMask:
"""[Classic Only] Reads remote LE supported features.
Args:
handle: connection handle to read LMP features.
Returns:
LMP features supported by the remote device.
"""
with closing(utils.EventWatcher()) as watcher:
read_feature_future: asyncio.Future[tuple[int, int]] = (
asyncio.get_running_loop().create_future()
)
read_features = hci.LmpFeatureMask(0)
current_page_number = 0
@watcher.on(self.host, 'classic_remote_features')
def on_classic_remote_features(
handle: int,
status: int,
features: int,
page_number: int,
max_page_number: int,
) -> None:
if handle != connection.handle:
logger.warning(
"Received classic_remote_features for wrong handle, expected=0x%04X, got=0x%04X",
connection.handle,
handle,
)
return
if page_number != current_page_number:
logger.warning(
"Received classic_remote_features for wrong page, expected=%d, got=%d",
current_page_number,
page_number,
)
return
if status == hci.HCI_ErrorCode.SUCCESS:
read_feature_future.set_result((features, max_page_number))
else:
read_feature_future.set_exception(hci.HCI_Error(status))
await self.send_async_command(
hci.HCI_Read_Remote_Supported_Features_Command(
connection_handle=connection.handle
)
)
new_features, max_page_number = await read_feature_future
read_features |= new_features
if not (read_features & hci.LmpFeatureMask.EXTENDED_FEATURES):
return read_features
while current_page_number <= max_page_number:
read_feature_future = asyncio.get_running_loop().create_future()
await self.send_async_command(
hci.HCI_Read_Remote_Extended_Features_Command(
connection_handle=connection.handle,
page_number=current_page_number,
)
)
new_features, max_page_number = await read_feature_future
read_features |= new_features << (current_page_number * 64)
current_page_number += 1
return read_features
@utils.experimental('Only for testing.')
async def get_remote_cs_capabilities(
self, connection: Connection

View File

@@ -68,6 +68,8 @@ class HfpProtocolError(ProtocolError):
# -----------------------------------------------------------------------------
class HfpProtocol:
MAX_BUFFER_SIZE: ClassVar[int] = 65536
dlc: rfcomm.DLC
buffer: str
lines: collections.deque
@@ -84,10 +86,19 @@ class HfpProtocol:
def feed(self, data: bytes | str) -> None:
# Convert the data to a string if needed
if isinstance(data, bytes):
data = data.decode('utf-8')
data = data.decode('utf-8', errors='replace')
logger.debug(f'<<< Data received: {data}')
# Drop incoming data if it would overflow the buffer; keep existing
# partial packet state intact so a future clean packet can still parse.
if len(self.buffer) + len(data) > self.MAX_BUFFER_SIZE:
logger.warning(
'HFP buffer overflow (>%d bytes), dropping incoming data',
self.MAX_BUFFER_SIZE,
)
return
# Add to the buffer and look for lines
self.buffer += data
while (separator := self.buffer.find('\r')) >= 0:

View File

@@ -692,10 +692,8 @@ class Host(utils.EventEmitter):
finally:
self.pending_command = None
self.pending_response = None
if (
response is not None
and response.num_hci_command_packets
and self.command_semaphore.locked()
if response is None or (
response.num_hci_command_packets and self.command_semaphore.locked()
):
self.command_semaphore.release()
@@ -1660,6 +1658,19 @@ class Host(utils.EventEmitter):
'connection_encryption_failure', event.connection_handle, event.status
)
def on_hci_read_remote_supported_features_complete_event(
self, event: hci.HCI_Read_Remote_Supported_Features_Complete_Event
) -> None:
# Notify the client
self.emit(
'classic_remote_features',
event.connection_handle,
event.status,
int.from_bytes(event.lmp_features, 'little'),
0, # page number
0, # max page number
)
def on_hci_encryption_change_v2_event(
self, event: hci.HCI_Encryption_Change_V2_Event
):
@@ -1816,6 +1827,18 @@ class Host(utils.EventEmitter):
rssi,
)
def on_hci_read_remote_extended_features_complete_event(
self, event: hci.HCI_Read_Remote_Extended_Features_Complete_Event
):
self.emit(
'classic_remote_features',
event.connection_handle,
event.status,
int.from_bytes(event.extended_lmp_features, 'little'),
event.page_number,
event.maximum_page_number,
)
def on_hci_extended_inquiry_result_event(
self, event: hci.HCI_Extended_Inquiry_Result_Event
):

View File

@@ -27,6 +27,7 @@ import dataclasses
import json
import logging
import os
import pathlib
from typing import TYPE_CHECKING, Any
from typing_extensions import Self
@@ -248,29 +249,26 @@ class JsonKeyStore(KeyStore):
DEFAULT_NAMESPACE = '__DEFAULT__'
DEFAULT_BASE_NAME = "keys"
def __init__(self, namespace, filename=None):
self.namespace = namespace if namespace is not None else self.DEFAULT_NAMESPACE
def __init__(
self, namespace: str | None = None, filename: str | None = None
) -> None:
self.namespace = namespace or self.DEFAULT_NAMESPACE
if filename is None:
# Use a default for the current user
# Import here because this may not exist on all platforms
# pylint: disable=import-outside-toplevel
import appdirs
self.directory_name = os.path.join(
appdirs.user_data_dir(self.APP_NAME, self.APP_AUTHOR), self.KEYS_DIR
)
base_name = self.DEFAULT_BASE_NAME if namespace is None else self.namespace
json_filename = (
f'{base_name}.json'.lower().replace(':', '-').replace('/p', '-p')
)
self.filename = os.path.join(self.directory_name, json_filename)
if filename:
self.filename = pathlib.Path(filename).resolve()
self.directory_name = self.filename.parent
else:
self.filename = filename
self.directory_name = os.path.dirname(os.path.abspath(self.filename))
import platformdirs # Deferred import
logger.debug(f'JSON keystore: {self.filename}')
base_dir = platformdirs.user_data_path(self.APP_NAME, self.APP_AUTHOR)
self.directory_name = base_dir / self.KEYS_DIR
base_name = self.namespace if namespace else self.DEFAULT_BASE_NAME
safe_name = base_name.lower().replace(':', '-').replace('/', '-')
self.filename = self.directory_name / f"{safe_name}.json"
logger.debug('JSON keystore: %s', self.filename)
@classmethod
def from_device(
@@ -293,7 +291,9 @@ class JsonKeyStore(KeyStore):
return cls(namespace, filename)
async def load(self):
async def load(
self,
) -> tuple[dict[str, dict[str, dict[str, Any]]], dict[str, dict[str, Any]]]:
# Try to open the file, without failing. If the file does not exist, it
# will be created upon saving.
try:
@@ -312,17 +312,17 @@ class JsonKeyStore(KeyStore):
return next(iter(db.items()))
# Finally, just create an empty key map for the namespace
key_map = {}
key_map: dict[str, dict[str, Any]] = {}
db[self.namespace] = key_map
return (db, key_map)
async def save(self, db):
async def save(self, db: dict[str, dict[str, dict[str, Any]]]) -> None:
# Create the directory if it doesn't exist
if not os.path.exists(self.directory_name):
os.makedirs(self.directory_name, exist_ok=True)
if not self.directory_name.exists():
self.directory_name.mkdir(parents=True, exist_ok=True)
# Save to a temporary file
temp_filename = self.filename + '.tmp'
temp_filename = self.filename.with_name(self.filename.name + ".tmp")
with open(temp_filename, 'w', encoding='utf-8') as output:
json.dump(db, output, sort_keys=True, indent=4)
@@ -334,16 +334,16 @@ class JsonKeyStore(KeyStore):
del key_map[name]
await self.save(db)
async def update(self, name, keys):
async def update(self, name: str, keys: PairingKeys) -> None:
db, key_map = await self.load()
key_map.setdefault(name, {}).update(keys.to_dict())
await self.save(db)
async def get_all(self):
async def get_all(self) -> list[tuple[str, PairingKeys]]:
_, key_map = await self.load()
return [(name, PairingKeys.from_dict(keys)) for (name, keys) in key_map.items()]
async def delete_all(self):
async def delete_all(self) -> None:
db, key_map = await self.load()
key_map.clear()
await self.save(db)

View File

@@ -322,3 +322,38 @@ class LmpNameRes(Packet):
name_offset: int = field(metadata=hci.metadata(2))
name_length: int = field(metadata=hci.metadata(3))
name_fregment: bytes = field(metadata=hci.metadata('*'))
@Packet.subclass
@dataclass
class LmpFeaturesReq(Packet):
opcode = Opcode.LMP_FEATURES_REQ
features: bytes = field(metadata=hci.metadata(8))
@Packet.subclass
@dataclass
class LmpFeaturesRes(Packet):
opcode = Opcode.LMP_FEATURES_RES
features: bytes = field(metadata=hci.metadata(8))
@Packet.subclass
@dataclass
class LmpFeaturesReqExt(Packet):
opcode = Opcode.LMP_FEATURES_REQ_EXT
features_page: int = field(metadata=hci.metadata(1))
features: bytes = field(metadata=hci.metadata(8))
@Packet.subclass
@dataclass
class LmpFeaturesResExt(Packet):
opcode = Opcode.LMP_FEATURES_RES_EXT
features_page: int = field(metadata=hci.metadata(1))
max_features_page: int = field(metadata=hci.metadata(1))
features: bytes = field(metadata=hci.metadata(8))

View File

@@ -21,11 +21,12 @@ import asyncio
import logging
import struct
from collections.abc import Iterable, Sequence
from typing import TYPE_CHECKING, NewType
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any, ClassVar, NewType, TypeVar
from typing_extensions import Self
from bumble import core, l2cap
from bumble import core, hci, l2cap, utils
from bumble.colors import color
from bumble.core import (
InvalidArgumentError,
@@ -33,7 +34,6 @@ from bumble.core import (
InvalidStateError,
ProtocolError,
)
from bumble.hci import HCI_Object, key_with_value, name_or_number
if TYPE_CHECKING:
from bumble.device import Connection, Device
@@ -54,39 +54,22 @@ SDP_CONTINUATION_WATCHDOG = 64 # Maximum number of continuations we're willing
SDP_PSM = 0x0001
SDP_ERROR_RESPONSE = 0x01
SDP_SERVICE_SEARCH_REQUEST = 0x02
SDP_SERVICE_SEARCH_RESPONSE = 0x03
SDP_SERVICE_ATTRIBUTE_REQUEST = 0x04
SDP_SERVICE_ATTRIBUTE_RESPONSE = 0x05
SDP_SERVICE_SEARCH_ATTRIBUTE_REQUEST = 0x06
SDP_SERVICE_SEARCH_ATTRIBUTE_RESPONSE = 0x07
class PduId(hci.SpecableEnum):
SDP_ERROR_RESPONSE = 0x01
SDP_SERVICE_SEARCH_REQUEST = 0x02
SDP_SERVICE_SEARCH_RESPONSE = 0x03
SDP_SERVICE_ATTRIBUTE_REQUEST = 0x04
SDP_SERVICE_ATTRIBUTE_RESPONSE = 0x05
SDP_SERVICE_SEARCH_ATTRIBUTE_REQUEST = 0x06
SDP_SERVICE_SEARCH_ATTRIBUTE_RESPONSE = 0x07
SDP_PDU_NAMES = {
SDP_ERROR_RESPONSE: 'SDP_ERROR_RESPONSE',
SDP_SERVICE_SEARCH_REQUEST: 'SDP_SERVICE_SEARCH_REQUEST',
SDP_SERVICE_SEARCH_RESPONSE: 'SDP_SERVICE_SEARCH_RESPONSE',
SDP_SERVICE_ATTRIBUTE_REQUEST: 'SDP_SERVICE_ATTRIBUTE_REQUEST',
SDP_SERVICE_ATTRIBUTE_RESPONSE: 'SDP_SERVICE_ATTRIBUTE_RESPONSE',
SDP_SERVICE_SEARCH_ATTRIBUTE_REQUEST: 'SDP_SERVICE_SEARCH_ATTRIBUTE_REQUEST',
SDP_SERVICE_SEARCH_ATTRIBUTE_RESPONSE: 'SDP_SERVICE_SEARCH_ATTRIBUTE_RESPONSE'
}
SDP_INVALID_SDP_VERSION_ERROR = 0x0001
SDP_INVALID_SERVICE_RECORD_HANDLE_ERROR = 0x0002
SDP_INVALID_REQUEST_SYNTAX_ERROR = 0x0003
SDP_INVALID_PDU_SIZE_ERROR = 0x0004
SDP_INVALID_CONTINUATION_STATE_ERROR = 0x0005
SDP_INSUFFICIENT_RESOURCES_TO_SATISFY_REQUEST_ERROR = 0x0006
SDP_ERROR_NAMES = {
SDP_INVALID_SDP_VERSION_ERROR: 'SDP_INVALID_SDP_VERSION_ERROR',
SDP_INVALID_SERVICE_RECORD_HANDLE_ERROR: 'SDP_INVALID_SERVICE_RECORD_HANDLE_ERROR',
SDP_INVALID_REQUEST_SYNTAX_ERROR: 'SDP_INVALID_REQUEST_SYNTAX_ERROR',
SDP_INVALID_PDU_SIZE_ERROR: 'SDP_INVALID_PDU_SIZE_ERROR',
SDP_INVALID_CONTINUATION_STATE_ERROR: 'SDP_INVALID_CONTINUATION_STATE_ERROR',
SDP_INSUFFICIENT_RESOURCES_TO_SATISFY_REQUEST_ERROR: 'SDP_INSUFFICIENT_RESOURCES_TO_SATISFY_REQUEST_ERROR'
}
class ErrorCode(hci.SpecableEnum):
INVALID_SDP_VERSION = 0x0001
INVALID_SERVICE_RECORD_HANDLE = 0x0002
INVALID_REQUEST_SYNTAX = 0x0003
INVALID_PDU_SIZE = 0x0004
INVALID_CONTINUATION_STATE = 0x0005
INSUFFICIENT_RESOURCES_TO_SATISFY_REQUEST = 0x0006
SDP_SERVICE_NAME_ATTRIBUTE_ID_OFFSET = 0x0000
SDP_SERVICE_DESCRIPTION_ATTRIBUTE_ID_OFFSET = 0x0001
@@ -141,30 +124,31 @@ SDP_ALL_ATTRIBUTES_RANGE = (0x0000, 0xFFFF)
# -----------------------------------------------------------------------------
@dataclass
class DataElement:
NIL = 0
UNSIGNED_INTEGER = 1
SIGNED_INTEGER = 2
UUID = 3
TEXT_STRING = 4
BOOLEAN = 5
SEQUENCE = 6
ALTERNATIVE = 7
URL = 8
TYPE_NAMES = {
NIL: 'NIL',
UNSIGNED_INTEGER: 'UNSIGNED_INTEGER',
SIGNED_INTEGER: 'SIGNED_INTEGER',
UUID: 'UUID',
TEXT_STRING: 'TEXT_STRING',
BOOLEAN: 'BOOLEAN',
SEQUENCE: 'SEQUENCE',
ALTERNATIVE: 'ALTERNATIVE',
URL: 'URL',
}
class Type(utils.OpenIntEnum):
NIL = 0
UNSIGNED_INTEGER = 1
SIGNED_INTEGER = 2
UUID = 3
TEXT_STRING = 4
BOOLEAN = 5
SEQUENCE = 6
ALTERNATIVE = 7
URL = 8
type_constructors = {
NIL = Type.NIL
UNSIGNED_INTEGER = Type.UNSIGNED_INTEGER
SIGNED_INTEGER = Type.SIGNED_INTEGER
UUID = Type.UUID
TEXT_STRING = Type.TEXT_STRING
BOOLEAN = Type.BOOLEAN
SEQUENCE = Type.SEQUENCE
ALTERNATIVE = Type.ALTERNATIVE
URL = Type.URL
TYPE_CONSTRUCTORS = {
NIL: lambda x: DataElement(DataElement.NIL, None),
UNSIGNED_INTEGER: lambda x, y: DataElement(
DataElement.UNSIGNED_INTEGER,
@@ -190,14 +174,18 @@ class DataElement:
URL: lambda x: DataElement(DataElement.URL, x.decode('utf8')),
}
def __init__(self, element_type, value, value_size=None):
self.type = element_type
self.value = value
self.value_size = value_size
type: Type
value: Any
value_size: int | None = None
def __post_init__(self) -> None:
# Used as a cache when parsing from bytes so we can emit a byte-for-byte replica
self.bytes = None
if element_type in (DataElement.UNSIGNED_INTEGER, DataElement.SIGNED_INTEGER):
if value_size is None:
self._bytes: bytes | None = None
if self.type in (
DataElement.UNSIGNED_INTEGER,
DataElement.SIGNED_INTEGER,
):
if self.value_size is None:
raise InvalidArgumentError(
'integer types must have a value size specified'
)
@@ -337,7 +325,7 @@ class DataElement:
value_offset = 4
value_data = data[1 + value_offset : 1 + value_offset + value_size]
constructor = DataElement.type_constructors.get(element_type)
constructor = DataElement.TYPE_CONSTRUCTORS.get(element_type)
if constructor:
if element_type in (
DataElement.UNSIGNED_INTEGER,
@@ -348,15 +336,15 @@ class DataElement:
result = constructor(value_data)
else:
result = DataElement(element_type, value_data)
result.bytes = data[
result._bytes = data[
: 1 + value_offset + value_size
] # Keep a copy so we can re-serialize to an exact replica
return result
def __bytes__(self):
# Return early if we have a cache
if self.bytes:
return self.bytes
if self._bytes:
return self._bytes
if self.type == DataElement.NIL:
data = b''
@@ -443,12 +431,12 @@ class DataElement:
else:
raise RuntimeError("internal error - self.type not supported")
self.bytes = bytes([self.type << 3 | size_index]) + size_bytes + data
return self.bytes
self._bytes = bytes([self.type << 3 | size_index]) + size_bytes + data
return self._bytes
def to_string(self, pretty=False, indentation=0):
prefix = ' ' * indentation
type_name = name_or_number(self.TYPE_NAMES, self.type)
type_name = self.type.name
if self.type == DataElement.NIL:
value_string = ''
elif self.type in (DataElement.SEQUENCE, DataElement.ALTERNATIVE):
@@ -476,10 +464,10 @@ class DataElement:
# -----------------------------------------------------------------------------
@dataclass
class ServiceAttribute:
def __init__(self, attribute_id: int, value: DataElement) -> None:
self.id = attribute_id
self.value = value
id: int
value: DataElement
@staticmethod
def list_from_data_elements(
@@ -510,7 +498,7 @@ class ServiceAttribute:
@staticmethod
def id_name(id_code):
return name_or_number(SDP_ATTRIBUTE_ID_NAMES, id_code)
return hci.name_or_number(SDP_ATTRIBUTE_ID_NAMES, id_code)
@staticmethod
def is_uuid_in_value(uuid: core.UUID, value: DataElement) -> bool:
@@ -540,239 +528,228 @@ class ServiceAttribute:
# -----------------------------------------------------------------------------
def _parse_service_record_handle_list(
data: bytes, offset: int
) -> tuple[int, list[int]]:
count = struct.unpack_from('>H', data, offset)[0]
offset += 2
handle_list = [
struct.unpack_from('>I', data, offset + x * 4)[0] for x in range(count)
]
return offset + count * 4, handle_list
def _serialize_service_record_handle_list(
handles: list[int],
) -> bytes:
return struct.pack('>H', len(handles)) + b''.join(
struct.pack('>I', handle) for handle in handles
)
def _parse_bytes_preceded_by_length(data: bytes, offset: int) -> tuple[int, bytes]:
length = struct.unpack_from('>H', data, offset)[0]
offset += 2
return offset + length, data[offset : offset + length]
def _serialize_bytes_preceded_by_length(data: bytes) -> bytes:
return struct.pack('>H', len(data)) + data
_SERVICE_RECORD_HANDLE_LIST_METADATA = hci.metadata(
{
'parser': _parse_service_record_handle_list,
'serializer': _serialize_service_record_handle_list,
}
)
_BYTES_PRECEDED_BY_LENGTH_METADATA = hci.metadata(
{
'parser': _parse_bytes_preceded_by_length,
'serializer': _serialize_bytes_preceded_by_length,
}
)
# -----------------------------------------------------------------------------
@dataclass
class SDP_PDU:
'''
See Bluetooth spec @ Vol 3, Part B - 4.2 PROTOCOL DATA UNIT FORMAT
'''
RESPONSE_PDU_IDS = {
SDP_SERVICE_SEARCH_REQUEST: SDP_SERVICE_SEARCH_RESPONSE,
SDP_SERVICE_ATTRIBUTE_REQUEST: SDP_SERVICE_ATTRIBUTE_RESPONSE,
SDP_SERVICE_SEARCH_ATTRIBUTE_REQUEST: SDP_SERVICE_SEARCH_ATTRIBUTE_RESPONSE,
PduId.SDP_SERVICE_SEARCH_REQUEST: PduId.SDP_SERVICE_SEARCH_RESPONSE,
PduId.SDP_SERVICE_ATTRIBUTE_REQUEST: PduId.SDP_SERVICE_ATTRIBUTE_RESPONSE,
PduId.SDP_SERVICE_SEARCH_ATTRIBUTE_REQUEST: PduId.SDP_SERVICE_SEARCH_ATTRIBUTE_RESPONSE,
}
sdp_pdu_classes: dict[int, type[SDP_PDU]] = {}
name = None
pdu_id = 0
subclasses: ClassVar[dict[int, type[SDP_PDU]]] = {}
pdu_id: ClassVar[PduId]
fields: ClassVar[hci.Fields]
@staticmethod
def from_bytes(pdu):
pdu_id, transaction_id, _parameters_length = struct.unpack_from('>BHH', pdu, 0)
transaction_id: int
_payload: bytes | None = field(init=False, repr=False, default=None)
cls = SDP_PDU.sdp_pdu_classes.get(pdu_id)
if cls is None:
instance = SDP_PDU(pdu)
instance.name = SDP_PDU.pdu_name(pdu_id)
instance.pdu_id = pdu_id
instance.transaction_id = transaction_id
return instance
self = cls.__new__(cls)
SDP_PDU.__init__(self, pdu, transaction_id)
if hasattr(self, 'fields'):
self.init_from_bytes(pdu, 5)
return self
@classmethod
def from_bytes(cls, pdu: bytes) -> SDP_PDU:
pdu_id, transaction_id, parameters_length = struct.unpack_from('>BHH', pdu, 0)
@staticmethod
def parse_service_record_handle_list_preceded_by_count(
data: bytes, offset: int
) -> tuple[int, list[int]]:
count = struct.unpack_from('>H', data, offset - 2)[0]
handle_list = [
struct.unpack_from('>I', data, offset + x * 4)[0] for x in range(count)
]
return offset + count * 4, handle_list
if len(pdu) != 5 + parameters_length:
logger.warning("Expect %d bytes, got %d", 5 + parameters_length, len(pdu))
@staticmethod
def parse_bytes_preceded_by_length(data, offset):
length = struct.unpack_from('>H', data, offset - 2)[0]
return offset + length, data[offset : offset + length]
subclass = cls.subclasses.get(pdu_id)
if not (subclass := cls.subclasses.get(pdu_id)):
raise InvalidPacketError(f"Unknown PDU type {pdu_id}")
instance = subclass(
transaction_id=transaction_id,
**hci.HCI_Object.dict_from_bytes(pdu, 5, subclass.fields),
)
instance._payload = pdu
return instance
@staticmethod
def error_name(error_code):
return name_or_number(SDP_ERROR_NAMES, error_code)
_PDU = TypeVar('_PDU', bound='SDP_PDU')
@staticmethod
def pdu_name(code):
return name_or_number(SDP_PDU_NAMES, code)
@staticmethod
def subclass(fields):
def inner(cls):
name = cls.__name__
# add a _ character before every uppercase letter, except the SDP_ prefix
location = len(name) - 1
while location > 4:
if not name[location].isupper():
location -= 1
continue
name = name[:location] + '_' + name[location:]
location -= 1
cls.name = name.upper()
cls.pdu_id = key_with_value(SDP_PDU_NAMES, cls.name)
if cls.pdu_id is None:
raise KeyError(f'PDU name {cls.name} not found in SDP_PDU_NAMES')
cls.fields = fields
# Register a factory for this class
SDP_PDU.sdp_pdu_classes[cls.pdu_id] = cls
return cls
return inner
def __init__(self, pdu=None, transaction_id=0, **kwargs):
if hasattr(self, 'fields') and kwargs:
HCI_Object.init_from_fields(self, self.fields, kwargs)
if pdu is None:
parameters = HCI_Object.dict_to_bytes(kwargs, self.fields)
pdu = (
struct.pack('>BHH', self.pdu_id, transaction_id, len(parameters))
+ parameters
)
self.pdu = pdu
self.transaction_id = transaction_id
def init_from_bytes(self, pdu, offset):
return HCI_Object.init_from_bytes(self, pdu, offset, self.fields)
@classmethod
def subclass(cls, subclass: type[_PDU]) -> type[_PDU]:
subclass.fields = hci.HCI_Object.fields_from_dataclass(subclass)
cls.subclasses[subclass.pdu_id] = subclass
return subclass
def __bytes__(self):
return self.pdu
if self._payload is None:
parameters = hci.HCI_Object.dict_to_bytes(self.__dict__, self.fields)
self._payload = (
struct.pack('>BHH', self.pdu_id, self.transaction_id, len(parameters))
+ parameters
)
return self._payload
@property
def name(self) -> str:
return self.pdu_id.name
def __str__(self):
result = f'{color(self.name, "blue")} [TID={self.transaction_id}]'
if fields := getattr(self, 'fields', None):
result += ':\n' + HCI_Object.format_fields(self.__dict__, fields, ' ')
result += ':\n' + hci.HCI_Object.format_fields(self.__dict__, fields, ' ')
elif len(self.pdu) > 1:
result += f': {self.pdu.hex()}'
return result
# -----------------------------------------------------------------------------
@SDP_PDU.subclass([('error_code', {'size': 2, 'mapper': SDP_PDU.error_name})])
@SDP_PDU.subclass
@dataclass
class SDP_ErrorResponse(SDP_PDU):
'''
See Bluetooth spec @ Vol 3, Part B - 4.4.1 SDP_ErrorResponse PDU
'''
error_code: int
pdu_id = PduId.SDP_ERROR_RESPONSE
error_code: ErrorCode = field(metadata=ErrorCode.type_metadata(2))
# -----------------------------------------------------------------------------
@SDP_PDU.subclass(
[
('service_search_pattern', DataElement.parse_from_bytes),
('maximum_service_record_count', '>2'),
('continuation_state', '*'),
]
)
@SDP_PDU.subclass
@dataclass
class SDP_ServiceSearchRequest(SDP_PDU):
'''
See Bluetooth spec @ Vol 3, Part B - 4.5.1 SDP_ServiceSearchRequest PDU
'''
service_search_pattern: DataElement
maximum_service_record_count: int
continuation_state: bytes
pdu_id = PduId.SDP_SERVICE_SEARCH_REQUEST
service_search_pattern: DataElement = field(
metadata=hci.metadata(DataElement.parse_from_bytes)
)
maximum_service_record_count: int = field(metadata=hci.metadata('>2'))
continuation_state: bytes = field(metadata=hci.metadata('*'))
# -----------------------------------------------------------------------------
@SDP_PDU.subclass(
[
('total_service_record_count', '>2'),
('current_service_record_count', '>2'),
(
'service_record_handle_list',
SDP_PDU.parse_service_record_handle_list_preceded_by_count,
),
('continuation_state', '*'),
]
)
@SDP_PDU.subclass
@dataclass
class SDP_ServiceSearchResponse(SDP_PDU):
'''
See Bluetooth spec @ Vol 3, Part B - 4.5.2 SDP_ServiceSearchResponse PDU
'''
service_record_handle_list: list[int]
total_service_record_count: int
current_service_record_count: int
continuation_state: bytes
pdu_id = PduId.SDP_SERVICE_SEARCH_RESPONSE
total_service_record_count: int = field(metadata=hci.metadata('>2'))
service_record_handle_list: Sequence[int] = field(
metadata=_SERVICE_RECORD_HANDLE_LIST_METADATA
)
continuation_state: bytes = field(metadata=hci.metadata('*'))
# -----------------------------------------------------------------------------
@SDP_PDU.subclass(
[
('service_record_handle', '>4'),
('maximum_attribute_byte_count', '>2'),
('attribute_id_list', DataElement.parse_from_bytes),
('continuation_state', '*'),
]
)
@SDP_PDU.subclass
@dataclass
class SDP_ServiceAttributeRequest(SDP_PDU):
'''
See Bluetooth spec @ Vol 3, Part B - 4.6.1 SDP_ServiceAttributeRequest PDU
'''
service_record_handle: int
maximum_attribute_byte_count: int
attribute_id_list: DataElement
continuation_state: bytes
pdu_id = PduId.SDP_SERVICE_ATTRIBUTE_REQUEST
service_record_handle: int = field(metadata=hci.metadata('>4'))
maximum_attribute_byte_count: int = field(metadata=hci.metadata('>2'))
attribute_id_list: DataElement = field(
metadata=hci.metadata(DataElement.parse_from_bytes)
)
continuation_state: bytes = field(metadata=hci.metadata('*'))
# -----------------------------------------------------------------------------
@SDP_PDU.subclass(
[
('attribute_list_byte_count', '>2'),
('attribute_list', SDP_PDU.parse_bytes_preceded_by_length),
('continuation_state', '*'),
]
)
@SDP_PDU.subclass
@dataclass
class SDP_ServiceAttributeResponse(SDP_PDU):
'''
See Bluetooth spec @ Vol 3, Part B - 4.6.2 SDP_ServiceAttributeResponse PDU
'''
attribute_list_byte_count: int
attribute_list: bytes
continuation_state: bytes
pdu_id = PduId.SDP_SERVICE_ATTRIBUTE_RESPONSE
attribute_list: bytes = field(metadata=_BYTES_PRECEDED_BY_LENGTH_METADATA)
continuation_state: bytes = field(metadata=hci.metadata('*'))
# -----------------------------------------------------------------------------
@SDP_PDU.subclass(
[
('service_search_pattern', DataElement.parse_from_bytes),
('maximum_attribute_byte_count', '>2'),
('attribute_id_list', DataElement.parse_from_bytes),
('continuation_state', '*'),
]
)
@SDP_PDU.subclass
@dataclass
class SDP_ServiceSearchAttributeRequest(SDP_PDU):
'''
See Bluetooth spec @ Vol 3, Part B - 4.7.1 SDP_ServiceSearchAttributeRequest PDU
'''
service_search_pattern: DataElement
maximum_attribute_byte_count: int
attribute_id_list: DataElement
continuation_state: bytes
pdu_id = PduId.SDP_SERVICE_SEARCH_ATTRIBUTE_REQUEST
service_search_pattern: DataElement = field(
metadata=hci.metadata(DataElement.parse_from_bytes)
)
maximum_attribute_byte_count: int = field(metadata=hci.metadata('>2'))
attribute_id_list: DataElement = field(
metadata=hci.metadata(DataElement.parse_from_bytes)
)
continuation_state: bytes = field(metadata=hci.metadata('*'))
# -----------------------------------------------------------------------------
@SDP_PDU.subclass(
[
('attribute_lists_byte_count', '>2'),
('attribute_lists', SDP_PDU.parse_bytes_preceded_by_length),
('continuation_state', '*'),
]
)
@SDP_PDU.subclass
@dataclass
class SDP_ServiceSearchAttributeResponse(SDP_PDU):
'''
See Bluetooth spec @ Vol 3, Part B - 4.7.2 SDP_ServiceSearchAttributeResponse PDU
'''
attribute_lists_byte_count: int
attribute_lists: bytes
continuation_state: bytes
pdu_id = PduId.SDP_SERVICE_SEARCH_ATTRIBUTE_RESPONSE
attribute_lists: bytes = field(metadata=_BYTES_PRECEDED_BY_LENGTH_METADATA)
continuation_state: bytes = field(metadata=hci.metadata('*'))
# -----------------------------------------------------------------------------
@@ -873,7 +850,7 @@ class Client:
)
# Request and accumulate until there's no more continuation
service_record_handle_list = []
service_record_handle_list: list[int] = []
continuation_state = bytes([0])
watchdog = SDP_CONTINUATION_WATCHDOG
while watchdog > 0:
@@ -1091,7 +1068,7 @@ class Server:
logger.exception(color('failed to parse SDP Request PDU', 'red'))
self.send_response(
SDP_ErrorResponse(
transaction_id=0, error_code=SDP_INVALID_REQUEST_SYNTAX_ERROR
transaction_id=0, error_code=ErrorCode.INVALID_REQUEST_SYNTAX
)
)
@@ -1108,7 +1085,7 @@ class Server:
self.send_response(
SDP_ErrorResponse(
transaction_id=sdp_pdu.transaction_id,
error_code=SDP_INSUFFICIENT_RESOURCES_TO_SATISFY_REQUEST_ERROR,
error_code=ErrorCode.INSUFFICIENT_RESOURCES_TO_SATISFY_REQUEST,
)
)
else:
@@ -1116,7 +1093,7 @@ class Server:
self.send_response(
SDP_ErrorResponse(
transaction_id=sdp_pdu.transaction_id,
error_code=SDP_INVALID_REQUEST_SYNTAX_ERROR,
error_code=ErrorCode.INVALID_REQUEST_SYNTAX,
)
)
@@ -1134,7 +1111,7 @@ class Server:
self.send_response(
SDP_ErrorResponse(
transaction_id=transaction_id,
error_code=SDP_INVALID_CONTINUATION_STATE_ERROR,
error_code=ErrorCode.INVALID_CONTINUATION_STATE,
)
)
return None
@@ -1228,15 +1205,11 @@ class Server:
if service_record_handles_remaining
else bytes([0])
)
service_record_handle_list = b''.join(
[struct.pack('>I', handle) for handle in service_record_handles]
)
self.send_response(
SDP_ServiceSearchResponse(
transaction_id=request.transaction_id,
total_service_record_count=total_service_record_count,
current_service_record_count=len(service_record_handles),
service_record_handle_list=service_record_handle_list,
service_record_handle_list=service_record_handles,
continuation_state=continuation_state,
)
)
@@ -1259,7 +1232,7 @@ class Server:
self.send_response(
SDP_ErrorResponse(
transaction_id=request.transaction_id,
error_code=SDP_INVALID_SERVICE_RECORD_HANDLE_ERROR,
error_code=ErrorCode.INVALID_SERVICE_RECORD_HANDLE,
)
)
return
@@ -1284,7 +1257,6 @@ class Server:
self.send_response(
SDP_ServiceAttributeResponse(
transaction_id=request.transaction_id,
attribute_list_byte_count=len(attribute_list_response),
attribute_list=attribute_list_response,
continuation_state=continuation_state,
)
@@ -1331,7 +1303,6 @@ class Server:
self.send_response(
SDP_ServiceSearchAttributeResponse(
transaction_id=request.transaction_id,
attribute_lists_byte_count=len(attribute_lists_response),
attribute_lists=attribute_lists_response,
continuation_state=continuation_state,
)

View File

@@ -36,6 +36,7 @@ from bumble.colors import color
from bumble.core import (
AdvertisingData,
InvalidArgumentError,
InvalidPacketError,
PhysicalTransport,
ProtocolError,
)
@@ -178,6 +179,16 @@ class AuthReq(hci.SpecableFlag):
SMP_CTKD_H7_LEBR_SALT = bytes.fromhex('000000000000000000000000746D7031')
SMP_CTKD_H7_BRLE_SALT = bytes.fromhex('000000000000000000000000746D7032')
# Diffie-Hellman private / public key pair in Debug Mode (Core - Vol. 3, Part H)
SMP_DEBUG_KEY_PRIVATE = bytes.fromhex(
'3f49f6d4 a3c55f38 74c9b3e3 d2103f50 4aff607b eb40b799 5899b8a6 cd3c1abd'
)
SMP_DEBUG_KEY_PUBLIC_X = bytes.fromhex(
'20b003d2 f297be2c 5e2c83a7 e9f9a5b9 eff49111 acf4fddb cc030148 0e359de6'
)
SMP_DEBUG_KEY_PUBLIC_Y= bytes.fromhex(
'dc809c49 652aeb6d 63329abf 5a52155c 766345c2 8fed3024 741c8ed0 1589d28b'
)
# fmt: on
# pylint: enable=line-too-long
# pylint: disable=invalid-name
@@ -205,6 +216,8 @@ class SMP_Command:
@classmethod
def from_bytes(cls, pdu: bytes) -> SMP_Command:
if not pdu:
raise InvalidPacketError("Empty SMP PDU")
code = CommandCode(pdu[0])
subclass = SMP_Command.smp_classes.get(code)
@@ -1919,6 +1932,7 @@ class Manager(utils.EventEmitter):
self._ecc_key = None
self.pairing_config_factory = pairing_config_factory
self.session_proxy = Session
self.debug_mode = False
def send_command(self, connection: Connection, command: SMP_Command) -> None:
logger.debug(
@@ -1965,6 +1979,13 @@ class Manager(utils.EventEmitter):
@property
def ecc_key(self) -> crypto.EccKey:
if self.debug_mode:
# Core - Vol 3, Part H:
# When the Security Manager is placed in a Debug mode it shall use the
# following Diffie-Hellman private / public key pair:
debug_key = crypto.EccKey.from_private_key_bytes(SMP_DEBUG_KEY_PRIVATE)
return debug_key
if self._ecc_key is None:
self._ecc_key = crypto.EccKey.generate()
assert self._ecc_key

View File

@@ -83,6 +83,7 @@ async def main() -> None:
GATT_DEVICE_INFORMATION_SERVICE, [manufacturer_name_characteristic]
)
server_device.add_service(device_info_service)
await server_device.start_advertising()
# Connect the client to the server
connection = await client_device.connect(server_device.random_address)

View File

@@ -13,13 +13,12 @@ authors = [{ name = "Google", email = "bumble-dev@google.com" }]
requires-python = ">=3.10"
dependencies = [
"aiohttp ~= 3.8; platform_system!='Emscripten'",
"appdirs >= 1.4; platform_system!='Emscripten'",
"click >= 8.1.3; platform_system!='Emscripten'",
"cryptography >= 44.0.3; platform_system!='Emscripten' and platform_system!='Android'",
# Pyodide bundles a version of cryptography that is built for wasm, which may not match the
# versions available on PyPI. Relax the version requirement since it's better than being
# completely unable to import the package in case of version mismatch.
"cryptography >= 44.0.3; platform_system=='Emscripten'",
"cryptography >= 39.0.0; platform_system=='Emscripten'",
# Android wheels for cryptography are not yet available on PyPI, so chaquopy uses
# the builds from https://chaquo.com/pypi-13.1/cryptography/. But these are not regually
# updated. Relax the version requirement since it's better than being completely unable

View File

@@ -73,6 +73,14 @@ def test_uuid_to_hex_str() -> None:
)
# -----------------------------------------------------------------------------
def test_uuid_hash() -> None:
uuid = UUID("1234")
uuid_128_bytes = UUID.from_bytes(uuid.to_bytes(force_128=True))
assert uuid in {uuid_128_bytes}
assert uuid_128_bytes in {uuid}
# -----------------------------------------------------------------------------
def test_appearance() -> None:
a = Appearance(Appearance.Category.COMPUTER, Appearance.ComputerSubcategory.LAPTOP)

View File

@@ -826,6 +826,22 @@ async def test_remote_name_request():
assert actual_name == expected_name
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_get_remote_classic_features():
devices = TwoDevices()
devices[0].classic_enabled = True
devices[1].classic_enabled = True
await devices[0].power_on()
await devices[1].power_on()
connection = await devices[0].connect_classic(devices[1].public_address)
assert (
await asyncio.wait_for(connection.get_remote_classic_features(), _TIMEOUT)
== devices.controllers[1].lmp_features
)
# -----------------------------------------------------------------------------
async def run_test_device():
await test_device_connect_parallel()

View File

@@ -22,6 +22,7 @@ import unittest.mock
import pytest
from bumble import controller, hci
from bumble.controller import Controller
from bumble.hci import (
HCI_AclDataPacket,
@@ -49,34 +50,27 @@ logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
@pytest.mark.parametrize(
'supported_commands, lmp_features',
'supported_commands, max_lmp_features_page_number',
[
(
# Default commands
'2000800000c000000000e4000000a822000000000000040000f7ffff7f000000'
'30f0f9ff01008004000000000000000000000000000000000000000000000000',
# Only LE LMP feature
'0000000060000000',
),
(controller.Controller.supported_commands, 0),
(
# All commands
'ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff'
'ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff',
set(hci.HCI_Command.command_names.keys()),
# 3 pages of LMP features
'000102030405060708090A0B0C0D0E0F011112131415161718191A1B1C1D1E1F',
2,
),
],
)
async def test_reset(supported_commands: str, lmp_features: str):
async def test_reset(supported_commands: set[int], max_lmp_features_page_number: int):
controller = Controller('C')
controller.supported_commands = bytes.fromhex(supported_commands)
controller.lmp_features = bytes.fromhex(lmp_features)
controller.supported_commands = supported_commands
controller.lmp_features_max_page_number = max_lmp_features_page_number
host = Host(controller, AsyncPipeSink(controller))
await host.reset()
assert host.local_lmp_features == int.from_bytes(
bytes.fromhex(lmp_features), 'little'
assert host.local_lmp_features == (
controller.lmp_features & ~(1 << (64 * max_lmp_features_page_number + 1))
)
@@ -177,14 +171,15 @@ class Source:
class Sink:
response: HCI_Event
response: HCI_Event | None
def __init__(self, source: Source, response: HCI_Event) -> None:
def __init__(self, source: Source, response: HCI_Event | None) -> None:
self.source = source
self.response = response
def on_packet(self, packet: bytes) -> None:
self.source.sink.on_packet(bytes(self.response))
if self.response is not None:
self.source.sink.on_packet(bytes(self.response))
@pytest.mark.asyncio
@@ -234,6 +229,23 @@ async def test_send_sync_command() -> None:
assert isinstance(response3.return_parameters, HCI_GenericReturnParameters)
@pytest.mark.asyncio
async def test_send_sync_command_timeout() -> None:
source = Source()
sink = Sink(source, None)
host = Host(source, sink)
host.ready = True
with pytest.raises(asyncio.TimeoutError):
await host.send_sync_command(HCI_Reset_Command(), response_timeout=0.01)
# The sending semaphore should have been released, so this should not block
# indefinitely
with pytest.raises(asyncio.TimeoutError):
await host.send_sync_command(hci.HCI_Reset_Command(), response_timeout=0.01)
@pytest.mark.asyncio
async def test_send_async_command() -> None:
source = Source()

View File

@@ -21,6 +21,7 @@ import logging
import os
import pathlib
import tempfile
from unittest import mock
import pytest
@@ -179,11 +180,55 @@ async def test_default_namespace(temporary_file):
assert keys.irk.value == bytes.fromhex('e7b2543b206e4e46b44f9e51dad22bd1')
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_no_filename(tmp_path):
import platformdirs
with mock.patch.object(platformdirs, 'user_data_path', return_value=tmp_path):
# Case 1: no namespace, no filename
keystore = JsonKeyStore(None, None)
expected_directory = tmp_path / 'Pairing'
expected_filename = expected_directory / 'keys.json'
assert keystore.directory_name == expected_directory
assert keystore.filename == expected_filename
# Save some data
keys = PairingKeys()
ltk = bytes([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15])
keys.ltk = PairingKeys.Key(ltk)
await keystore.update('foo', keys)
assert expected_filename.exists()
# Load back
keystore2 = JsonKeyStore(None, None)
foo = await keystore2.get('foo')
assert foo is not None
assert foo.ltk.value == ltk
# Case 2: namespace, no filename
keystore3 = JsonKeyStore('my:namespace', None)
# safe_name = 'my-namespace' (lower is already 'my:namespace', then replace ':' with '-')
expected_filename3 = expected_directory / 'my-namespace.json'
assert keystore3.filename == expected_filename3
# Save some data
await keystore3.update('bar', keys)
assert expected_filename3.exists()
# Load back
keystore4 = JsonKeyStore('my:namespace', None)
bar = await keystore4.get('bar')
assert bar is not None
assert bar.ltk.value == ltk
# -----------------------------------------------------------------------------
async def run_tests():
await test_basic()
await test_parsing()
await test_default_namespace()
await test_no_filename()
# -----------------------------------------------------------------------------

View File

@@ -18,9 +18,11 @@
import asyncio
import logging
import os
import re
import pytest
from bumble import sdp
from bumble.core import BT_L2CAP_PROTOCOL_ID, UUID
from bumble.sdp import (
SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID,
@@ -206,6 +208,16 @@ def sdp_records(record_count=1):
}
# -----------------------------------------------------------------------------
def test_pdu_parameter_length(caplog) -> None:
caplog.set_level(logging.WARNING)
pdu = sdp.SDP_ErrorResponse(
transaction_id=0, error_code=sdp.ErrorCode.INVALID_SDP_VERSION
)
assert sdp.SDP_PDU.from_bytes(bytes(pdu)) == pdu
assert not re.search("Expect \d+ bytes, got \d+", caplog.text)
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_service_search():

View File

@@ -24,7 +24,7 @@ import pytest
from bumble import crypto, pairing, smp
from bumble.core import AdvertisingData
from bumble.crypto import EccKey, aes_cmac, ah, c1, f4, f5, f6, g2, h6, h7, s1
from bumble.device import Device
from bumble.device import Device, DeviceConfiguration
from bumble.hci import Address
from bumble.pairing import LeRole, OobData, OobSharedData
@@ -312,3 +312,17 @@ async def test_send_identity_address_command(
actual_command = mock_method.call_args.args[0]
assert actual_command.addr_type == expected_identity_address.address_type
assert actual_command.bd_addr == expected_identity_address
@pytest.mark.asyncio
async def test_smp_debug_mode():
config = DeviceConfiguration(smp_debug_mode=True)
device = Device(config=config)
assert device.smp_manager.ecc_key.x == smp.SMP_DEBUG_KEY_PUBLIC_X
assert device.smp_manager.ecc_key.y == smp.SMP_DEBUG_KEY_PUBLIC_Y
device.smp_manager.debug_mode = False
assert not device.smp_manager.ecc_key.x == smp.SMP_DEBUG_KEY_PUBLIC_X
assert not device.smp_manager.ecc_key.y == smp.SMP_DEBUG_KEY_PUBLIC_Y

View File

@@ -3,7 +3,7 @@
<head>
<link rel="stylesheet" href="https://fonts.googleapis.com/icon?family=Material+Icons">
<link rel="stylesheet" href="https://fonts.googleapis.com/css2?family=Material+Symbols+Outlined:opsz,wght,FILL,GRAD@24,400,0,0" />
<script src="https://cdn.jsdelivr.net/pyodide/v0.24.1/full/pyodide.js"></script>
<script src="https://cdn.jsdelivr.net/pyodide/v0.26.2/full/pyodide.js"></script>
<script type="module" src="../ui.js"></script>
<script type="module" src="heart_rate_monitor.js"></script>
<style>

View File

@@ -3,7 +3,7 @@
<head>
<link rel="stylesheet" href="https://fonts.googleapis.com/icon?family=Material+Icons">
<link rel="stylesheet" href="scanner.css">
<script src="https://cdn.jsdelivr.net/pyodide/v0.24.1/full/pyodide.js"></script>
<script src="https://cdn.jsdelivr.net/pyodide/v0.26.2/full/pyodide.js"></script>
<script type="module" src="../ui.js"></script>
<script type="module" src="scanner.js"></script>
</style>

View File

@@ -4,7 +4,7 @@
<title>Bumble Speaker</title>
<link rel="stylesheet" href="https://fonts.googleapis.com/icon?family=Material+Icons">
<link rel="stylesheet" href="speaker.css">
<script src="https://cdn.jsdelivr.net/pyodide/v0.24.1/full/pyodide.js"></script>
<script src="https://cdn.jsdelivr.net/pyodide/v0.26.2/full/pyodide.js"></script>
<script type="module" src="speaker.js"></script>
<script type="module" src="../ui.js"></script>
</head>