mirror of
https://github.com/google/bumble.git
synced 2026-05-06 03:38:01 +00:00
Compare commits
7 Commits
dependabot
...
v0.0.228
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
27d02ef18d | ||
|
|
c0725e2a4a | ||
|
|
bf0784dde4 | ||
|
|
444f43f6a3 | ||
|
|
2420c47cf1 | ||
|
|
0a78e7506b | ||
|
|
f7cc6f6657 |
@@ -42,7 +42,7 @@ from typing_extensions import TypeIs
|
||||
|
||||
from bumble import hci, l2cap, utils
|
||||
from bumble.colors import color
|
||||
from bumble.core import UUID, InvalidOperationError, ProtocolError
|
||||
from bumble.core import UUID, InvalidOperationError, InvalidPacketError, ProtocolError
|
||||
from bumble.hci import HCI_Object
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@@ -249,6 +249,8 @@ class ATT_PDU:
|
||||
|
||||
@classmethod
|
||||
def from_bytes(cls, pdu: bytes) -> ATT_PDU:
|
||||
if not pdu:
|
||||
raise InvalidPacketError("Empty ATT PDU")
|
||||
op_code = pdu[0]
|
||||
|
||||
subclass = ATT_PDU.pdu_classes.get(op_code)
|
||||
|
||||
@@ -68,6 +68,8 @@ class HfpProtocolError(ProtocolError):
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
class HfpProtocol:
|
||||
MAX_BUFFER_SIZE: ClassVar[int] = 65536
|
||||
|
||||
dlc: rfcomm.DLC
|
||||
buffer: str
|
||||
lines: collections.deque
|
||||
@@ -84,10 +86,19 @@ class HfpProtocol:
|
||||
def feed(self, data: bytes | str) -> None:
|
||||
# Convert the data to a string if needed
|
||||
if isinstance(data, bytes):
|
||||
data = data.decode('utf-8')
|
||||
data = data.decode('utf-8', errors='replace')
|
||||
|
||||
logger.debug(f'<<< Data received: {data}')
|
||||
|
||||
# Drop incoming data if it would overflow the buffer; keep existing
|
||||
# partial packet state intact so a future clean packet can still parse.
|
||||
if len(self.buffer) + len(data) > self.MAX_BUFFER_SIZE:
|
||||
logger.warning(
|
||||
'HFP buffer overflow (>%d bytes), dropping incoming data',
|
||||
self.MAX_BUFFER_SIZE,
|
||||
)
|
||||
return
|
||||
|
||||
# Add to the buffer and look for lines
|
||||
self.buffer += data
|
||||
while (separator := self.buffer.find('\r')) >= 0:
|
||||
|
||||
@@ -692,10 +692,8 @@ class Host(utils.EventEmitter):
|
||||
finally:
|
||||
self.pending_command = None
|
||||
self.pending_response = None
|
||||
if (
|
||||
response is not None
|
||||
and response.num_hci_command_packets
|
||||
and self.command_semaphore.locked()
|
||||
if response is None or (
|
||||
response.num_hci_command_packets and self.command_semaphore.locked()
|
||||
):
|
||||
self.command_semaphore.release()
|
||||
|
||||
|
||||
@@ -594,7 +594,10 @@ class SDP_PDU:
|
||||
|
||||
@classmethod
|
||||
def from_bytes(cls, pdu: bytes) -> SDP_PDU:
|
||||
pdu_id, transaction_id, _parameters_length = struct.unpack_from('>BHH', pdu, 0)
|
||||
pdu_id, transaction_id, parameters_length = struct.unpack_from('>BHH', pdu, 0)
|
||||
|
||||
if len(pdu) != 5 + parameters_length:
|
||||
logger.warning("Expect %d bytes, got %d", 5 + parameters_length, len(pdu))
|
||||
|
||||
subclass = cls.subclasses.get(pdu_id)
|
||||
if not (subclass := cls.subclasses.get(pdu_id)):
|
||||
@@ -616,9 +619,11 @@ class SDP_PDU:
|
||||
|
||||
def __bytes__(self):
|
||||
if self._payload is None:
|
||||
self._payload = struct.pack(
|
||||
'>BHH', self.pdu_id, self.transaction_id, 0
|
||||
) + hci.HCI_Object.dict_to_bytes(self.__dict__, self.fields)
|
||||
parameters = hci.HCI_Object.dict_to_bytes(self.__dict__, self.fields)
|
||||
self._payload = (
|
||||
struct.pack('>BHH', self.pdu_id, self.transaction_id, len(parameters))
|
||||
+ parameters
|
||||
)
|
||||
return self._payload
|
||||
|
||||
@property
|
||||
|
||||
@@ -36,6 +36,7 @@ from bumble.colors import color
|
||||
from bumble.core import (
|
||||
AdvertisingData,
|
||||
InvalidArgumentError,
|
||||
InvalidPacketError,
|
||||
PhysicalTransport,
|
||||
ProtocolError,
|
||||
)
|
||||
@@ -215,6 +216,8 @@ class SMP_Command:
|
||||
|
||||
@classmethod
|
||||
def from_bytes(cls, pdu: bytes) -> SMP_Command:
|
||||
if not pdu:
|
||||
raise InvalidPacketError("Empty SMP PDU")
|
||||
code = CommandCode(pdu[0])
|
||||
|
||||
subclass = SMP_Command.smp_classes.get(code)
|
||||
|
||||
@@ -171,14 +171,15 @@ class Source:
|
||||
|
||||
|
||||
class Sink:
|
||||
response: HCI_Event
|
||||
response: HCI_Event | None
|
||||
|
||||
def __init__(self, source: Source, response: HCI_Event) -> None:
|
||||
def __init__(self, source: Source, response: HCI_Event | None) -> None:
|
||||
self.source = source
|
||||
self.response = response
|
||||
|
||||
def on_packet(self, packet: bytes) -> None:
|
||||
self.source.sink.on_packet(bytes(self.response))
|
||||
if self.response is not None:
|
||||
self.source.sink.on_packet(bytes(self.response))
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@@ -228,6 +229,23 @@ async def test_send_sync_command() -> None:
|
||||
assert isinstance(response3.return_parameters, HCI_GenericReturnParameters)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_send_sync_command_timeout() -> None:
|
||||
source = Source()
|
||||
sink = Sink(source, None)
|
||||
|
||||
host = Host(source, sink)
|
||||
host.ready = True
|
||||
|
||||
with pytest.raises(asyncio.TimeoutError):
|
||||
await host.send_sync_command(HCI_Reset_Command(), response_timeout=0.01)
|
||||
|
||||
# The sending semaphore should have been released, so this should not block
|
||||
# indefinitely
|
||||
with pytest.raises(asyncio.TimeoutError):
|
||||
await host.send_sync_command(hci.HCI_Reset_Command(), response_timeout=0.01)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_send_async_command() -> None:
|
||||
source = Source()
|
||||
|
||||
@@ -18,9 +18,11 @@
|
||||
import asyncio
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
|
||||
import pytest
|
||||
|
||||
from bumble import sdp
|
||||
from bumble.core import BT_L2CAP_PROTOCOL_ID, UUID
|
||||
from bumble.sdp import (
|
||||
SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID,
|
||||
@@ -206,6 +208,16 @@ def sdp_records(record_count=1):
|
||||
}
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
def test_pdu_parameter_length(caplog) -> None:
|
||||
caplog.set_level(logging.WARNING)
|
||||
pdu = sdp.SDP_ErrorResponse(
|
||||
transaction_id=0, error_code=sdp.ErrorCode.INVALID_SDP_VERSION
|
||||
)
|
||||
assert sdp.SDP_PDU.from_bytes(bytes(pdu)) == pdu
|
||||
assert not re.search("Expect \d+ bytes, got \d+", caplog.text)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@pytest.mark.asyncio
|
||||
async def test_service_search():
|
||||
|
||||
Reference in New Issue
Block a user