Migrate all HCI_Event to dataclasses

This commit is contained in:
Josh Wu
2025-06-24 19:02:36 +08:00
parent 22ff0d5e32
commit 0ab5b6c49a
9 changed files with 437 additions and 555 deletions

View File

@@ -394,7 +394,7 @@ class Controller:
peer_address=peer_address,
link=self.link,
transport=PhysicalTransport.LE,
link_type=HCI_Connection_Complete_Event.ACL_LINK_TYPE,
link_type=HCI_Connection_Complete_Event.LinkType.ACL,
)
self.peripheral_connections[peer_address] = connection
logger.debug(f'New PERIPHERAL connection handle: 0x{connection_handle:04X}')
@@ -454,7 +454,7 @@ class Controller:
peer_address=peer_address,
link=self.link,
transport=PhysicalTransport.LE,
link_type=HCI_Connection_Complete_Event.ACL_LINK_TYPE,
link_type=HCI_Connection_Complete_Event.LinkType.ACL,
)
self.central_connections[peer_address] = connection
logger.debug(
@@ -695,7 +695,7 @@ class Controller:
peer_address=peer_address,
link=self.link,
transport=PhysicalTransport.BR_EDR,
link_type=HCI_Connection_Complete_Event.ACL_LINK_TYPE,
link_type=HCI_Connection_Complete_Event.LinkType.ACL,
)
self.classic_connections[peer_address] = connection
logger.debug(
@@ -709,7 +709,7 @@ class Controller:
connection_handle=connection_handle,
bd_addr=peer_address,
encryption_enabled=False,
link_type=HCI_Connection_Complete_Event.ACL_LINK_TYPE,
link_type=HCI_Connection_Complete_Event.LinkType.ACL,
)
)
else:
@@ -720,7 +720,7 @@ class Controller:
connection_handle=0,
bd_addr=peer_address,
encryption_enabled=False,
link_type=HCI_Connection_Complete_Event.ACL_LINK_TYPE,
link_type=HCI_Connection_Complete_Event.LinkType.ACL,
)
)
@@ -945,7 +945,7 @@ class Controller:
)
)
self.link.classic_sco_connect(
self, connection.peer_address, HCI_Connection_Complete_Event.ESCO_LINK_TYPE
self, connection.peer_address, HCI_Connection_Complete_Event.LinkType.ESCO
)
def on_hci_enhanced_accept_synchronous_connection_request_command(self, command):
@@ -974,7 +974,7 @@ class Controller:
)
)
self.link.classic_accept_sco_connection(
self, connection.peer_address, HCI_Connection_Complete_Event.ESCO_LINK_TYPE
self, connection.peer_address, HCI_Connection_Complete_Event.LinkType.ESCO
)
def on_hci_switch_role_command(self, command):

View File

@@ -5084,8 +5084,8 @@ class Device(utils.CompositeEventEmitter):
# Store the keys in the key store
if self.keystore:
authenticated = key_type in (
hci.HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_192_TYPE,
hci.HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE,
hci.LinkKeyType.AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_192,
hci.LinkKeyType.AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256,
)
pairing_keys = PairingKeys(
link_key=PairingKeys.Key(value=link_key, authenticated=authenticated),
@@ -5532,8 +5532,8 @@ class Device(utils.CompositeEventEmitter):
# Handle SCO request.
if link_type in (
hci.HCI_Connection_Complete_Event.SCO_LINK_TYPE,
hci.HCI_Connection_Complete_Event.ESCO_LINK_TYPE,
hci.HCI_Connection_Complete_Event.LinkType.SCO,
hci.HCI_Connection_Complete_Event.LinkType.ESCO,
):
if connection := self.find_connection_by_bd_addr(
bd_addr, transport=PhysicalTransport.BR_EDR
@@ -5641,7 +5641,7 @@ class Device(utils.CompositeEventEmitter):
# [Classic only]
@host_event_handler
@with_connection_from_address
def on_authentication_io_capability_request(self, connection):
def on_authentication_io_capability_request(self, connection: Connection):
# Ask what the pairing config should be for this connection
pairing_config = self.pairing_config_factory(connection)
@@ -5649,13 +5649,13 @@ class Device(utils.CompositeEventEmitter):
authentication_requirements = (
# No Bonding
(
hci.HCI_MITM_NOT_REQUIRED_NO_BONDING_AUTHENTICATION_REQUIREMENTS,
hci.HCI_MITM_REQUIRED_NO_BONDING_AUTHENTICATION_REQUIREMENTS,
hci.AuthenticationRequirements.MITM_NOT_REQUIRED_NO_BONDING,
hci.AuthenticationRequirements.MITM_REQUIRED_NO_BONDING,
),
# General Bonding
(
hci.HCI_MITM_NOT_REQUIRED_GENERAL_BONDING_AUTHENTICATION_REQUIREMENTS,
hci.HCI_MITM_REQUIRED_GENERAL_BONDING_AUTHENTICATION_REQUIREMENTS,
hci.AuthenticationRequirements.MITM_NOT_REQUIRED_GENERAL_BONDING,
hci.AuthenticationRequirements.MITM_REQUIRED_GENERAL_BONDING,
),
)[1 if pairing_config.bonding else 0][1 if pairing_config.mitm else 0]
@@ -5710,30 +5710,30 @@ class Device(utils.CompositeEventEmitter):
raise UnreachableError()
# See Bluetooth spec @ Vol 3, Part C 5.2.2.6
methods = {
hci.HCI_DISPLAY_ONLY_IO_CAPABILITY: {
hci.HCI_DISPLAY_ONLY_IO_CAPABILITY: display_auto_confirm,
hci.HCI_DISPLAY_YES_NO_IO_CAPABILITY: display_confirm,
hci.HCI_KEYBOARD_ONLY_IO_CAPABILITY: na,
hci.HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: auto_confirm,
methods: dict[int, dict[int, Callable[[], Awaitable[bool]]]] = {
hci.IoCapability.DISPLAY_ONLY: {
hci.IoCapability.DISPLAY_ONLY: display_auto_confirm,
hci.IoCapability.DISPLAY_YES_NO: display_confirm,
hci.IoCapability.KEYBOARD_ONLY: na,
hci.IoCapability.NO_INPUT_NO_OUTPUT: auto_confirm,
},
hci.HCI_DISPLAY_YES_NO_IO_CAPABILITY: {
hci.HCI_DISPLAY_ONLY_IO_CAPABILITY: display_auto_confirm,
hci.HCI_DISPLAY_YES_NO_IO_CAPABILITY: display_confirm,
hci.HCI_KEYBOARD_ONLY_IO_CAPABILITY: na,
hci.HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: auto_confirm,
hci.IoCapability.DISPLAY_YES_NO: {
hci.IoCapability.DISPLAY_ONLY: display_auto_confirm,
hci.IoCapability.DISPLAY_YES_NO: display_confirm,
hci.IoCapability.KEYBOARD_ONLY: na,
hci.IoCapability.NO_INPUT_NO_OUTPUT: auto_confirm,
},
hci.HCI_KEYBOARD_ONLY_IO_CAPABILITY: {
hci.HCI_DISPLAY_ONLY_IO_CAPABILITY: na,
hci.HCI_DISPLAY_YES_NO_IO_CAPABILITY: na,
hci.HCI_KEYBOARD_ONLY_IO_CAPABILITY: na,
hci.HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: auto_confirm,
hci.IoCapability.KEYBOARD_ONLY: {
hci.IoCapability.DISPLAY_ONLY: na,
hci.IoCapability.DISPLAY_YES_NO: na,
hci.IoCapability.KEYBOARD_ONLY: na,
hci.IoCapability.NO_INPUT_NO_OUTPUT: auto_confirm,
},
hci.HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: {
hci.HCI_DISPLAY_ONLY_IO_CAPABILITY: confirm,
hci.HCI_DISPLAY_YES_NO_IO_CAPABILITY: confirm,
hci.HCI_KEYBOARD_ONLY_IO_CAPABILITY: auto_confirm,
hci.HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: auto_confirm,
hci.IoCapability.NO_INPUT_NO_OUTPUT: {
hci.IoCapability.DISPLAY_ONLY: confirm,
hci.IoCapability.DISPLAY_YES_NO: confirm,
hci.IoCapability.KEYBOARD_ONLY: auto_confirm,
hci.IoCapability.NO_INPUT_NO_OUTPUT: auto_confirm,
},
}
@@ -5799,7 +5799,7 @@ class Device(utils.CompositeEventEmitter):
io_capability = pairing_config.delegate.classic_io_capability
# Respond
if io_capability == hci.HCI_KEYBOARD_ONLY_IO_CAPABILITY:
if io_capability == hci.IoCapability.KEYBOARD_ONLY:
# Ask the user to enter a string
async def get_pin_code():
pin_code = await connection.cancel_on_disconnection(
@@ -5979,7 +5979,7 @@ class Device(utils.CompositeEventEmitter):
@host_event_handler
@with_connection_from_handle
def on_connection_encryption_change(
self, connection, encryption, encryption_key_size
self, connection: Connection, encryption: int, encryption_key_size: int
):
logger.debug(
f'*** Connection Encryption Change: [0x{connection.handle:04X}] '
@@ -5992,14 +5992,14 @@ class Device(utils.CompositeEventEmitter):
if (
not connection.authenticated
and connection.transport == PhysicalTransport.BR_EDR
and encryption == hci.HCI_Encryption_Change_Event.AES_CCM
and encryption == hci.HCI_Encryption_Change_Event.Enabled.AES_CCM
):
connection.authenticated = True
connection.sc = True
if (
not connection.authenticated
and connection.transport == PhysicalTransport.LE
and encryption == hci.HCI_Encryption_Change_Event.E0_OR_AES_CCM
and encryption == hci.HCI_Encryption_Change_Event.Enabled.E0_OR_AES_CCM
):
connection.authenticated = True
connection.sc = True

File diff suppressed because it is too large Load Diff

View File

@@ -835,8 +835,8 @@ class Host(utils.EventEmitter):
def on_packet(self, packet: bytes) -> None:
try:
hci_packet = hci.HCI_Packet.from_bytes(packet)
except Exception as error:
logger.warning(f'!!! error parsing packet from bytes: {error}')
except Exception:
logger.exception('!!! error parsing packet from bytes')
return
if self.ready or (
@@ -1392,7 +1392,7 @@ class Host(utils.EventEmitter):
event.status,
)
def on_hci_encryption_change_event(self, event):
def on_hci_encryption_change_event(self, event: hci.HCI_Encryption_Change_Event):
# Notify the client
if event.status == hci.HCI_SUCCESS:
self.emit(
@@ -1406,7 +1406,9 @@ class Host(utils.EventEmitter):
'connection_encryption_failure', event.connection_handle, event.status
)
def on_hci_encryption_change_v2_event(self, event):
def on_hci_encryption_change_v2_event(
self, event: hci.HCI_Encryption_Change_V2_Event
):
# Notify the client
if event.status == hci.HCI_SUCCESS:
self.emit(

View File

@@ -274,7 +274,7 @@ class LocalLink:
responder_controller.on_classic_connection_request(
initiator_controller.public_address,
HCI_Connection_Complete_Event.ACL_LINK_TYPE,
HCI_Connection_Complete_Event.LinkType.ACL,
)
def classic_accept_connection(

View File

@@ -21,13 +21,7 @@ from dataclasses import dataclass
import secrets
from typing import Optional
from bumble.hci import (
Address,
HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY,
HCI_DISPLAY_ONLY_IO_CAPABILITY,
HCI_DISPLAY_YES_NO_IO_CAPABILITY,
HCI_KEYBOARD_ONLY_IO_CAPABILITY,
)
from bumble import hci
from bumble.smp import (
SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY,
SMP_KEYBOARD_ONLY_IO_CAPABILITY,
@@ -50,7 +44,7 @@ from bumble.core import AdvertisingData, LeRole
class OobData:
"""OOB data that can be sent from one device to another."""
address: Optional[Address] = None
address: Optional[hci.Address] = None
role: Optional[LeRole] = None
shared_data: Optional[OobSharedData] = None
legacy_context: Optional[OobLegacyContext] = None
@@ -62,7 +56,7 @@ class OobData:
shared_data_r: Optional[bytes] = None
for ad_type, ad_data in ad.ad_structures:
if ad_type == AdvertisingData.LE_BLUETOOTH_DEVICE_ADDRESS:
instance.address = Address(ad_data)
instance.address = hci.Address(ad_data)
elif ad_type == AdvertisingData.LE_ROLE:
instance.role = LeRole(ad_data[0])
elif ad_type == AdvertisingData.LE_SECURE_CONNECTIONS_CONFIRMATION_VALUE:
@@ -130,11 +124,11 @@ class PairingDelegate:
# Default mapping from abstract to Classic I/O capabilities.
# Subclasses may override this if they prefer a different mapping.
CLASSIC_IO_CAPABILITIES_MAP = {
NO_OUTPUT_NO_INPUT: HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY,
KEYBOARD_INPUT_ONLY: HCI_KEYBOARD_ONLY_IO_CAPABILITY,
DISPLAY_OUTPUT_ONLY: HCI_DISPLAY_ONLY_IO_CAPABILITY,
DISPLAY_OUTPUT_AND_YES_NO_INPUT: HCI_DISPLAY_YES_NO_IO_CAPABILITY,
DISPLAY_OUTPUT_AND_KEYBOARD_INPUT: HCI_DISPLAY_YES_NO_IO_CAPABILITY,
NO_OUTPUT_NO_INPUT: hci.IoCapability.NO_INPUT_NO_OUTPUT,
KEYBOARD_INPUT_ONLY: hci.IoCapability.KEYBOARD_ONLY,
DISPLAY_OUTPUT_ONLY: hci.IoCapability.DISPLAY_ONLY,
DISPLAY_OUTPUT_AND_YES_NO_INPUT: hci.IoCapability.DISPLAY_YES_NO,
DISPLAY_OUTPUT_AND_KEYBOARD_INPUT: hci.IoCapability.DISPLAY_YES_NO,
}
io_capability: IoCapability
@@ -160,7 +154,7 @@ class PairingDelegate:
# pylint: disable=line-too-long
return self.CLASSIC_IO_CAPABILITIES_MAP.get(
self.io_capability, HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY
self.io_capability, hci.IoCapability.NO_INPUT_NO_OUTPUT
)
@property
@@ -237,8 +231,8 @@ class PairingConfig:
"""Configuration for the Pairing protocol."""
class AddressType(enum.IntEnum):
PUBLIC = Address.PUBLIC_DEVICE_ADDRESS
RANDOM = Address.RANDOM_DEVICE_ADDRESS
PUBLIC = hci.Address.PUBLIC_DEVICE_ADDRESS
RANDOM = hci.Address.RANDOM_DEVICE_ADDRESS
@dataclass
class OobConfig:

View File

@@ -244,16 +244,16 @@ class SecurityService(SecurityServicer):
and connection.authenticated
and link_key_type
in (
hci.HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_192_TYPE,
hci.HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE,
hci.LinkKeyType.AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_192,
hci.LinkKeyType.AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256,
)
)
if level == LEVEL4:
return (
connection.encryption == hci.HCI_Encryption_Change_Event.AES_CCM
connection.encryption == hci.HCI_Encryption_Change_Event.Enabled.AES_CCM
and connection.authenticated
and link_key_type
== hci.HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE
== hci.LinkKeyType.AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256
)
raise InvalidArgumentError(f"Unexpected level {level}")

View File

@@ -46,7 +46,7 @@ def on_dlc(dlc: rfcomm.DLC, configuration: hfp.HfConfiguration):
def on_sco_request(connection: Connection, link_type: int, protocol: HfProtocol):
if connection == protocol.dlc.multiplexer.l2cap_channel.connection:
if link_type == hci.HCI_Connection_Complete_Event.SCO_LINK_TYPE:
if link_type == hci.HCI_Connection_Complete_Event.LinkType.SCO:
esco_parameters = hfp.ESCO_PARAMETERS[
hfp.DefaultCodecParameters.SCO_CVSD_D1
]

View File

@@ -125,7 +125,7 @@ async def test_device_connect_parallel():
HCI_Connection_Request_Event(
bd_addr=d0.public_address,
class_of_device=0,
link_type=HCI_Connection_Complete_Event.ACL_LINK_TYPE,
link_type=HCI_Connection_Complete_Event.LinkType.ACL,
)
)
@@ -145,7 +145,7 @@ async def test_device_connect_parallel():
HCI_Connection_Request_Event(
bd_addr=d0.public_address,
class_of_device=0,
link_type=HCI_Connection_Complete_Event.ACL_LINK_TYPE,
link_type=HCI_Connection_Complete_Event.LinkType.ACL,
)
)
@@ -168,7 +168,7 @@ async def test_device_connect_parallel():
status=HCI_SUCCESS,
connection_handle=0x100,
bd_addr=d0.public_address,
link_type=HCI_Connection_Complete_Event.ACL_LINK_TYPE,
link_type=HCI_Connection_Complete_Event.LinkType.ACL,
encryption_enabled=True,
)
)
@@ -178,7 +178,7 @@ async def test_device_connect_parallel():
status=HCI_SUCCESS,
connection_handle=0x100,
bd_addr=d1.public_address,
link_type=HCI_Connection_Complete_Event.ACL_LINK_TYPE,
link_type=HCI_Connection_Complete_Event.LinkType.ACL,
encryption_enabled=True,
)
)
@@ -202,7 +202,7 @@ async def test_device_connect_parallel():
status=HCI_SUCCESS,
connection_handle=0x101,
bd_addr=d0.public_address,
link_type=HCI_Connection_Complete_Event.ACL_LINK_TYPE,
link_type=HCI_Connection_Complete_Event.LinkType.ACL,
encryption_enabled=True,
)
)
@@ -212,7 +212,7 @@ async def test_device_connect_parallel():
status=HCI_SUCCESS,
connection_handle=0x101,
bd_addr=d2.public_address,
link_type=HCI_Connection_Complete_Event.ACL_LINK_TYPE,
link_type=HCI_Connection_Complete_Event.LinkType.ACL,
encryption_enabled=True,
)
)