mirror of
https://github.com/google/bumble.git
synced 2026-04-16 00:25:31 +00:00
add classic pairing io delegation
This commit is contained in:
91
apps/pair.py
91
apps/pair.py
@@ -44,7 +44,7 @@ from bumble.att import (
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
class Delegate(PairingDelegate):
|
||||
def __init__(self, connection, capability_string, prompt):
|
||||
def __init__(self, mode, connection, capability_string, prompt):
|
||||
super().__init__({
|
||||
'keyboard': PairingDelegate.KEYBOARD_INPUT_ONLY,
|
||||
'display': PairingDelegate.DISPLAY_OUTPUT_ONLY,
|
||||
@@ -53,6 +53,7 @@ class Delegate(PairingDelegate):
|
||||
'none': PairingDelegate.NO_OUTPUT_NO_INPUT
|
||||
}[capability_string.lower()])
|
||||
|
||||
self.mode = mode
|
||||
self.peer = Peer(connection)
|
||||
self.peer_name = None
|
||||
self.prompt = prompt
|
||||
@@ -62,12 +63,16 @@ class Delegate(PairingDelegate):
|
||||
# We already asked the peer
|
||||
return
|
||||
|
||||
# Try to get the peer's name
|
||||
if self.peer:
|
||||
peer_name = await get_peer_name(self.peer)
|
||||
self.peer_name = f'{peer_name or ""} [{self.peer.connection.peer_address}]'
|
||||
# For classic, just use the address
|
||||
if self.mode == 'classic':
|
||||
self.peer_name = str(self.peer.connection.peer_address)
|
||||
else:
|
||||
self.peer_name = '[?]'
|
||||
# Try to get the peer's name
|
||||
if self.peer:
|
||||
peer_name = await get_peer_name(self.peer)
|
||||
self.peer_name = f'{peer_name or ""} [{self.peer.connection.peer_address}]'
|
||||
else:
|
||||
self.peer_name = '[?]'
|
||||
|
||||
async def accept(self):
|
||||
if self.prompt:
|
||||
@@ -91,7 +96,7 @@ class Delegate(PairingDelegate):
|
||||
# Accept silently
|
||||
return True
|
||||
|
||||
async def compare_numbers(self, number):
|
||||
async def compare_numbers(self, number, digits):
|
||||
await self.update_peer_name()
|
||||
|
||||
# Wait a bit to allow some of the log lines to print before we prompt
|
||||
@@ -102,7 +107,7 @@ class Delegate(PairingDelegate):
|
||||
print(color(f'### Pairing with {self.peer_name}', 'yellow'))
|
||||
print(color('###-----------------------------------', 'yellow'))
|
||||
while True:
|
||||
response = await aioconsole.ainput(color(f'>>> Does the other device display {number:06}? ', 'yellow'))
|
||||
response = await aioconsole.ainput(color(f'>>> Does the other device display {number:{digits}}? ', 'yellow'))
|
||||
response = response.lower().strip()
|
||||
if response == 'yes':
|
||||
return True
|
||||
@@ -125,7 +130,7 @@ class Delegate(PairingDelegate):
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
async def display_number(self, number):
|
||||
async def display_number(self, number, digits):
|
||||
await self.update_peer_name()
|
||||
|
||||
# Wait a bit to allow some of the log lines to print before we prompt
|
||||
@@ -134,7 +139,7 @@ class Delegate(PairingDelegate):
|
||||
# Display a PIN code
|
||||
print(color('###-----------------------------------', 'yellow'))
|
||||
print(color(f'### Pairing with {self.peer_name}', 'yellow'))
|
||||
print(color(f'### PIN: {number:06}', 'yellow'))
|
||||
print(color(f'### PIN: {number:0{digits}}', 'yellow'))
|
||||
print(color('###-----------------------------------', 'yellow'))
|
||||
|
||||
|
||||
@@ -224,9 +229,22 @@ def on_pairing_failure(reason):
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def pair(sc, mitm, bond, io, prompt, request, print_keys, keystore_file, device_config, transport, address_or_name):
|
||||
async def pair(
|
||||
mode,
|
||||
sc,
|
||||
mitm,
|
||||
bond,
|
||||
io,
|
||||
prompt,
|
||||
request,
|
||||
print_keys,
|
||||
keystore_file,
|
||||
device_config,
|
||||
hci_transport,
|
||||
address_or_name
|
||||
):
|
||||
print('<<< connecting to HCI...')
|
||||
async with await open_transport_or_link(transport) as (hci_source, hci_sink):
|
||||
async with await open_transport_or_link(hci_transport) as (hci_source, hci_sink):
|
||||
print('<<< connected')
|
||||
|
||||
# Create a device to manage the host
|
||||
@@ -245,19 +263,25 @@ async def pair(sc, mitm, bond, io, prompt, request, print_keys, keystore_file, d
|
||||
|
||||
# Expose a GATT characteristic that can be used to trigger pairing by
|
||||
# responding with an authentication error when read
|
||||
device.add_service(
|
||||
Service(
|
||||
'50DB505C-8AC4-4738-8448-3B1D9CC09CC5',
|
||||
[
|
||||
Characteristic(
|
||||
'552957FB-CF1F-4A31-9535-E78847E1A714',
|
||||
Characteristic.READ | Characteristic.WRITE,
|
||||
Characteristic.READABLE | Characteristic.WRITEABLE,
|
||||
CharacteristicValue(read=read_with_error, write=write_with_error)
|
||||
)
|
||||
]
|
||||
if mode == 'le':
|
||||
device.add_service(
|
||||
Service(
|
||||
'50DB505C-8AC4-4738-8448-3B1D9CC09CC5',
|
||||
[
|
||||
Characteristic(
|
||||
'552957FB-CF1F-4A31-9535-E78847E1A714',
|
||||
Characteristic.READ | Characteristic.WRITE,
|
||||
Characteristic.READABLE | Characteristic.WRITEABLE,
|
||||
CharacteristicValue(read=read_with_error, write=write_with_error)
|
||||
)
|
||||
]
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
# Select LE or Classic
|
||||
if mode == 'classic':
|
||||
device.classic_enabled = True
|
||||
device.le_enabled = False
|
||||
|
||||
# Get things going
|
||||
await device.power_on()
|
||||
@@ -267,7 +291,7 @@ async def pair(sc, mitm, bond, io, prompt, request, print_keys, keystore_file, d
|
||||
sc,
|
||||
mitm,
|
||||
bond,
|
||||
Delegate(connection, io, prompt)
|
||||
Delegate(mode, connection, io, prompt)
|
||||
)
|
||||
|
||||
# Connect to a peer or wait for a connection
|
||||
@@ -278,10 +302,14 @@ async def pair(sc, mitm, bond, io, prompt, request, print_keys, keystore_file, d
|
||||
|
||||
if not request:
|
||||
try:
|
||||
await connection.pair()
|
||||
if mode == 'le':
|
||||
await connection.pair()
|
||||
else:
|
||||
await connection.authenticate()
|
||||
return
|
||||
except ProtocolError as error:
|
||||
print(color(f'Pairing failed: {error}', 'red'))
|
||||
return
|
||||
except ProtocolError:
|
||||
pass
|
||||
else:
|
||||
# Advertise so that peers can find us and connect
|
||||
await device.start_advertising(auto_restart=True)
|
||||
@@ -291,6 +319,7 @@ async def pair(sc, mitm, bond, io, prompt, request, print_keys, keystore_file, d
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@click.command()
|
||||
@click.option('--mode', type=click.Choice(['le', 'classic']), default='le', show_default=True)
|
||||
@click.option('--sc', type=bool, default=True, help='Use the Secure Connections protocol', show_default=True)
|
||||
@click.option('--mitm', type=bool, default=True, help='Request MITM protection', show_default=True)
|
||||
@click.option('--bond', type=bool, default=True, help='Enable bonding', show_default=True)
|
||||
@@ -300,11 +329,11 @@ async def pair(sc, mitm, bond, io, prompt, request, print_keys, keystore_file, d
|
||||
@click.option('--print-keys', is_flag=True, help='Print the bond keys before pairing')
|
||||
@click.option('--keystore-file', help='File in which to store the pairing keys')
|
||||
@click.argument('device-config')
|
||||
@click.argument('transport')
|
||||
@click.argument('hci_transport')
|
||||
@click.argument('address-or-name', required=False)
|
||||
def main(sc, mitm, bond, io, prompt, request, print_keys, keystore_file, device_config, transport, address_or_name):
|
||||
def main(mode, sc, mitm, bond, io, prompt, request, print_keys, keystore_file, device_config, hci_transport, address_or_name):
|
||||
logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
|
||||
asyncio.run(pair(sc, mitm, bond, io, prompt, request, print_keys, keystore_file, device_config, transport, address_or_name))
|
||||
asyncio.run(pair(mode, sc, mitm, bond, io, prompt, request, print_keys, keystore_file, device_config, hci_transport, address_or_name))
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
Reference in New Issue
Block a user