forked from auracaster/bumble_mirror
improve smp compatibility with other OS flows
This commit is contained in:
@@ -518,13 +518,14 @@ class PairingDelegate:
|
||||
async def confirm(self) -> bool:
|
||||
return True
|
||||
|
||||
async def compare_numbers(self, _number: int, _digits: int = 6) -> bool:
|
||||
# pylint: disable-next=unused-argument
|
||||
async def compare_numbers(self, number: int, digits: int) -> bool:
|
||||
return True
|
||||
|
||||
async def get_number(self) -> int:
|
||||
return 0
|
||||
|
||||
async def display_number(self, _number: int, _digits: int = 6) -> None:
|
||||
async def display_number(self, number: int, digits: int) -> None:
|
||||
pass
|
||||
|
||||
async def key_distribution_response(
|
||||
@@ -661,7 +662,8 @@ class Session:
|
||||
self.peer_expected_distributions = []
|
||||
self.dh_key = None
|
||||
self.confirm_value = None
|
||||
self.passkey = 0
|
||||
self.passkey = None
|
||||
self.passkey_ready = asyncio.Event()
|
||||
self.passkey_step = 0
|
||||
self.passkey_display = False
|
||||
self.pairing_method = 0
|
||||
@@ -839,6 +841,7 @@ class Session:
|
||||
# Generate random Passkey/PIN code
|
||||
self.passkey = secrets.randbelow(1000000)
|
||||
logger.debug(f'Pairing PIN CODE: {self.passkey:06}')
|
||||
self.passkey_ready.set()
|
||||
|
||||
# The value of TK is computed from the PIN code
|
||||
if not self.sc:
|
||||
@@ -859,6 +862,8 @@ class Session:
|
||||
self.tk = passkey.to_bytes(16, byteorder='little')
|
||||
logger.debug(f'TK from passkey = {self.tk.hex()}')
|
||||
|
||||
self.passkey_ready.set()
|
||||
|
||||
if next_steps is not None:
|
||||
next_steps()
|
||||
|
||||
@@ -910,17 +915,29 @@ class Session:
|
||||
logger.debug(f'generated random: {self.r.hex()}')
|
||||
|
||||
if self.sc:
|
||||
if self.pairing_method in (self.JUST_WORKS, self.NUMERIC_COMPARISON):
|
||||
z = 0
|
||||
elif self.pairing_method == self.PASSKEY:
|
||||
z = 0x80 + ((self.passkey >> self.passkey_step) & 1)
|
||||
else:
|
||||
return
|
||||
|
||||
if self.is_initiator:
|
||||
confirm_value = crypto.f4(self.pka, self.pkb, self.r, bytes([z]))
|
||||
else:
|
||||
confirm_value = crypto.f4(self.pkb, self.pka, self.r, bytes([z]))
|
||||
async def next_steps():
|
||||
if self.pairing_method in (self.JUST_WORKS, self.NUMERIC_COMPARISON):
|
||||
z = 0
|
||||
elif self.pairing_method == self.PASSKEY:
|
||||
# We need a passkey
|
||||
await self.passkey_ready.wait()
|
||||
|
||||
z = 0x80 + ((self.passkey >> self.passkey_step) & 1)
|
||||
else:
|
||||
return
|
||||
|
||||
if self.is_initiator:
|
||||
confirm_value = crypto.f4(self.pka, self.pkb, self.r, bytes([z]))
|
||||
else:
|
||||
confirm_value = crypto.f4(self.pkb, self.pka, self.r, bytes([z]))
|
||||
|
||||
self.send_command(
|
||||
SMP_Pairing_Confirm_Command(confirm_value=confirm_value)
|
||||
)
|
||||
|
||||
# Perform the next steps asynchronously in case we need to wait for input
|
||||
self.connection.abort_on('disconnection', next_steps())
|
||||
else:
|
||||
confirm_value = crypto.c1(
|
||||
self.tk,
|
||||
@@ -933,7 +950,7 @@ class Session:
|
||||
self.ra,
|
||||
)
|
||||
|
||||
self.send_command(SMP_Pairing_Confirm_Command(confirm_value=confirm_value))
|
||||
self.send_command(SMP_Pairing_Confirm_Command(confirm_value=confirm_value))
|
||||
|
||||
def send_pairing_random_command(self):
|
||||
self.send_command(SMP_Pairing_Random_Command(random_value=self.r))
|
||||
@@ -1364,8 +1381,8 @@ class Session:
|
||||
|
||||
# Start phase 2
|
||||
if self.sc:
|
||||
if self.pairing_method == self.PASSKEY and self.passkey_display:
|
||||
self.display_passkey()
|
||||
if self.pairing_method == self.PASSKEY:
|
||||
self.display_or_input_passkey()
|
||||
|
||||
self.send_public_key_command()
|
||||
else:
|
||||
@@ -1426,18 +1443,22 @@ class Session:
|
||||
else:
|
||||
srand = self.r
|
||||
mrand = command.random_value
|
||||
stk = crypto.s1(self.tk, srand, mrand)
|
||||
logger.debug(f'STK = {stk.hex()}')
|
||||
self.stk = crypto.s1(self.tk, srand, mrand)
|
||||
logger.debug(f'STK = {self.stk.hex()}')
|
||||
|
||||
# Generate LTK
|
||||
self.ltk = crypto.r()
|
||||
|
||||
if self.is_initiator:
|
||||
self.start_encryption(stk)
|
||||
self.start_encryption(self.stk)
|
||||
else:
|
||||
self.send_pairing_random_command()
|
||||
|
||||
def on_smp_pairing_random_command_secure_connections(self, command):
|
||||
if self.pairing_method == self.PASSKEY and self.passkey is None:
|
||||
logger.warning('no passkey entered, ignoring command')
|
||||
return
|
||||
|
||||
# pylint: disable=too-many-return-statements
|
||||
if self.is_initiator:
|
||||
if self.pairing_method in (self.JUST_WORKS, self.NUMERIC_COMPARISON):
|
||||
@@ -1565,17 +1586,13 @@ class Session:
|
||||
logger.debug(f'DH key: {self.dh_key.hex()}')
|
||||
|
||||
if self.is_initiator:
|
||||
if self.pairing_method == self.PASSKEY:
|
||||
if self.passkey_display:
|
||||
self.send_pairing_confirm_command()
|
||||
else:
|
||||
self.input_passkey(self.send_pairing_confirm_command)
|
||||
self.send_pairing_confirm_command()
|
||||
else:
|
||||
# Send our public key back to the initiator
|
||||
if self.pairing_method == self.PASSKEY:
|
||||
self.display_or_input_passkey(self.send_public_key_command)
|
||||
else:
|
||||
self.send_public_key_command()
|
||||
self.display_or_input_passkey()
|
||||
|
||||
# Send our public key back to the initiator
|
||||
self.send_public_key_command()
|
||||
|
||||
if self.pairing_method in (self.JUST_WORKS, self.NUMERIC_COMPARISON):
|
||||
# We can now send the confirmation value
|
||||
|
||||
Reference in New Issue
Block a user