forked from auracaster/bumble_mirror
improve vendor event support
This commit is contained in:
@@ -4996,6 +4996,7 @@ class HCI_Event(HCI_Packet):
|
||||
hci_packet_type = HCI_EVENT_PACKET
|
||||
event_names: Dict[int, str] = {}
|
||||
event_classes: Dict[int, Type[HCI_Event]] = {}
|
||||
vendor_factory: Optional[Callable[[bytes], Optional[HCI_Event]]] = None
|
||||
|
||||
@staticmethod
|
||||
def event(fields=()):
|
||||
@@ -5053,31 +5054,41 @@ class HCI_Event(HCI_Packet):
|
||||
|
||||
return event_class
|
||||
|
||||
@staticmethod
|
||||
def from_bytes(packet: bytes) -> HCI_Event:
|
||||
@classmethod
|
||||
def from_bytes(cls, packet: bytes) -> HCI_Event:
|
||||
event_code = packet[1]
|
||||
length = packet[2]
|
||||
parameters = packet[3:]
|
||||
if len(parameters) != length:
|
||||
raise InvalidPacketError('invalid packet length')
|
||||
|
||||
cls: Any
|
||||
subclass: Any
|
||||
if event_code == HCI_LE_META_EVENT:
|
||||
# We do this dispatch here and not in the subclass in order to avoid call
|
||||
# loops
|
||||
subevent_code = parameters[0]
|
||||
cls = HCI_LE_Meta_Event.subevent_classes.get(subevent_code)
|
||||
if cls is None:
|
||||
subclass = HCI_LE_Meta_Event.subevent_classes.get(subevent_code)
|
||||
if subclass is None:
|
||||
# No class registered, just use a generic class instance
|
||||
return HCI_LE_Meta_Event(subevent_code, parameters)
|
||||
elif event_code == HCI_VENDOR_EVENT:
|
||||
# Invoke all the registered factories to see if any of them can handle
|
||||
# the event
|
||||
if cls.vendor_factory:
|
||||
if event := cls.vendor_factory(parameters):
|
||||
return event
|
||||
|
||||
# No factory, or the factory could not create an instance,
|
||||
# return a generic vendor event
|
||||
return HCI_Event(event_code, parameters)
|
||||
else:
|
||||
cls = HCI_Event.event_classes.get(event_code)
|
||||
if cls is None:
|
||||
subclass = HCI_Event.event_classes.get(event_code)
|
||||
if subclass is None:
|
||||
# No class registered, just use a generic class instance
|
||||
return HCI_Event(event_code, parameters)
|
||||
|
||||
# Invoke the factory to create a new instance
|
||||
return cls.from_parameters(parameters) # type: ignore
|
||||
return subclass.from_parameters(parameters) # type: ignore
|
||||
|
||||
@classmethod
|
||||
def from_parameters(cls, parameters):
|
||||
@@ -5127,7 +5138,7 @@ class HCI_Extended_Event(HCI_Event):
|
||||
'''
|
||||
|
||||
subevent_names: Dict[int, str] = {}
|
||||
subevent_classes: Dict[int, Type[HCI_Extended_Event]]
|
||||
subevent_classes: Dict[int, Type[HCI_Extended_Event]] = {}
|
||||
|
||||
@classmethod
|
||||
def event(cls, fields=()):
|
||||
@@ -5178,7 +5189,22 @@ class HCI_Extended_Event(HCI_Event):
|
||||
cls.subevent_names.update(cls.subevent_map(symbols))
|
||||
|
||||
@classmethod
|
||||
def from_parameters(cls, parameters):
|
||||
def subclass_from_parameters(
|
||||
cls, parameters: bytes
|
||||
) -> Optional[HCI_Extended_Event]:
|
||||
"""
|
||||
Factory method that parses the subevent code, finds a registered subclass,
|
||||
and creates an instance if found.
|
||||
"""
|
||||
subevent_code = parameters[0]
|
||||
if subclass := cls.subevent_classes.get(subevent_code):
|
||||
return subclass.from_parameters(parameters)
|
||||
|
||||
return None
|
||||
|
||||
@classmethod
|
||||
def from_parameters(cls, parameters: bytes) -> HCI_Extended_Event:
|
||||
"""Factory method for subclasses (the subevent code has already been parsed)"""
|
||||
self = cls.__new__(cls)
|
||||
HCI_Extended_Event.__init__(self, self.subevent_code, parameters)
|
||||
if fields := getattr(self, 'fields', None):
|
||||
@@ -6092,8 +6118,9 @@ class HCI_Command_Complete_Event(HCI_Event):
|
||||
See Bluetooth spec @ 7.7.14 Command Complete Event
|
||||
'''
|
||||
|
||||
return_parameters = b''
|
||||
num_hci_command_packets: int
|
||||
command_opcode: int
|
||||
return_parameters = b''
|
||||
|
||||
def map_return_parameters(self, return_parameters):
|
||||
'''Map simple 'status' return parameters to their named constant form'''
|
||||
|
||||
Reference in New Issue
Block a user