diff --git a/bumble/snoop.py b/bumble/snoop.py index 5a8262c..686bf9d 100644 --- a/bumble/snoop.py +++ b/bumble/snoop.py @@ -64,7 +64,7 @@ class BtSnooper(Snooper): Snooper that saves HCI packets using the BTSnoop format, based on RFC 1761. """ - IDENTIFICATION_PATTERN = b'btsnoop\0' + IDENTIFICATION_PATTERN = b"btsnoop\0" TIMESTAMP_ANCHOR = datetime.datetime(2000, 1, 1) TIMESTAMP_DELTA = 0x00E03AB44A676000 ONE_MS = datetime.timedelta(microseconds=1) @@ -74,7 +74,7 @@ class BtSnooper(Snooper): # Write the header self.output.write( - self.IDENTIFICATION_PATTERN + struct.pack('>LL', 1, self.DataLinkType.H4) + self.IDENTIFICATION_PATTERN + struct.pack(">LL", 1, self.DataLinkType.H4) ) def snoop(self, hci_packet: bytes, direction: Snooper.Direction) -> None: @@ -85,14 +85,20 @@ class BtSnooper(Snooper): # Compute the current timestamp timestamp = ( - int((datetime.datetime.utcnow() - self.TIMESTAMP_ANCHOR) / self.ONE_MS) + int( + ( + datetime.datetime.now(tz=datetime.timezone.utc) + - self.TIMESTAMP_ANCHOR + ) + / self.ONE_MS + ) + self.TIMESTAMP_DELTA ) # Emit the record self.output.write( struct.pack( - '>IIIIQ', + ">IIIIQ", len(hci_packet), # Original Length len(hci_packet), # Included Length flags, # Packet Flags @@ -129,7 +135,7 @@ def create_snooper(spec: str) -> Generator[Snooper, None, None]: records will be written to that file if it can be opened/created. The keyword args that may be referenced by the string pattern are: now: the value of `datetime.now()` - utcnow: the value of `datetime.utcnow()` + utcnow: the value of `datetime.datetime(tz=datetime.timezone.utc)` pid: the current process ID. instance: the instance ID in the current process. @@ -138,34 +144,34 @@ def create_snooper(spec: str) -> Generator[Snooper, None, None]: btsnoop:file:/tmp/bumble_{now:%Y-%m-%d-%H:%M:%S}_{pid}.log """ - if ':' not in spec: - raise core.InvalidArgumentError('snooper type prefix missing') + if ":" not in spec: + raise core.InvalidArgumentError("snooper type prefix missing") - snooper_type, snooper_args = spec.split(':', maxsplit=1) + snooper_type, snooper_args = spec.split(":", maxsplit=1) - if snooper_type == 'btsnoop': - if ':' not in snooper_args: - raise core.InvalidArgumentError('I/O type for btsnoop snooper type missing') + if snooper_type == "btsnoop": + if ":" not in snooper_args: + raise core.InvalidArgumentError("I/O type for btsnoop snooper type missing") - io_type, io_name = snooper_args.split(':', maxsplit=1) - if io_type == 'file': + io_type, io_name = snooper_args.split(":", maxsplit=1) + if io_type == "file": # Process the file name string pattern. global _SNOOPER_INSTANCE_COUNT file_path = io_name.format( now=datetime.datetime.now(), - utcnow=datetime.datetime.utcnow(), + utcnow=datetime.datetime.now(tz=datetime.timezone.utc), pid=os.getpid(), instance=_SNOOPER_INSTANCE_COUNT, ) # Open the file - logger.debug(f'Snoop file: {file_path}') - with open(file_path, 'wb') as snoop_file: + logger.debug(f"Snoop file: {file_path}") + with open(file_path, "wb") as snoop_file: _SNOOPER_INSTANCE_COUNT += 1 yield BtSnooper(snoop_file) _SNOOPER_INSTANCE_COUNT -= 1 return - raise core.InvalidArgumentError(f'I/O type {io_type} not supported') + raise core.InvalidArgumentError(f"I/O type {io_type} not supported") - raise core.InvalidArgumentError(f'snooper type {snooper_type} not found') + raise core.InvalidArgumentError(f"snooper type {snooper_type} not found")