forked from auracaster/bumble_mirror
Move connection.link_key_type to keystore
This commit is contained in:
@@ -15,6 +15,7 @@
|
||||
from __future__ import annotations
|
||||
import asyncio
|
||||
import contextlib
|
||||
from collections.abc import Awaitable
|
||||
import grpc
|
||||
import logging
|
||||
|
||||
@@ -24,6 +25,7 @@ from bumble import hci
|
||||
from bumble.core import (
|
||||
PhysicalTransport,
|
||||
ProtocolError,
|
||||
InvalidArgumentError,
|
||||
)
|
||||
import bumble.utils
|
||||
from bumble.device import Connection as BumbleConnection, Device
|
||||
@@ -188,35 +190,6 @@ class PairingDelegate(BasePairingDelegate):
|
||||
self.service.event_queue.put_nowait(event)
|
||||
|
||||
|
||||
BR_LEVEL_REACHED: Dict[SecurityLevel, Callable[[BumbleConnection], bool]] = {
|
||||
LEVEL0: lambda connection: True,
|
||||
LEVEL1: lambda connection: connection.encryption == 0 or connection.authenticated,
|
||||
LEVEL2: lambda connection: connection.encryption != 0 and connection.authenticated,
|
||||
LEVEL3: lambda connection: connection.encryption != 0
|
||||
and connection.authenticated
|
||||
and connection.link_key_type
|
||||
in (
|
||||
hci.HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_192_TYPE,
|
||||
hci.HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE,
|
||||
),
|
||||
LEVEL4: lambda connection: connection.encryption
|
||||
== hci.HCI_Encryption_Change_Event.AES_CCM
|
||||
and connection.authenticated
|
||||
and connection.link_key_type
|
||||
== hci.HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE,
|
||||
}
|
||||
|
||||
LE_LEVEL_REACHED: Dict[LESecurityLevel, Callable[[BumbleConnection], bool]] = {
|
||||
LE_LEVEL1: lambda connection: True,
|
||||
LE_LEVEL2: lambda connection: connection.encryption != 0,
|
||||
LE_LEVEL3: lambda connection: connection.encryption != 0
|
||||
and connection.authenticated,
|
||||
LE_LEVEL4: lambda connection: connection.encryption != 0
|
||||
and connection.authenticated
|
||||
and connection.sc,
|
||||
}
|
||||
|
||||
|
||||
class SecurityService(SecurityServicer):
|
||||
def __init__(self, device: Device, config: Config) -> None:
|
||||
self.log = utils.BumbleServerLoggerAdapter(
|
||||
@@ -248,6 +221,59 @@ class SecurityService(SecurityServicer):
|
||||
|
||||
self.device.pairing_config_factory = pairing_config_factory
|
||||
|
||||
async def _classic_level_reached(
|
||||
self, level: SecurityLevel, connection: BumbleConnection
|
||||
) -> bool:
|
||||
if level == LEVEL0:
|
||||
return True
|
||||
if level == LEVEL1:
|
||||
return connection.encryption == 0 or connection.authenticated
|
||||
if level == LEVEL2:
|
||||
return connection.encryption != 0 and connection.authenticated
|
||||
|
||||
link_key_type: Optional[int] = None
|
||||
if (keystore := connection.device.keystore) and (
|
||||
keys := await keystore.get(str(connection.peer_address))
|
||||
):
|
||||
link_key_type = keys.link_key_type
|
||||
self.log.debug("link_key_type: %d", link_key_type)
|
||||
|
||||
if level == LEVEL3:
|
||||
return (
|
||||
connection.encryption != 0
|
||||
and connection.authenticated
|
||||
and link_key_type
|
||||
in (
|
||||
hci.HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_192_TYPE,
|
||||
hci.HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE,
|
||||
)
|
||||
)
|
||||
if level == LEVEL4:
|
||||
return (
|
||||
connection.encryption == hci.HCI_Encryption_Change_Event.AES_CCM
|
||||
and connection.authenticated
|
||||
and link_key_type
|
||||
== hci.HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE
|
||||
)
|
||||
raise InvalidArgumentError(f"Unexpected level {level}")
|
||||
|
||||
def _le_level_reached(
|
||||
self, level: LESecurityLevel, connection: BumbleConnection
|
||||
) -> bool:
|
||||
if level == LE_LEVEL1:
|
||||
return True
|
||||
if level == LE_LEVEL2:
|
||||
return connection.encryption != 0
|
||||
if level == LE_LEVEL3:
|
||||
return connection.encryption != 0 and connection.authenticated
|
||||
if level == LE_LEVEL4:
|
||||
return (
|
||||
connection.encryption != 0
|
||||
and connection.authenticated
|
||||
and connection.sc
|
||||
)
|
||||
raise InvalidArgumentError(f"Unexpected level {level}")
|
||||
|
||||
@utils.rpc
|
||||
async def OnPairing(
|
||||
self, request: AsyncIterator[PairingEventAnswer], context: grpc.ServicerContext
|
||||
@@ -290,7 +316,7 @@ class SecurityService(SecurityServicer):
|
||||
] == oneof
|
||||
|
||||
# security level already reached
|
||||
if self.reached_security_level(connection, level):
|
||||
if await self.reached_security_level(connection, level):
|
||||
return SecureResponse(success=empty_pb2.Empty())
|
||||
|
||||
# trigger pairing if needed
|
||||
@@ -361,7 +387,7 @@ class SecurityService(SecurityServicer):
|
||||
return SecureResponse(encryption_failure=empty_pb2.Empty())
|
||||
|
||||
# security level has been reached ?
|
||||
if self.reached_security_level(connection, level):
|
||||
if await self.reached_security_level(connection, level):
|
||||
return SecureResponse(success=empty_pb2.Empty())
|
||||
return SecureResponse(not_reached=empty_pb2.Empty())
|
||||
|
||||
@@ -388,13 +414,10 @@ class SecurityService(SecurityServicer):
|
||||
pair_task: Optional[asyncio.Future[None]] = None
|
||||
|
||||
async def authenticate() -> None:
|
||||
assert connection
|
||||
if (encryption := connection.encryption) != 0:
|
||||
self.log.debug('Disable encryption...')
|
||||
try:
|
||||
with contextlib.suppress(Exception):
|
||||
await connection.encrypt(enable=False)
|
||||
except:
|
||||
pass
|
||||
self.log.debug('Disable encryption: done')
|
||||
|
||||
self.log.debug('Authenticate...')
|
||||
@@ -413,15 +436,13 @@ class SecurityService(SecurityServicer):
|
||||
|
||||
return wrapper
|
||||
|
||||
def try_set_success(*_: Any) -> None:
|
||||
assert connection
|
||||
if self.reached_security_level(connection, level):
|
||||
async def try_set_success(*_: Any) -> None:
|
||||
if await self.reached_security_level(connection, level):
|
||||
self.log.debug('Wait for security: done')
|
||||
wait_for_security.set_result('success')
|
||||
|
||||
def on_encryption_change(*_: Any) -> None:
|
||||
assert connection
|
||||
if self.reached_security_level(connection, level):
|
||||
async def on_encryption_change(*_: Any) -> None:
|
||||
if await self.reached_security_level(connection, level):
|
||||
self.log.debug('Wait for security: done')
|
||||
wait_for_security.set_result('success')
|
||||
elif (
|
||||
@@ -436,7 +457,7 @@ class SecurityService(SecurityServicer):
|
||||
if self.need_pairing(connection, level):
|
||||
pair_task = asyncio.create_task(connection.pair())
|
||||
|
||||
listeners: Dict[str, Callable[..., None]] = {
|
||||
listeners: Dict[str, Callable[..., Union[None, Awaitable[None]]]] = {
|
||||
'disconnection': set_failure('connection_died'),
|
||||
'pairing_failure': set_failure('pairing_failure'),
|
||||
'connection_authentication_failure': set_failure('authentication_failure'),
|
||||
@@ -455,7 +476,7 @@ class SecurityService(SecurityServicer):
|
||||
watcher.on(connection, event, listener)
|
||||
|
||||
# security level already reached
|
||||
if self.reached_security_level(connection, level):
|
||||
if await self.reached_security_level(connection, level):
|
||||
return WaitSecurityResponse(success=empty_pb2.Empty())
|
||||
|
||||
self.log.debug('Wait for security...')
|
||||
@@ -465,24 +486,20 @@ class SecurityService(SecurityServicer):
|
||||
# wait for `authenticate` to finish if any
|
||||
if authenticate_task is not None:
|
||||
self.log.debug('Wait for authentication...')
|
||||
try:
|
||||
with contextlib.suppress(Exception):
|
||||
await authenticate_task # type: ignore
|
||||
except:
|
||||
pass
|
||||
self.log.debug('Authenticated')
|
||||
|
||||
# wait for `pair` to finish if any
|
||||
if pair_task is not None:
|
||||
self.log.debug('Wait for authentication...')
|
||||
try:
|
||||
with contextlib.suppress(Exception):
|
||||
await pair_task # type: ignore
|
||||
except:
|
||||
pass
|
||||
self.log.debug('paired')
|
||||
|
||||
return WaitSecurityResponse(**kwargs)
|
||||
|
||||
def reached_security_level(
|
||||
async def reached_security_level(
|
||||
self, connection: BumbleConnection, level: Union[SecurityLevel, LESecurityLevel]
|
||||
) -> bool:
|
||||
self.log.debug(
|
||||
@@ -492,15 +509,14 @@ class SecurityService(SecurityServicer):
|
||||
'encryption': connection.encryption,
|
||||
'authenticated': connection.authenticated,
|
||||
'sc': connection.sc,
|
||||
'link_key_type': connection.link_key_type,
|
||||
}
|
||||
)
|
||||
)
|
||||
|
||||
if isinstance(level, LESecurityLevel):
|
||||
return LE_LEVEL_REACHED[level](connection)
|
||||
return self._le_level_reached(level, connection)
|
||||
|
||||
return BR_LEVEL_REACHED[level](connection)
|
||||
return await self._classic_level_reached(level, connection)
|
||||
|
||||
def need_pairing(self, connection: BumbleConnection, level: int) -> bool:
|
||||
if connection.transport == PhysicalTransport.LE:
|
||||
|
||||
Reference in New Issue
Block a user