From a5275ade29d9b05f7f0b2a384387324300c3c20e Mon Sep 17 00:00:00 2001 From: Gilles Boccon-Gibod Date: Thu, 2 Mar 2023 14:34:49 -0800 Subject: [PATCH] add snoop support --- bumble/hci.py | 8 +++ bumble/host.py | 8 +++ bumble/snoop.py | 93 +++++++++++++++++++++++++++++ examples/run_device_with_snooper.py | 51 ++++++++++++++++ 4 files changed, 160 insertions(+) create mode 100644 bumble/snoop.py create mode 100644 examples/run_device_with_snooper.py diff --git a/bumble/hci.py b/bumble/hci.py index d8517c21..70d96528 100644 --- a/bumble/hci.py +++ b/bumble/hci.py @@ -1846,6 +1846,8 @@ class HCI_Packet: Abstract Base class for HCI packets ''' + hci_packet_type: int + @staticmethod def from_bytes(packet): packet_type = packet[0] @@ -1864,6 +1866,9 @@ class HCI_Packet: def __init__(self, name): self.name = name + def __bytes__(self) -> bytes: + raise NotImplementedError + def __repr__(self) -> str: return self.name @@ -1875,6 +1880,9 @@ class HCI_CustomPacket(HCI_Packet): self.hci_packet_type = payload[0] self.payload = payload + def __bytes__(self) -> bytes: + return self.payload + # ----------------------------------------------------------------------------- class HCI_Command(HCI_Packet): diff --git a/bumble/host.py b/bumble/host.py index 65c7741a..87ec6104 100644 --- a/bumble/host.py +++ b/bumble/host.py @@ -22,6 +22,7 @@ import struct from bumble.colors import color from bumble.l2cap import L2CAP_PDU +from bumble.snoop import Snooper from .hci import ( HCI_ACL_DATA_PACKET, @@ -133,6 +134,7 @@ class Host(AbortableEventEmitter): self.long_term_key_provider = None self.link_key_provider = None self.pairing_io_capability_provider = None # Classic only + self.snooper = None # Connect to the source and sink if specified if controller_source: @@ -273,6 +275,9 @@ class Host(AbortableEventEmitter): self.hci_sink = sink def send_hci_packet(self, packet): + if self.snooper: + self.snooper.snoop(packet, Snooper.Direction.HOST_TO_CONTROLLER) + self.hci_sink.on_packet(packet.to_bytes()) async def send_command(self, command, check_result=False): @@ -419,6 +424,9 @@ class Host(AbortableEventEmitter): def on_hci_packet(self, packet): logger.debug(f'{color("### CONTROLLER -> HOST", "green")}: {packet}') + if self.snooper: + self.snooper.snoop(packet, Snooper.Direction.CONTROLLER_TO_HOST) + # If the packet is a command, invoke the handler for this packet if packet.hci_packet_type == HCI_COMMAND_PACKET: self.on_hci_command_packet(packet) diff --git a/bumble/snoop.py b/bumble/snoop.py new file mode 100644 index 00000000..359fa380 --- /dev/null +++ b/bumble/snoop.py @@ -0,0 +1,93 @@ +# Copyright 2021-2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# ----------------------------------------------------------------------------- +# Imports +# ----------------------------------------------------------------------------- +from enum import IntEnum +import struct +import datetime +from typing import BinaryIO + +from bumble.hci import HCI_Packet, HCI_COMMAND_PACKET, HCI_EVENT_PACKET + + +# ----------------------------------------------------------------------------- +# Classes +# ----------------------------------------------------------------------------- +class Snooper: + """ + Base class for snooper implementations. + + A snooper is an object that will be provided with HCI packets as they are + exchanged between a host and a controller. + """ + + class Direction(IntEnum): + HOST_TO_CONTROLLER = 0 + CONTROLLER_TO_HOST = 1 + + class DataLinkType(IntEnum): + H1 = 1001 + H4 = 1002 + HCI_BSCP = 1003 + H5 = 1004 + + def snoop(self, hci_packet: HCI_Packet, direction: Direction) -> None: + """Snoop on an HCI packet.""" + + +# ----------------------------------------------------------------------------- +class BtSnooper(Snooper): + """ + Snooper that saves HCI packets using the BTSnoop format, based on RFC 1761. + """ + + IDENTIFICATION_PATTERN = b'btsnoop\0' + TIMESTAMP_ANCHOR = datetime.datetime(2000, 1, 1) + TIMESTAMP_DELTA = 0x00E03AB44A676000 + ONE_MS = datetime.timedelta(microseconds=1) + + def __init__(self, output: BinaryIO): + self.output = output + + # Write the header + self.output.write( + self.IDENTIFICATION_PATTERN + struct.pack('>LL', 1, self.DataLinkType.H4) + ) + + def snoop(self, hci_packet: HCI_Packet, direction: Snooper.Direction) -> None: + flags = int(direction) + if hci_packet.hci_packet_type in (HCI_EVENT_PACKET, HCI_COMMAND_PACKET): + flags |= 0x10 + + # Compute the current timestamp + timestamp = ( + int((datetime.datetime.utcnow() - self.TIMESTAMP_ANCHOR) / self.ONE_MS) + + self.TIMESTAMP_DELTA + ) + + # Emit the record + packet_data = bytes(hci_packet) + self.output.write( + struct.pack( + '>IIIIQ', + len(packet_data), # Original Length + len(packet_data), # Included Length + flags, # Packet Flags + 0, # Cumulative Drops + timestamp, # Timestamp + ) + + packet_data + ) diff --git a/examples/run_device_with_snooper.py b/examples/run_device_with_snooper.py new file mode 100644 index 00000000..69a187f5 --- /dev/null +++ b/examples/run_device_with_snooper.py @@ -0,0 +1,51 @@ +# Copyright 2021-2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# ----------------------------------------------------------------------------- +# Imports +# ----------------------------------------------------------------------------- +import asyncio +import sys +import os +import logging +from bumble.colors import color + +from bumble.device import Device +from bumble.transport import open_transport_or_link +from bumble.snoop import BtSnooper + +# ----------------------------------------------------------------------------- +async def main(): + if len(sys.argv) != 3: + print('Usage: run_device_with_snooper.py ') + print('example: run_device_with_snooper.py usb:0 btsnoop.log') + return + + print('<<< connecting to HCI...') + async with await open_transport_or_link(sys.argv[1]) as (hci_source, hci_sink): + print('<<< connected') + + device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink) + + with open(sys.argv[2], "wb") as snoop_file: + device.host.snooper = BtSnooper(snoop_file) + await device.power_on() + await device.start_scanning() + + await hci_source.wait_for_termination() + + +# ----------------------------------------------------------------------------- +logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper()) +asyncio.run(main())