Compare commits

...

18 Commits

Author SHA1 Message Date
Gilles Boccon-Gibod
21497d0a40 use type object instead of type strings 2023-07-27 11:29:23 -07:00
Josh Wu
190529184e L2CAP: Import device.Connection for typing 2023-07-27 09:07:55 -07:00
Josh Wu
46eb81466d Add more argement hints in L2CAP 2023-07-27 09:07:55 -07:00
Josh Wu
9c70c487b9 Add type hint to L2CAP module 2023-07-27 09:07:55 -07:00
Josh Wu
43234d7c3e Use with-patch to mock SMP session 2023-07-27 08:00:36 -07:00
Josh Wu
dbf878dc3f SMP: Remove PairingMethod.__str__ 2023-07-27 08:00:36 -07:00
Josh Wu
f6c0bd88d7 SMP: Do not send phase 2 commands in CTKD 2023-07-27 08:00:36 -07:00
Josh Wu
8440b7fbf1 SMP: Refactor pairing method as enum 2023-07-27 08:00:36 -07:00
Gilles Boccon-Gibod
808ab54135 Merge pull request #221 from google/gbg/core-classes
add new device class major/minor identifiers
2023-07-25 09:49:05 -07:00
Gilles Boccon-Gibod
52b29ad680 add new device class major/minor identifiers 2023-07-24 17:41:57 -07:00
Gilles Boccon-Gibod
d41bf9c587 Merge pull request #216 from google/gbg/host-buffer-size-command
accept Host Buffer Size Command in the controller
2023-07-24 09:05:10 -07:00
Gilles Boccon-Gibod
b758825164 add flow control command 2023-07-22 13:04:39 -07:00
Gilles Boccon-Gibod
779dfe5473 accept Host Buffer Size Command in the controller 2023-07-21 19:36:26 -07:00
Gilles Boccon-Gibod
f9a4c7518e Merge pull request #214 from marshallpierce/mp/scanner-rssi
Add a space after RSSI
2023-07-14 10:52:54 -07:00
Marshall Pierce
bad2fdf69f Add a space after RSSI
The other data elements have a space, so I'm guessing that RSSI
is intended to as well. Perhaps there's some subtle reason why
it should have a space, though, in which case feel free to
close this.

Output now looks like this:

```
>>> 58:D3:49:E7:40:DA/P [PUBLIC]:
  RSSI: -67
  [Flags]: LE General,BR/EDR C,BR/EDR H
  [TX Power Level]: 4
  [Manufacturer Specific Data]: company=Apple, Inc., data=0f08c00af4392b00040c10020f04
```
2023-07-13 12:47:45 -06:00
Lucas Abel
a84df469cd pairing: handle user errors from all delegate calls 2023-07-12 11:03:21 -07:00
Gilles Boccon-Gibod
03e33e39bd Merge pull request #211 from google/gbg/fix-ws-transport-doc
fix doc for ws-client ws-server transports
2023-07-12 07:06:32 -07:00
Gilles Boccon-Gibod
753fb69272 fix doc for ws-client ws-server transports 2023-07-12 06:06:20 -07:00
9 changed files with 593 additions and 288 deletions

View File

@@ -654,7 +654,7 @@ class Controller:
def on_hci_create_connection_command(self, command):
'''
See Bluetooth spec Vol 2, Part E - 7.1.5 Create Connection command
See Bluetooth spec Vol 4, Part E - 7.1.5 Create Connection command
'''
if self.link is None:
@@ -685,7 +685,7 @@ class Controller:
def on_hci_disconnect_command(self, command):
'''
See Bluetooth spec Vol 2, Part E - 7.1.6 Disconnect Command
See Bluetooth spec Vol 4, Part E - 7.1.6 Disconnect Command
'''
# First, say that the disconnection is pending
self.send_hci_packet(
@@ -719,7 +719,7 @@ class Controller:
def on_hci_accept_connection_request_command(self, command):
'''
See Bluetooth spec Vol 2, Part E - 7.1.8 Accept Connection Request command
See Bluetooth spec Vol 4, Part E - 7.1.8 Accept Connection Request command
'''
if self.link is None:
@@ -735,7 +735,7 @@ class Controller:
def on_hci_switch_role_command(self, command):
'''
See Bluetooth spec Vol 2, Part E - 7.2.8 Switch Role command
See Bluetooth spec Vol 4, Part E - 7.2.8 Switch Role command
'''
if self.link is None:
@@ -751,21 +751,21 @@ class Controller:
def on_hci_set_event_mask_command(self, command):
'''
See Bluetooth spec Vol 2, Part E - 7.3.1 Set Event Mask Command
See Bluetooth spec Vol 4, Part E - 7.3.1 Set Event Mask Command
'''
self.event_mask = command.event_mask
return bytes([HCI_SUCCESS])
def on_hci_reset_command(self, _command):
'''
See Bluetooth spec Vol 2, Part E - 7.3.2 Reset Command
See Bluetooth spec Vol 4, Part E - 7.3.2 Reset Command
'''
# TODO: cleanup what needs to be reset
return bytes([HCI_SUCCESS])
def on_hci_write_local_name_command(self, command):
'''
See Bluetooth spec Vol 2, Part E - 7.3.11 Write Local Name Command
See Bluetooth spec Vol 4, Part E - 7.3.11 Write Local Name Command
'''
local_name = command.local_name
if len(local_name):
@@ -780,7 +780,7 @@ class Controller:
def on_hci_read_local_name_command(self, _command):
'''
See Bluetooth spec Vol 2, Part E - 7.3.12 Read Local Name Command
See Bluetooth spec Vol 4, Part E - 7.3.12 Read Local Name Command
'''
local_name = bytes(self.local_name, 'utf-8')[:248]
if len(local_name) < 248:
@@ -790,19 +790,19 @@ class Controller:
def on_hci_read_class_of_device_command(self, _command):
'''
See Bluetooth spec Vol 2, Part E - 7.3.25 Read Class of Device Command
See Bluetooth spec Vol 4, Part E - 7.3.25 Read Class of Device Command
'''
return bytes([HCI_SUCCESS, 0, 0, 0])
def on_hci_write_class_of_device_command(self, _command):
'''
See Bluetooth spec Vol 2, Part E - 7.3.26 Write Class of Device Command
See Bluetooth spec Vol 4, Part E - 7.3.26 Write Class of Device Command
'''
return bytes([HCI_SUCCESS])
def on_hci_read_synchronous_flow_control_enable_command(self, _command):
'''
See Bluetooth spec Vol 2, Part E - 7.3.36 Read Synchronous Flow Control Enable
See Bluetooth spec Vol 4, Part E - 7.3.36 Read Synchronous Flow Control Enable
Command
'''
if self.sync_flow_control:
@@ -813,7 +813,7 @@ class Controller:
def on_hci_write_synchronous_flow_control_enable_command(self, command):
'''
See Bluetooth spec Vol 2, Part E - 7.3.37 Write Synchronous Flow Control Enable
See Bluetooth spec Vol 4, Part E - 7.3.37 Write Synchronous Flow Control Enable
Command
'''
ret = HCI_SUCCESS
@@ -825,41 +825,59 @@ class Controller:
ret = HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR
return bytes([ret])
def on_hci_set_controller_to_host_flow_control_command(self, _command):
'''
See Bluetooth spec Vol 4, Part E - 7.3.38 Set Controller To Host Flow Control
Command
'''
# For now we just accept the command but ignore the values.
# TODO: respect the passed in values.
return bytes([HCI_SUCCESS])
def on_hci_host_buffer_size_command(self, _command):
'''
See Bluetooth spec Vol 4, Part E - 7.3.39 Host Buffer Size Command
'''
# For now we just accept the command but ignore the values.
# TODO: respect the passed in values.
return bytes([HCI_SUCCESS])
def on_hci_write_extended_inquiry_response_command(self, _command):
'''
See Bluetooth spec Vol 2, Part E - 7.3.59 Write Simple Pairing Mode Command
See Bluetooth spec Vol 4, Part E - 7.3.56 Write Extended Inquiry Response
Command
'''
return bytes([HCI_SUCCESS])
def on_hci_write_simple_pairing_mode_command(self, _command):
'''
See Bluetooth spec Vol 2, Part E - 7.3.59 Write Simple Pairing Mode Command
See Bluetooth spec Vol 4, Part E - 7.3.59 Write Simple Pairing Mode Command
'''
return bytes([HCI_SUCCESS])
def on_hci_set_event_mask_page_2_command(self, command):
'''
See Bluetooth spec Vol 2, Part E - 7.3.69 Set Event Mask Page 2 Command
See Bluetooth spec Vol 4, Part E - 7.3.69 Set Event Mask Page 2 Command
'''
self.event_mask_page_2 = command.event_mask_page_2
return bytes([HCI_SUCCESS])
def on_hci_read_le_host_support_command(self, _command):
'''
See Bluetooth spec Vol 2, Part E - 7.3.78 Write LE Host Support Command
See Bluetooth spec Vol 4, Part E - 7.3.78 Write LE Host Support Command
'''
return bytes([HCI_SUCCESS, 1, 0])
def on_hci_write_le_host_support_command(self, _command):
'''
See Bluetooth spec Vol 2, Part E - 7.3.79 Write LE Host Support Command
See Bluetooth spec Vol 4, Part E - 7.3.79 Write LE Host Support Command
'''
# TODO / Just ignore for now
return bytes([HCI_SUCCESS])
def on_hci_write_authenticated_payload_timeout_command(self, command):
'''
See Bluetooth spec Vol 2, Part E - 7.3.94 Write Authenticated Payload Timeout
See Bluetooth spec Vol 4, Part E - 7.3.94 Write Authenticated Payload Timeout
Command
'''
# TODO
@@ -867,7 +885,7 @@ class Controller:
def on_hci_read_local_version_information_command(self, _command):
'''
See Bluetooth spec Vol 2, Part E - 7.4.1 Read Local Version Information Command
See Bluetooth spec Vol 4, Part E - 7.4.1 Read Local Version Information Command
'''
return struct.pack(
'<BBHBHH',
@@ -881,19 +899,19 @@ class Controller:
def on_hci_read_local_supported_commands_command(self, _command):
'''
See Bluetooth spec Vol 2, Part E - 7.4.2 Read Local Supported Commands Command
See Bluetooth spec Vol 4, Part E - 7.4.2 Read Local Supported Commands Command
'''
return bytes([HCI_SUCCESS]) + self.supported_commands
def on_hci_read_local_supported_features_command(self, _command):
'''
See Bluetooth spec Vol 2, Part E - 7.4.3 Read Local Supported Features Command
See Bluetooth spec Vol 4, Part E - 7.4.3 Read Local Supported Features Command
'''
return bytes([HCI_SUCCESS]) + self.lmp_features
def on_hci_read_bd_addr_command(self, _command):
'''
See Bluetooth spec Vol 2, Part E - 7.4.6 Read BD_ADDR Command
See Bluetooth spec Vol 4, Part E - 7.4.6 Read BD_ADDR Command
'''
bd_addr = (
self._public_address.to_bytes()
@@ -904,14 +922,14 @@ class Controller:
def on_hci_le_set_event_mask_command(self, command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.1 LE Set Event Mask Command
See Bluetooth spec Vol 4, Part E - 7.8.1 LE Set Event Mask Command
'''
self.le_event_mask = command.le_event_mask
return bytes([HCI_SUCCESS])
def on_hci_le_read_buffer_size_command(self, _command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.2 LE Read Buffer Size Command
See Bluetooth spec Vol 4, Part E - 7.8.2 LE Read Buffer Size Command
'''
return struct.pack(
'<BHB',
@@ -922,49 +940,49 @@ class Controller:
def on_hci_le_read_local_supported_features_command(self, _command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.3 LE Read Local Supported Features
See Bluetooth spec Vol 4, Part E - 7.8.3 LE Read Local Supported Features
Command
'''
return bytes([HCI_SUCCESS]) + self.le_features
def on_hci_le_set_random_address_command(self, command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.4 LE Set Random Address Command
See Bluetooth spec Vol 4, Part E - 7.8.4 LE Set Random Address Command
'''
self.random_address = command.random_address
return bytes([HCI_SUCCESS])
def on_hci_le_set_advertising_parameters_command(self, command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.5 LE Set Advertising Parameters Command
See Bluetooth spec Vol 4, Part E - 7.8.5 LE Set Advertising Parameters Command
'''
self.advertising_parameters = command
return bytes([HCI_SUCCESS])
def on_hci_le_read_advertising_physical_channel_tx_power_command(self, _command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.6 LE Read Advertising Physical Channel
See Bluetooth spec Vol 4, Part E - 7.8.6 LE Read Advertising Physical Channel
Tx Power Command
'''
return bytes([HCI_SUCCESS, self.advertising_channel_tx_power])
def on_hci_le_set_advertising_data_command(self, command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.7 LE Set Advertising Data Command
See Bluetooth spec Vol 4, Part E - 7.8.7 LE Set Advertising Data Command
'''
self.advertising_data = command.advertising_data
return bytes([HCI_SUCCESS])
def on_hci_le_set_scan_response_data_command(self, command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.8 LE Set Scan Response Data Command
See Bluetooth spec Vol 4, Part E - 7.8.8 LE Set Scan Response Data Command
'''
self.le_scan_response_data = command.scan_response_data
return bytes([HCI_SUCCESS])
def on_hci_le_set_advertising_enable_command(self, command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.9 LE Set Advertising Enable Command
See Bluetooth spec Vol 4, Part E - 7.8.9 LE Set Advertising Enable Command
'''
if command.advertising_enable:
self.start_advertising()
@@ -975,7 +993,7 @@ class Controller:
def on_hci_le_set_scan_parameters_command(self, command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.10 LE Set Scan Parameters Command
See Bluetooth spec Vol 4, Part E - 7.8.10 LE Set Scan Parameters Command
'''
self.le_scan_type = command.le_scan_type
self.le_scan_interval = command.le_scan_interval
@@ -986,7 +1004,7 @@ class Controller:
def on_hci_le_set_scan_enable_command(self, command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.11 LE Set Scan Enable Command
See Bluetooth spec Vol 4, Part E - 7.8.11 LE Set Scan Enable Command
'''
self.le_scan_enable = command.le_scan_enable
self.filter_duplicates = command.filter_duplicates
@@ -994,7 +1012,7 @@ class Controller:
def on_hci_le_create_connection_command(self, command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.12 LE Create Connection Command
See Bluetooth spec Vol 4, Part E - 7.8.12 LE Create Connection Command
'''
if not self.link:
@@ -1027,40 +1045,40 @@ class Controller:
def on_hci_le_create_connection_cancel_command(self, _command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.13 LE Create Connection Cancel Command
See Bluetooth spec Vol 4, Part E - 7.8.13 LE Create Connection Cancel Command
'''
return bytes([HCI_SUCCESS])
def on_hci_le_read_filter_accept_list_size_command(self, _command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.14 LE Read Filter Accept List Size
See Bluetooth spec Vol 4, Part E - 7.8.14 LE Read Filter Accept List Size
Command
'''
return bytes([HCI_SUCCESS, self.filter_accept_list_size])
def on_hci_le_clear_filter_accept_list_command(self, _command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.15 LE Clear Filter Accept List Command
See Bluetooth spec Vol 4, Part E - 7.8.15 LE Clear Filter Accept List Command
'''
return bytes([HCI_SUCCESS])
def on_hci_le_add_device_to_filter_accept_list_command(self, _command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.16 LE Add Device To Filter Accept List
See Bluetooth spec Vol 4, Part E - 7.8.16 LE Add Device To Filter Accept List
Command
'''
return bytes([HCI_SUCCESS])
def on_hci_le_remove_device_from_filter_accept_list_command(self, _command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.17 LE Remove Device From Filter Accept
See Bluetooth spec Vol 4, Part E - 7.8.17 LE Remove Device From Filter Accept
List Command
'''
return bytes([HCI_SUCCESS])
def on_hci_le_read_remote_features_command(self, command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.21 LE Read Remote Features Command
See Bluetooth spec Vol 4, Part E - 7.8.21 LE Read Remote Features Command
'''
# First, say that the command is pending
@@ -1083,13 +1101,13 @@ class Controller:
def on_hci_le_rand_command(self, _command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.23 LE Rand Command
See Bluetooth spec Vol 4, Part E - 7.8.23 LE Rand Command
'''
return bytes([HCI_SUCCESS]) + struct.pack('Q', random.randint(0, 1 << 64))
def on_hci_le_enable_encryption_command(self, command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.24 LE Enable Encryption Command
See Bluetooth spec Vol 4, Part E - 7.8.24 LE Enable Encryption Command
'''
# Check the parameters
@@ -1122,13 +1140,13 @@ class Controller:
def on_hci_le_read_supported_states_command(self, _command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.27 LE Read Supported States Command
See Bluetooth spec Vol 4, Part E - 7.8.27 LE Read Supported States Command
'''
return bytes([HCI_SUCCESS]) + self.le_states
def on_hci_le_read_suggested_default_data_length_command(self, _command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.34 LE Read Suggested Default Data Length
See Bluetooth spec Vol 4, Part E - 7.8.34 LE Read Suggested Default Data Length
Command
'''
return struct.pack(
@@ -1140,7 +1158,7 @@ class Controller:
def on_hci_le_write_suggested_default_data_length_command(self, command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.35 LE Write Suggested Default Data Length
See Bluetooth spec Vol 4, Part E - 7.8.35 LE Write Suggested Default Data Length
Command
'''
self.suggested_max_tx_octets, self.suggested_max_tx_time = struct.unpack(
@@ -1150,33 +1168,33 @@ class Controller:
def on_hci_le_read_local_p_256_public_key_command(self, _command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.36 LE Read P-256 Public Key Command
See Bluetooth spec Vol 4, Part E - 7.8.36 LE Read P-256 Public Key Command
'''
# TODO create key and send HCI_LE_Read_Local_P-256_Public_Key_Complete event
return bytes([HCI_SUCCESS])
def on_hci_le_add_device_to_resolving_list_command(self, _command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.38 LE Add Device To Resolving List
See Bluetooth spec Vol 4, Part E - 7.8.38 LE Add Device To Resolving List
Command
'''
return bytes([HCI_SUCCESS])
def on_hci_le_clear_resolving_list_command(self, _command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.40 LE Clear Resolving List Command
See Bluetooth spec Vol 4, Part E - 7.8.40 LE Clear Resolving List Command
'''
return bytes([HCI_SUCCESS])
def on_hci_le_read_resolving_list_size_command(self, _command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.41 LE Read Resolving List Size Command
See Bluetooth spec Vol 4, Part E - 7.8.41 LE Read Resolving List Size Command
'''
return bytes([HCI_SUCCESS, self.resolving_list_size])
def on_hci_le_set_address_resolution_enable_command(self, command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.44 LE Set Address Resolution Enable
See Bluetooth spec Vol 4, Part E - 7.8.44 LE Set Address Resolution Enable
Command
'''
ret = HCI_SUCCESS
@@ -1190,7 +1208,7 @@ class Controller:
def on_hci_le_set_resolvable_private_address_timeout_command(self, command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.45 LE Set Resolvable Private Address
See Bluetooth spec Vol 4, Part E - 7.8.45 LE Set Resolvable Private Address
Timeout Command
'''
self.le_rpa_timeout = command.rpa_timeout
@@ -1198,7 +1216,7 @@ class Controller:
def on_hci_le_read_maximum_data_length_command(self, _command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.46 LE Read Maximum Data Length Command
See Bluetooth spec Vol 4, Part E - 7.8.46 LE Read Maximum Data Length Command
'''
return struct.pack(
'<BHHHH',
@@ -1211,7 +1229,7 @@ class Controller:
def on_hci_le_read_phy_command(self, command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.47 LE Read PHY Command
See Bluetooth spec Vol 4, Part E - 7.8.47 LE Read PHY Command
'''
return struct.pack(
'<BHBB',
@@ -1223,7 +1241,7 @@ class Controller:
def on_hci_le_set_default_phy_command(self, command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.48 LE Set Default PHY Command
See Bluetooth spec Vol 4, Part E - 7.8.48 LE Set Default PHY Command
'''
self.default_phy = {
'all_phys': command.all_phys,
@@ -1234,6 +1252,6 @@ class Controller:
def on_hci_le_read_transmit_power_command(self, _command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.74 LE Read Transmit Power Command
See Bluetooth spec Vol 4, Part E - 7.8.74 LE Read Transmit Power Command
'''
return struct.pack('<BBB', HCI_SUCCESS, 0, 0)

View File

@@ -17,7 +17,7 @@
# -----------------------------------------------------------------------------
from __future__ import annotations
import struct
from typing import List, Optional, Tuple, Union, cast
from typing import List, Optional, Tuple, Union, cast, Dict
from .company_ids import COMPANY_IDENTIFIERS
@@ -53,7 +53,7 @@ def bit_flags_to_strings(bits, bit_flag_names):
return names
def name_or_number(dictionary, number, width=2):
def name_or_number(dictionary: Dict[int, str], number: int, width: int = 2) -> str:
name = dictionary.get(number)
if name is not None:
return name
@@ -562,11 +562,82 @@ class DeviceClass:
PERIPHERAL_HANDHELD_GESTURAL_INPUT_DEVICE_MINOR_DEVICE_CLASS: 'Handheld gestural input device'
}
WEARABLE_UNCATEGORIZED_MINOR_DEVICE_CLASS = 0x00
WEARABLE_WRISTWATCH_MINOR_DEVICE_CLASS = 0x01
WEARABLE_PAGER_MINOR_DEVICE_CLASS = 0x02
WEARABLE_JACKET_MINOR_DEVICE_CLASS = 0x03
WEARABLE_HELMET_MINOR_DEVICE_CLASS = 0x04
WEARABLE_GLASSES_MINOR_DEVICE_CLASS = 0x05
WEARABLE_MINOR_DEVICE_CLASS_NAMES = {
WEARABLE_UNCATEGORIZED_MINOR_DEVICE_CLASS: 'Uncategorized',
WEARABLE_WRISTWATCH_MINOR_DEVICE_CLASS: 'Wristwatch',
WEARABLE_PAGER_MINOR_DEVICE_CLASS: 'Pager',
WEARABLE_JACKET_MINOR_DEVICE_CLASS: 'Jacket',
WEARABLE_HELMET_MINOR_DEVICE_CLASS: 'Helmet',
WEARABLE_GLASSES_MINOR_DEVICE_CLASS: 'Glasses',
}
TOY_UNCATEGORIZED_MINOR_DEVICE_CLASS = 0x00
TOY_ROBOT_MINOR_DEVICE_CLASS = 0x01
TOY_VEHICLE_MINOR_DEVICE_CLASS = 0x02
TOY_DOLL_ACTION_FIGURE_MINOR_DEVICE_CLASS = 0x03
TOY_CONTROLLER_MINOR_DEVICE_CLASS = 0x04
TOY_GAME_MINOR_DEVICE_CLASS = 0x05
TOY_MINOR_DEVICE_CLASS_NAMES = {
TOY_UNCATEGORIZED_MINOR_DEVICE_CLASS: 'Uncategorized',
TOY_ROBOT_MINOR_DEVICE_CLASS: 'Robot',
TOY_VEHICLE_MINOR_DEVICE_CLASS: 'Vehicle',
TOY_DOLL_ACTION_FIGURE_MINOR_DEVICE_CLASS: 'Doll/Action figure',
TOY_CONTROLLER_MINOR_DEVICE_CLASS: 'Controller',
TOY_GAME_MINOR_DEVICE_CLASS: 'Game',
}
HEALTH_UNDEFINED_MINOR_DEVICE_CLASS = 0x00
HEALTH_BLOOD_PRESSURE_MONITOR_MINOR_DEVICE_CLASS = 0x01
HEALTH_THERMOMETER_MINOR_DEVICE_CLASS = 0x02
HEALTH_WEIGHING_SCALE_MINOR_DEVICE_CLASS = 0x03
HEALTH_GLUCOSE_METER_MINOR_DEVICE_CLASS = 0x04
HEALTH_PULSE_OXIMETER_MINOR_DEVICE_CLASS = 0x05
HEALTH_HEART_PULSE_RATE_MONITOR_MINOR_DEVICE_CLASS = 0x06
HEALTH_HEALTH_DATA_DISPLAY_MINOR_DEVICE_CLASS = 0x07
HEALTH_STEP_COUNTER_MINOR_DEVICE_CLASS = 0x08
HEALTH_BODY_COMPOSITION_ANALYZER_MINOR_DEVICE_CLASS = 0x09
HEALTH_PEAK_FLOW_MONITOR_MINOR_DEVICE_CLASS = 0x0A
HEALTH_MEDICATION_MONITOR_MINOR_DEVICE_CLASS = 0x0B
HEALTH_KNEE_PROSTHESIS_MINOR_DEVICE_CLASS = 0x0C
HEALTH_ANKLE_PROSTHESIS_MINOR_DEVICE_CLASS = 0x0D
HEALTH_GENERIC_HEALTH_MANAGER_MINOR_DEVICE_CLASS = 0x0E
HEALTH_PERSONAL_MOBILITY_DEVICE_MINOR_DEVICE_CLASS = 0x0F
HEALTH_MINOR_DEVICE_CLASS_NAMES = {
HEALTH_UNDEFINED_MINOR_DEVICE_CLASS: 'Undefined',
HEALTH_BLOOD_PRESSURE_MONITOR_MINOR_DEVICE_CLASS: 'Blood Pressure Monitor',
HEALTH_THERMOMETER_MINOR_DEVICE_CLASS: 'Thermometer',
HEALTH_WEIGHING_SCALE_MINOR_DEVICE_CLASS: 'Weighing Scale',
HEALTH_GLUCOSE_METER_MINOR_DEVICE_CLASS: 'Glucose Meter',
HEALTH_PULSE_OXIMETER_MINOR_DEVICE_CLASS: 'Pulse Oximeter',
HEALTH_HEART_PULSE_RATE_MONITOR_MINOR_DEVICE_CLASS: 'Heart/Pulse Rate Monitor',
HEALTH_HEALTH_DATA_DISPLAY_MINOR_DEVICE_CLASS: 'Health Data Display',
HEALTH_STEP_COUNTER_MINOR_DEVICE_CLASS: 'Step Counter',
HEALTH_BODY_COMPOSITION_ANALYZER_MINOR_DEVICE_CLASS: 'Body Composition Analyzer',
HEALTH_PEAK_FLOW_MONITOR_MINOR_DEVICE_CLASS: 'Peak Flow Monitor',
HEALTH_MEDICATION_MONITOR_MINOR_DEVICE_CLASS: 'Medication Monitor',
HEALTH_KNEE_PROSTHESIS_MINOR_DEVICE_CLASS: 'Knee Prosthesis',
HEALTH_ANKLE_PROSTHESIS_MINOR_DEVICE_CLASS: 'Ankle Prosthesis',
HEALTH_GENERIC_HEALTH_MANAGER_MINOR_DEVICE_CLASS: 'Generic Health Manager',
HEALTH_PERSONAL_MOBILITY_DEVICE_MINOR_DEVICE_CLASS: 'Personal Mobility Device',
}
MINOR_DEVICE_CLASS_NAMES = {
COMPUTER_MAJOR_DEVICE_CLASS: COMPUTER_MINOR_DEVICE_CLASS_NAMES,
PHONE_MAJOR_DEVICE_CLASS: PHONE_MINOR_DEVICE_CLASS_NAMES,
AUDIO_VIDEO_MAJOR_DEVICE_CLASS: AUDIO_VIDEO_MINOR_DEVICE_CLASS_NAMES,
PERIPHERAL_MAJOR_DEVICE_CLASS: PERIPHERAL_MINOR_DEVICE_CLASS_NAMES
PERIPHERAL_MAJOR_DEVICE_CLASS: PERIPHERAL_MINOR_DEVICE_CLASS_NAMES,
WEARABLE_MAJOR_DEVICE_CLASS: WEARABLE_MINOR_DEVICE_CLASS_NAMES,
TOY_MAJOR_DEVICE_CLASS: TOY_MINOR_DEVICE_CLASS_NAMES,
HEALTH_MAJOR_DEVICE_CLASS: HEALTH_MINOR_DEVICE_CLASS_NAMES,
}
# fmt: on

View File

@@ -2851,18 +2851,22 @@ class Device(CompositeEventEmitter):
method = methods[peer_io_capability][io_capability]
async def reply() -> None:
if await connection.abort_on('disconnection', method()):
await self.host.send_command(
HCI_User_Confirmation_Request_Reply_Command( # type: ignore[call-arg]
bd_addr=connection.peer_address
)
)
else:
await self.host.send_command(
HCI_User_Confirmation_Request_Negative_Reply_Command( # type: ignore[call-arg]
bd_addr=connection.peer_address
try:
if await connection.abort_on('disconnection', method()):
await self.host.send_command(
HCI_User_Confirmation_Request_Reply_Command( # type: ignore[call-arg]
bd_addr=connection.peer_address
)
)
return
except Exception as error:
logger.warning(f'exception while confirming: {error}')
await self.host.send_command(
HCI_User_Confirmation_Request_Negative_Reply_Command( # type: ignore[call-arg]
bd_addr=connection.peer_address
)
)
AsyncRunner.spawn(reply())
@@ -2874,21 +2878,25 @@ class Device(CompositeEventEmitter):
pairing_config = self.pairing_config_factory(connection)
async def reply() -> None:
number = await connection.abort_on(
'disconnection', pairing_config.delegate.get_number()
try:
number = await connection.abort_on(
'disconnection', pairing_config.delegate.get_number()
)
if number is not None:
await self.host.send_command(
HCI_User_Passkey_Request_Reply_Command( # type: ignore[call-arg]
bd_addr=connection.peer_address, numeric_value=number
)
)
return
except Exception as error:
logger.warning(f'exception while asking for pass-key: {error}')
await self.host.send_command(
HCI_User_Passkey_Request_Negative_Reply_Command( # type: ignore[call-arg]
bd_addr=connection.peer_address
)
)
if number is not None:
await self.host.send_command(
HCI_User_Passkey_Request_Reply_Command( # type: ignore[call-arg]
bd_addr=connection.peer_address, numeric_value=number
)
)
else:
await self.host.send_command(
HCI_User_Passkey_Request_Negative_Reply_Command( # type: ignore[call-arg]
bd_addr=connection.peer_address
)
)
AsyncRunner.spawn(reply())

View File

@@ -22,7 +22,19 @@ import struct
from collections import deque
from pyee import EventEmitter
from typing import Dict, Type
from typing import (
Dict,
Type,
List,
Optional,
Tuple,
Callable,
Any,
Union,
Deque,
Iterable,
TYPE_CHECKING,
)
from .colors import color
from .core import BT_CENTRAL_ROLE, InvalidStateError, ProtocolError
@@ -33,6 +45,9 @@ from .hci import (
name_or_number,
)
if TYPE_CHECKING:
from bumble.device import Connection
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
@@ -155,7 +170,7 @@ class L2CAP_PDU:
'''
@staticmethod
def from_bytes(data):
def from_bytes(data: bytes) -> L2CAP_PDU:
# Sanity check
if len(data) < 4:
raise ValueError('not enough data for L2CAP header')
@@ -165,18 +180,18 @@ class L2CAP_PDU:
return L2CAP_PDU(l2cap_pdu_cid, l2cap_pdu_payload)
def to_bytes(self):
def to_bytes(self) -> bytes:
header = struct.pack('<HH', len(self.payload), self.cid)
return header + self.payload
def __init__(self, cid, payload):
def __init__(self, cid: int, payload: bytes) -> None:
self.cid = cid
self.payload = payload
def __bytes__(self):
def __bytes__(self) -> bytes:
return self.to_bytes()
def __str__(self):
def __str__(self) -> str:
return f'{color("L2CAP", "green")} [CID={self.cid}]: {self.payload.hex()}'
@@ -188,10 +203,10 @@ class L2CAP_Control_Frame:
classes: Dict[int, Type[L2CAP_Control_Frame]] = {}
code = 0
name = None
name: str
@staticmethod
def from_bytes(pdu):
def from_bytes(pdu: bytes) -> L2CAP_Control_Frame:
code = pdu[0]
cls = L2CAP_Control_Frame.classes.get(code)
@@ -216,11 +231,11 @@ class L2CAP_Control_Frame:
return self
@staticmethod
def code_name(code):
def code_name(code: int) -> str:
return name_or_number(L2CAP_CONTROL_FRAME_NAMES, code)
@staticmethod
def decode_configuration_options(data):
def decode_configuration_options(data: bytes) -> List[Tuple[int, bytes]]:
options = []
while len(data) >= 2:
value_type = data[0]
@@ -232,7 +247,7 @@ class L2CAP_Control_Frame:
return options
@staticmethod
def encode_configuration_options(options):
def encode_configuration_options(options: List[Tuple[int, bytes]]) -> bytes:
return b''.join(
[bytes([option[0], len(option[1])]) + option[1] for option in options]
)
@@ -256,29 +271,30 @@ class L2CAP_Control_Frame:
return inner
def __init__(self, pdu=None, **kwargs):
def __init__(self, pdu=None, **kwargs) -> None:
self.identifier = kwargs.get('identifier', 0)
if hasattr(self, 'fields') and kwargs:
HCI_Object.init_from_fields(self, self.fields, kwargs)
if pdu is None:
data = HCI_Object.dict_to_bytes(kwargs, self.fields)
pdu = (
bytes([self.code, self.identifier])
+ struct.pack('<H', len(data))
+ data
)
if hasattr(self, 'fields'):
if kwargs:
HCI_Object.init_from_fields(self, self.fields, kwargs)
if pdu is None:
data = HCI_Object.dict_to_bytes(kwargs, self.fields)
pdu = (
bytes([self.code, self.identifier])
+ struct.pack('<H', len(data))
+ data
)
self.pdu = pdu
def init_from_bytes(self, pdu, offset):
return HCI_Object.init_from_bytes(self, pdu, offset, self.fields)
def to_bytes(self):
def to_bytes(self) -> bytes:
return self.pdu
def __bytes__(self):
def __bytes__(self) -> bytes:
return self.to_bytes()
def __str__(self):
def __str__(self) -> str:
result = f'{color(self.name, "yellow")} [ID={self.identifier}]'
if fields := getattr(self, 'fields', None):
result += ':\n' + HCI_Object.format_fields(self.__dict__, fields, ' ')
@@ -315,7 +331,7 @@ class L2CAP_Command_Reject(L2CAP_Control_Frame):
}
@staticmethod
def reason_name(reason):
def reason_name(reason: int) -> str:
return name_or_number(L2CAP_Command_Reject.REASON_NAMES, reason)
@@ -343,7 +359,7 @@ class L2CAP_Connection_Request(L2CAP_Control_Frame):
'''
@staticmethod
def parse_psm(data, offset=0):
def parse_psm(data: bytes, offset: int = 0) -> Tuple[int, int]:
psm_length = 2
psm = data[offset] | data[offset + 1] << 8
@@ -355,7 +371,7 @@ class L2CAP_Connection_Request(L2CAP_Control_Frame):
return offset + psm_length, psm
@staticmethod
def serialize_psm(psm):
def serialize_psm(psm: int) -> bytes:
serialized = struct.pack('<H', psm & 0xFFFF)
psm >>= 16
while psm:
@@ -405,7 +421,7 @@ class L2CAP_Connection_Response(L2CAP_Control_Frame):
}
@staticmethod
def result_name(result):
def result_name(result: int) -> str:
return name_or_number(L2CAP_Connection_Response.RESULT_NAMES, result)
@@ -452,7 +468,7 @@ class L2CAP_Configure_Response(L2CAP_Control_Frame):
}
@staticmethod
def result_name(result):
def result_name(result: int) -> str:
return name_or_number(L2CAP_Configure_Response.RESULT_NAMES, result)
@@ -529,7 +545,7 @@ class L2CAP_Information_Request(L2CAP_Control_Frame):
}
@staticmethod
def info_type_name(info_type):
def info_type_name(info_type: int) -> str:
return name_or_number(L2CAP_Information_Request.INFO_TYPE_NAMES, info_type)
@@ -556,7 +572,7 @@ class L2CAP_Information_Response(L2CAP_Control_Frame):
RESULT_NAMES = {SUCCESS: 'SUCCESS', NOT_SUPPORTED: 'NOT_SUPPORTED'}
@staticmethod
def result_name(result):
def result_name(result: int) -> str:
return name_or_number(L2CAP_Information_Response.RESULT_NAMES, result)
@@ -588,6 +604,8 @@ class L2CAP_LE_Credit_Based_Connection_Request(L2CAP_Control_Frame):
(CODE 0x14)
'''
source_cid: int
# -----------------------------------------------------------------------------
@L2CAP_Control_Frame.subclass(
@@ -640,7 +658,7 @@ class L2CAP_LE_Credit_Based_Connection_Response(L2CAP_Control_Frame):
}
@staticmethod
def result_name(result):
def result_name(result: int) -> str:
return name_or_number(
L2CAP_LE_Credit_Based_Connection_Response.RESULT_NAMES, result
)
@@ -701,7 +719,22 @@ class Channel(EventEmitter):
WAIT_CONTROL_IND: 'WAIT_CONTROL_IND',
}
def __init__(self, manager, connection, signaling_cid, psm, source_cid, mtu):
connection_result: Optional[asyncio.Future[None]]
disconnection_result: Optional[asyncio.Future[None]]
response: Optional[asyncio.Future[bytes]]
sink: Optional[Callable[[bytes], Any]]
state: int
connection: Connection
def __init__(
self,
manager: 'ChannelManager',
connection: Connection,
signaling_cid: int,
psm: int,
source_cid: int,
mtu: int,
) -> None:
super().__init__()
self.manager = manager
self.connection = connection
@@ -716,19 +749,19 @@ class Channel(EventEmitter):
self.disconnection_result = None
self.sink = None
def change_state(self, new_state):
def change_state(self, new_state: int) -> None:
logger.debug(
f'{self} state change -> {color(Channel.STATE_NAMES[new_state], "cyan")}'
)
self.state = new_state
def send_pdu(self, pdu):
def send_pdu(self, pdu) -> None:
self.manager.send_pdu(self.connection, self.destination_cid, pdu)
def send_control_frame(self, frame):
def send_control_frame(self, frame) -> None:
self.manager.send_control_frame(self.connection, self.signaling_cid, frame)
async def send_request(self, request):
async def send_request(self, request) -> bytes:
# Check that there isn't already a request pending
if self.response:
raise InvalidStateError('request already pending')
@@ -739,7 +772,7 @@ class Channel(EventEmitter):
self.send_pdu(request)
return await self.response
def on_pdu(self, pdu):
def on_pdu(self, pdu) -> None:
if self.response:
self.response.set_result(pdu)
self.response = None
@@ -751,7 +784,7 @@ class Channel(EventEmitter):
color('received pdu without a pending request or sink', 'red')
)
async def connect(self):
async def connect(self) -> None:
if self.state != Channel.CLOSED:
raise InvalidStateError('invalid state')
@@ -778,7 +811,7 @@ class Channel(EventEmitter):
finally:
self.connection_result = None
async def disconnect(self):
async def disconnect(self) -> None:
if self.state != Channel.OPEN:
raise InvalidStateError('invalid state')
@@ -796,12 +829,12 @@ class Channel(EventEmitter):
self.disconnection_result = asyncio.get_running_loop().create_future()
return await self.disconnection_result
def abort(self):
def abort(self) -> None:
if self.state == self.OPEN:
self.change_state(self.CLOSED)
self.emit('close')
def send_configure_request(self):
def send_configure_request(self) -> None:
options = L2CAP_Control_Frame.encode_configuration_options(
[
(
@@ -819,7 +852,7 @@ class Channel(EventEmitter):
)
)
def on_connection_request(self, request):
def on_connection_request(self, request) -> None:
self.destination_cid = request.source_cid
self.change_state(Channel.WAIT_CONNECT)
self.send_control_frame(
@@ -858,7 +891,7 @@ class Channel(EventEmitter):
)
self.connection_result = None
def on_configure_request(self, request):
def on_configure_request(self, request) -> None:
if self.state not in (
Channel.WAIT_CONFIG,
Channel.WAIT_CONFIG_REQ,
@@ -896,7 +929,7 @@ class Channel(EventEmitter):
elif self.state == Channel.WAIT_CONFIG_REQ_RSP:
self.change_state(Channel.WAIT_CONFIG_RSP)
def on_configure_response(self, response):
def on_configure_response(self, response) -> None:
if response.result == L2CAP_Configure_Response.SUCCESS:
if self.state == Channel.WAIT_CONFIG_REQ_RSP:
self.change_state(Channel.WAIT_CONFIG_REQ)
@@ -930,7 +963,7 @@ class Channel(EventEmitter):
)
# TODO: decide how to fail gracefully
def on_disconnection_request(self, request):
def on_disconnection_request(self, request) -> None:
if self.state in (Channel.OPEN, Channel.WAIT_DISCONNECT):
self.send_control_frame(
L2CAP_Disconnection_Response(
@@ -945,7 +978,7 @@ class Channel(EventEmitter):
else:
logger.warning(color('invalid state', 'red'))
def on_disconnection_response(self, response):
def on_disconnection_response(self, response) -> None:
if self.state != Channel.WAIT_DISCONNECT:
logger.warning(color('invalid state', 'red'))
return
@@ -964,7 +997,7 @@ class Channel(EventEmitter):
self.emit('close')
self.manager.on_channel_closed(self)
def __str__(self):
def __str__(self) -> str:
return (
f'Channel({self.source_cid}->{self.destination_cid}, '
f'PSM={self.psm}, '
@@ -995,25 +1028,32 @@ class LeConnectionOrientedChannel(EventEmitter):
CONNECTION_ERROR: 'CONNECTION_ERROR',
}
out_queue: Deque[bytes]
connection_result: Optional[asyncio.Future[LeConnectionOrientedChannel]]
disconnection_result: Optional[asyncio.Future[None]]
out_sdu: Optional[bytes]
state: int
connection: Connection
@staticmethod
def state_name(state):
def state_name(state: int) -> str:
return name_or_number(LeConnectionOrientedChannel.STATE_NAMES, state)
def __init__(
self,
manager,
connection,
le_psm,
source_cid,
destination_cid,
mtu,
mps,
credits, # pylint: disable=redefined-builtin
peer_mtu,
peer_mps,
peer_credits,
connected,
):
manager: 'ChannelManager',
connection: Connection,
le_psm: int,
source_cid: int,
destination_cid: int,
mtu: int,
mps: int,
credits: int, # pylint: disable=redefined-builtin
peer_mtu: int,
peer_mps: int,
peer_credits: int,
connected: bool,
) -> None:
super().__init__()
self.manager = manager
self.connection = connection
@@ -1045,7 +1085,7 @@ class LeConnectionOrientedChannel(EventEmitter):
else:
self.state = LeConnectionOrientedChannel.INIT
def change_state(self, new_state):
def change_state(self, new_state: int) -> None:
logger.debug(
f'{self} state change -> {color(self.state_name(new_state), "cyan")}'
)
@@ -1056,13 +1096,13 @@ class LeConnectionOrientedChannel(EventEmitter):
elif new_state == self.DISCONNECTED:
self.emit('close')
def send_pdu(self, pdu):
def send_pdu(self, pdu) -> None:
self.manager.send_pdu(self.connection, self.destination_cid, pdu)
def send_control_frame(self, frame):
def send_control_frame(self, frame) -> None:
self.manager.send_control_frame(self.connection, L2CAP_LE_SIGNALING_CID, frame)
async def connect(self):
async def connect(self) -> LeConnectionOrientedChannel:
# Check that we're in the right state
if self.state != self.INIT:
raise InvalidStateError('not in a connectable state')
@@ -1090,7 +1130,7 @@ class LeConnectionOrientedChannel(EventEmitter):
# Wait for the connection to succeed or fail
return await self.connection_result
async def disconnect(self):
async def disconnect(self) -> None:
# Check that we're connected
if self.state != self.CONNECTED:
raise InvalidStateError('not connected')
@@ -1110,11 +1150,11 @@ class LeConnectionOrientedChannel(EventEmitter):
self.disconnection_result = asyncio.get_running_loop().create_future()
return await self.disconnection_result
def abort(self):
def abort(self) -> None:
if self.state == self.CONNECTED:
self.change_state(self.DISCONNECTED)
def on_pdu(self, pdu):
def on_pdu(self, pdu) -> None:
if self.sink is None:
logger.warning('received pdu without a sink')
return
@@ -1180,7 +1220,7 @@ class LeConnectionOrientedChannel(EventEmitter):
self.in_sdu = None
self.in_sdu_length = 0
def on_connection_response(self, response):
def on_connection_response(self, response) -> None:
# Look for a matching pending response result
if self.connection_result is None:
logger.warning(
@@ -1214,14 +1254,14 @@ class LeConnectionOrientedChannel(EventEmitter):
# Cleanup
self.connection_result = None
def on_credits(self, credits): # pylint: disable=redefined-builtin
def on_credits(self, credits: int) -> None: # pylint: disable=redefined-builtin
self.credits += credits
logger.debug(f'received {credits} credits, total = {self.credits}')
# Try to send more data if we have any queued up
self.process_output()
def on_disconnection_request(self, request):
def on_disconnection_request(self, request) -> None:
self.send_control_frame(
L2CAP_Disconnection_Response(
identifier=request.identifier,
@@ -1232,7 +1272,7 @@ class LeConnectionOrientedChannel(EventEmitter):
self.change_state(self.DISCONNECTED)
self.flush_output()
def on_disconnection_response(self, response):
def on_disconnection_response(self, response) -> None:
if self.state != self.DISCONNECTING:
logger.warning(color('invalid state', 'red'))
return
@@ -1249,11 +1289,11 @@ class LeConnectionOrientedChannel(EventEmitter):
self.disconnection_result.set_result(None)
self.disconnection_result = None
def flush_output(self):
def flush_output(self) -> None:
self.out_queue.clear()
self.out_sdu = None
def process_output(self):
def process_output(self) -> None:
while self.credits > 0:
if self.out_sdu is not None:
# Finish the current SDU
@@ -1296,7 +1336,7 @@ class LeConnectionOrientedChannel(EventEmitter):
self.drained.set()
return
def write(self, data):
def write(self, data: bytes) -> None:
if self.state != self.CONNECTED:
logger.warning('not connected, dropping data')
return
@@ -1311,18 +1351,18 @@ class LeConnectionOrientedChannel(EventEmitter):
# Send what we can
self.process_output()
async def drain(self):
async def drain(self) -> None:
await self.drained.wait()
def pause_reading(self):
def pause_reading(self) -> None:
# TODO: not implemented yet
pass
def resume_reading(self):
def resume_reading(self) -> None:
# TODO: not implemented yet
pass
def __str__(self):
def __str__(self) -> str:
return (
f'CoC({self.source_cid}->{self.destination_cid}, '
f'State={self.state_name(self.state)}, '
@@ -1335,9 +1375,21 @@ class LeConnectionOrientedChannel(EventEmitter):
# -----------------------------------------------------------------------------
class ChannelManager:
identifiers: Dict[int, int]
channels: Dict[int, Dict[int, Union[Channel, LeConnectionOrientedChannel]]]
servers: Dict[int, Callable[[Channel], Any]]
le_coc_channels: Dict[int, Dict[int, LeConnectionOrientedChannel]]
le_coc_servers: Dict[
int, Tuple[Callable[[LeConnectionOrientedChannel], Any], int, int, int]
]
le_coc_requests: Dict[int, L2CAP_LE_Credit_Based_Connection_Request]
fixed_channels: Dict[int, Optional[Callable[[int, bytes], Any]]]
def __init__(
self, extended_features=(), connectionless_mtu=L2CAP_DEFAULT_CONNECTIONLESS_MTU
):
self,
extended_features: Iterable[int] = (),
connectionless_mtu: int = L2CAP_DEFAULT_CONNECTIONLESS_MTU,
) -> None:
self._host = None
self.identifiers = {} # Incrementing identifier values by connection
self.channels = {} # All channels, mapped by connection and source cid
@@ -1366,20 +1418,20 @@ class ChannelManager:
if host is not None:
host.on('disconnection', self.on_disconnection)
def find_channel(self, connection_handle, cid):
def find_channel(self, connection_handle: int, cid: int):
if connection_channels := self.channels.get(connection_handle):
return connection_channels.get(cid)
return None
def find_le_coc_channel(self, connection_handle, cid):
def find_le_coc_channel(self, connection_handle: int, cid: int):
if connection_channels := self.le_coc_channels.get(connection_handle):
return connection_channels.get(cid)
return None
@staticmethod
def find_free_br_edr_cid(channels):
def find_free_br_edr_cid(channels: Iterable[int]) -> int:
# Pick the smallest valid CID that's not already in the list
# (not necessarily the most efficient algorithm, but the list of CID is
# very small in practice)
@@ -1392,7 +1444,7 @@ class ChannelManager:
raise RuntimeError('no free CID available')
@staticmethod
def find_free_le_cid(channels):
def find_free_le_cid(channels: Iterable[int]) -> int:
# Pick the smallest valid CID that's not already in the list
# (not necessarily the most efficient algorithm, but the list of CID is
# very small in practice)
@@ -1405,7 +1457,7 @@ class ChannelManager:
raise RuntimeError('no free CID')
@staticmethod
def check_le_coc_parameters(max_credits, mtu, mps):
def check_le_coc_parameters(max_credits: int, mtu: int, mps: int) -> None:
if (
max_credits < 1
or max_credits > L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_CREDITS
@@ -1419,19 +1471,21 @@ class ChannelManager:
):
raise ValueError('MPS out of range')
def next_identifier(self, connection):
def next_identifier(self, connection: Connection) -> int:
identifier = (self.identifiers.setdefault(connection.handle, 0) + 1) % 256
self.identifiers[connection.handle] = identifier
return identifier
def register_fixed_channel(self, cid, handler):
def register_fixed_channel(
self, cid: int, handler: Callable[[int, bytes], Any]
) -> None:
self.fixed_channels[cid] = handler
def deregister_fixed_channel(self, cid):
def deregister_fixed_channel(self, cid: int) -> None:
if cid in self.fixed_channels:
del self.fixed_channels[cid]
def register_server(self, psm, server):
def register_server(self, psm: int, server: Callable[[Channel], Any]) -> int:
if psm == 0:
# Find a free PSM
for candidate in range(
@@ -1465,12 +1519,12 @@ class ChannelManager:
def register_le_coc_server(
self,
psm,
server,
max_credits=L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_INITIAL_CREDITS,
mtu=L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MTU,
mps=L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MPS,
):
psm: int,
server: Callable[[LeConnectionOrientedChannel], Any],
max_credits: int = L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_INITIAL_CREDITS,
mtu: int = L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MTU,
mps: int = L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MPS,
) -> int:
self.check_le_coc_parameters(max_credits, mtu, mps)
if psm == 0:
@@ -1498,7 +1552,7 @@ class ChannelManager:
return psm
def on_disconnection(self, connection_handle, _reason):
def on_disconnection(self, connection_handle: int, _reason: int) -> None:
logger.debug(f'disconnection from {connection_handle}, cleaning up channels')
if connection_handle in self.channels:
for _, channel in self.channels[connection_handle].items():
@@ -1511,7 +1565,7 @@ class ChannelManager:
if connection_handle in self.identifiers:
del self.identifiers[connection_handle]
def send_pdu(self, connection, cid, pdu):
def send_pdu(self, connection, cid: int, pdu) -> None:
pdu_str = pdu.hex() if isinstance(pdu, bytes) else str(pdu)
logger.debug(
f'{color(">>> Sending L2CAP PDU", "blue")} '
@@ -1520,14 +1574,16 @@ class ChannelManager:
)
self.host.send_l2cap_pdu(connection.handle, cid, bytes(pdu))
def on_pdu(self, connection, cid, pdu):
def on_pdu(self, connection: Connection, cid: int, pdu) -> None:
if cid in (L2CAP_SIGNALING_CID, L2CAP_LE_SIGNALING_CID):
# Parse the L2CAP payload into a Control Frame object
control_frame = L2CAP_Control_Frame.from_bytes(pdu)
self.on_control_frame(connection, cid, control_frame)
elif cid in self.fixed_channels:
self.fixed_channels[cid](connection.handle, pdu)
handler = self.fixed_channels[cid]
assert handler is not None
handler(connection.handle, pdu)
else:
if (channel := self.find_channel(connection.handle, cid)) is None:
logger.warning(
@@ -1539,7 +1595,9 @@ class ChannelManager:
channel.on_pdu(pdu)
def send_control_frame(self, connection, cid, control_frame):
def send_control_frame(
self, connection: Connection, cid: int, control_frame
) -> None:
logger.debug(
f'{color(">>> Sending L2CAP Signaling Control Frame", "blue")} '
f'on connection [0x{connection.handle:04X}] (CID={cid}) '
@@ -1547,7 +1605,7 @@ class ChannelManager:
)
self.host.send_l2cap_pdu(connection.handle, cid, bytes(control_frame))
def on_control_frame(self, connection, cid, control_frame):
def on_control_frame(self, connection: Connection, cid: int, control_frame) -> None:
logger.debug(
f'{color("<<< Received L2CAP Signaling Control Frame", "green")} '
f'on connection [0x{connection.handle:04X}] (CID={cid}) '
@@ -1584,10 +1642,14 @@ class ChannelManager:
),
)
def on_l2cap_command_reject(self, _connection, _cid, packet):
def on_l2cap_command_reject(
self, _connection: Connection, _cid: int, packet
) -> None:
logger.warning(f'{color("!!! Command rejected:", "red")} {packet.reason}')
def on_l2cap_connection_request(self, connection, cid, request):
def on_l2cap_connection_request(
self, connection: Connection, cid: int, request
) -> None:
# Check if there's a server for this PSM
server = self.servers.get(request.psm)
if server:
@@ -1639,7 +1701,9 @@ class ChannelManager:
),
)
def on_l2cap_connection_response(self, connection, cid, response):
def on_l2cap_connection_response(
self, connection: Connection, cid: int, response
) -> None:
if (
channel := self.find_channel(connection.handle, response.source_cid)
) is None:
@@ -1654,7 +1718,9 @@ class ChannelManager:
channel.on_connection_response(response)
def on_l2cap_configure_request(self, connection, cid, request):
def on_l2cap_configure_request(
self, connection: Connection, cid: int, request
) -> None:
if (
channel := self.find_channel(connection.handle, request.destination_cid)
) is None:
@@ -1669,7 +1735,9 @@ class ChannelManager:
channel.on_configure_request(request)
def on_l2cap_configure_response(self, connection, cid, response):
def on_l2cap_configure_response(
self, connection: Connection, cid: int, response
) -> None:
if (
channel := self.find_channel(connection.handle, response.source_cid)
) is None:
@@ -1684,7 +1752,9 @@ class ChannelManager:
channel.on_configure_response(response)
def on_l2cap_disconnection_request(self, connection, cid, request):
def on_l2cap_disconnection_request(
self, connection: Connection, cid: int, request
) -> None:
if (
channel := self.find_channel(connection.handle, request.destination_cid)
) is None:
@@ -1699,7 +1769,9 @@ class ChannelManager:
channel.on_disconnection_request(request)
def on_l2cap_disconnection_response(self, connection, cid, response):
def on_l2cap_disconnection_response(
self, connection: Connection, cid: int, response
) -> None:
if (
channel := self.find_channel(connection.handle, response.source_cid)
) is None:
@@ -1714,7 +1786,7 @@ class ChannelManager:
channel.on_disconnection_response(response)
def on_l2cap_echo_request(self, connection, cid, request):
def on_l2cap_echo_request(self, connection: Connection, cid: int, request) -> None:
logger.debug(f'<<< Echo request: data={request.data.hex()}')
self.send_control_frame(
connection,
@@ -1722,11 +1794,15 @@ class ChannelManager:
L2CAP_Echo_Response(identifier=request.identifier, data=request.data),
)
def on_l2cap_echo_response(self, _connection, _cid, response):
def on_l2cap_echo_response(
self, _connection: Connection, _cid: int, response
) -> None:
logger.debug(f'<<< Echo response: data={response.data.hex()}')
# TODO notify listeners
def on_l2cap_information_request(self, connection, cid, request):
def on_l2cap_information_request(
self, connection: Connection, cid: int, request
) -> None:
if request.info_type == L2CAP_Information_Request.CONNECTIONLESS_MTU:
result = L2CAP_Information_Response.SUCCESS
data = self.connectionless_mtu.to_bytes(2, 'little')
@@ -1750,7 +1826,9 @@ class ChannelManager:
),
)
def on_l2cap_connection_parameter_update_request(self, connection, cid, request):
def on_l2cap_connection_parameter_update_request(
self, connection: Connection, cid: int, request
):
if connection.role == BT_CENTRAL_ROLE:
self.send_control_frame(
connection,
@@ -1769,7 +1847,7 @@ class ChannelManager:
supervision_timeout=request.timeout,
min_ce_length=0,
max_ce_length=0,
)
) # type: ignore[call-arg]
)
else:
self.send_control_frame(
@@ -1781,11 +1859,15 @@ class ChannelManager:
),
)
def on_l2cap_connection_parameter_update_response(self, connection, cid, response):
def on_l2cap_connection_parameter_update_response(
self, connection: Connection, cid: int, response
) -> None:
# TODO: check response
pass
def on_l2cap_le_credit_based_connection_request(self, connection, cid, request):
def on_l2cap_le_credit_based_connection_request(
self, connection: Connection, cid: int, request
) -> None:
if request.le_psm in self.le_coc_servers:
(server, max_credits, mtu, mps) = self.le_coc_servers[request.le_psm]
@@ -1887,7 +1969,9 @@ class ChannelManager:
),
)
def on_l2cap_le_credit_based_connection_response(self, connection, _cid, response):
def on_l2cap_le_credit_based_connection_response(
self, connection: Connection, _cid: int, response
) -> None:
# Find the pending request by identifier
request = self.le_coc_requests.get(response.identifier)
if request is None:
@@ -1910,7 +1994,9 @@ class ChannelManager:
# Process the response
channel.on_connection_response(response)
def on_l2cap_le_flow_control_credit(self, connection, _cid, credit):
def on_l2cap_le_flow_control_credit(
self, connection: Connection, _cid: int, credit
) -> None:
channel = self.find_le_coc_channel(connection.handle, credit.cid)
if channel is None:
logger.warning(f'received credits for an unknown channel (cid={credit.cid}')
@@ -1918,13 +2004,15 @@ class ChannelManager:
channel.on_credits(credit.credits)
def on_channel_closed(self, channel):
def on_channel_closed(self, channel: Channel) -> None:
connection_channels = self.channels.get(channel.connection.handle)
if connection_channels:
if channel.source_cid in connection_channels:
del connection_channels[channel.source_cid]
async def open_le_coc(self, connection, psm, max_credits, mtu, mps):
async def open_le_coc(
self, connection: Connection, psm: int, max_credits: int, mtu: int, mps: int
) -> LeConnectionOrientedChannel:
self.check_le_coc_parameters(max_credits, mtu, mps)
# Find a free CID for the new channel
@@ -1965,7 +2053,7 @@ class ChannelManager:
return channel
async def connect(self, connection, psm):
async def connect(self, connection: Connection, psm: int) -> Channel:
# NOTE: this implementation hard-codes BR/EDR
# Find a free CID for a new channel

View File

@@ -25,6 +25,7 @@
from __future__ import annotations
import logging
import asyncio
import enum
import secrets
from typing import (
TYPE_CHECKING,
@@ -553,20 +554,16 @@ class AddressResolver:
# -----------------------------------------------------------------------------
class Session:
# Pairing methods
class PairingMethod(enum.IntEnum):
JUST_WORKS = 0
NUMERIC_COMPARISON = 1
PASSKEY = 2
OOB = 3
CTKD_OVER_CLASSIC = 4
PAIRING_METHOD_NAMES = {
JUST_WORKS: 'JUST_WORKS',
NUMERIC_COMPARISON: 'NUMERIC_COMPARISON',
PASSKEY: 'PASSKEY',
OOB: 'OOB',
}
# -----------------------------------------------------------------------------
class Session:
# I/O Capability to pairing method decision matrix
#
# See Bluetooth spec @ Vol 3, part H - Table 2.8: Mapping of IO Capabilities to Key
@@ -581,47 +578,50 @@ class Session:
# (False).
PAIRING_METHODS = {
SMP_DISPLAY_ONLY_IO_CAPABILITY: {
SMP_DISPLAY_ONLY_IO_CAPABILITY: JUST_WORKS,
SMP_DISPLAY_YES_NO_IO_CAPABILITY: JUST_WORKS,
SMP_KEYBOARD_ONLY_IO_CAPABILITY: (PASSKEY, True, False),
SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: JUST_WORKS,
SMP_KEYBOARD_DISPLAY_IO_CAPABILITY: (PASSKEY, True, False),
SMP_DISPLAY_ONLY_IO_CAPABILITY: PairingMethod.JUST_WORKS,
SMP_DISPLAY_YES_NO_IO_CAPABILITY: PairingMethod.JUST_WORKS,
SMP_KEYBOARD_ONLY_IO_CAPABILITY: (PairingMethod.PASSKEY, True, False),
SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: PairingMethod.JUST_WORKS,
SMP_KEYBOARD_DISPLAY_IO_CAPABILITY: (PairingMethod.PASSKEY, True, False),
},
SMP_DISPLAY_YES_NO_IO_CAPABILITY: {
SMP_DISPLAY_ONLY_IO_CAPABILITY: JUST_WORKS,
SMP_DISPLAY_YES_NO_IO_CAPABILITY: (JUST_WORKS, NUMERIC_COMPARISON),
SMP_KEYBOARD_ONLY_IO_CAPABILITY: (PASSKEY, True, False),
SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: JUST_WORKS,
SMP_DISPLAY_ONLY_IO_CAPABILITY: PairingMethod.JUST_WORKS,
SMP_DISPLAY_YES_NO_IO_CAPABILITY: (
PairingMethod.JUST_WORKS,
PairingMethod.NUMERIC_COMPARISON,
),
SMP_KEYBOARD_ONLY_IO_CAPABILITY: (PairingMethod.PASSKEY, True, False),
SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: PairingMethod.JUST_WORKS,
SMP_KEYBOARD_DISPLAY_IO_CAPABILITY: (
(PASSKEY, True, False),
NUMERIC_COMPARISON,
(PairingMethod.PASSKEY, True, False),
PairingMethod.NUMERIC_COMPARISON,
),
},
SMP_KEYBOARD_ONLY_IO_CAPABILITY: {
SMP_DISPLAY_ONLY_IO_CAPABILITY: (PASSKEY, False, True),
SMP_DISPLAY_YES_NO_IO_CAPABILITY: (PASSKEY, False, True),
SMP_KEYBOARD_ONLY_IO_CAPABILITY: (PASSKEY, False, False),
SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: JUST_WORKS,
SMP_KEYBOARD_DISPLAY_IO_CAPABILITY: (PASSKEY, False, True),
SMP_DISPLAY_ONLY_IO_CAPABILITY: (PairingMethod.PASSKEY, False, True),
SMP_DISPLAY_YES_NO_IO_CAPABILITY: (PairingMethod.PASSKEY, False, True),
SMP_KEYBOARD_ONLY_IO_CAPABILITY: (PairingMethod.PASSKEY, False, False),
SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: PairingMethod.JUST_WORKS,
SMP_KEYBOARD_DISPLAY_IO_CAPABILITY: (PairingMethod.PASSKEY, False, True),
},
SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: {
SMP_DISPLAY_ONLY_IO_CAPABILITY: JUST_WORKS,
SMP_DISPLAY_YES_NO_IO_CAPABILITY: JUST_WORKS,
SMP_KEYBOARD_ONLY_IO_CAPABILITY: JUST_WORKS,
SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: JUST_WORKS,
SMP_KEYBOARD_DISPLAY_IO_CAPABILITY: JUST_WORKS,
SMP_DISPLAY_ONLY_IO_CAPABILITY: PairingMethod.JUST_WORKS,
SMP_DISPLAY_YES_NO_IO_CAPABILITY: PairingMethod.JUST_WORKS,
SMP_KEYBOARD_ONLY_IO_CAPABILITY: PairingMethod.JUST_WORKS,
SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: PairingMethod.JUST_WORKS,
SMP_KEYBOARD_DISPLAY_IO_CAPABILITY: PairingMethod.JUST_WORKS,
},
SMP_KEYBOARD_DISPLAY_IO_CAPABILITY: {
SMP_DISPLAY_ONLY_IO_CAPABILITY: (PASSKEY, False, True),
SMP_DISPLAY_ONLY_IO_CAPABILITY: (PairingMethod.PASSKEY, False, True),
SMP_DISPLAY_YES_NO_IO_CAPABILITY: (
(PASSKEY, False, True),
NUMERIC_COMPARISON,
(PairingMethod.PASSKEY, False, True),
PairingMethod.NUMERIC_COMPARISON,
),
SMP_KEYBOARD_ONLY_IO_CAPABILITY: (PASSKEY, True, False),
SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: JUST_WORKS,
SMP_KEYBOARD_ONLY_IO_CAPABILITY: (PairingMethod.PASSKEY, True, False),
SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: PairingMethod.JUST_WORKS,
SMP_KEYBOARD_DISPLAY_IO_CAPABILITY: (
(PASSKEY, True, False),
NUMERIC_COMPARISON,
(PairingMethod.PASSKEY, True, False),
PairingMethod.NUMERIC_COMPARISON,
),
},
}
@@ -664,7 +664,7 @@ class Session:
self.passkey_ready = asyncio.Event()
self.passkey_step = 0
self.passkey_display = False
self.pairing_method = 0
self.pairing_method: PairingMethod = PairingMethod.JUST_WORKS
self.pairing_config = pairing_config
self.wait_before_continuing: Optional[asyncio.Future[None]] = None
self.completed = False
@@ -769,19 +769,23 @@ class Session:
def decide_pairing_method(
self, auth_req: int, initiator_io_capability: int, responder_io_capability: int
) -> None:
if self.connection.transport == BT_BR_EDR_TRANSPORT:
self.pairing_method = PairingMethod.CTKD_OVER_CLASSIC
return
if (not self.mitm) and (auth_req & SMP_MITM_AUTHREQ == 0):
self.pairing_method = self.JUST_WORKS
self.pairing_method = PairingMethod.JUST_WORKS
return
details = self.PAIRING_METHODS[initiator_io_capability][responder_io_capability] # type: ignore[index]
if isinstance(details, tuple) and len(details) == 2:
# One entry for legacy pairing and one for secure connections
details = details[1 if self.sc else 0]
if isinstance(details, int):
if isinstance(details, PairingMethod):
# Just a method ID
self.pairing_method = details
else:
# PASSKEY method, with a method ID and display/input flags
assert isinstance(details[0], PairingMethod)
self.pairing_method = details[0]
self.passkey_display = details[1 if self.is_initiator else 2]
@@ -858,10 +862,13 @@ class Session:
self.tk = self.passkey.to_bytes(16, byteorder='little')
logger.debug(f'TK from passkey = {self.tk.hex()}')
self.connection.abort_on(
'disconnection',
self.pairing_config.delegate.display_number(self.passkey, digits=6),
)
try:
self.connection.abort_on(
'disconnection',
self.pairing_config.delegate.display_number(self.passkey, digits=6),
)
except Exception as error:
logger.warning(f'exception while displaying number: {error}')
def input_passkey(self, next_steps: Optional[Callable[[], None]] = None) -> None:
# Prompt the user for the passkey displayed on the peer
@@ -929,9 +936,12 @@ class Session:
if self.sc:
async def next_steps() -> None:
if self.pairing_method in (self.JUST_WORKS, self.NUMERIC_COMPARISON):
if self.pairing_method in (
PairingMethod.JUST_WORKS,
PairingMethod.NUMERIC_COMPARISON,
):
z = 0
elif self.pairing_method == self.PASSKEY:
elif self.pairing_method == PairingMethod.PASSKEY:
# We need a passkey
await self.passkey_ready.wait()
assert self.passkey
@@ -1224,7 +1234,7 @@ class Session:
# Create an object to hold the keys
keys = PairingKeys()
keys.address_type = peer_address.address_type
authenticated = self.pairing_method != self.JUST_WORKS
authenticated = self.pairing_method != PairingMethod.JUST_WORKS
if self.sc or self.connection.transport == BT_BR_EDR_TRANSPORT:
keys.ltk = PairingKeys.Key(value=self.ltk, authenticated=authenticated)
else:
@@ -1300,7 +1310,11 @@ class Session:
self, command: SMP_Pairing_Request_Command
) -> None:
# Check if the request should proceed
accepted = await self.pairing_config.delegate.accept()
try:
accepted = await self.pairing_config.delegate.accept()
except Exception as error:
logger.warning(f'exception while accepting: {error}')
accepted = False
if not accepted:
logger.debug('pairing rejected by delegate')
self.send_pairing_failed(SMP_PAIRING_NOT_SUPPORTED_ERROR)
@@ -1323,9 +1337,7 @@ class Session:
self.decide_pairing_method(
command.auth_req, command.io_capability, self.io_capability
)
logger.debug(
f'pairing method: {self.PAIRING_METHOD_NAMES[self.pairing_method]}'
)
logger.debug(f'pairing method: {self.pairing_method.name}')
# Key distribution
(
@@ -1341,7 +1353,7 @@ class Session:
# Display a passkey if we need to
if not self.sc:
if self.pairing_method == self.PASSKEY and self.passkey_display:
if self.pairing_method == PairingMethod.PASSKEY and self.passkey_display:
self.display_passkey()
# Respond
@@ -1382,9 +1394,7 @@ class Session:
self.decide_pairing_method(
command.auth_req, self.io_capability, command.io_capability
)
logger.debug(
f'pairing method: {self.PAIRING_METHOD_NAMES[self.pairing_method]}'
)
logger.debug(f'pairing method: {self.pairing_method.name}')
# Key distribution
if (
@@ -1400,13 +1410,16 @@ class Session:
self.compute_peer_expected_distributions(self.responder_key_distribution)
# Start phase 2
if self.sc:
if self.pairing_method == self.PASSKEY:
if self.pairing_method == PairingMethod.CTKD_OVER_CLASSIC:
# Authentication is already done in SMP, so remote shall start keys distribution immediately
return
elif self.sc:
if self.pairing_method == PairingMethod.PASSKEY:
self.display_or_input_passkey()
self.send_public_key_command()
else:
if self.pairing_method == self.PASSKEY:
if self.pairing_method == PairingMethod.PASSKEY:
self.display_or_input_passkey(self.send_pairing_confirm_command)
else:
self.send_pairing_confirm_command()
@@ -1418,7 +1431,10 @@ class Session:
self.send_pairing_random_command()
else:
# If the method is PASSKEY, now is the time to input the code
if self.pairing_method == self.PASSKEY and not self.passkey_display:
if (
self.pairing_method == PairingMethod.PASSKEY
and not self.passkey_display
):
self.input_passkey(self.send_pairing_confirm_command)
else:
self.send_pairing_confirm_command()
@@ -1426,11 +1442,14 @@ class Session:
def on_smp_pairing_confirm_command_secure_connections(
self, _: SMP_Pairing_Confirm_Command
) -> None:
if self.pairing_method in (self.JUST_WORKS, self.NUMERIC_COMPARISON):
if self.pairing_method in (
PairingMethod.JUST_WORKS,
PairingMethod.NUMERIC_COMPARISON,
):
if self.is_initiator:
self.r = crypto.r()
self.send_pairing_random_command()
elif self.pairing_method == self.PASSKEY:
elif self.pairing_method == PairingMethod.PASSKEY:
if self.is_initiator:
self.send_pairing_random_command()
else:
@@ -1486,13 +1505,16 @@ class Session:
def on_smp_pairing_random_command_secure_connections(
self, command: SMP_Pairing_Random_Command
) -> None:
if self.pairing_method == self.PASSKEY and self.passkey is None:
if self.pairing_method == PairingMethod.PASSKEY and self.passkey is None:
logger.warning('no passkey entered, ignoring command')
return
# pylint: disable=too-many-return-statements
if self.is_initiator:
if self.pairing_method in (self.JUST_WORKS, self.NUMERIC_COMPARISON):
if self.pairing_method in (
PairingMethod.JUST_WORKS,
PairingMethod.NUMERIC_COMPARISON,
):
assert self.confirm_value
# Check that the random value matches what was committed to earlier
confirm_verifier = crypto.f4(
@@ -1502,7 +1524,7 @@ class Session:
self.confirm_value, confirm_verifier, SMP_CONFIRM_VALUE_FAILED_ERROR
):
return
elif self.pairing_method == self.PASSKEY:
elif self.pairing_method == PairingMethod.PASSKEY:
assert self.passkey and self.confirm_value
# Check that the random value matches what was committed to earlier
confirm_verifier = crypto.f4(
@@ -1525,9 +1547,12 @@ class Session:
else:
return
else:
if self.pairing_method in (self.JUST_WORKS, self.NUMERIC_COMPARISON):
if self.pairing_method in (
PairingMethod.JUST_WORKS,
PairingMethod.NUMERIC_COMPARISON,
):
self.send_pairing_random_command()
elif self.pairing_method == self.PASSKEY:
elif self.pairing_method == PairingMethod.PASSKEY:
assert self.passkey and self.confirm_value
# Check that the random value matches what was committed to earlier
confirm_verifier = crypto.f4(
@@ -1558,10 +1583,13 @@ class Session:
(mac_key, self.ltk) = crypto.f5(self.dh_key, self.na, self.nb, a, b)
# Compute the DH Key checks
if self.pairing_method in (self.JUST_WORKS, self.NUMERIC_COMPARISON):
if self.pairing_method in (
PairingMethod.JUST_WORKS,
PairingMethod.NUMERIC_COMPARISON,
):
ra = bytes(16)
rb = ra
elif self.pairing_method == self.PASSKEY:
elif self.pairing_method == PairingMethod.PASSKEY:
assert self.passkey
ra = self.passkey.to_bytes(16, byteorder='little')
rb = ra
@@ -1585,13 +1613,16 @@ class Session:
self.wait_before_continuing.set_result(None)
# Prompt the user for confirmation if needed
if self.pairing_method in (self.JUST_WORKS, self.NUMERIC_COMPARISON):
if self.pairing_method in (
PairingMethod.JUST_WORKS,
PairingMethod.NUMERIC_COMPARISON,
):
# Compute the 6-digit code
code = crypto.g2(self.pka, self.pkb, self.na, self.nb) % 1000000
# Ask for user confirmation
self.wait_before_continuing = asyncio.get_running_loop().create_future()
if self.pairing_method == self.JUST_WORKS:
if self.pairing_method == PairingMethod.JUST_WORKS:
self.prompt_user_for_confirmation(next_steps)
else:
self.prompt_user_for_numeric_comparison(code, next_steps)
@@ -1628,13 +1659,16 @@ class Session:
if self.is_initiator:
self.send_pairing_confirm_command()
else:
if self.pairing_method == self.PASSKEY:
if self.pairing_method == PairingMethod.PASSKEY:
self.display_or_input_passkey()
# Send our public key back to the initiator
self.send_public_key_command()
if self.pairing_method in (self.JUST_WORKS, self.NUMERIC_COMPARISON):
if self.pairing_method in (
PairingMethod.JUST_WORKS,
PairingMethod.NUMERIC_COMPARISON,
):
# We can now send the confirmation value
self.send_pairing_confirm_command()

View File

@@ -1,11 +1,11 @@
UDP TRANSPORT
=============
WEBSOCKET CLIENT TRANSPORT
==========================
The UDP transport is a UDP socket, receiving packets on a specified port number, and sending packets to a specified host and port number.
The WebSocket Client transport is WebSocket connection to a WebSocket server over which HCI packets
are sent and received.
## Moniker
The moniker syntax for a UDP transport is: `udp:<local-host>:<local-port>,<remote-host>:<remote-port>`.
The moniker syntax for a WebSocket Client transport is: `ws-client:<ws-url>`
!!! example
`udp:0.0.0.0:9000,127.0.0.1:9001`
UDP transport where packets are received on port `9000` and sent to `127.0.0.1` on port `9001`
`ws-client:ws://localhost:1234/some/path`

View File

@@ -1,11 +1,13 @@
UDP TRANSPORT
=============
WEBSOCKET SERVER TRANSPORT
==========================
The UDP transport is a UDP socket, receiving packets on a specified port number, and sending packets to a specified host and port number.
The WebSocket Server transport is WebSocket server that accepts connections from a WebSocket
client. HCI packets are sent and received over the connection.
## Moniker
The moniker syntax for a UDP transport is: `udp:<local-host>:<local-port>,<remote-host>:<remote-port>`.
The moniker syntax for a WebSocket Server transport is: `ws-server:<host>:<port>`,
where `<host>` may be the address of a local network interface, or `_`to accept connections on all local network interfaces. `<port>` is the TCP port number on which to accept connections.
!!! example
`udp:0.0.0.0:9000,127.0.0.1:9001`
UDP transport where packets are received on port `9000` and sent to `127.0.0.1` on port `9001`
`ws-server:_:9001`

View File

@@ -62,7 +62,7 @@ async def main():
print(
f'>>> {color(advertisement.address, address_color)} '
f'[{color(address_type_string, type_color)}]'
f'{address_qualifier}:{separator}RSSI:{advertisement.rssi}'
f'{address_qualifier}:{separator}RSSI: {advertisement.rssi}'
f'{separator}'
f'{advertisement.data.to_string(separator)}'
)

View File

@@ -21,6 +21,8 @@ import logging
import os
import pytest
from unittest.mock import MagicMock, patch
from bumble.controller import Controller
from bumble.core import BT_BR_EDR_TRANSPORT, BT_PERIPHERAL_ROLE, BT_CENTRAL_ROLE
from bumble.link import LocalLink
@@ -34,6 +36,8 @@ from bumble.smp import (
SMP_CONFIRM_VALUE_FAILED_ERROR,
)
from bumble.core import ProtocolError
from bumble.hci import HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE
from bumble.keys import PairingKeys
# -----------------------------------------------------------------------------
@@ -473,6 +477,85 @@ async def test_self_smp_wrong_pin():
assert not paired
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_self_smp_over_classic():
# Create two devices, each with a controller, attached to the same link
two_devices = TwoDevices()
# Attach listeners
two_devices.devices[0].on(
'connection', lambda connection: two_devices.on_connection(0, connection)
)
two_devices.devices[1].on(
'connection', lambda connection: two_devices.on_connection(1, connection)
)
# Enable Classic connections
two_devices.devices[0].classic_enabled = True
two_devices.devices[1].classic_enabled = True
# Start
await two_devices.devices[0].power_on()
await two_devices.devices[1].power_on()
# Connect the two devices
await asyncio.gather(
two_devices.devices[0].connect(
two_devices.devices[1].public_address, transport=BT_BR_EDR_TRANSPORT
),
two_devices.devices[1].accept(two_devices.devices[0].public_address),
)
# Check the post conditions
assert two_devices.connections[0] is not None
assert two_devices.connections[1] is not None
# Mock connection
# TODO: Implement Classic SSP and encryption in link relayer
LINK_KEY = bytes.fromhex('287ad379dca402530a39f1f43047b835')
two_devices.devices[0].on_link_key(
two_devices.devices[1].public_address,
LINK_KEY,
HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE,
)
two_devices.devices[1].on_link_key(
two_devices.devices[0].public_address,
LINK_KEY,
HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE,
)
two_devices.connections[0].encryption = 1
two_devices.connections[1].encryption = 1
paired = [
asyncio.get_event_loop().create_future(),
asyncio.get_event_loop().create_future(),
]
def on_pairing(which: int, keys: PairingKeys):
paired[which].set_result(keys)
two_devices.connections[0].on('pairing', lambda keys: on_pairing(0, keys))
two_devices.connections[1].on('pairing', lambda keys: on_pairing(1, keys))
# Mock SMP
with patch('bumble.smp.Session', spec=True) as MockSmpSession:
MockSmpSession.send_pairing_confirm_command = MagicMock()
MockSmpSession.send_pairing_dhkey_check_command = MagicMock()
MockSmpSession.send_public_key_command = MagicMock()
MockSmpSession.send_pairing_random_command = MagicMock()
# Start CTKD
await two_devices.connections[0].pair()
await asyncio.gather(*paired)
# Phase 2 commands should not be invoked
MockSmpSession.send_pairing_confirm_command.assert_not_called()
MockSmpSession.send_pairing_dhkey_check_command.assert_not_called()
MockSmpSession.send_public_key_command.assert_not_called()
MockSmpSession.send_pairing_random_command.assert_not_called()
# -----------------------------------------------------------------------------
async def run_test_self():
await test_self_connection()
@@ -481,6 +564,7 @@ async def run_test_self():
await test_self_smp()
await test_self_smp_reject()
await test_self_smp_wrong_pin()
await test_self_smp_over_classic()
# -----------------------------------------------------------------------------