forked from auracaster/bumble_mirror
Typing packet transmission flow
This commit is contained in:
@@ -20,7 +20,7 @@ import struct
|
||||
import collections
|
||||
import logging
|
||||
import functools
|
||||
from typing import Dict, Type, Union
|
||||
from typing import Dict, Type, Union, Callable, Any, Optional
|
||||
|
||||
from .colors import color
|
||||
from .core import (
|
||||
@@ -1918,7 +1918,7 @@ class HCI_Packet:
|
||||
hci_packet_type: int
|
||||
|
||||
@staticmethod
|
||||
def from_bytes(packet):
|
||||
def from_bytes(packet: bytes) -> HCI_Packet:
|
||||
packet_type = packet[0]
|
||||
|
||||
if packet_type == HCI_COMMAND_PACKET:
|
||||
@@ -1992,7 +1992,7 @@ class HCI_Command(HCI_Packet):
|
||||
return inner
|
||||
|
||||
@staticmethod
|
||||
def from_bytes(packet):
|
||||
def from_bytes(packet: bytes) -> HCI_Command:
|
||||
op_code, length = struct.unpack_from('<HB', packet, 1)
|
||||
parameters = packet[4:]
|
||||
if len(parameters) != length:
|
||||
@@ -2011,7 +2011,7 @@ class HCI_Command(HCI_Packet):
|
||||
HCI_Object.init_from_bytes(self, parameters, 0, fields)
|
||||
return self
|
||||
|
||||
return cls.from_parameters(parameters)
|
||||
return cls.from_parameters(parameters) # type: ignore
|
||||
|
||||
@staticmethod
|
||||
def command_name(op_code):
|
||||
@@ -4350,13 +4350,14 @@ class HCI_Event(HCI_Packet):
|
||||
return event_class
|
||||
|
||||
@staticmethod
|
||||
def from_bytes(packet):
|
||||
def from_bytes(packet: bytes) -> HCI_Event:
|
||||
event_code = packet[1]
|
||||
length = packet[2]
|
||||
parameters = packet[3:]
|
||||
if len(parameters) != length:
|
||||
raise ValueError('invalid packet length')
|
||||
|
||||
cls: Type[HCI_Event | HCI_LE_Meta_Event] | None
|
||||
if event_code == HCI_LE_META_EVENT:
|
||||
# We do this dispatch here and not in the subclass in order to avoid call
|
||||
# loops
|
||||
@@ -4373,7 +4374,7 @@ class HCI_Event(HCI_Packet):
|
||||
return HCI_Event(event_code, parameters)
|
||||
|
||||
# Invoke the factory to create a new instance
|
||||
return cls.from_parameters(parameters)
|
||||
return cls.from_parameters(parameters) # type: ignore
|
||||
|
||||
@classmethod
|
||||
def from_parameters(cls, parameters):
|
||||
@@ -5086,6 +5087,7 @@ class HCI_Command_Complete_Event(HCI_Event):
|
||||
'''
|
||||
|
||||
return_parameters = b''
|
||||
command_opcode: int
|
||||
|
||||
def map_return_parameters(self, return_parameters):
|
||||
'''Map simple 'status' return parameters to their named constant form'''
|
||||
@@ -5605,7 +5607,7 @@ class HCI_Remote_Host_Supported_Features_Notification_Event(HCI_Event):
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
class HCI_AclDataPacket:
|
||||
class HCI_AclDataPacket(HCI_Packet):
|
||||
'''
|
||||
See Bluetooth spec @ 5.4.2 HCI ACL Data Packets
|
||||
'''
|
||||
@@ -5613,7 +5615,7 @@ class HCI_AclDataPacket:
|
||||
hci_packet_type = HCI_ACL_DATA_PACKET
|
||||
|
||||
@staticmethod
|
||||
def from_bytes(packet):
|
||||
def from_bytes(packet: bytes) -> HCI_AclDataPacket:
|
||||
# Read the header
|
||||
h, data_total_length = struct.unpack_from('<HH', packet, 1)
|
||||
connection_handle = h & 0xFFF
|
||||
@@ -5655,12 +5657,14 @@ class HCI_AclDataPacket:
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
class HCI_AclDataPacketAssembler:
|
||||
def __init__(self, callback):
|
||||
current_data: Optional[bytes]
|
||||
|
||||
def __init__(self, callback: Callable[[bytes], Any]) -> None:
|
||||
self.callback = callback
|
||||
self.current_data = None
|
||||
self.l2cap_pdu_length = 0
|
||||
|
||||
def feed_packet(self, packet):
|
||||
def feed_packet(self, packet: HCI_AclDataPacket) -> None:
|
||||
if packet.pb_flag in (
|
||||
HCI_ACL_PB_FIRST_NON_FLUSHABLE,
|
||||
HCI_ACL_PB_FIRST_FLUSHABLE,
|
||||
@@ -5674,6 +5678,7 @@ class HCI_AclDataPacketAssembler:
|
||||
return
|
||||
self.current_data += packet.data
|
||||
|
||||
assert self.current_data is not None
|
||||
if len(self.current_data) == self.l2cap_pdu_length + 4:
|
||||
# The packet is complete, invoke the callback
|
||||
logger.debug(f'<<< ACL PDU: {self.current_data.hex()}')
|
||||
|
||||
Reference in New Issue
Block a user