Added the PcapSnooper class.

The class implements a bumble snooper that writes PCAP records.
It can write to either a file or a named pipe.
The latter is useful to bridge with wireshark extcap for live logging.
This commit is contained in:
Frieder Steinmetz
2026-01-14 23:40:59 +01:00
parent 591ed61686
commit 3fdd7ee45e

View File

@@ -110,6 +110,46 @@ class BtSnooper(Snooper):
)
# -----------------------------------------------------------------------------
class PcapSnooper(Snooper):
"""
Snooper that saves or streames HCI packets using the PCAP format.
"""
PCAP_MAGIC = 0xa1b2c3d4
DLT_BLUETOOTH_HCI_H4_WITH_PHDR = 201
def __init__(self, fifo):
self.output = fifo
# Write the header
self.output.write(struct.pack("<IHHIIII",
self.PCAP_MAGIC,
2, 4, # Major and Minor PCAP Version
0, 0, # Reserved 1 and 2
65535, # SnapLen
# FCS and f are set to 0 implicitly by the next line
self.DLT_BLUETOOTH_HCI_H4_WITH_PHDR # The DLT in this PCAP
))
def snoop(self, hci_packet: bytes, direction: Snooper.Direction):
now = datetime.datetime.now(datetime.timezone.utc)
sec = int(now.timestamp())
usec = now.microsecond
# Emit the record
self.output.write(
struct.pack("<IIII",
sec, # Timestamp (Seconds)
usec, # Timestamp (Microseconds)
len(hci_packet)+4,
len(hci_packet)+4 # +4 because of the addtional direction info...
)
+ struct.pack(">I", int(direction)) # ...thats being added here
+ hci_packet
)
self.output.flush() # flush after every packet for live logging
# -----------------------------------------------------------------------------
_SNOOPER_INSTANCE_COUNT = 0
@@ -139,10 +179,39 @@ def create_snooper(spec: str) -> Generator[Snooper, None, None]:
utcnow: the value of `datetime.now(tz=datetime.timezone.utc)`
pid: the current process ID.
instance: the instance ID in the current process.
pcapsnoop
The syntax for the type-specific arguments for this type is:
<io-type>:<io-type-specific-arguments>
Supported I/O types are:
file
The type-specific arguments for this I/O type is a string that is converted
to a file path using the python `str.format()` string formatting. The log
records will be written to that file if it can be opened/created.
The keyword args that may be referenced by the string pattern are:
now: the value of `datetime.now()`
utcnow: the value of `datetime.now(tz=datetime.timezone.utc)`
pid: the current process ID.
instance: the instance ID in the current process.
pipe
The type-specific arguments for this I/O type is a string that is converted
to a path using the python `str.format()` string formatting. The log
records will be written to the named pipe referenced by this path
if it can be opened. The keyword args that may be referenced by the
string pattern are:
now: the value of `datetime.now()`
utcnow: the value of `datetime.now(tz=datetime.timezone.utc)`
pid: the current process ID.
instance: the instance ID in the current process.
Examples:
btsnoop:file:my_btsnoop.log
btsnoop:file:/tmp/bumble_{now:%Y-%m-%d-%H:%M:%S}_{pid}.log
pcapsnoop:pipe:/tmp/bumble-extcap
"""
if ':' not in spec:
@@ -173,6 +242,36 @@ def create_snooper(spec: str) -> Generator[Snooper, None, None]:
_SNOOPER_INSTANCE_COUNT -= 1
return
elif snooper_type == 'pcapsnoop':
if ':' not in snooper_args:
raise core.InvalidArgumentError(
'I/O type for pcapsnoop snooper type missing'
)
io_type, io_name = snooper_args.split(':', maxsplit=1)
if io_type in {'pipe', 'file'}:
# Process the file name string pattern.
file_path = io_name.format(
now=datetime.datetime.now(),
utcnow=datetime.datetime.now(tz=datetime.timezone.utc),
pid=os.getpid(),
instance=_SNOOPER_INSTANCE_COUNT,
)
# Pipes we have to open with unbuffered binary I/O
kwargs = {}
if io_type == 'pipe':
kwargs["buffering"] = 0
# Open a file or pipe
logger.debug(f'PCAP file: {file_path}')
# Pass ``buffering`` for pipes but not for files
with open(file_path, 'wb', **kwargs) as snoop_file:
_SNOOPER_INSTANCE_COUNT += 1
yield PcapSnooper(snoop_file)
_SNOOPER_INSTANCE_COUNT -= 1
return
raise core.InvalidArgumentError(f'I/O type {io_type} not supported')
raise core.InvalidArgumentError(f'snooper type {snooper_type} not found')