From 30934969b8fac7cc7ebef658bd3c4a540ce064c6 Mon Sep 17 00:00:00 2001 From: uael Date: Wed, 19 Apr 2023 22:21:41 +0000 Subject: [PATCH] ssp: simplify pairing and fix `just-works` Even through the previous implementation was correct: - Always call `delegate.confirm()` for `just-works` pairing, but with `auto` parameter set to `True`. - Trust the controller and do not double check the devices IO capabilities. --- bumble/device.py | 148 +++++++++++++++++++++++++--------------------- bumble/pairing.py | 7 ++- 2 files changed, 86 insertions(+), 69 deletions(-) diff --git a/bumble/device.py b/bumble/device.py index de3912b..c09e061 100644 --- a/bumble/device.py +++ b/bumble/device.py @@ -2773,89 +2773,103 @@ class Device(CompositeEventEmitter): # [Classic only] @host_event_handler @with_connection_from_address - def on_authentication_user_confirmation_request(self, connection, code): + def on_authentication_user_confirmation_request(self, connection, code) -> None: # Ask what the pairing config should be for this connection pairing_config = self.pairing_config_factory(connection) io_capability = pairing_config.delegate.classic_io_capability + peer_io_capability = connection.peer_pairing_io_capability - # Respond - if io_capability == HCI_DISPLAY_YES_NO_IO_CAPABILITY: - if connection.peer_pairing_io_capability in ( - HCI_DISPLAY_YES_NO_IO_CAPABILITY, - HCI_DISPLAY_ONLY_IO_CAPABILITY, - ): - # Display the code and ask the user to compare - async def prompt(): - return ( - await pairing_config.delegate.compare_numbers(code, digits=6), + async def confirm() -> bool: + # Ask the user to confirm the pairing, without display + return await pairing_config.delegate.confirm() + + async def auto_confirm() -> bool: + # Ask the user to auto-confirm the pairing, without display + return await pairing_config.delegate.confirm(auto=True) + + async def display_confirm() -> bool: + # Display the code and ask the user to compare + return await pairing_config.delegate.compare_numbers(code, digits=6) + + async def display_auto_confirm() -> bool: + # Display the code to the user and ask the delegate to auto-confirm + await pairing_config.delegate.display_number(code, digits=6) + return await pairing_config.delegate.confirm(auto=True) + + async def na() -> bool: + assert False, "N/A: unreachable" + + # See Bluetooth spec @ Vol 3, Part C 5.2.2.6 + methods = { + HCI_DISPLAY_ONLY_IO_CAPABILITY: { + HCI_DISPLAY_ONLY_IO_CAPABILITY: display_auto_confirm, + HCI_DISPLAY_YES_NO_IO_CAPABILITY: display_confirm, + HCI_KEYBOARD_ONLY_IO_CAPABILITY: na, + HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: auto_confirm, + }, + HCI_DISPLAY_YES_NO_IO_CAPABILITY: { + HCI_DISPLAY_ONLY_IO_CAPABILITY: display_auto_confirm, + HCI_DISPLAY_YES_NO_IO_CAPABILITY: display_confirm, + HCI_KEYBOARD_ONLY_IO_CAPABILITY: na, + HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: auto_confirm, + }, + HCI_KEYBOARD_ONLY_IO_CAPABILITY: { + HCI_DISPLAY_ONLY_IO_CAPABILITY: na, + HCI_DISPLAY_YES_NO_IO_CAPABILITY: na, + HCI_KEYBOARD_ONLY_IO_CAPABILITY: na, + HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: auto_confirm, + }, + HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: { + HCI_DISPLAY_ONLY_IO_CAPABILITY: confirm, + HCI_DISPLAY_YES_NO_IO_CAPABILITY: confirm, + HCI_KEYBOARD_ONLY_IO_CAPABILITY: auto_confirm, + HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: auto_confirm, + }, + } + + method = methods[peer_io_capability][io_capability] + + async def reply() -> None: + if await connection.abort_on('disconnection', method()): + await self.host.send_command( + HCI_User_Confirmation_Request_Reply_Command( # type: ignore[call-arg] + bd_addr=connection.peer_address ) - + ) else: - # Ask the user to confirm the pairing, without showing a code - async def prompt(): - return await pairing_config.delegate.confirm() - - async def confirm(): - if await prompt(): - await self.host.send_command( - HCI_User_Confirmation_Request_Reply_Command( - bd_addr=connection.peer_address - ) - ) - else: - await self.host.send_command( - HCI_User_Confirmation_Request_Negative_Reply_Command( - bd_addr=connection.peer_address - ) + await self.host.send_command( + HCI_User_Confirmation_Request_Negative_Reply_Command( # type: ignore[call-arg] + bd_addr=connection.peer_address ) + ) - AsyncRunner.spawn(connection.abort_on('disconnection', confirm())) - return - - if io_capability == HCI_DISPLAY_ONLY_IO_CAPABILITY: - # Display the code to the user - AsyncRunner.spawn(pairing_config.delegate.display_number(code, 6)) - - # Automatic confirmation - self.host.send_command_sync( - HCI_User_Confirmation_Request_Reply_Command(bd_addr=connection.peer_address) - ) + AsyncRunner.spawn(reply()) # [Classic only] @host_event_handler @with_connection_from_address - def on_authentication_user_passkey_request(self, connection): + def on_authentication_user_passkey_request(self, connection) -> None: # Ask what the pairing config should be for this connection pairing_config = self.pairing_config_factory(connection) - io_capability = pairing_config.delegate.classic_io_capability - # Respond - if io_capability == HCI_KEYBOARD_ONLY_IO_CAPABILITY: - # Ask the user to input a number - async def get_number(): - number = await connection.abort_on( - 'disconnection', pairing_config.delegate.get_number() - ) - if number is not None: - await self.host.send_command( - HCI_User_Passkey_Request_Reply_Command( - bd_addr=connection.peer_address, numeric_value=number - ) - ) - else: - await self.host.send_command( - HCI_User_Passkey_Request_Negative_Reply_Command( - bd_addr=connection.peer_address - ) - ) - - asyncio.create_task(get_number()) - else: - self.host.send_command_sync( - HCI_User_Passkey_Request_Negative_Reply_Command( - bd_addr=connection.peer_address - ) + async def reply() -> None: + number = await connection.abort_on( + 'disconnection', pairing_config.delegate.get_number() ) + if number is not None: + await self.host.send_command( + HCI_User_Passkey_Request_Reply_Command( # type: ignore[call-arg] + bd_addr=connection.peer_address, numeric_value=number + ) + ) + else: + await self.host.send_command( + HCI_User_Passkey_Request_Negative_Reply_Command( # type: ignore[call-arg] + bd_addr=connection.peer_address + ) + ) + + AsyncRunner.spawn(reply()) # [Classic only] @host_event_handler diff --git a/bumble/pairing.py b/bumble/pairing.py index dc5c014..ab356ee 100644 --- a/bumble/pairing.py +++ b/bumble/pairing.py @@ -114,8 +114,11 @@ class PairingDelegate: """Accept or reject a Pairing request.""" return True - async def confirm(self) -> bool: - """Respond yes or no to a Pairing confirmation question.""" + async def confirm(self, auto: bool = False) -> bool: + """ + Respond yes or no to a Pairing confirmation question. + The `auto` parameter stands for automatic confirmation. + """ return True # pylint: disable-next=unused-argument