forked from auracaster/bumble_mirror
Revert "fix python 3.13 linter deprecated warnings for utcnow()"
This reverts commit 589bbfcf19.
This commit is contained in:
@@ -64,7 +64,7 @@ class BtSnooper(Snooper):
|
|||||||
Snooper that saves HCI packets using the BTSnoop format, based on RFC 1761.
|
Snooper that saves HCI packets using the BTSnoop format, based on RFC 1761.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
IDENTIFICATION_PATTERN = b"btsnoop\0"
|
IDENTIFICATION_PATTERN = b'btsnoop\0'
|
||||||
TIMESTAMP_ANCHOR = datetime.datetime(2000, 1, 1)
|
TIMESTAMP_ANCHOR = datetime.datetime(2000, 1, 1)
|
||||||
TIMESTAMP_DELTA = 0x00E03AB44A676000
|
TIMESTAMP_DELTA = 0x00E03AB44A676000
|
||||||
ONE_MS = datetime.timedelta(microseconds=1)
|
ONE_MS = datetime.timedelta(microseconds=1)
|
||||||
@@ -74,7 +74,7 @@ class BtSnooper(Snooper):
|
|||||||
|
|
||||||
# Write the header
|
# Write the header
|
||||||
self.output.write(
|
self.output.write(
|
||||||
self.IDENTIFICATION_PATTERN + struct.pack(">LL", 1, self.DataLinkType.H4)
|
self.IDENTIFICATION_PATTERN + struct.pack('>LL', 1, self.DataLinkType.H4)
|
||||||
)
|
)
|
||||||
|
|
||||||
def snoop(self, hci_packet: bytes, direction: Snooper.Direction) -> None:
|
def snoop(self, hci_packet: bytes, direction: Snooper.Direction) -> None:
|
||||||
@@ -85,20 +85,14 @@ class BtSnooper(Snooper):
|
|||||||
|
|
||||||
# Compute the current timestamp
|
# Compute the current timestamp
|
||||||
timestamp = (
|
timestamp = (
|
||||||
int(
|
int((datetime.datetime.utcnow() - self.TIMESTAMP_ANCHOR) / self.ONE_MS)
|
||||||
(
|
|
||||||
datetime.datetime.now(tz=datetime.timezone.utc)
|
|
||||||
- self.TIMESTAMP_ANCHOR
|
|
||||||
)
|
|
||||||
/ self.ONE_MS
|
|
||||||
)
|
|
||||||
+ self.TIMESTAMP_DELTA
|
+ self.TIMESTAMP_DELTA
|
||||||
)
|
)
|
||||||
|
|
||||||
# Emit the record
|
# Emit the record
|
||||||
self.output.write(
|
self.output.write(
|
||||||
struct.pack(
|
struct.pack(
|
||||||
">IIIIQ",
|
'>IIIIQ',
|
||||||
len(hci_packet), # Original Length
|
len(hci_packet), # Original Length
|
||||||
len(hci_packet), # Included Length
|
len(hci_packet), # Included Length
|
||||||
flags, # Packet Flags
|
flags, # Packet Flags
|
||||||
@@ -135,7 +129,7 @@ def create_snooper(spec: str) -> Generator[Snooper, None, None]:
|
|||||||
records will be written to that file if it can be opened/created.
|
records will be written to that file if it can be opened/created.
|
||||||
The keyword args that may be referenced by the string pattern are:
|
The keyword args that may be referenced by the string pattern are:
|
||||||
now: the value of `datetime.now()`
|
now: the value of `datetime.now()`
|
||||||
utcnow: the value of `datetime.datetime(tz=datetime.timezone.utc)`
|
utcnow: the value of `datetime.utcnow()`
|
||||||
pid: the current process ID.
|
pid: the current process ID.
|
||||||
instance: the instance ID in the current process.
|
instance: the instance ID in the current process.
|
||||||
|
|
||||||
@@ -144,34 +138,34 @@ def create_snooper(spec: str) -> Generator[Snooper, None, None]:
|
|||||||
btsnoop:file:/tmp/bumble_{now:%Y-%m-%d-%H:%M:%S}_{pid}.log
|
btsnoop:file:/tmp/bumble_{now:%Y-%m-%d-%H:%M:%S}_{pid}.log
|
||||||
|
|
||||||
"""
|
"""
|
||||||
if ":" not in spec:
|
if ':' not in spec:
|
||||||
raise core.InvalidArgumentError("snooper type prefix missing")
|
raise core.InvalidArgumentError('snooper type prefix missing')
|
||||||
|
|
||||||
snooper_type, snooper_args = spec.split(":", maxsplit=1)
|
snooper_type, snooper_args = spec.split(':', maxsplit=1)
|
||||||
|
|
||||||
if snooper_type == "btsnoop":
|
if snooper_type == 'btsnoop':
|
||||||
if ":" not in snooper_args:
|
if ':' not in snooper_args:
|
||||||
raise core.InvalidArgumentError("I/O type for btsnoop snooper type missing")
|
raise core.InvalidArgumentError('I/O type for btsnoop snooper type missing')
|
||||||
|
|
||||||
io_type, io_name = snooper_args.split(":", maxsplit=1)
|
io_type, io_name = snooper_args.split(':', maxsplit=1)
|
||||||
if io_type == "file":
|
if io_type == 'file':
|
||||||
# Process the file name string pattern.
|
# Process the file name string pattern.
|
||||||
global _SNOOPER_INSTANCE_COUNT
|
global _SNOOPER_INSTANCE_COUNT
|
||||||
file_path = io_name.format(
|
file_path = io_name.format(
|
||||||
now=datetime.datetime.now(),
|
now=datetime.datetime.now(),
|
||||||
utcnow=datetime.datetime.now(tz=datetime.timezone.utc),
|
utcnow=datetime.datetime.utcnow(),
|
||||||
pid=os.getpid(),
|
pid=os.getpid(),
|
||||||
instance=_SNOOPER_INSTANCE_COUNT,
|
instance=_SNOOPER_INSTANCE_COUNT,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Open the file
|
# Open the file
|
||||||
logger.debug(f"Snoop file: {file_path}")
|
logger.debug(f'Snoop file: {file_path}')
|
||||||
with open(file_path, "wb") as snoop_file:
|
with open(file_path, 'wb') as snoop_file:
|
||||||
_SNOOPER_INSTANCE_COUNT += 1
|
_SNOOPER_INSTANCE_COUNT += 1
|
||||||
yield BtSnooper(snoop_file)
|
yield BtSnooper(snoop_file)
|
||||||
_SNOOPER_INSTANCE_COUNT -= 1
|
_SNOOPER_INSTANCE_COUNT -= 1
|
||||||
return
|
return
|
||||||
|
|
||||||
raise core.InvalidArgumentError(f"I/O type {io_type} not supported")
|
raise core.InvalidArgumentError(f'I/O type {io_type} not supported')
|
||||||
|
|
||||||
raise core.InvalidArgumentError(f"snooper type {snooper_type} not found")
|
raise core.InvalidArgumentError(f'snooper type {snooper_type} not found')
|
||||||
|
|||||||
Reference in New Issue
Block a user