forked from auracaster/bumble_mirror
Merge pull request #618 from google/gbg/hci-event-multi-vendor
support multiple event factories
This commit is contained in:
+17
-4
@@ -5070,7 +5070,7 @@ class HCI_Event(HCI_Packet):
|
||||
hci_packet_type = HCI_EVENT_PACKET
|
||||
event_names: Dict[int, str] = {}
|
||||
event_classes: Dict[int, Type[HCI_Event]] = {}
|
||||
vendor_factory: Optional[Callable[[bytes], Optional[HCI_Event]]] = None
|
||||
vendor_factories: list[Callable[[bytes], Optional[HCI_Event]]] = []
|
||||
|
||||
@staticmethod
|
||||
def event(fields=()):
|
||||
@@ -5128,6 +5128,19 @@ class HCI_Event(HCI_Packet):
|
||||
|
||||
return event_class
|
||||
|
||||
@classmethod
|
||||
def add_vendor_factory(
|
||||
cls, factory: Callable[[bytes], Optional[HCI_Event]]
|
||||
) -> None:
|
||||
cls.vendor_factories.append(factory)
|
||||
|
||||
@classmethod
|
||||
def remove_vendor_factory(
|
||||
cls, factory: Callable[[bytes], Optional[HCI_Event]]
|
||||
) -> None:
|
||||
if factory in cls.vendor_factories:
|
||||
cls.vendor_factories.remove(factory)
|
||||
|
||||
@classmethod
|
||||
def from_bytes(cls, packet: bytes) -> HCI_Event:
|
||||
event_code = packet[1]
|
||||
@@ -5148,13 +5161,13 @@ class HCI_Event(HCI_Packet):
|
||||
elif event_code == HCI_VENDOR_EVENT:
|
||||
# Invoke all the registered factories to see if any of them can handle
|
||||
# the event
|
||||
if cls.vendor_factory:
|
||||
if event := cls.vendor_factory(parameters):
|
||||
for vendor_factory in cls.vendor_factories:
|
||||
if event := vendor_factory(parameters):
|
||||
return event
|
||||
|
||||
# No factory, or the factory could not create an instance,
|
||||
# return a generic vendor event
|
||||
return HCI_Event(event_code, parameters)
|
||||
return HCI_Vendor_Event(data=parameters)
|
||||
else:
|
||||
subclass = HCI_Event.event_classes.get(event_code)
|
||||
if subclass is None:
|
||||
|
||||
Vendored
+1
-1
@@ -299,7 +299,7 @@ class HCI_Android_Vendor_Event(HCI_Extended_Event):
|
||||
|
||||
|
||||
HCI_Android_Vendor_Event.register_subevents(globals())
|
||||
HCI_Event.vendor_factory = HCI_Android_Vendor_Event.subclass_from_parameters
|
||||
HCI_Event.add_vendor_factory(HCI_Android_Vendor_Event.subclass_from_parameters)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
# -----------------------------------------------------------------------------
|
||||
# Imports
|
||||
# -----------------------------------------------------------------------------
|
||||
import struct
|
||||
|
||||
from bumble.hci import (
|
||||
HCI_DISCONNECT_COMMAND,
|
||||
@@ -22,6 +23,7 @@ from bumble.hci import (
|
||||
HCI_LE_CODED_PHY_BIT,
|
||||
HCI_LE_READ_BUFFER_SIZE_COMMAND,
|
||||
HCI_RESET_COMMAND,
|
||||
HCI_VENDOR_EVENT,
|
||||
HCI_SUCCESS,
|
||||
HCI_LE_CONNECTION_COMPLETE_EVENT,
|
||||
HCI_LE_ENHANCED_CONNECTION_COMPLETE_V2_EVENT,
|
||||
@@ -67,6 +69,7 @@ from bumble.hci import (
|
||||
HCI_Read_Local_Version_Information_Command,
|
||||
HCI_Reset_Command,
|
||||
HCI_Set_Event_Mask_Command,
|
||||
HCI_Vendor_Event,
|
||||
)
|
||||
|
||||
|
||||
@@ -213,6 +216,41 @@ def test_HCI_Number_Of_Completed_Packets_Event():
|
||||
basic_check(event)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
def test_HCI_Vendor_Event():
|
||||
data = bytes.fromhex('01020304')
|
||||
event = HCI_Vendor_Event(data=data)
|
||||
event_bytes = bytes(event)
|
||||
parsed = HCI_Packet.from_bytes(event_bytes)
|
||||
assert isinstance(parsed, HCI_Vendor_Event)
|
||||
assert parsed.data == data
|
||||
|
||||
class HCI_Custom_Event(HCI_Event):
|
||||
def __init__(self, blabla):
|
||||
super().__init__(HCI_VENDOR_EVENT, parameters=struct.pack("<I", blabla))
|
||||
self.name = 'HCI_CUSTOM_EVENT'
|
||||
self.blabla = blabla
|
||||
|
||||
def create_event(payload):
|
||||
if payload[0] == 1:
|
||||
return HCI_Custom_Event(blabla=struct.unpack('<I', payload)[0])
|
||||
return None
|
||||
|
||||
HCI_Event.add_vendor_factory(create_event)
|
||||
parsed = HCI_Packet.from_bytes(event_bytes)
|
||||
assert isinstance(parsed, HCI_Custom_Event)
|
||||
assert parsed.blabla == 0x04030201
|
||||
event_bytes2 = event_bytes[:3] + bytes([7]) + event_bytes[4:]
|
||||
parsed = HCI_Packet.from_bytes(event_bytes2)
|
||||
assert not isinstance(parsed, HCI_Custom_Event)
|
||||
assert isinstance(parsed, HCI_Vendor_Event)
|
||||
HCI_Event.remove_vendor_factory(create_event)
|
||||
|
||||
parsed = HCI_Packet.from_bytes(event_bytes)
|
||||
assert not isinstance(parsed, HCI_Custom_Event)
|
||||
assert isinstance(parsed, HCI_Vendor_Event)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
def test_HCI_Command():
|
||||
command = HCI_Command(0x5566)
|
||||
@@ -576,6 +614,7 @@ def run_test_events():
|
||||
test_HCI_Command_Complete_Event()
|
||||
test_HCI_Command_Status_Event()
|
||||
test_HCI_Number_Of_Completed_Packets_Event()
|
||||
test_HCI_Vendor_Event()
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
Reference in New Issue
Block a user