This commit is contained in:
Gilles Boccon-Gibod
2024-01-28 00:07:31 -08:00
parent aba1ac0cea
commit 31ec1c41ce
3 changed files with 37 additions and 39 deletions
-1
View File
@@ -34,7 +34,6 @@ from typing import (
Tuple,
Type,
TypeVar,
Set,
Union,
cast,
overload,
+36 -36
View File
@@ -21,7 +21,13 @@ from collections.abc import Callable, MutableMapping
from typing import cast, Any, Optional
import logging
from bumble import avc
from bumble import avctp
from bumble import avdtp
from bumble import avrcp
from bumble import crypto
from bumble import rfcomm
from bumble import sdp
from bumble.colors import color
from bumble.att import ATT_CID, ATT_PDU
from bumble.smp import SMP_CID, SMP_Command
@@ -47,13 +53,6 @@ from bumble.hci import (
HCI_AclDataPacket,
HCI_Disconnection_Complete_Event,
)
from bumble.rfcomm import RFCOMM_Frame, RFCOMM_PSM
from bumble.sdp import SDP_PDU, SDP_PSM
from bumble import crypto
from bumble.avdtp import MessageAssembler as AVDTP_MessageAssembler, AVDTP_PSM
from bumble.avctp import MessageAssembler as AVCTP_MessageAssembler, AVCTP_PSM
from bumble.avrcp import AVRCP_PID
from bumble.avc import Frame as AVC_Frame
# -----------------------------------------------------------------------------
@@ -64,14 +63,14 @@ logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
PSM_NAMES = {
RFCOMM_PSM: 'RFCOMM',
SDP_PSM: 'SDP',
AVDTP_PSM: 'AVDTP',
AVCTP_PSM: 'AVCTP'
rfcomm.RFCOMM_PSM: 'RFCOMM',
sdp.SDP_PSM: 'SDP',
avdtp.AVDTP_PSM: 'AVDTP',
avctp.AVCTP_PSM: 'AVCTP'
# TODO: add more PSM values
}
AVCTP_PID_NAMES = {AVRCP_PID: 'AVRCP'}
AVCTP_PID_NAMES = {avrcp.AVRCP_PID: 'AVRCP'}
# -----------------------------------------------------------------------------
class PacketTracer:
@@ -79,13 +78,13 @@ class PacketTracer:
psms: MutableMapping[int, int]
peer: Optional[PacketTracer.AclStream]
avdtp_assemblers: MutableMapping[int, avdtp.MessageAssembler]
avctp_assemblers: MutableMapping[int, avctp.MessageAssembler]
def __init__(self, analyzer: PacketTracer.Analyzer) -> None:
self.analyzer = analyzer
self.packet_assembler = HCI_AclDataPacketAssembler(self.on_acl_pdu)
self.avdtp_assemblers = {} # AVDTP assemblers, by source_cid
self.avctp_assemblers = {} # AVCTP assemblers, by source_cid
self.avrcp_assemblers = {} # AVRCP assemblers, by source_cid
self.psms = {} # PSM, by source_cid
self.peer = None
@@ -115,9 +114,7 @@ class PacketTracer:
== L2CAP_Connection_Response.CONNECTION_SUCCESSFUL
):
if self.peer and (
psm := self.peer.psms.get(
connection_response.source_cid
)
psm := self.peer.psms.get(connection_response.source_cid)
):
# Found a pending connection
self.psms[connection_response.destination_cid] = psm
@@ -130,41 +127,37 @@ class PacketTracer:
] = avdtp.MessageAssembler(self.on_avdtp_message)
self.peer.avdtp_assemblers[
connection_response.destination_cid
] = avdtp.MessageAssembler(
self.peer.on_avdtp_message
)
elif psm == AVCTP_PSM:
] = avdtp.MessageAssembler(self.peer.on_avdtp_message)
elif psm == avctp.AVCTP_PSM:
self.avctp_assemblers[
connection_response.source_cid
] = AVCTP_MessageAssembler(self.on_avctp_message)
] = avctp.MessageAssembler(self.on_avctp_message)
self.peer.avctp_assemblers[
connection_response.destination_cid
] = AVCTP_MessageAssembler(self.peer.on_avctp_message)
] = avctp.MessageAssembler(self.peer.on_avctp_message)
else:
# Try to find the PSM associated with this PDU
if self.peer and (psm := self.peer.psms.get(l2cap_pdu.cid)):
if psm == SDP_PSM:
sdp_pdu = SDP_PDU.from_bytes(l2cap_pdu.payload)
if psm == sdp.SDP_PSM:
sdp_pdu = sdp.SDP_PDU.from_bytes(l2cap_pdu.payload)
self.analyzer.emit(sdp_pdu)
elif psm == RFCOMM_PSM:
rfcomm_frame = RFCOMM_Frame.from_bytes(l2cap_pdu.payload)
elif psm == rfcomm.RFCOMM_PSM:
rfcomm_frame = rfcomm.RFCOMM_Frame.from_bytes(l2cap_pdu.payload)
self.analyzer.emit(rfcomm_frame)
elif psm == avdtp.AVDTP_PSM:
self.analyzer.emit(
f'{color("L2CAP", "green")} [CID={l2cap_pdu.cid}, '
f'PSM=AVDTP]: {l2cap_pdu.payload.hex()}'
)
assembler = self.avdtp_assemblers.get(l2cap_pdu.cid)
if assembler:
assembler.on_pdu(l2cap_pdu.payload)
elif psm == AVCTP_PSM:
if avdtp_assembler := self.avdtp_assemblers.get(l2cap_pdu.cid):
avdtp_assembler.on_pdu(l2cap_pdu.payload)
elif psm == avctp.AVCTP_PSM:
self.analyzer.emit(
f'{color("L2CAP", "green")} [CID={l2cap_pdu.cid}, '
f'PSM=AVCTP]: {l2cap_pdu.payload.hex()}'
)
assembler = self.avctp_assemblers.get(l2cap_pdu.cid)
if assembler:
assembler.on_pdu(l2cap_pdu.payload)
if avctp_assembler := self.avctp_assemblers.get(l2cap_pdu.cid):
avctp_assembler.on_pdu(l2cap_pdu.payload)
else:
psm_string = name_or_number(PSM_NAMES, psm)
self.analyzer.emit(
@@ -181,9 +174,16 @@ class PacketTracer:
f'{color("AVDTP", "green")} [{transaction_label}] {message}'
)
def on_avctp_message(self, transaction_label: int, is_command: bool, ipid: bool, pid: int, payload: bytes):
if pid == AVRCP_PID:
avc_frame = AVC_Frame.from_bytes(payload)
def on_avctp_message(
self,
transaction_label: int,
is_command: bool,
ipid: bool,
pid: int,
payload: bytes,
):
if pid == avrcp.AVRCP_PID:
avc_frame = avc.Frame.from_bytes(payload)
details = str(avc_frame)
else:
details = payload.hex()
+1 -2
View File
@@ -35,7 +35,6 @@ from typing import (
Union,
overload,
)
import traceback
from pyee import EventEmitter
@@ -459,7 +458,7 @@ def experimental(msg: str):
"""
def wrapper(function):
@wraps(function)
@functools.wraps(function)
def inner(*args, **kwargs):
warnings.warn(msg, FutureWarning)
return function(*args, **kwargs)