add passkey delegate

This commit is contained in:
Gilles Boccon-Gibod
2025-06-09 12:20:06 -04:00
parent 8614e075b6
commit d631156f6c
3 changed files with 157 additions and 31 deletions

View File

@@ -222,6 +222,14 @@ class PairingDelegate:
),
)
async def generate_passkey(self) -> int:
"""
Return a passkey value between 0 and 999999 (inclusive).
"""
# By default, generate a random passkey.
return secrets.randbelow(1000000)
# -----------------------------------------------------------------------------
class PairingConfig:

View File

@@ -934,10 +934,9 @@ class Session:
utils.cancel_on_event(self.connection, 'disconnection', prompt())
def display_passkey(self) -> None:
# Generate random Passkey/PIN code
self.passkey = secrets.randbelow(1000000)
assert self.passkey is not None
async def display_passkey(self) -> None:
# Get the passkey value from the delegate
self.passkey = await self.pairing_config.delegate.generate_passkey()
logger.debug(f'Pairing PIN CODE: {self.passkey:06}')
self.passkey_ready.set()
@@ -946,14 +945,7 @@ class Session:
self.tk = self.passkey.to_bytes(16, byteorder='little')
logger.debug(f'TK from passkey = {self.tk.hex()}')
try:
utils.cancel_on_event(
self.connection,
'disconnection',
self.pairing_config.delegate.display_number(self.passkey, digits=6),
)
except Exception as error:
logger.warning(f'exception while displaying number: {error}')
await self.pairing_config.delegate.display_number(self.passkey, digits=6)
def input_passkey(self, next_steps: Optional[Callable[[], None]] = None) -> None:
# Prompt the user for the passkey displayed on the peer
@@ -975,9 +967,20 @@ class Session:
self, next_steps: Optional[Callable[[], None]] = None
) -> None:
if self.passkey_display:
self.display_passkey()
if next_steps is not None:
next_steps()
async def display_passkey():
await self.display_passkey()
if next_steps is not None:
next_steps()
try:
utils.cancel_on_event(
self.connection,
'disconnection',
display_passkey(),
)
except Exception as error:
logger.warning(f'exception while displaying passkey: {error}')
else:
self.input_passkey(next_steps)
@@ -1503,7 +1506,7 @@ class Session:
# Display a passkey if we need to
if not self.sc:
if self.pairing_method == PairingMethod.PASSKEY and self.passkey_display:
self.display_passkey()
await self.display_passkey()
# Respond
self.send_pairing_response_command()
@@ -1685,7 +1688,7 @@ class Session:
):
return
elif self.pairing_method == PairingMethod.PASSKEY:
assert self.passkey and self.confirm_value
assert self.passkey is not None and self.confirm_value is not None
# Check that the random value matches what was committed to earlier
confirm_verifier = crypto.f4(
self.pkb,
@@ -1714,7 +1717,7 @@ class Session:
):
self.send_pairing_random_command()
elif self.pairing_method == PairingMethod.PASSKEY:
assert self.passkey and self.confirm_value
assert self.passkey is not None and self.confirm_value is not None
# Check that the random value matches what was committed to earlier
confirm_verifier = crypto.f4(
self.pka,
@@ -1751,7 +1754,7 @@ class Session:
ra = bytes(16)
rb = ra
elif self.pairing_method == PairingMethod.PASSKEY:
assert self.passkey
assert self.passkey is not None
ra = self.passkey.to_bytes(16, byteorder='little')
rb = ra
elif self.pairing_method == PairingMethod.OOB:
@@ -1850,19 +1853,23 @@ class Session:
elif self.pairing_method == PairingMethod.PASSKEY:
self.send_pairing_confirm_command()
else:
def next_steps():
# Send our public key back to the initiator
self.send_public_key_command()
if self.pairing_method in (
PairingMethod.JUST_WORKS,
PairingMethod.NUMERIC_COMPARISON,
PairingMethod.OOB,
):
# We can now send the confirmation value
self.send_pairing_confirm_command()
if self.pairing_method == PairingMethod.PASSKEY:
self.display_or_input_passkey()
# Send our public key back to the initiator
self.send_public_key_command()
if self.pairing_method in (
PairingMethod.JUST_WORKS,
PairingMethod.NUMERIC_COMPARISON,
PairingMethod.OOB,
):
# We can now send the confirmation value
self.send_pairing_confirm_command()
self.display_or_input_passkey(next_steps)
else:
next_steps()
def on_smp_pairing_dhkey_check_command(
self, command: SMP_Pairing_DHKey_Check_Command

View File

@@ -0,0 +1,111 @@
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import sys
import os
import logging
from bumble.device import Device
from bumble.transport import open_transport_or_link
from bumble.gatt import (
Service,
Characteristic,
)
from bumble.pairing import PairingConfig, PairingDelegate
# -----------------------------------------------------------------------------
class FixedPinPairingDelegate(PairingDelegate):
"""
A PairingDelegate that declares that the device only has the ability to display
a passkey but not to enter or confirm one. When asked for the passkey to use for
pairing, this delegate returns a fixed value (instead of the default, which is
to generate a random value each time). This is obviously not a secure way to do
pairing, but it used here as an illustration of how a delegate can override the
default passkey generation.
"""
def __init__(self, passkey: int) -> None:
super().__init__(io_capability=PairingDelegate.IoCapability.DISPLAY_OUTPUT_ONLY)
self.passkey = passkey
async def generate_passkey(self) -> int:
return self.passkey
# -----------------------------------------------------------------------------
async def main() -> None:
if len(sys.argv) < 3:
print(
'Usage: run_gatt_server_with_pairing_delegate.py <device-config> <transport-spec> '
)
print('example: run_gatt_server_with_pairing_delegate.py device1.json usb:0')
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected')
# Create a device to manage the host
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
# Add a service with a single characteristic.
# The characteristic requires authentication, so reading it on a non-paired
# connection will return an error.
custom_service1 = Service(
'50DB505C-8AC4-4738-8448-3B1D9CC09CC5',
[
Characteristic(
'486F64C6-4B5F-4B3B-8AFF-EDE134A8446A',
Characteristic.Properties.READ,
Characteristic.READABLE
| Characteristic.READ_REQUIRES_AUTHENTICATION,
bytes('hello', 'utf-8'),
),
],
)
device.add_services([custom_service1])
# Debug print
for attribute in device.gatt_server.attributes:
print(attribute)
# Setup pairing
device.pairing_config_factory = lambda connection: PairingConfig(
delegate=FixedPinPairingDelegate(123456)
)
# Get things going
await device.power_on()
# Connect to a peer
if len(sys.argv) > 3:
target_address = sys.argv[3]
print(f'=== Connecting to {target_address}...')
await device.connect(target_address)
else:
await device.start_advertising(auto_restart=True)
await hci_transport.source.wait_for_termination()
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
asyncio.run(main())