forked from auracaster/bumble_mirror
ssp: simplify pairing and fix just-works
Even through the previous implementation was correct: - Always call `delegate.confirm()` for `just-works` pairing, but with `auto` parameter set to `True`. - Trust the controller and do not double check the devices IO capabilities.
This commit is contained in:
148
bumble/device.py
148
bumble/device.py
@@ -2773,89 +2773,103 @@ class Device(CompositeEventEmitter):
|
|||||||
# [Classic only]
|
# [Classic only]
|
||||||
@host_event_handler
|
@host_event_handler
|
||||||
@with_connection_from_address
|
@with_connection_from_address
|
||||||
def on_authentication_user_confirmation_request(self, connection, code):
|
def on_authentication_user_confirmation_request(self, connection, code) -> None:
|
||||||
# Ask what the pairing config should be for this connection
|
# Ask what the pairing config should be for this connection
|
||||||
pairing_config = self.pairing_config_factory(connection)
|
pairing_config = self.pairing_config_factory(connection)
|
||||||
io_capability = pairing_config.delegate.classic_io_capability
|
io_capability = pairing_config.delegate.classic_io_capability
|
||||||
|
peer_io_capability = connection.peer_pairing_io_capability
|
||||||
|
|
||||||
# Respond
|
async def confirm() -> bool:
|
||||||
if io_capability == HCI_DISPLAY_YES_NO_IO_CAPABILITY:
|
# Ask the user to confirm the pairing, without display
|
||||||
if connection.peer_pairing_io_capability in (
|
return await pairing_config.delegate.confirm()
|
||||||
HCI_DISPLAY_YES_NO_IO_CAPABILITY,
|
|
||||||
HCI_DISPLAY_ONLY_IO_CAPABILITY,
|
async def auto_confirm() -> bool:
|
||||||
):
|
# Ask the user to auto-confirm the pairing, without display
|
||||||
# Display the code and ask the user to compare
|
return await pairing_config.delegate.confirm(auto=True)
|
||||||
async def prompt():
|
|
||||||
return (
|
async def display_confirm() -> bool:
|
||||||
await pairing_config.delegate.compare_numbers(code, digits=6),
|
# Display the code and ask the user to compare
|
||||||
|
return await pairing_config.delegate.compare_numbers(code, digits=6)
|
||||||
|
|
||||||
|
async def display_auto_confirm() -> bool:
|
||||||
|
# Display the code to the user and ask the delegate to auto-confirm
|
||||||
|
await pairing_config.delegate.display_number(code, digits=6)
|
||||||
|
return await pairing_config.delegate.confirm(auto=True)
|
||||||
|
|
||||||
|
async def na() -> bool:
|
||||||
|
assert False, "N/A: unreachable"
|
||||||
|
|
||||||
|
# See Bluetooth spec @ Vol 3, Part C 5.2.2.6
|
||||||
|
methods = {
|
||||||
|
HCI_DISPLAY_ONLY_IO_CAPABILITY: {
|
||||||
|
HCI_DISPLAY_ONLY_IO_CAPABILITY: display_auto_confirm,
|
||||||
|
HCI_DISPLAY_YES_NO_IO_CAPABILITY: display_confirm,
|
||||||
|
HCI_KEYBOARD_ONLY_IO_CAPABILITY: na,
|
||||||
|
HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: auto_confirm,
|
||||||
|
},
|
||||||
|
HCI_DISPLAY_YES_NO_IO_CAPABILITY: {
|
||||||
|
HCI_DISPLAY_ONLY_IO_CAPABILITY: display_auto_confirm,
|
||||||
|
HCI_DISPLAY_YES_NO_IO_CAPABILITY: display_confirm,
|
||||||
|
HCI_KEYBOARD_ONLY_IO_CAPABILITY: na,
|
||||||
|
HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: auto_confirm,
|
||||||
|
},
|
||||||
|
HCI_KEYBOARD_ONLY_IO_CAPABILITY: {
|
||||||
|
HCI_DISPLAY_ONLY_IO_CAPABILITY: na,
|
||||||
|
HCI_DISPLAY_YES_NO_IO_CAPABILITY: na,
|
||||||
|
HCI_KEYBOARD_ONLY_IO_CAPABILITY: na,
|
||||||
|
HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: auto_confirm,
|
||||||
|
},
|
||||||
|
HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: {
|
||||||
|
HCI_DISPLAY_ONLY_IO_CAPABILITY: confirm,
|
||||||
|
HCI_DISPLAY_YES_NO_IO_CAPABILITY: confirm,
|
||||||
|
HCI_KEYBOARD_ONLY_IO_CAPABILITY: auto_confirm,
|
||||||
|
HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: auto_confirm,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
method = methods[peer_io_capability][io_capability]
|
||||||
|
|
||||||
|
async def reply() -> None:
|
||||||
|
if await connection.abort_on('disconnection', method()):
|
||||||
|
await self.host.send_command(
|
||||||
|
HCI_User_Confirmation_Request_Reply_Command( # type: ignore[call-arg]
|
||||||
|
bd_addr=connection.peer_address
|
||||||
)
|
)
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
# Ask the user to confirm the pairing, without showing a code
|
await self.host.send_command(
|
||||||
async def prompt():
|
HCI_User_Confirmation_Request_Negative_Reply_Command( # type: ignore[call-arg]
|
||||||
return await pairing_config.delegate.confirm()
|
bd_addr=connection.peer_address
|
||||||
|
|
||||||
async def confirm():
|
|
||||||
if await prompt():
|
|
||||||
await self.host.send_command(
|
|
||||||
HCI_User_Confirmation_Request_Reply_Command(
|
|
||||||
bd_addr=connection.peer_address
|
|
||||||
)
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
await self.host.send_command(
|
|
||||||
HCI_User_Confirmation_Request_Negative_Reply_Command(
|
|
||||||
bd_addr=connection.peer_address
|
|
||||||
)
|
|
||||||
)
|
)
|
||||||
|
)
|
||||||
|
|
||||||
AsyncRunner.spawn(connection.abort_on('disconnection', confirm()))
|
AsyncRunner.spawn(reply())
|
||||||
return
|
|
||||||
|
|
||||||
if io_capability == HCI_DISPLAY_ONLY_IO_CAPABILITY:
|
|
||||||
# Display the code to the user
|
|
||||||
AsyncRunner.spawn(pairing_config.delegate.display_number(code, 6))
|
|
||||||
|
|
||||||
# Automatic confirmation
|
|
||||||
self.host.send_command_sync(
|
|
||||||
HCI_User_Confirmation_Request_Reply_Command(bd_addr=connection.peer_address)
|
|
||||||
)
|
|
||||||
|
|
||||||
# [Classic only]
|
# [Classic only]
|
||||||
@host_event_handler
|
@host_event_handler
|
||||||
@with_connection_from_address
|
@with_connection_from_address
|
||||||
def on_authentication_user_passkey_request(self, connection):
|
def on_authentication_user_passkey_request(self, connection) -> None:
|
||||||
# Ask what the pairing config should be for this connection
|
# Ask what the pairing config should be for this connection
|
||||||
pairing_config = self.pairing_config_factory(connection)
|
pairing_config = self.pairing_config_factory(connection)
|
||||||
io_capability = pairing_config.delegate.classic_io_capability
|
|
||||||
|
|
||||||
# Respond
|
async def reply() -> None:
|
||||||
if io_capability == HCI_KEYBOARD_ONLY_IO_CAPABILITY:
|
number = await connection.abort_on(
|
||||||
# Ask the user to input a number
|
'disconnection', pairing_config.delegate.get_number()
|
||||||
async def get_number():
|
|
||||||
number = await connection.abort_on(
|
|
||||||
'disconnection', pairing_config.delegate.get_number()
|
|
||||||
)
|
|
||||||
if number is not None:
|
|
||||||
await self.host.send_command(
|
|
||||||
HCI_User_Passkey_Request_Reply_Command(
|
|
||||||
bd_addr=connection.peer_address, numeric_value=number
|
|
||||||
)
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
await self.host.send_command(
|
|
||||||
HCI_User_Passkey_Request_Negative_Reply_Command(
|
|
||||||
bd_addr=connection.peer_address
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
asyncio.create_task(get_number())
|
|
||||||
else:
|
|
||||||
self.host.send_command_sync(
|
|
||||||
HCI_User_Passkey_Request_Negative_Reply_Command(
|
|
||||||
bd_addr=connection.peer_address
|
|
||||||
)
|
|
||||||
)
|
)
|
||||||
|
if number is not None:
|
||||||
|
await self.host.send_command(
|
||||||
|
HCI_User_Passkey_Request_Reply_Command( # type: ignore[call-arg]
|
||||||
|
bd_addr=connection.peer_address, numeric_value=number
|
||||||
|
)
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
await self.host.send_command(
|
||||||
|
HCI_User_Passkey_Request_Negative_Reply_Command( # type: ignore[call-arg]
|
||||||
|
bd_addr=connection.peer_address
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
AsyncRunner.spawn(reply())
|
||||||
|
|
||||||
# [Classic only]
|
# [Classic only]
|
||||||
@host_event_handler
|
@host_event_handler
|
||||||
|
|||||||
@@ -114,8 +114,11 @@ class PairingDelegate:
|
|||||||
"""Accept or reject a Pairing request."""
|
"""Accept or reject a Pairing request."""
|
||||||
return True
|
return True
|
||||||
|
|
||||||
async def confirm(self) -> bool:
|
async def confirm(self, auto: bool = False) -> bool:
|
||||||
"""Respond yes or no to a Pairing confirmation question."""
|
"""
|
||||||
|
Respond yes or no to a Pairing confirmation question.
|
||||||
|
The `auto` parameter stands for automatic confirmation.
|
||||||
|
"""
|
||||||
return True
|
return True
|
||||||
|
|
||||||
# pylint: disable-next=unused-argument
|
# pylint: disable-next=unused-argument
|
||||||
|
|||||||
Reference in New Issue
Block a user