Merge pull request #153 from benquike/main

Add 1 bug fix and a few features in bumble
This commit is contained in:
Lucas Abel
2023-03-23 10:31:02 -07:00
committed by GitHub
4 changed files with 83 additions and 7 deletions

View File

@@ -95,6 +95,8 @@ from .hci import (
HCI_LE_Set_Scan_Enable_Command,
HCI_LE_Set_Scan_Parameters_Command,
HCI_LE_Set_Scan_Response_Data_Command,
HCI_PIN_Code_Request_Reply_Command,
HCI_PIN_Code_Request_Negative_Reply_Command,
HCI_Read_BD_ADDR_Command,
HCI_Read_RSSI_Command,
HCI_Reject_Connection_Request_Command,
@@ -743,6 +745,7 @@ class DeviceConfiguration:
self.le_enabled = True
# LE host enable 2nd parameter
self.le_simultaneous_enabled = True
self.classic_enabled = False
self.classic_sc_enabled = True
self.classic_ssp_enabled = True
self.classic_accept_any = True
@@ -772,6 +775,7 @@ class DeviceConfiguration:
self.le_simultaneous_enabled = config.get(
'le_simultaneous_enabled', self.le_simultaneous_enabled
)
self.classic_enabled = config.get('classic_enabled', self.classic_enabled)
self.classic_sc_enabled = config.get(
'classic_sc_enabled', self.classic_sc_enabled
)
@@ -983,6 +987,7 @@ class Device(CompositeEventEmitter):
self.keystore = KeyStore.create_for_device(config)
self.irk = config.irk
self.le_enabled = config.le_enabled
self.classic_enabled = config.classic_enabled
self.le_simultaneous_enabled = config.le_simultaneous_enabled
self.classic_ssp_enabled = config.classic_ssp_enabled
self.classic_sc_enabled = config.classic_sc_enabled
@@ -2781,6 +2786,51 @@ class Device(CompositeEventEmitter):
)
)
# [Classic only]
@host_event_handler
@with_connection_from_address
def on_pin_code_request(self, connection):
# classic legacy pairing
# Ask what the pairing config should be for this connection
pairing_config = self.pairing_config_factory(connection)
can_input = pairing_config.delegate.io_capability in (
smp.SMP_KEYBOARD_ONLY_IO_CAPABILITY,
smp.SMP_KEYBOARD_DISPLAY_IO_CAPABILITY,
)
# respond the pin code
if can_input:
async def get_pin_code():
pin_code = await connection.abort_on(
'disconnection', pairing_config.delegate.get_number()
)
if pin_code is not None:
pin_code = bytes(str(pin_code).zfill(6))
await self.host.send_command(
HCI_PIN_Code_Request_Reply_Command(
bd_addr=connection.peer_address,
pin_code_length=len(pin_code),
pin_code=pin_code,
)
)
else:
await self.host.send_command(
HCI_PIN_Code_Request_Negative_Reply_Command(
bd_addr=connection.peer_address
)
)
asyncio.create_task(get_pin_code())
else:
self.host.send_command_sync(
HCI_PIN_Code_Request_Negative_Reply_Command(
bd_addr=connection.peer_address
)
)
# [Classic only]
@host_event_handler
@with_connection_from_address

View File

@@ -1491,7 +1491,7 @@ class HCI_Object:
elif field_type == -2:
# 16-bit signed
field_value = struct.unpack_from('<h', data, offset)[0]
offset += 1
offset += 2
elif field_type == 3:
# 24-bit unsigned
padded = data[offset : offset + 3] + bytes([0])
@@ -2097,6 +2097,24 @@ class HCI_Link_Key_Request_Negative_Reply_Command(HCI_Command):
'''
# -----------------------------------------------------------------------------
@HCI_Command.command(
fields=[
('bd_addr', Address.parse_address),
('pin_code_length', 1),
('pin_code', '*'),
],
return_parameters_fields=[
('status', STATUS_SPEC),
('bd_addr', Address.parse_address),
],
)
class HCI_PIN_Code_Request_Reply_Command(HCI_Command):
'''
See Bluetooth spec @ 7.1.12 PIN Code Request Reply Command
'''
# -----------------------------------------------------------------------------
@HCI_Command.command(
fields=[('bd_addr', Address.parse_address)],

View File

@@ -53,7 +53,6 @@ from .hci import (
HCI_LE_Write_Suggested_Default_Data_Length_Command,
HCI_Link_Key_Request_Negative_Reply_Command,
HCI_Link_Key_Request_Reply_Command,
HCI_PIN_Code_Request_Negative_Reply_Command,
HCI_Packet,
HCI_Read_Buffer_Size_Command,
HCI_Read_Local_Supported_Commands_Command,
@@ -794,11 +793,7 @@ class Host(AbortableEventEmitter):
)
def on_hci_pin_code_request_event(self, event):
# For now, just refuse all requests
# TODO: delegate the decision
self.send_command_sync(
HCI_PIN_Code_Request_Negative_Reply_Command(bd_addr=event.bd_addr)
)
self.emit('pin_code_request', event.bd_addr)
def on_hci_link_key_request_event(self, event):
async def send_link_key():