SMP: Determine initiator by direction instead of role

Though in the spec this is not allowed, but in some ambiguous cases such
as SMP over BR/EDR or we want to test how remote devices handle invalid
pairing requests, we still need to allow this behavior, with some logs
to let users know it's invalid.
This commit is contained in:
Josh Wu
2023-04-07 19:25:01 +08:00
parent a95e601a5c
commit 85496aaff5

View File

@@ -645,7 +645,7 @@ class Session:
},
}
def __init__(self, manager, connection, pairing_config):
def __init__(self, manager, connection, pairing_config, is_initiator):
self.manager = manager
self.connection = connection
self.preq = None
@@ -684,7 +684,7 @@ class Session:
self.ctkd_task = None
# Decide if we're the initiator or the responder
self.is_initiator = connection.role == BT_CENTRAL_ROLE
self.is_initiator = is_initiator
self.is_responder = not self.is_initiator
# Listen for connection events
@@ -1680,6 +1680,8 @@ class Manager(EventEmitter):
def on_smp_pdu(self, connection, pdu):
# Look for a session with this connection, and create one if none exists
if not (session := self.sessions.get(connection.handle)):
if connection.role == BT_CENTRAL_ROLE:
logger.warning('Remote starts pairing as Peripheral!')
pairing_config = self.pairing_config_factory(connection)
if pairing_config is None:
# Pairing disabled
@@ -1688,7 +1690,7 @@ class Manager(EventEmitter):
SMP_Pairing_Failed_Command(reason=SMP_PAIRING_NOT_SUPPORTED_ERROR),
)
return
session = Session(self, connection, pairing_config)
session = Session(self, connection, pairing_config, is_initiator=False)
self.sessions[connection.handle] = session
# Parse the L2CAP payload into an SMP Command object
@@ -1709,10 +1711,12 @@ class Manager(EventEmitter):
async def pair(self, connection):
# TODO: check if there's already a session for this connection
if connection.role != BT_CENTRAL_ROLE:
logger.warning('Start pairing as Peripheral!')
pairing_config = self.pairing_config_factory(connection)
if pairing_config is None:
raise ValueError('pairing config must not be None when initiating')
session = Session(self, connection, pairing_config)
session = Session(self, connection, pairing_config, is_initiator=True)
self.sessions[connection.handle] = session
return await session.pair()