show timestamps from snoop logs

This commit is contained in:
Gilles Boccon-Gibod
2024-02-27 16:40:37 -08:00
parent de8f3d9c1e
commit f2d601f411
3 changed files with 83 additions and 21 deletions

View File

@@ -15,7 +15,11 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import datetime
import logging
import os
import struct
import click
from bumble.colors import color
@@ -24,6 +28,14 @@ from bumble.transport.common import PacketReader
from bumble.helpers import PacketTracer
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
# Classes
# -----------------------------------------------------------------------------
class SnoopPacketReader:
'''
@@ -36,12 +48,18 @@ class SnoopPacketReader:
DATALINK_BSCP = 1003
DATALINK_H5 = 1004
IDENTIFICATION_PATTERN = b'btsnoop\0'
TIMESTAMP_ANCHOR = datetime.datetime(2000, 1, 1)
TIMESTAMP_DELTA = 0x00E03AB44A676000
ONE_MICROSECOND = datetime.timedelta(microseconds=1)
def __init__(self, source):
self.source = source
self.at_end = False
# Read the header
identification_pattern = source.read(8)
if identification_pattern.hex().lower() != '6274736e6f6f7000':
if identification_pattern != self.IDENTIFICATION_PATTERN:
raise ValueError(
'not a valid snoop file, unexpected identification pattern'
)
@@ -55,19 +73,32 @@ class SnoopPacketReader:
# Read the record header
header = self.source.read(24)
if len(header) < 24:
return (0, None)
self.at_end = True
return (None, 0, None)
# Parse the header
(
original_length,
included_length,
packet_flags,
_cumulative_drops,
_timestamp_seconds,
_timestamp_microsecond,
) = struct.unpack('>IIIIII', header)
timestamp,
) = struct.unpack('>IIIIQ', header)
# Abort on truncated packets
# Skip truncated packets
if original_length != included_length:
return (0, None)
print(
color(
f"!!! truncated packet ({included_length}/{original_length})", "red"
)
)
self.source.read(included_length)
return (None, 0, None)
# Convert the timestamp to a datetime object.
ts_dt = self.TIMESTAMP_ANCHOR + datetime.timedelta(
microseconds=timestamp - self.TIMESTAMP_DELTA
)
if self.data_link_type == self.DATALINK_H1:
# The packet is un-encapsulated, look at the flags to figure out its type
@@ -89,7 +120,17 @@ class SnoopPacketReader:
bytes([packet_type]) + self.source.read(included_length),
)
return (packet_flags & 1, self.source.read(included_length))
return (ts_dt, packet_flags & 1, self.source.read(included_length))
# -----------------------------------------------------------------------------
class Printer:
def __init__(self):
self.index = 0
def print(self, message: str) -> None:
self.index += 1
print(f"[{self.index}]{message}")
# -----------------------------------------------------------------------------
@@ -122,24 +163,28 @@ def main(format, vendors, filename):
packet_reader = PacketReader(input)
def read_next_packet():
return (0, packet_reader.next_packet())
return (None, 0, packet_reader.next_packet())
else:
packet_reader = SnoopPacketReader(input)
read_next_packet = packet_reader.next_packet
tracer = PacketTracer(emit_message=print)
printer = Printer()
tracer = PacketTracer(emit_message=printer.print)
while True:
while not packet_reader.at_end:
try:
(direction, packet) = read_next_packet()
if packet is None:
break
tracer.trace(hci.HCI_Packet.from_bytes(packet), direction)
(timestamp, direction, packet) = read_next_packet()
if packet:
tracer.trace(hci.HCI_Packet.from_bytes(packet), direction, timestamp)
else:
printer.print(color("[TRUNCATED]", "red"))
except Exception as error:
logger.exception()
print(color(f'!!! {error}', 'red'))
# -----------------------------------------------------------------------------
if __name__ == '__main__':
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'WARNING').upper())
main() # pylint: disable=no-value-for-parameter

View File

@@ -18,6 +18,7 @@
from __future__ import annotations
from collections.abc import Callable, MutableMapping
import datetime
from typing import cast, Any, Optional
import logging
@@ -66,12 +67,13 @@ PSM_NAMES = {
rfcomm.RFCOMM_PSM: 'RFCOMM',
sdp.SDP_PSM: 'SDP',
avdtp.AVDTP_PSM: 'AVDTP',
avctp.AVCTP_PSM: 'AVCTP'
avctp.AVCTP_PSM: 'AVCTP',
# TODO: add more PSM values
}
AVCTP_PID_NAMES = {avrcp.AVRCP_PID: 'AVRCP'}
# -----------------------------------------------------------------------------
class PacketTracer:
class AclStream:
@@ -207,6 +209,7 @@ class PacketTracer:
self.label = label
self.emit_message = emit_message
self.acl_streams = {} # ACL streams, by connection handle
self.packet_timestamp: Optional[datetime.datetime] = None
def start_acl_stream(self, connection_handle: int) -> PacketTracer.AclStream:
logger.info(
@@ -234,7 +237,10 @@ class PacketTracer:
# Let the other forwarder know so it can cleanup its stream as well
self.peer.end_acl_stream(connection_handle)
def on_packet(self, packet: HCI_Packet) -> None:
def on_packet(
self, timestamp: Optional[datetime.datetime], packet: HCI_Packet
) -> None:
self.packet_timestamp = timestamp
self.emit(packet)
if packet.hci_packet_type == HCI_ACL_DATA_PACKET:
@@ -254,13 +260,22 @@ class PacketTracer:
)
def emit(self, message: Any) -> None:
self.emit_message(f'[{self.label}] {message}')
if self.packet_timestamp:
prefix = f"[{self.packet_timestamp.strftime('%Y-%m-%d %H:%M:%S.%f')}]"
else:
prefix = ""
self.emit_message(f'{prefix}[{self.label}] {message}')
def trace(self, packet: HCI_Packet, direction: int = 0) -> None:
def trace(
self,
packet: HCI_Packet,
direction: int = 0,
timestamp: Optional[datetime.datetime] = None,
) -> None:
if direction == 0:
self.host_to_controller_analyzer.on_packet(packet)
self.host_to_controller_analyzer.on_packet(timestamp, packet)
else:
self.controller_to_host_analyzer.on_packet(packet)
self.controller_to_host_analyzer.on_packet(timestamp, packet)
def __init__(
self,

View File

@@ -168,11 +168,13 @@ class PacketReader:
def __init__(self, source: io.BufferedReader) -> None:
self.source = source
self.at_end = False
def next_packet(self) -> Optional[bytes]:
# Get the packet type
packet_type = self.source.read(1)
if len(packet_type) != 1:
self.at_end = True
return None
# Get the packet info based on its type