diff --git a/bumble/att.py b/bumble/att.py index 07ebe86..688321b 100644 --- a/bumble/att.py +++ b/bumble/att.py @@ -42,7 +42,7 @@ from typing_extensions import TypeIs from bumble import hci, l2cap, utils from bumble.colors import color -from bumble.core import UUID, InvalidOperationError, ProtocolError +from bumble.core import UUID, InvalidOperationError, InvalidPacketError, ProtocolError from bumble.hci import HCI_Object # ----------------------------------------------------------------------------- @@ -249,6 +249,8 @@ class ATT_PDU: @classmethod def from_bytes(cls, pdu: bytes) -> ATT_PDU: + if not pdu: + raise InvalidPacketError("Empty ATT PDU") op_code = pdu[0] subclass = ATT_PDU.pdu_classes.get(op_code) diff --git a/bumble/hfp.py b/bumble/hfp.py index 7056517..7a782e7 100644 --- a/bumble/hfp.py +++ b/bumble/hfp.py @@ -68,6 +68,8 @@ class HfpProtocolError(ProtocolError): # ----------------------------------------------------------------------------- class HfpProtocol: + MAX_BUFFER_SIZE: ClassVar[int] = 65536 + dlc: rfcomm.DLC buffer: str lines: collections.deque @@ -84,10 +86,19 @@ class HfpProtocol: def feed(self, data: bytes | str) -> None: # Convert the data to a string if needed if isinstance(data, bytes): - data = data.decode('utf-8') + data = data.decode('utf-8', errors='replace') logger.debug(f'<<< Data received: {data}') + # Drop incoming data if it would overflow the buffer; keep existing + # partial packet state intact so a future clean packet can still parse. + if len(self.buffer) + len(data) > self.MAX_BUFFER_SIZE: + logger.warning( + 'HFP buffer overflow (>%d bytes), dropping incoming data', + self.MAX_BUFFER_SIZE, + ) + return + # Add to the buffer and look for lines self.buffer += data while (separator := self.buffer.find('\r')) >= 0: diff --git a/bumble/smp.py b/bumble/smp.py index 0f40094..a6b213e 100644 --- a/bumble/smp.py +++ b/bumble/smp.py @@ -36,6 +36,7 @@ from bumble.colors import color from bumble.core import ( AdvertisingData, InvalidArgumentError, + InvalidPacketError, PhysicalTransport, ProtocolError, ) @@ -215,6 +216,8 @@ class SMP_Command: @classmethod def from_bytes(cls, pdu: bytes) -> SMP_Command: + if not pdu: + raise InvalidPacketError("Empty SMP PDU") code = CommandCode(pdu[0]) subclass = SMP_Command.smp_classes.get(code)