avdtp: bound message assembler to drop truncated PDUs (DoS prevention)

A remote peer can send an AVDTP frame shorter than the assembler expects.
The current MessageAssembler.on_pdu() unconditionally accesses pdu[0],
pdu[1], and (for START packets) pdu[2], so a 0-, 1-, or 2-byte frame
raises IndexError. The exception propagates up through L2CAP's read loop
and tears down the channel — same DoS class as #912 (empty ATT PDU) and
#914 (unbounded SDP recursion).

Fix: validate length before each access. Empty PDUs and packets shorter
than the type-specific minimum are logged and dropped; the assembler
stays alive so the L2CAP channel is not torn down.

- bumble/avdtp.py: length guards in MessageAssembler.on_pdu before
  accessing pdu[0], pdu[1], pdu[2].
- tests/avdtp_test.py: regression test covering empty PDU, 1-byte SINGLE,
  1-byte START, 2-byte START — all four would have raised IndexError
  pre-fix; assembler now drops without raising.
This commit is contained in:
ibondarenko1
2026-04-26 18:16:15 -07:00
parent 1686c5b11b
commit b874e26a4f
2 changed files with 49 additions and 0 deletions

View File

@@ -311,6 +311,13 @@ class MessageAssembler:
def on_pdu(self, pdu: bytes) -> None: def on_pdu(self, pdu: bytes) -> None:
self.packet_count += 1 self.packet_count += 1
# Drop empty PDUs sent by remote — accessing pdu[0] below would
# raise IndexError, propagating up to the L2CAP read loop and
# tearing down the channel. Same class as #912 (ATT empty PDU).
if len(pdu) < 1:
logger.warning('AVDTP message assembler: empty PDU dropped')
return
transaction_label = pdu[0] >> 4 transaction_label = pdu[0] >> 4
packet_type = Protocol.PacketType((pdu[0] >> 2) & 3) packet_type = Protocol.PacketType((pdu[0] >> 2) & 3)
message_type = Message.MessageType(pdu[0] & 3) message_type = Message.MessageType(pdu[0] & 3)
@@ -324,6 +331,23 @@ class MessageAssembler:
Protocol.PacketType.SINGLE_PACKET, Protocol.PacketType.SINGLE_PACKET,
Protocol.PacketType.START_PACKET, Protocol.PacketType.START_PACKET,
): ):
# Both single and start packets carry the signal identifier in
# pdu[1]; start packets additionally carry the packet count in
# pdu[2]. Guard each access so a malformed remote frame can't
# crash the message assembler.
if len(pdu) < 2:
logger.warning(
'AVDTP %s packet too short (%d bytes); dropped',
packet_type.name,
len(pdu),
)
return
if packet_type == Protocol.PacketType.START_PACKET and len(pdu) < 3:
logger.warning(
'AVDTP START packet missing signal-packet count; dropped'
)
return
if self.message is not None: if self.message is not None:
# The previous message has not been terminated # The previous message has not been terminated
logger.warning( logger.warning(

View File

@@ -120,6 +120,31 @@ def test_messages(message: avdtp.Message):
assert message.payload == parsed.payload assert message.payload == parsed.payload
# -----------------------------------------------------------------------------
@pytest.mark.parametrize(
'pdu',
(
b'', # empty PDU — would IndexError on pdu[0]
b'\x00', # 1-byte SINGLE_PACKET — would IndexError on pdu[1]
b'\x04', # 1-byte START_PACKET — would IndexError on pdu[1]
b'\x44\x10', # 2-byte START_PACKET — would IndexError on pdu[2]
),
)
def test_message_assembler_truncated_pdu(pdu: bytes):
"""Truncated AVDTP PDUs from a remote peer must NOT raise IndexError —
same DoS class as #912 (ATT empty PDU). The assembler is required to
log + drop and stay alive so the L2CAP channel survives."""
completed = []
def callback(transaction_label, message):
completed.append((transaction_label, message))
assembler = avdtp.MessageAssembler(callback)
# Must not raise; nothing should be delivered to callback either.
assembler.on_pdu(pdu)
assert completed == []
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_rtp(): def test_rtp():
packet = bytes.fromhex( packet = bytes.fromhex(