diff --git a/bumble/snoop.py b/bumble/snoop.py index 686bf9de..5a8262cf 100644 --- a/bumble/snoop.py +++ b/bumble/snoop.py @@ -64,7 +64,7 @@ class BtSnooper(Snooper): Snooper that saves HCI packets using the BTSnoop format, based on RFC 1761. """ - IDENTIFICATION_PATTERN = b"btsnoop\0" + IDENTIFICATION_PATTERN = b'btsnoop\0' TIMESTAMP_ANCHOR = datetime.datetime(2000, 1, 1) TIMESTAMP_DELTA = 0x00E03AB44A676000 ONE_MS = datetime.timedelta(microseconds=1) @@ -74,7 +74,7 @@ class BtSnooper(Snooper): # Write the header self.output.write( - self.IDENTIFICATION_PATTERN + struct.pack(">LL", 1, self.DataLinkType.H4) + self.IDENTIFICATION_PATTERN + struct.pack('>LL', 1, self.DataLinkType.H4) ) def snoop(self, hci_packet: bytes, direction: Snooper.Direction) -> None: @@ -85,20 +85,14 @@ class BtSnooper(Snooper): # Compute the current timestamp timestamp = ( - int( - ( - datetime.datetime.now(tz=datetime.timezone.utc) - - self.TIMESTAMP_ANCHOR - ) - / self.ONE_MS - ) + int((datetime.datetime.utcnow() - self.TIMESTAMP_ANCHOR) / self.ONE_MS) + self.TIMESTAMP_DELTA ) # Emit the record self.output.write( struct.pack( - ">IIIIQ", + '>IIIIQ', len(hci_packet), # Original Length len(hci_packet), # Included Length flags, # Packet Flags @@ -135,7 +129,7 @@ def create_snooper(spec: str) -> Generator[Snooper, None, None]: records will be written to that file if it can be opened/created. The keyword args that may be referenced by the string pattern are: now: the value of `datetime.now()` - utcnow: the value of `datetime.datetime(tz=datetime.timezone.utc)` + utcnow: the value of `datetime.utcnow()` pid: the current process ID. instance: the instance ID in the current process. @@ -144,34 +138,34 @@ def create_snooper(spec: str) -> Generator[Snooper, None, None]: btsnoop:file:/tmp/bumble_{now:%Y-%m-%d-%H:%M:%S}_{pid}.log """ - if ":" not in spec: - raise core.InvalidArgumentError("snooper type prefix missing") + if ':' not in spec: + raise core.InvalidArgumentError('snooper type prefix missing') - snooper_type, snooper_args = spec.split(":", maxsplit=1) + snooper_type, snooper_args = spec.split(':', maxsplit=1) - if snooper_type == "btsnoop": - if ":" not in snooper_args: - raise core.InvalidArgumentError("I/O type for btsnoop snooper type missing") + if snooper_type == 'btsnoop': + if ':' not in snooper_args: + raise core.InvalidArgumentError('I/O type for btsnoop snooper type missing') - io_type, io_name = snooper_args.split(":", maxsplit=1) - if io_type == "file": + io_type, io_name = snooper_args.split(':', maxsplit=1) + if io_type == 'file': # Process the file name string pattern. global _SNOOPER_INSTANCE_COUNT file_path = io_name.format( now=datetime.datetime.now(), - utcnow=datetime.datetime.now(tz=datetime.timezone.utc), + utcnow=datetime.datetime.utcnow(), pid=os.getpid(), instance=_SNOOPER_INSTANCE_COUNT, ) # Open the file - logger.debug(f"Snoop file: {file_path}") - with open(file_path, "wb") as snoop_file: + logger.debug(f'Snoop file: {file_path}') + with open(file_path, 'wb') as snoop_file: _SNOOPER_INSTANCE_COUNT += 1 yield BtSnooper(snoop_file) _SNOOPER_INSTANCE_COUNT -= 1 return - raise core.InvalidArgumentError(f"I/O type {io_type} not supported") + raise core.InvalidArgumentError(f'I/O type {io_type} not supported') - raise core.InvalidArgumentError(f"snooper type {snooper_type} not found") + raise core.InvalidArgumentError(f'snooper type {snooper_type} not found')