mirror of
https://github.com/google/bumble.git
synced 2026-04-16 00:25:31 +00:00
191 lines
6.1 KiB
Python
191 lines
6.1 KiB
Python
# Copyright 2021-2022 Google LLC
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# https://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
# -----------------------------------------------------------------------------
|
|
# Imports
|
|
# -----------------------------------------------------------------------------
|
|
import datetime
|
|
import logging
|
|
import os
|
|
import struct
|
|
|
|
import click
|
|
|
|
from bumble.colors import color
|
|
from bumble import hci
|
|
from bumble.transport.common import PacketReader
|
|
from bumble.helpers import PacketTracer
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
# Logging
|
|
# -----------------------------------------------------------------------------
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
# Classes
|
|
# -----------------------------------------------------------------------------
|
|
class SnoopPacketReader:
|
|
'''
|
|
Reader that reads HCI packets from a "snoop" file (based on RFC 1761, but not
|
|
exactly the same...)
|
|
'''
|
|
|
|
DATALINK_H1 = 1001
|
|
DATALINK_H4 = 1002
|
|
DATALINK_BSCP = 1003
|
|
DATALINK_H5 = 1004
|
|
|
|
IDENTIFICATION_PATTERN = b'btsnoop\0'
|
|
TIMESTAMP_ANCHOR = datetime.datetime(2000, 1, 1)
|
|
TIMESTAMP_DELTA = 0x00E03AB44A676000
|
|
ONE_MICROSECOND = datetime.timedelta(microseconds=1)
|
|
|
|
def __init__(self, source):
|
|
self.source = source
|
|
self.at_end = False
|
|
|
|
# Read the header
|
|
identification_pattern = source.read(8)
|
|
if identification_pattern != self.IDENTIFICATION_PATTERN:
|
|
raise ValueError(
|
|
'not a valid snoop file, unexpected identification pattern'
|
|
)
|
|
(self.version_number, self.data_link_type) = struct.unpack(
|
|
'>II', source.read(8)
|
|
)
|
|
if self.data_link_type not in (self.DATALINK_H4, self.DATALINK_H1):
|
|
raise ValueError(f'datalink type {self.data_link_type} not supported')
|
|
|
|
def next_packet(self):
|
|
# Read the record header
|
|
header = self.source.read(24)
|
|
if len(header) < 24:
|
|
self.at_end = True
|
|
return (None, 0, None)
|
|
|
|
# Parse the header
|
|
(
|
|
original_length,
|
|
included_length,
|
|
packet_flags,
|
|
_cumulative_drops,
|
|
timestamp,
|
|
) = struct.unpack('>IIIIQ', header)
|
|
|
|
# Skip truncated packets
|
|
if original_length != included_length:
|
|
print(
|
|
color(
|
|
f"!!! truncated packet ({included_length}/{original_length})", "red"
|
|
)
|
|
)
|
|
self.source.read(included_length)
|
|
return (None, 0, None)
|
|
|
|
# Convert the timestamp to a datetime object.
|
|
ts_dt = self.TIMESTAMP_ANCHOR + datetime.timedelta(
|
|
microseconds=timestamp - self.TIMESTAMP_DELTA
|
|
)
|
|
|
|
if self.data_link_type == self.DATALINK_H1:
|
|
# The packet is un-encapsulated, look at the flags to figure out its type
|
|
if packet_flags & 1:
|
|
# Controller -> Host
|
|
if packet_flags & 2:
|
|
packet_type = hci.HCI_EVENT_PACKET
|
|
else:
|
|
packet_type = hci.HCI_ACL_DATA_PACKET
|
|
else:
|
|
# Host -> Controller
|
|
if packet_flags & 2:
|
|
packet_type = hci.HCI_COMMAND_PACKET
|
|
else:
|
|
packet_type = hci.HCI_ACL_DATA_PACKET
|
|
|
|
return (
|
|
packet_flags & 1,
|
|
bytes([packet_type]) + self.source.read(included_length),
|
|
)
|
|
|
|
return (ts_dt, packet_flags & 1, self.source.read(included_length))
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
class Printer:
|
|
def __init__(self):
|
|
self.index = 0
|
|
|
|
def print(self, message: str) -> None:
|
|
self.index += 1
|
|
print(f"[{self.index:8}]{message}")
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
# Main
|
|
# -----------------------------------------------------------------------------
|
|
@click.command()
|
|
@click.option(
|
|
'--format',
|
|
type=click.Choice(['h4', 'snoop']),
|
|
default='h4',
|
|
help='Format of the input file',
|
|
)
|
|
@click.option(
|
|
'--vendor',
|
|
type=click.Choice(['android', 'zephyr']),
|
|
multiple=True,
|
|
help='Support vendor-specific commands (list one or more)',
|
|
)
|
|
@click.argument('filename')
|
|
# pylint: disable=redefined-builtin
|
|
def main(format, vendor, filename):
|
|
for vendor_name in vendor:
|
|
if vendor_name == 'android':
|
|
import bumble.vendor.android.hci
|
|
elif vendor_name == 'zephyr':
|
|
import bumble.vendor.zephyr.hci
|
|
|
|
input = open(filename, 'rb')
|
|
if format == 'h4':
|
|
packet_reader = PacketReader(input)
|
|
|
|
def read_next_packet():
|
|
return (None, 0, packet_reader.next_packet())
|
|
|
|
else:
|
|
packet_reader = SnoopPacketReader(input)
|
|
read_next_packet = packet_reader.next_packet
|
|
|
|
printer = Printer()
|
|
tracer = PacketTracer(emit_message=printer.print)
|
|
|
|
while not packet_reader.at_end:
|
|
try:
|
|
(timestamp, direction, packet) = read_next_packet()
|
|
if packet:
|
|
tracer.trace(hci.HCI_Packet.from_bytes(packet), direction, timestamp)
|
|
else:
|
|
printer.print(color("[TRUNCATED]", "red"))
|
|
except Exception as error:
|
|
logger.exception('')
|
|
print(color(f'!!! {error}', 'red'))
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
if __name__ == '__main__':
|
|
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'WARNING').upper())
|
|
main() # pylint: disable=no-value-for-parameter
|