Compare commits

...

25 Commits

Author SHA1 Message Date
Gilles Boccon-Gibod b758825164 add flow control command 2023-07-22 13:04:39 -07:00
Gilles Boccon-Gibod 779dfe5473 accept Host Buffer Size Command in the controller 2023-07-21 19:36:26 -07:00
Gilles Boccon-Gibod f9a4c7518e Merge pull request #214 from marshallpierce/mp/scanner-rssi
Add a space after RSSI
2023-07-14 10:52:54 -07:00
Marshall Pierce bad2fdf69f Add a space after RSSI
The other data elements have a space, so I'm guessing that RSSI
is intended to as well. Perhaps there's some subtle reason why
it should have a space, though, in which case feel free to
close this.

Output now looks like this:

```
>>> 58:D3:49:E7:40:DA/P [PUBLIC]:
  RSSI: -67
  [Flags]: LE General,BR/EDR C,BR/EDR H
  [TX Power Level]: 4
  [Manufacturer Specific Data]: company=Apple, Inc., data=0f08c00af4392b00040c10020f04
```
2023-07-13 12:47:45 -06:00
Lucas Abel a84df469cd pairing: handle user errors from all delegate calls 2023-07-12 11:03:21 -07:00
Gilles Boccon-Gibod 03e33e39bd Merge pull request #211 from google/gbg/fix-ws-transport-doc
fix doc for ws-client ws-server transports
2023-07-12 07:06:32 -07:00
Gilles Boccon-Gibod 753fb69272 fix doc for ws-client ws-server transports 2023-07-12 06:06:20 -07:00
Gilles Boccon-Gibod 81a5f3a395 Merge pull request #203 from google/gbg/realtek-driver
realtek driver
2023-07-11 07:06:07 -07:00
Gilles Boccon-Gibod 696a8d82fd look for files in linux FW dir 2023-07-11 06:41:34 -07:00
Gilles Boccon-Gibod 5f294b1fea python 3.8 compatibility 2023-07-11 06:41:34 -07:00
Gilles Boccon-Gibod 2d8f5e80fb add missing doc files 2023-07-11 06:41:34 -07:00
Gilles Boccon-Gibod 7a042db78e add more USB ids 2023-07-11 06:41:34 -07:00
Gilles Boccon-Gibod 41ce311836 allow custom driver factories 2023-07-11 06:41:34 -07:00
Gilles Boccon-Gibod 03538d0f8a add doc 2023-07-11 06:41:34 -07:00
Gilles Boccon-Gibod 86bc222dc0 add missing file 2023-07-11 06:41:34 -07:00
Gilles Boccon-Gibod e8d285fdab add downloader tool 2023-07-11 06:41:34 -07:00
Gilles Boccon-Gibod 852c933c92 wip (+4 squashed commits)
Squashed commits:
[d29a350] wip
[7f541ed] wip
[1e2902e] basic working version
[14b497a] wip
2023-07-11 06:41:34 -07:00
Lucas Abel 7867a99a54 Merge pull request #209 from google/click-types-quick-fix
temporarily pin click to 8.1.3
2023-07-11 06:21:11 -07:00
Gilles Boccon-Gibod 6cd14bb503 temporarily pin click to 8.1.3 2023-07-11 00:11:24 -07:00
Gilles Boccon-Gibod 532b99ffea Merge pull request #206 from benquike/main
Add some commands and events in hci
2023-07-10 01:23:08 -07:00
Hui Peng d80f40ff5d Add some commands and events in hci 2023-06-28 08:51:10 -07:00
Gilles Boccon-Gibod e9dc0d6855 Merge pull request #201 from benquike/main
Pin aiohttp at version 3.8
2023-06-14 11:23:31 -07:00
Hui Peng b18104c9a7 Pin aiohttp at version 3.8.4
Recently aiohttp package is upgraded to 4.0.x version,
which breaks setup.py. This change fix the build issue
by pinning it at version 3.8.4.
2023-06-13 09:44:54 -07:00
Gilles Boccon-Gibod 50d1884365 Merge pull request #199 from benquike/main
Add support for legacy pairing over bt classic
2023-06-12 10:45:51 -07:00
Hui Peng 4d2e821e50 Add support for legacy pairing over bt classic 2023-06-07 11:39:06 -07:00
20 changed files with 1545 additions and 108 deletions
+20
View File
@@ -157,6 +157,26 @@ class Delegate(PairingDelegate):
self.print(f'### PIN: {number:0{digits}}')
self.print('###-----------------------------------')
async def get_string(self, max_length: int):
await self.update_peer_name()
# Prompt a PIN (for legacy pairing in classic)
self.print('###-----------------------------------')
self.print(f'### Pairing with {self.peer_name}')
self.print('###-----------------------------------')
count = 0
while True:
response = await self.prompt('>>> Enter PIN (1-6 chars):')
if len(response) == 0:
count += 1
if count > 3:
self.print('too many tries, stopping the pairing')
return None
self.print('no PIN was entered, try again')
continue
return response
# -----------------------------------------------------------------------------
async def get_peer_name(peer, mode):
+73 -55
View File
@@ -654,7 +654,7 @@ class Controller:
def on_hci_create_connection_command(self, command):
'''
See Bluetooth spec Vol 2, Part E - 7.1.5 Create Connection command
See Bluetooth spec Vol 4, Part E - 7.1.5 Create Connection command
'''
if self.link is None:
@@ -685,7 +685,7 @@ class Controller:
def on_hci_disconnect_command(self, command):
'''
See Bluetooth spec Vol 2, Part E - 7.1.6 Disconnect Command
See Bluetooth spec Vol 4, Part E - 7.1.6 Disconnect Command
'''
# First, say that the disconnection is pending
self.send_hci_packet(
@@ -719,7 +719,7 @@ class Controller:
def on_hci_accept_connection_request_command(self, command):
'''
See Bluetooth spec Vol 2, Part E - 7.1.8 Accept Connection Request command
See Bluetooth spec Vol 4, Part E - 7.1.8 Accept Connection Request command
'''
if self.link is None:
@@ -735,7 +735,7 @@ class Controller:
def on_hci_switch_role_command(self, command):
'''
See Bluetooth spec Vol 2, Part E - 7.2.8 Switch Role command
See Bluetooth spec Vol 4, Part E - 7.2.8 Switch Role command
'''
if self.link is None:
@@ -751,21 +751,21 @@ class Controller:
def on_hci_set_event_mask_command(self, command):
'''
See Bluetooth spec Vol 2, Part E - 7.3.1 Set Event Mask Command
See Bluetooth spec Vol 4, Part E - 7.3.1 Set Event Mask Command
'''
self.event_mask = command.event_mask
return bytes([HCI_SUCCESS])
def on_hci_reset_command(self, _command):
'''
See Bluetooth spec Vol 2, Part E - 7.3.2 Reset Command
See Bluetooth spec Vol 4, Part E - 7.3.2 Reset Command
'''
# TODO: cleanup what needs to be reset
return bytes([HCI_SUCCESS])
def on_hci_write_local_name_command(self, command):
'''
See Bluetooth spec Vol 2, Part E - 7.3.11 Write Local Name Command
See Bluetooth spec Vol 4, Part E - 7.3.11 Write Local Name Command
'''
local_name = command.local_name
if len(local_name):
@@ -780,7 +780,7 @@ class Controller:
def on_hci_read_local_name_command(self, _command):
'''
See Bluetooth spec Vol 2, Part E - 7.3.12 Read Local Name Command
See Bluetooth spec Vol 4, Part E - 7.3.12 Read Local Name Command
'''
local_name = bytes(self.local_name, 'utf-8')[:248]
if len(local_name) < 248:
@@ -790,19 +790,19 @@ class Controller:
def on_hci_read_class_of_device_command(self, _command):
'''
See Bluetooth spec Vol 2, Part E - 7.3.25 Read Class of Device Command
See Bluetooth spec Vol 4, Part E - 7.3.25 Read Class of Device Command
'''
return bytes([HCI_SUCCESS, 0, 0, 0])
def on_hci_write_class_of_device_command(self, _command):
'''
See Bluetooth spec Vol 2, Part E - 7.3.26 Write Class of Device Command
See Bluetooth spec Vol 4, Part E - 7.3.26 Write Class of Device Command
'''
return bytes([HCI_SUCCESS])
def on_hci_read_synchronous_flow_control_enable_command(self, _command):
'''
See Bluetooth spec Vol 2, Part E - 7.3.36 Read Synchronous Flow Control Enable
See Bluetooth spec Vol 4, Part E - 7.3.36 Read Synchronous Flow Control Enable
Command
'''
if self.sync_flow_control:
@@ -813,7 +813,7 @@ class Controller:
def on_hci_write_synchronous_flow_control_enable_command(self, command):
'''
See Bluetooth spec Vol 2, Part E - 7.3.37 Write Synchronous Flow Control Enable
See Bluetooth spec Vol 4, Part E - 7.3.37 Write Synchronous Flow Control Enable
Command
'''
ret = HCI_SUCCESS
@@ -825,41 +825,59 @@ class Controller:
ret = HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR
return bytes([ret])
def on_hci_set_controller_to_host_flow_control_command(self, _command):
'''
See Bluetooth spec Vol 4, Part E - 7.3.38 Set Controller To Host Flow Control
Command
'''
# For now we just accept the command but ignore the values.
# TODO: respect the passed in values.
return bytes([HCI_SUCCESS])
def on_hci_host_buffer_size_command(self, _command):
'''
See Bluetooth spec Vol 4, Part E - 7.3.39 Host Buffer Size Command
'''
# For now we just accept the command but ignore the values.
# TODO: respect the passed in values.
return bytes([HCI_SUCCESS])
def on_hci_write_extended_inquiry_response_command(self, _command):
'''
See Bluetooth spec Vol 2, Part E - 7.3.59 Write Simple Pairing Mode Command
See Bluetooth spec Vol 4, Part E - 7.3.56 Write Extended Inquiry Response
Command
'''
return bytes([HCI_SUCCESS])
def on_hci_write_simple_pairing_mode_command(self, _command):
'''
See Bluetooth spec Vol 2, Part E - 7.3.59 Write Simple Pairing Mode Command
See Bluetooth spec Vol 4, Part E - 7.3.59 Write Simple Pairing Mode Command
'''
return bytes([HCI_SUCCESS])
def on_hci_set_event_mask_page_2_command(self, command):
'''
See Bluetooth spec Vol 2, Part E - 7.3.69 Set Event Mask Page 2 Command
See Bluetooth spec Vol 4, Part E - 7.3.69 Set Event Mask Page 2 Command
'''
self.event_mask_page_2 = command.event_mask_page_2
return bytes([HCI_SUCCESS])
def on_hci_read_le_host_support_command(self, _command):
'''
See Bluetooth spec Vol 2, Part E - 7.3.78 Write LE Host Support Command
See Bluetooth spec Vol 4, Part E - 7.3.78 Write LE Host Support Command
'''
return bytes([HCI_SUCCESS, 1, 0])
def on_hci_write_le_host_support_command(self, _command):
'''
See Bluetooth spec Vol 2, Part E - 7.3.79 Write LE Host Support Command
See Bluetooth spec Vol 4, Part E - 7.3.79 Write LE Host Support Command
'''
# TODO / Just ignore for now
return bytes([HCI_SUCCESS])
def on_hci_write_authenticated_payload_timeout_command(self, command):
'''
See Bluetooth spec Vol 2, Part E - 7.3.94 Write Authenticated Payload Timeout
See Bluetooth spec Vol 4, Part E - 7.3.94 Write Authenticated Payload Timeout
Command
'''
# TODO
@@ -867,7 +885,7 @@ class Controller:
def on_hci_read_local_version_information_command(self, _command):
'''
See Bluetooth spec Vol 2, Part E - 7.4.1 Read Local Version Information Command
See Bluetooth spec Vol 4, Part E - 7.4.1 Read Local Version Information Command
'''
return struct.pack(
'<BBHBHH',
@@ -881,19 +899,19 @@ class Controller:
def on_hci_read_local_supported_commands_command(self, _command):
'''
See Bluetooth spec Vol 2, Part E - 7.4.2 Read Local Supported Commands Command
See Bluetooth spec Vol 4, Part E - 7.4.2 Read Local Supported Commands Command
'''
return bytes([HCI_SUCCESS]) + self.supported_commands
def on_hci_read_local_supported_features_command(self, _command):
'''
See Bluetooth spec Vol 2, Part E - 7.4.3 Read Local Supported Features Command
See Bluetooth spec Vol 4, Part E - 7.4.3 Read Local Supported Features Command
'''
return bytes([HCI_SUCCESS]) + self.lmp_features
def on_hci_read_bd_addr_command(self, _command):
'''
See Bluetooth spec Vol 2, Part E - 7.4.6 Read BD_ADDR Command
See Bluetooth spec Vol 4, Part E - 7.4.6 Read BD_ADDR Command
'''
bd_addr = (
self._public_address.to_bytes()
@@ -904,14 +922,14 @@ class Controller:
def on_hci_le_set_event_mask_command(self, command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.1 LE Set Event Mask Command
See Bluetooth spec Vol 4, Part E - 7.8.1 LE Set Event Mask Command
'''
self.le_event_mask = command.le_event_mask
return bytes([HCI_SUCCESS])
def on_hci_le_read_buffer_size_command(self, _command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.2 LE Read Buffer Size Command
See Bluetooth spec Vol 4, Part E - 7.8.2 LE Read Buffer Size Command
'''
return struct.pack(
'<BHB',
@@ -922,49 +940,49 @@ class Controller:
def on_hci_le_read_local_supported_features_command(self, _command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.3 LE Read Local Supported Features
See Bluetooth spec Vol 4, Part E - 7.8.3 LE Read Local Supported Features
Command
'''
return bytes([HCI_SUCCESS]) + self.le_features
def on_hci_le_set_random_address_command(self, command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.4 LE Set Random Address Command
See Bluetooth spec Vol 4, Part E - 7.8.4 LE Set Random Address Command
'''
self.random_address = command.random_address
return bytes([HCI_SUCCESS])
def on_hci_le_set_advertising_parameters_command(self, command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.5 LE Set Advertising Parameters Command
See Bluetooth spec Vol 4, Part E - 7.8.5 LE Set Advertising Parameters Command
'''
self.advertising_parameters = command
return bytes([HCI_SUCCESS])
def on_hci_le_read_advertising_physical_channel_tx_power_command(self, _command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.6 LE Read Advertising Physical Channel
See Bluetooth spec Vol 4, Part E - 7.8.6 LE Read Advertising Physical Channel
Tx Power Command
'''
return bytes([HCI_SUCCESS, self.advertising_channel_tx_power])
def on_hci_le_set_advertising_data_command(self, command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.7 LE Set Advertising Data Command
See Bluetooth spec Vol 4, Part E - 7.8.7 LE Set Advertising Data Command
'''
self.advertising_data = command.advertising_data
return bytes([HCI_SUCCESS])
def on_hci_le_set_scan_response_data_command(self, command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.8 LE Set Scan Response Data Command
See Bluetooth spec Vol 4, Part E - 7.8.8 LE Set Scan Response Data Command
'''
self.le_scan_response_data = command.scan_response_data
return bytes([HCI_SUCCESS])
def on_hci_le_set_advertising_enable_command(self, command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.9 LE Set Advertising Enable Command
See Bluetooth spec Vol 4, Part E - 7.8.9 LE Set Advertising Enable Command
'''
if command.advertising_enable:
self.start_advertising()
@@ -975,7 +993,7 @@ class Controller:
def on_hci_le_set_scan_parameters_command(self, command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.10 LE Set Scan Parameters Command
See Bluetooth spec Vol 4, Part E - 7.8.10 LE Set Scan Parameters Command
'''
self.le_scan_type = command.le_scan_type
self.le_scan_interval = command.le_scan_interval
@@ -986,7 +1004,7 @@ class Controller:
def on_hci_le_set_scan_enable_command(self, command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.11 LE Set Scan Enable Command
See Bluetooth spec Vol 4, Part E - 7.8.11 LE Set Scan Enable Command
'''
self.le_scan_enable = command.le_scan_enable
self.filter_duplicates = command.filter_duplicates
@@ -994,7 +1012,7 @@ class Controller:
def on_hci_le_create_connection_command(self, command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.12 LE Create Connection Command
See Bluetooth spec Vol 4, Part E - 7.8.12 LE Create Connection Command
'''
if not self.link:
@@ -1027,40 +1045,40 @@ class Controller:
def on_hci_le_create_connection_cancel_command(self, _command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.13 LE Create Connection Cancel Command
See Bluetooth spec Vol 4, Part E - 7.8.13 LE Create Connection Cancel Command
'''
return bytes([HCI_SUCCESS])
def on_hci_le_read_filter_accept_list_size_command(self, _command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.14 LE Read Filter Accept List Size
See Bluetooth spec Vol 4, Part E - 7.8.14 LE Read Filter Accept List Size
Command
'''
return bytes([HCI_SUCCESS, self.filter_accept_list_size])
def on_hci_le_clear_filter_accept_list_command(self, _command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.15 LE Clear Filter Accept List Command
See Bluetooth spec Vol 4, Part E - 7.8.15 LE Clear Filter Accept List Command
'''
return bytes([HCI_SUCCESS])
def on_hci_le_add_device_to_filter_accept_list_command(self, _command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.16 LE Add Device To Filter Accept List
See Bluetooth spec Vol 4, Part E - 7.8.16 LE Add Device To Filter Accept List
Command
'''
return bytes([HCI_SUCCESS])
def on_hci_le_remove_device_from_filter_accept_list_command(self, _command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.17 LE Remove Device From Filter Accept
See Bluetooth spec Vol 4, Part E - 7.8.17 LE Remove Device From Filter Accept
List Command
'''
return bytes([HCI_SUCCESS])
def on_hci_le_read_remote_features_command(self, command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.21 LE Read Remote Features Command
See Bluetooth spec Vol 4, Part E - 7.8.21 LE Read Remote Features Command
'''
# First, say that the command is pending
@@ -1083,13 +1101,13 @@ class Controller:
def on_hci_le_rand_command(self, _command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.23 LE Rand Command
See Bluetooth spec Vol 4, Part E - 7.8.23 LE Rand Command
'''
return bytes([HCI_SUCCESS]) + struct.pack('Q', random.randint(0, 1 << 64))
def on_hci_le_enable_encryption_command(self, command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.24 LE Enable Encryption Command
See Bluetooth spec Vol 4, Part E - 7.8.24 LE Enable Encryption Command
'''
# Check the parameters
@@ -1122,13 +1140,13 @@ class Controller:
def on_hci_le_read_supported_states_command(self, _command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.27 LE Read Supported States Command
See Bluetooth spec Vol 4, Part E - 7.8.27 LE Read Supported States Command
'''
return bytes([HCI_SUCCESS]) + self.le_states
def on_hci_le_read_suggested_default_data_length_command(self, _command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.34 LE Read Suggested Default Data Length
See Bluetooth spec Vol 4, Part E - 7.8.34 LE Read Suggested Default Data Length
Command
'''
return struct.pack(
@@ -1140,7 +1158,7 @@ class Controller:
def on_hci_le_write_suggested_default_data_length_command(self, command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.35 LE Write Suggested Default Data Length
See Bluetooth spec Vol 4, Part E - 7.8.35 LE Write Suggested Default Data Length
Command
'''
self.suggested_max_tx_octets, self.suggested_max_tx_time = struct.unpack(
@@ -1150,33 +1168,33 @@ class Controller:
def on_hci_le_read_local_p_256_public_key_command(self, _command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.36 LE Read P-256 Public Key Command
See Bluetooth spec Vol 4, Part E - 7.8.36 LE Read P-256 Public Key Command
'''
# TODO create key and send HCI_LE_Read_Local_P-256_Public_Key_Complete event
return bytes([HCI_SUCCESS])
def on_hci_le_add_device_to_resolving_list_command(self, _command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.38 LE Add Device To Resolving List
See Bluetooth spec Vol 4, Part E - 7.8.38 LE Add Device To Resolving List
Command
'''
return bytes([HCI_SUCCESS])
def on_hci_le_clear_resolving_list_command(self, _command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.40 LE Clear Resolving List Command
See Bluetooth spec Vol 4, Part E - 7.8.40 LE Clear Resolving List Command
'''
return bytes([HCI_SUCCESS])
def on_hci_le_read_resolving_list_size_command(self, _command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.41 LE Read Resolving List Size Command
See Bluetooth spec Vol 4, Part E - 7.8.41 LE Read Resolving List Size Command
'''
return bytes([HCI_SUCCESS, self.resolving_list_size])
def on_hci_le_set_address_resolution_enable_command(self, command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.44 LE Set Address Resolution Enable
See Bluetooth spec Vol 4, Part E - 7.8.44 LE Set Address Resolution Enable
Command
'''
ret = HCI_SUCCESS
@@ -1190,7 +1208,7 @@ class Controller:
def on_hci_le_set_resolvable_private_address_timeout_command(self, command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.45 LE Set Resolvable Private Address
See Bluetooth spec Vol 4, Part E - 7.8.45 LE Set Resolvable Private Address
Timeout Command
'''
self.le_rpa_timeout = command.rpa_timeout
@@ -1198,7 +1216,7 @@ class Controller:
def on_hci_le_read_maximum_data_length_command(self, _command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.46 LE Read Maximum Data Length Command
See Bluetooth spec Vol 4, Part E - 7.8.46 LE Read Maximum Data Length Command
'''
return struct.pack(
'<BHHHH',
@@ -1211,7 +1229,7 @@ class Controller:
def on_hci_le_read_phy_command(self, command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.47 LE Read PHY Command
See Bluetooth spec Vol 4, Part E - 7.8.47 LE Read PHY Command
'''
return struct.pack(
'<BHBB',
@@ -1223,7 +1241,7 @@ class Controller:
def on_hci_le_set_default_phy_command(self, command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.48 LE Set Default PHY Command
See Bluetooth spec Vol 4, Part E - 7.8.48 LE Set Default PHY Command
'''
self.default_phy = {
'all_phys': command.all_phys,
@@ -1234,6 +1252,6 @@ class Controller:
def on_hci_le_read_transmit_power_command(self, _command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.74 LE Read Transmit Power Command
See Bluetooth spec Vol 4, Part E - 7.8.74 LE Read Transmit Power Command
'''
return struct.pack('<BBB', HCI_SUCCESS, 0, 0)
+32 -24
View File
@@ -2851,18 +2851,22 @@ class Device(CompositeEventEmitter):
method = methods[peer_io_capability][io_capability]
async def reply() -> None:
if await connection.abort_on('disconnection', method()):
await self.host.send_command(
HCI_User_Confirmation_Request_Reply_Command( # type: ignore[call-arg]
bd_addr=connection.peer_address
)
)
else:
await self.host.send_command(
HCI_User_Confirmation_Request_Negative_Reply_Command( # type: ignore[call-arg]
bd_addr=connection.peer_address
try:
if await connection.abort_on('disconnection', method()):
await self.host.send_command(
HCI_User_Confirmation_Request_Reply_Command( # type: ignore[call-arg]
bd_addr=connection.peer_address
)
)
return
except Exception as error:
logger.warning(f'exception while confirming: {error}')
await self.host.send_command(
HCI_User_Confirmation_Request_Negative_Reply_Command( # type: ignore[call-arg]
bd_addr=connection.peer_address
)
)
AsyncRunner.spawn(reply())
@@ -2874,21 +2878,25 @@ class Device(CompositeEventEmitter):
pairing_config = self.pairing_config_factory(connection)
async def reply() -> None:
number = await connection.abort_on(
'disconnection', pairing_config.delegate.get_number()
try:
number = await connection.abort_on(
'disconnection', pairing_config.delegate.get_number()
)
if number is not None:
await self.host.send_command(
HCI_User_Passkey_Request_Reply_Command( # type: ignore[call-arg]
bd_addr=connection.peer_address, numeric_value=number
)
)
return
except Exception as error:
logger.warning(f'exception while asking for pass-key: {error}')
await self.host.send_command(
HCI_User_Passkey_Request_Negative_Reply_Command( # type: ignore[call-arg]
bd_addr=connection.peer_address
)
)
if number is not None:
await self.host.send_command(
HCI_User_Passkey_Request_Reply_Command( # type: ignore[call-arg]
bd_addr=connection.peer_address, numeric_value=number
)
)
else:
await self.host.send_command(
HCI_User_Passkey_Request_Negative_Reply_Command( # type: ignore[call-arg]
bd_addr=connection.peer_address
)
)
AsyncRunner.spawn(reply())
+68
View File
@@ -0,0 +1,68 @@
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Drivers that can be used to customize the interaction between a host and a controller,
like loading firmware after a cold start.
"""
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import abc
import logging
from . import rtk
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
# Classes
# -----------------------------------------------------------------------------
class Driver(abc.ABC):
"""Base class for drivers."""
@staticmethod
async def for_host(_host):
"""Return a driver instance for a host.
Args:
host: Host object for which a driver should be created.
Returns:
A Driver instance if a driver should be instantiated for this host, or
None if no driver instance of this class is needed.
"""
return None
@abc.abstractmethod
async def init_controller(self):
"""Initialize the controller."""
# -----------------------------------------------------------------------------
# Functions
# -----------------------------------------------------------------------------
async def get_driver_for_host(host):
"""Probe all known diver classes until one returns a valid instance for a host,
or none is found.
"""
if driver := await rtk.Driver.for_host(host):
logger.debug("Instantiated RTK driver")
return driver
return None
+647
View File
@@ -0,0 +1,647 @@
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Support for Realtek USB dongles.
Based on various online bits of information, including the Linux kernel.
(see `drivers/bluetooth/btrtl.c`)
"""
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from dataclasses import dataclass
import asyncio
import enum
import logging
import math
import os
import pathlib
import platform
import struct
from typing import Tuple
import weakref
from bumble.hci import (
hci_command_op_code,
STATUS_SPEC,
HCI_SUCCESS,
HCI_COMMAND_NAMES,
HCI_Command,
HCI_Reset_Command,
HCI_Read_Local_Version_Information_Command,
)
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
# Constants
# -----------------------------------------------------------------------------
RTK_ROM_LMP_8723A = 0x1200
RTK_ROM_LMP_8723B = 0x8723
RTK_ROM_LMP_8821A = 0x8821
RTK_ROM_LMP_8761A = 0x8761
RTK_ROM_LMP_8822B = 0x8822
RTK_ROM_LMP_8852A = 0x8852
RTK_CONFIG_MAGIC = 0x8723AB55
RTK_EPATCH_SIGNATURE = b"Realtech"
RTK_FRAGMENT_LENGTH = 252
RTK_FIRMWARE_DIR_ENV = "BUMBLE_RTK_FIRMWARE_DIR"
RTK_LINUX_FIRMWARE_DIR = "/lib/firmware/rtl_bt"
class RtlProjectId(enum.IntEnum):
PROJECT_ID_8723A = 0
PROJECT_ID_8723B = 1
PROJECT_ID_8821A = 2
PROJECT_ID_8761A = 3
PROJECT_ID_8822B = 8
PROJECT_ID_8723D = 9
PROJECT_ID_8821C = 10
PROJECT_ID_8822C = 13
PROJECT_ID_8761B = 14
PROJECT_ID_8852A = 18
PROJECT_ID_8852B = 20
PROJECT_ID_8852C = 25
RTK_PROJECT_ID_TO_ROM = {
0: RTK_ROM_LMP_8723A,
1: RTK_ROM_LMP_8723B,
2: RTK_ROM_LMP_8821A,
3: RTK_ROM_LMP_8761A,
8: RTK_ROM_LMP_8822B,
9: RTK_ROM_LMP_8723B,
10: RTK_ROM_LMP_8821A,
13: RTK_ROM_LMP_8822B,
14: RTK_ROM_LMP_8761A,
18: RTK_ROM_LMP_8852A,
20: RTK_ROM_LMP_8852A,
25: RTK_ROM_LMP_8852A,
}
# List of USB (VendorID, ProductID) for Realtek-based devices.
RTK_USB_PRODUCTS = {
# Realtek 8723AE
(0x0930, 0x021D),
(0x13D3, 0x3394),
# Realtek 8723BE
(0x0489, 0xE085),
(0x0489, 0xE08B),
(0x04F2, 0xB49F),
(0x13D3, 0x3410),
(0x13D3, 0x3416),
(0x13D3, 0x3459),
(0x13D3, 0x3494),
# Realtek 8723BU
(0x7392, 0xA611),
# Realtek 8723DE
(0x0BDA, 0xB009),
(0x2FF8, 0xB011),
# Realtek 8761BUV
(0x0B05, 0x190E),
(0x0BDA, 0x8771),
(0x2230, 0x0016),
(0x2357, 0x0604),
(0x2550, 0x8761),
(0x2B89, 0x8761),
(0x7392, 0xC611),
# Realtek 8821AE
(0x0B05, 0x17DC),
(0x13D3, 0x3414),
(0x13D3, 0x3458),
(0x13D3, 0x3461),
(0x13D3, 0x3462),
# Realtek 8821CE
(0x0BDA, 0xB00C),
(0x0BDA, 0xC822),
(0x13D3, 0x3529),
# Realtek 8822BE
(0x0B05, 0x185C),
(0x13D3, 0x3526),
# Realtek 8822CE
(0x04C5, 0x161F),
(0x04CA, 0x4005),
(0x0B05, 0x18EF),
(0x0BDA, 0xB00C),
(0x0BDA, 0xC123),
(0x0BDA, 0xC822),
(0x0CB5, 0xC547),
(0x1358, 0xC123),
(0x13D3, 0x3548),
(0x13D3, 0x3549),
(0x13D3, 0x3553),
(0x13D3, 0x3555),
(0x2FF8, 0x3051),
# Realtek 8822CU
(0x13D3, 0x3549),
# Realtek 8852AE
(0x04C5, 0x165C),
(0x04CA, 0x4006),
(0x0BDA, 0x2852),
(0x0BDA, 0x385A),
(0x0BDA, 0x4852),
(0x0BDA, 0xC852),
(0x0CB8, 0xC549),
# Realtek 8852BE
(0x0BDA, 0x887B),
(0x0CB8, 0xC559),
(0x13D3, 0x3571),
# Realtek 8852CE
(0x04C5, 0x1675),
(0x04CA, 0x4007),
(0x0CB8, 0xC558),
(0x13D3, 0x3586),
(0x13D3, 0x3587),
(0x13D3, 0x3592),
}
# -----------------------------------------------------------------------------
# HCI Commands
# -----------------------------------------------------------------------------
HCI_RTK_READ_ROM_VERSION_COMMAND = hci_command_op_code(0x3F, 0x6D)
HCI_COMMAND_NAMES[HCI_RTK_READ_ROM_VERSION_COMMAND] = "HCI_RTK_READ_ROM_VERSION_COMMAND"
@HCI_Command.command(return_parameters_fields=[("status", STATUS_SPEC), ("version", 1)])
class HCI_RTK_Read_ROM_Version_Command(HCI_Command):
pass
HCI_RTK_DOWNLOAD_COMMAND = hci_command_op_code(0x3F, 0x20)
HCI_COMMAND_NAMES[HCI_RTK_DOWNLOAD_COMMAND] = "HCI_RTK_DOWNLOAD_COMMAND"
@HCI_Command.command(
fields=[("index", 1), ("payload", RTK_FRAGMENT_LENGTH)],
return_parameters_fields=[("status", STATUS_SPEC), ("index", 1)],
)
class HCI_RTK_Download_Command(HCI_Command):
pass
HCI_RTK_DROP_FIRMWARE_COMMAND = hci_command_op_code(0x3F, 0x66)
HCI_COMMAND_NAMES[HCI_RTK_DROP_FIRMWARE_COMMAND] = "HCI_RTK_DROP_FIRMWARE_COMMAND"
@HCI_Command.command()
class HCI_RTK_Drop_Firmware_Command(HCI_Command):
pass
# -----------------------------------------------------------------------------
class Firmware:
def __init__(self, firmware):
extension_sig = bytes([0x51, 0x04, 0xFD, 0x77])
if not firmware.startswith(RTK_EPATCH_SIGNATURE):
raise ValueError("Firmware does not start with epatch signature")
if not firmware.endswith(extension_sig):
raise ValueError("Firmware does not end with extension sig")
# The firmware should start with a 14 byte header.
epatch_header_size = 14
if len(firmware) < epatch_header_size:
raise ValueError("Firmware too short")
# Look for the "project ID", starting from the end.
offset = len(firmware) - len(extension_sig)
project_id = -1
while offset >= epatch_header_size:
length, opcode = firmware[offset - 2 : offset]
offset -= 2
if opcode == 0xFF:
# End
break
if length == 0:
raise ValueError("Invalid 0-length instruction")
if opcode == 0 and length == 1:
project_id = firmware[offset - 1]
break
offset -= length
if project_id < 0:
raise ValueError("Project ID not found")
self.project_id = project_id
# Read the patch tables info.
self.version, num_patches = struct.unpack("<IH", firmware[8:14])
self.patches = []
# The patches tables are laid out as:
# <ChipID_1><ChipID_2>...<ChipID_N> (16 bits each)
# <PatchLength_1><PatchLength_2>...<PatchLength_N> (16 bits each)
# <PatchOffset_1><PatchOffset_2>...<PatchOffset_N> (32 bits each)
if epatch_header_size + 8 * num_patches > len(firmware):
raise ValueError("Firmware too short")
chip_id_table_offset = epatch_header_size
patch_length_table_offset = chip_id_table_offset + 2 * num_patches
patch_offset_table_offset = chip_id_table_offset + 4 * num_patches
for patch_index in range(num_patches):
chip_id_offset = chip_id_table_offset + 2 * patch_index
(chip_id,) = struct.unpack_from("<H", firmware, chip_id_offset)
(patch_length,) = struct.unpack_from(
"<H", firmware, patch_length_table_offset + 2 * patch_index
)
(patch_offset,) = struct.unpack_from(
"<I", firmware, patch_offset_table_offset + 4 * patch_index
)
if patch_offset + patch_length > len(firmware):
raise ValueError("Firmware too short")
# Get the SVN version for the patch
(svn_version,) = struct.unpack_from(
"<I", firmware, patch_offset + patch_length - 8
)
# Create a payload with the patch, replacing the last 4 bytes with
# the firmware version.
self.patches.append(
(
chip_id,
firmware[patch_offset : patch_offset + patch_length - 4]
+ struct.pack("<I", self.version),
svn_version,
)
)
class Driver:
@dataclass
class DriverInfo:
rom: int
hci: Tuple[int, int]
config_needed: bool
has_rom_version: bool
has_msft_ext: bool = False
fw_name: str = ""
config_name: str = ""
DRIVER_INFOS = [
# 8723A
DriverInfo(
rom=RTK_ROM_LMP_8723A,
hci=(0x0B, 0x06),
config_needed=False,
has_rom_version=False,
fw_name="rtl8723a_fw.bin",
config_name="",
),
# 8723B
DriverInfo(
rom=RTK_ROM_LMP_8723B,
hci=(0x0B, 0x06),
config_needed=False,
has_rom_version=True,
fw_name="rtl8723b_fw.bin",
config_name="rtl8723b_config.bin",
),
# 8723D
DriverInfo(
rom=RTK_ROM_LMP_8723B,
hci=(0x0D, 0x08),
config_needed=True,
has_rom_version=True,
fw_name="rtl8723d_fw.bin",
config_name="rtl8723d_config.bin",
),
# 8821A
DriverInfo(
rom=RTK_ROM_LMP_8821A,
hci=(0x0A, 0x06),
config_needed=False,
has_rom_version=True,
fw_name="rtl8821a_fw.bin",
config_name="rtl8821a_config.bin",
),
# 8821C
DriverInfo(
rom=RTK_ROM_LMP_8821A,
hci=(0x0C, 0x08),
config_needed=False,
has_rom_version=True,
has_msft_ext=True,
fw_name="rtl8821c_fw.bin",
config_name="rtl8821c_config.bin",
),
# 8761A
DriverInfo(
rom=RTK_ROM_LMP_8761A,
hci=(0x0A, 0x06),
config_needed=False,
has_rom_version=True,
fw_name="rtl8761a_fw.bin",
config_name="rtl8761a_config.bin",
),
# 8761BU
DriverInfo(
rom=RTK_ROM_LMP_8761A,
hci=(0x0B, 0x0A),
config_needed=False,
has_rom_version=True,
fw_name="rtl8761bu_fw.bin",
config_name="rtl8761bu_config.bin",
),
# 8822C
DriverInfo(
rom=RTK_ROM_LMP_8822B,
hci=(0x0C, 0x0A),
config_needed=False,
has_rom_version=True,
has_msft_ext=True,
fw_name="rtl8822cu_fw.bin",
config_name="rtl8822cu_config.bin",
),
# 8822B
DriverInfo(
rom=RTK_ROM_LMP_8822B,
hci=(0x0B, 0x07),
config_needed=True,
has_rom_version=True,
has_msft_ext=True,
fw_name="rtl8822b_fw.bin",
config_name="rtl8822b_config.bin",
),
# 8852A
DriverInfo(
rom=RTK_ROM_LMP_8852A,
hci=(0x0A, 0x0B),
config_needed=False,
has_rom_version=True,
has_msft_ext=True,
fw_name="rtl8852au_fw.bin",
config_name="rtl8852au_config.bin",
),
# 8852B
DriverInfo(
rom=RTK_ROM_LMP_8852A,
hci=(0xB, 0xB),
config_needed=False,
has_rom_version=True,
has_msft_ext=True,
fw_name="rtl8852bu_fw.bin",
config_name="rtl8852bu_config.bin",
),
# 8852C
DriverInfo(
rom=RTK_ROM_LMP_8852A,
hci=(0x0C, 0x0C),
config_needed=False,
has_rom_version=True,
has_msft_ext=True,
fw_name="rtl8852cu_fw.bin",
config_name="rtl8852cu_config.bin",
),
]
POST_DROP_DELAY = 0.2
@staticmethod
def find_driver_info(hci_version, hci_subversion, lmp_subversion):
for driver_info in Driver.DRIVER_INFOS:
if driver_info.rom == lmp_subversion and driver_info.hci == (
hci_subversion,
hci_version,
):
return driver_info
return None
@staticmethod
def find_binary_path(file_name):
# First check if an environment variable is set
if RTK_FIRMWARE_DIR_ENV in os.environ:
if (
path := pathlib.Path(os.environ[RTK_FIRMWARE_DIR_ENV]) / file_name
).is_file():
logger.debug(f"{file_name} found in env dir")
return path
# When the environment variable is set, don't look elsewhere
return None
# Then, look in the package's driver directory
if (path := pathlib.Path(__file__).parent / "rtk_fw" / file_name).is_file():
logger.debug(f"{file_name} found in package dir")
return path
# On Linux, check the system's FW directory
if (
platform.system() == "Linux"
and (path := pathlib.Path(RTK_LINUX_FIRMWARE_DIR) / file_name).is_file()
):
logger.debug(f"{file_name} found in Linux system FW dir")
return path
# Finally look in the current directory
if (path := pathlib.Path.cwd() / file_name).is_file():
logger.debug(f"{file_name} found in CWD")
return path
return None
@staticmethod
def check(host):
if not host.hci_metadata:
logger.debug("USB metadata not found")
return False
vendor_id = host.hci_metadata.get("vendor_id", None)
product_id = host.hci_metadata.get("product_id", None)
if vendor_id is None or product_id is None:
logger.debug("USB metadata not sufficient")
return False
if (vendor_id, product_id) not in RTK_USB_PRODUCTS:
logger.debug(
f"USB device ({vendor_id:04X}, {product_id:04X}) " "not in known list"
)
return False
return True
@classmethod
async def driver_info_for_host(cls, host):
response = await host.send_command(
HCI_Read_Local_Version_Information_Command(), check_result=True
)
local_version = response.return_parameters
logger.debug(
f"looking for a driver: 0x{local_version.lmp_subversion:04X} "
f"(0x{local_version.hci_version:02X}, "
f"0x{local_version.hci_subversion:04X})"
)
driver_info = cls.find_driver_info(
local_version.hci_version,
local_version.hci_subversion,
local_version.lmp_subversion,
)
if driver_info is None:
# TODO: it seems that the Linux driver will send command (0x3f, 0x66)
# in this case and then re-read the local version, then re-match.
logger.debug("firmware already loaded or no known driver for this device")
return driver_info
@classmethod
async def for_host(cls, host, force=False):
# Check that a driver is needed for this host
if not force and not cls.check(host):
return None
# Get the driver info
driver_info = await cls.driver_info_for_host(host)
if driver_info is None:
return None
# Load the firmware
firmware_path = cls.find_binary_path(driver_info.fw_name)
if not firmware_path:
logger.warning(f"Firmware file {driver_info.fw_name} not found")
logger.warning("See https://google.github.io/bumble/drivers/realtek.html")
return None
with open(firmware_path, "rb") as firmware_file:
firmware = firmware_file.read()
# Load the config
config = None
if driver_info.config_name:
config_path = cls.find_binary_path(driver_info.config_name)
if config_path:
with open(config_path, "rb") as config_file:
config = config_file.read()
if driver_info.config_needed and not config:
logger.warning("Config needed, but no config file available")
return None
return cls(host, driver_info, firmware, config)
def __init__(self, host, driver_info, firmware, config):
self.host = weakref.proxy(host)
self.driver_info = driver_info
self.firmware = firmware
self.config = config
@staticmethod
async def drop_firmware(host):
host.send_hci_packet(HCI_RTK_Drop_Firmware_Command())
# Wait for the command to be effective (no response is sent)
await asyncio.sleep(Driver.POST_DROP_DELAY)
async def download_for_rtl8723a(self):
# Check that the firmware image does not include an epatch signature.
if RTK_EPATCH_SIGNATURE in self.firmware:
logger.warning(
"epatch signature found in firmware, it is probably the wrong firmware"
)
return
# TODO: load the firmware
async def download_for_rtl8723b(self):
if self.driver_info.has_rom_version:
response = await self.host.send_command(
HCI_RTK_Read_ROM_Version_Command(), check_result=True
)
if response.return_parameters.status != HCI_SUCCESS:
logger.warning("can't get ROM version")
return
rom_version = response.return_parameters.version
logger.debug(f"ROM version before download: {rom_version:04X}")
else:
rom_version = 0
firmware = Firmware(self.firmware)
logger.debug(f"firmware: project_id=0x{firmware.project_id:04X}")
for patch in firmware.patches:
if patch[0] == rom_version + 1:
logger.debug(f"using patch {patch[0]}")
break
else:
logger.warning("no valid patch found for rom version {rom_version}")
return
# Append the config if there is one.
if self.config:
payload = patch[1] + self.config
else:
payload = patch[1]
# Download the payload, one fragment at a time.
fragment_count = math.ceil(len(payload) / RTK_FRAGMENT_LENGTH)
for fragment_index in range(fragment_count):
# NOTE: the Linux driver somehow adds 1 to the index after it wraps around.
# That's odd, but we"ll do the same here.
download_index = fragment_index & 0x7F
if download_index >= 0x80:
download_index += 1
if fragment_index == fragment_count - 1:
download_index |= 0x80 # End marker.
fragment_offset = fragment_index * RTK_FRAGMENT_LENGTH
fragment = payload[fragment_offset : fragment_offset + RTK_FRAGMENT_LENGTH]
logger.debug(f"downloading fragment {fragment_index}")
await self.host.send_command(
HCI_RTK_Download_Command(
index=download_index, payload=fragment, check_result=True
)
)
logger.debug("download complete!")
# Read the version again
response = await self.host.send_command(
HCI_RTK_Read_ROM_Version_Command(), check_result=True
)
if response.return_parameters.status != HCI_SUCCESS:
logger.warning("can't get ROM version")
else:
rom_version = response.return_parameters.version
logger.debug(f"ROM version after download: {rom_version:04X}")
async def download_firmware(self):
if self.driver_info.rom == RTK_ROM_LMP_8723A:
return await self.download_for_rtl8723a()
if self.driver_info.rom in (
RTK_ROM_LMP_8723B,
RTK_ROM_LMP_8821A,
RTK_ROM_LMP_8761A,
RTK_ROM_LMP_8822B,
RTK_ROM_LMP_8852A,
):
return await self.download_for_rtl8723b()
raise ValueError("ROM not supported")
async def init_controller(self):
await self.download_firmware()
await self.host.send_command(HCI_Reset_Command(), check_result=True)
logger.info(f"loaded FW image {self.driver_info.fw_name}")
+262 -2
View File
@@ -185,7 +185,7 @@ HCI_IO_CAPABILITY_REQUEST_EVENT = 0x31
HCI_IO_CAPABILITY_RESPONSE_EVENT = 0x32
HCI_USER_CONFIRMATION_REQUEST_EVENT = 0x33
HCI_USER_PASSKEY_REQUEST_EVENT = 0x34
HCI_REMOTE_OOB_DATA_REQUEST = 0x35
HCI_REMOTE_OOB_DATA_REQUEST_EVENT = 0x35
HCI_SIMPLE_PAIRING_COMPLETE_EVENT = 0x36
HCI_LINK_SUPERVISION_TIMEOUT_CHANGED_EVENT = 0x38
HCI_ENHANCED_FLUSH_COMPLETE_EVENT = 0x39
@@ -1641,9 +1641,11 @@ class HCI_Object:
# Get the value for the field
value = hci_object[key]
# Map the value if needed
# Check if there's a matching mapper passed
if value_mappers:
value_mapper = value_mappers.get(key, value_mapper)
# Map the value if we have a mapper
if value_mapper is not None:
value = value_mapper(value)
@@ -2286,6 +2288,55 @@ class HCI_User_Passkey_Request_Negative_Reply_Command(HCI_Command):
'''
# -----------------------------------------------------------------------------
@HCI_Command.command(
fields=[
('bd_addr', Address.parse_address),
('c', 16),
('r', 16),
],
return_parameters_fields=[
('status', STATUS_SPEC),
('bd_addr', Address.parse_address),
],
)
class HCI_Remote_OOB_Data_Request_Reply_Command(HCI_Command):
'''
See Bluetooth spec @ 7.1.34 Remote OOB Data Request Reply Command
'''
# -----------------------------------------------------------------------------
@HCI_Command.command(
fields=[('bd_addr', Address.parse_address)],
return_parameters_fields=[
('status', STATUS_SPEC),
('bd_addr', Address.parse_address),
],
)
class HCI_Remote_OOB_Data_Request_Negative_Reply_Command(HCI_Command):
'''
See Bluetooth spec @ 7.1.35 Remote OOB Data Request Negative Reply Command
'''
# -----------------------------------------------------------------------------
@HCI_Command.command(
fields=[
('bd_addr', Address.parse_address),
('reason', 1),
],
return_parameters_fields=[
('status', STATUS_SPEC),
('bd_addr', Address.parse_address),
],
)
class HCI_IO_Capability_Request_Negative_Reply_Command(HCI_Command):
'''
See Bluetooth spec @ 7.1.36 IO Capability Request Negative Reply Command
'''
# -----------------------------------------------------------------------------
@HCI_Command.command(
[
@@ -2321,6 +2372,161 @@ class HCI_Enhanced_Setup_Synchronous_Connection_Command(HCI_Command):
'''
# -----------------------------------------------------------------------------
@HCI_Command.command(
[
('bd_addr', Address.parse_address),
('transmit_bandwidth', 4),
('receive_bandwidth', 4),
('transmit_coding_format', 5),
('receive_coding_format', 5),
('transmit_codec_frame_size', 2),
('receive_codec_frame_size', 2),
('input_bandwidth', 4),
('output_bandwidth', 4),
('input_coding_format', 5),
('output_coding_format', 5),
('input_coded_data_size', 2),
('output_coded_data_size', 2),
('input_pcm_data_format', 1),
('output_pcm_data_format', 1),
('input_pcm_sample_payload_msb_position', 1),
('output_pcm_sample_payload_msb_position', 1),
('input_data_path', 1),
('output_data_path', 1),
('input_transport_unit_size', 1),
('output_transport_unit_size', 1),
('max_latency', 2),
('packet_type', 2),
('retransmission_effort', 1),
]
)
class HCI_Enhanced_Accept_Synchronous_Connection_Request_Command(HCI_Command):
'''
See Bluetooth spec @ 7.1.46 Enhanced Accept Synchronous Connection Request Command
'''
# -----------------------------------------------------------------------------
@HCI_Command.command(
fields=[
('bd_addr', Address.parse_address),
('page_scan_repetition_mode', 1),
('clock_offset', 2),
]
)
class HCI_Truncated_Page_Command(HCI_Command):
'''
See Bluetooth spec @ 7.1.47 Truncated Page Command
'''
# -----------------------------------------------------------------------------
@HCI_Command.command(
fields=[('bd_addr', Address.parse_address)],
return_parameters_fields=[
('status', STATUS_SPEC),
('bd_addr', Address.parse_address),
],
)
class HCI_Truncated_Page_Cancel_Command(HCI_Command):
'''
See Bluetooth spec @ 7.1.48 Truncated Page Cancel Command
'''
# -----------------------------------------------------------------------------
@HCI_Command.command(
fields=[
('enable', 1),
('lt_addr', 1),
('lpo_allowed', 1),
('packet_type', 2),
('interval_min', 2),
('interval_max', 2),
('supervision_timeout', 2),
],
return_parameters_fields=[
('status', STATUS_SPEC),
('lt_addr', 1),
('interval', 2),
],
)
class HCI_Set_Connectionless_Peripheral_Broadcast_Command(HCI_Command):
'''
See Bluetooth spec @ 7.1.49 Set Connectionless Peripheral Broadcast Command
'''
# -----------------------------------------------------------------------------
@HCI_Command.command(
fields=[
('enable', 1),
('bd_addr', Address.parse_address),
('lt_addr', 1),
('interval', 2),
('clock_offset', 4),
('next_connectionless_peripheral_broadcast_clock', 4),
('supervision_timeout', 2),
('remote_timing_accuracy', 1),
('skip', 1),
('packet_type', 2),
('afh_channel_map', 10),
],
return_parameters_fields=[
('status', STATUS_SPEC),
('bd_addr', Address.parse_address),
('lt_addr', 1),
],
)
class HCI_Set_Connectionless_Peripheral_Broadcast_Receive_Command(HCI_Command):
'''
See Bluetooth spec @ 7.1.50 Set Connectionless Peripheral Broadcast Receive Command
'''
# -----------------------------------------------------------------------------
class HCI_Start_Synchronization_Train_Command(HCI_Command):
'''
See Bluetooth spec @ 7.1.51 Start Synchronization Train Command
'''
# -----------------------------------------------------------------------------
@HCI_Command.command(
fields=[
('bd_addr', Address.parse_address),
('sync_scan_timeout', 2),
('sync_scan_window', 2),
('sync_scan_interval', 2),
],
)
class HCI_Receive_Synchronization_Train_Command(HCI_Command):
'''
See Bluetooth spec @ 7.1.52 Receive Synchronization Train Command
'''
# -----------------------------------------------------------------------------
@HCI_Command.command(
fields=[
('bd_addr', Address.parse_address),
('c_192', 16),
('r_192', 16),
('c_256', 16),
('r_256', 16),
],
return_parameters_fields=[
('status', STATUS_SPEC),
('bd_addr', Address.parse_address),
],
)
class HCI_Remote_OOB_Extended_Data_Request_Reply_Command(HCI_Command):
'''
See Bluetooth spec @ 7.1.53 Remote OOB Extended Data Request Reply Command
'''
# -----------------------------------------------------------------------------
@HCI_Command.command(
[
@@ -2687,6 +2893,20 @@ class HCI_Write_Simple_Pairing_Mode_Command(HCI_Command):
'''
# -----------------------------------------------------------------------------
@HCI_Command.command(
return_parameters_fields=[
('status', STATUS_SPEC),
('c', 16),
('r', 16),
]
)
class HCI_Read_Local_OOB_Data_Command(HCI_Command):
'''
See Bluetooth spec @ 7.3.60 Read Local OOB Data Command
'''
# -----------------------------------------------------------------------------
@HCI_Command.command(
return_parameters_fields=[('status', STATUS_SPEC), ('tx_power', -1)]
@@ -2747,6 +2967,22 @@ class HCI_Write_Authenticated_Payload_Timeout_Command(HCI_Command):
'''
# -----------------------------------------------------------------------------
@HCI_Command.command(
return_parameters_fields=[
('status', STATUS_SPEC),
('c_192', 16),
('r_192', 16),
('c_256', 16),
('r_256', 16),
]
)
class HCI_Read_Local_OOB_Extended_Data_Command(HCI_Command):
'''
See Bluetooth spec @ 7.3.95 Read Local OOB Extended Data Command
'''
# -----------------------------------------------------------------------------
@HCI_Command.command(
return_parameters_fields=[
@@ -5303,6 +5539,14 @@ class HCI_User_Passkey_Request_Event(HCI_Event):
'''
# -----------------------------------------------------------------------------
@HCI_Event.event([('bd_addr', Address.parse_address)])
class HCI_Remote_OOB_Data_Request_Event(HCI_Event):
'''
See Bluetooth spec @ 7.7.44 Remote OOB Data Request Event
'''
# -----------------------------------------------------------------------------
@HCI_Event.event([('status', STATUS_SPEC), ('bd_addr', Address.parse_address)])
class HCI_Simple_Pairing_Complete_Event(HCI_Event):
@@ -5319,6 +5563,14 @@ class HCI_Link_Supervision_Timeout_Changed_Event(HCI_Event):
'''
# -----------------------------------------------------------------------------
@HCI_Event.event([('handle', 2)])
class HCI_Enhanced_Flush_Complete_Event(HCI_Event):
'''
See Bluetooth spec @ 7.7.47 Enhanced Flush Complete Event
'''
# -----------------------------------------------------------------------------
@HCI_Event.event([('bd_addr', Address.parse_address), ('passkey', 4)])
class HCI_User_Passkey_Notification_Event(HCI_Event):
@@ -5327,6 +5579,14 @@ class HCI_User_Passkey_Notification_Event(HCI_Event):
'''
# -----------------------------------------------------------------------------
@HCI_Event.event([('bd_addr', Address.parse_address), ('notification_type', 1)])
class HCI_Keypress_Notification_Event(HCI_Event):
'''
See Bluetooth spec @ 7.7.49 Keypress Notification Event
'''
# -----------------------------------------------------------------------------
@HCI_Event.event([('bd_addr', Address.parse_address), ('host_supported_features', 8)])
class HCI_Remote_Host_Supported_Features_Notification_Event(HCI_Event):
+17 -3
View File
@@ -23,6 +23,7 @@ import struct
from bumble.colors import color
from bumble.l2cap import L2CAP_PDU
from bumble.snoop import Snooper
from bumble import drivers
from typing import Optional
@@ -116,6 +117,7 @@ class Host(AbortableEventEmitter):
super().__init__()
self.hci_sink = None
self.hci_metadata = None
self.ready = False # True when we can accept incoming packets
self.reset_done = False
self.connections = {} # Connections, by connection handle
@@ -141,6 +143,9 @@ class Host(AbortableEventEmitter):
# Connect to the source and sink if specified
if controller_source:
controller_source.set_packet_sink(self)
self.hci_metadata = getattr(
controller_source, 'metadata', self.hci_metadata
)
if controller_sink:
self.set_packet_sink(controller_sink)
@@ -170,7 +175,7 @@ class Host(AbortableEventEmitter):
self.emit('flush')
self.command_semaphore.release()
async def reset(self):
async def reset(self, driver_factory=drivers.get_driver_for_host):
if self.ready:
self.ready = False
await self.flush()
@@ -178,6 +183,15 @@ class Host(AbortableEventEmitter):
await self.send_command(HCI_Reset_Command(), check_result=True)
self.ready = True
# Instantiate and init a driver for the host if needed.
# NOTE: we don't keep a reference to the driver here, because we don't
# currently have a need for the driver later on. But if the driver interface
# evolves, it may be required, then, to store a reference to the driver in
# an object property.
if driver_factory is not None:
if driver := await driver_factory(self):
await driver.init_controller()
response = await self.send_command(
HCI_Read_Local_Supported_Commands_Command(), check_result=True
)
@@ -298,7 +312,7 @@ class Host(AbortableEventEmitter):
if self.snooper:
self.snooper.snoop(bytes(packet), Snooper.Direction.HOST_TO_CONTROLLER)
self.hci_sink.on_packet(packet.to_bytes())
self.hci_sink.on_packet(bytes(packet))
async def send_command(self, command, check_result=False):
logger.debug(f'{color("### HOST -> CONTROLLER", "blue")}: {command}')
@@ -350,7 +364,7 @@ class Host(AbortableEventEmitter):
asyncio.create_task(send_command(command))
def send_l2cap_pdu(self, connection_handle, cid, pdu):
l2cap_pdu = L2CAP_PDU(cid, pdu).to_bytes()
l2cap_pdu = bytes(L2CAP_PDU(cid, pdu))
# Send the data to the controller via ACL packets
bytes_remaining = len(l2cap_pdu)
+12 -5
View File
@@ -858,10 +858,13 @@ class Session:
self.tk = self.passkey.to_bytes(16, byteorder='little')
logger.debug(f'TK from passkey = {self.tk.hex()}')
self.connection.abort_on(
'disconnection',
self.pairing_config.delegate.display_number(self.passkey, digits=6),
)
try:
self.connection.abort_on(
'disconnection',
self.pairing_config.delegate.display_number(self.passkey, digits=6),
)
except Exception as error:
logger.warning(f'exception while displaying number: {error}')
def input_passkey(self, next_steps: Optional[Callable[[], None]] = None) -> None:
# Prompt the user for the passkey displayed on the peer
@@ -1300,7 +1303,11 @@ class Session:
self, command: SMP_Pairing_Request_Command
) -> None:
# Check if the request should proceed
accepted = await self.pairing_config.delegate.accept()
try:
accepted = await self.pairing_config.delegate.accept()
except Exception as error:
logger.warning(f'exception while accepting: {error}')
accepted = False
if not accepted:
logger.debug('pairing rejected by delegate')
self.send_pairing_failed(SMP_PAIRING_NOT_SUPPORTED_ERROR)
+7 -2
View File
@@ -206,10 +206,11 @@ async def open_usb_transport(spec):
logger.debug('OUT transfer likely already completed')
class UsbPacketSource(asyncio.Protocol, ParserSource):
def __init__(self, context, device, acl_in, events_in):
def __init__(self, context, device, metadata, acl_in, events_in):
super().__init__()
self.context = context
self.device = device
self.metadata = metadata
self.acl_in = acl_in
self.events_in = events_in
self.loop = asyncio.get_running_loop()
@@ -510,6 +511,10 @@ async def open_usb_transport(spec):
f'events_in=0x{events_in:02X}, '
)
device_metadata = {
'vendor_id': found.getVendorID(),
'product_id': found.getProductID(),
}
device = found.open()
# Auto-detach the kernel driver if supported
@@ -535,7 +540,7 @@ async def open_usb_transport(spec):
except usb1.USBError:
logger.warning('failed to set configuration')
source = UsbPacketSource(context, device, acl_in, events_in)
source = UsbPacketSource(context, device, device_metadata, acl_in, events_in)
sink = UsbPacketSink(device, acl_out)
return UsbTransport(context, device, interface, setting, source, sink)
except usb1.USBError as error:
+3
View File
@@ -36,6 +36,9 @@ nav:
- HCI Socket: transports/hci_socket.md
- Android Emulator: transports/android_emulator.md
- File: transports/file.md
- Drivers:
- Overview: drivers/index.md
- Realtek: drivers/realtek.md
- API:
- Guide: api/guide.md
- Examples: api/examples.md
+10
View File
@@ -0,0 +1,10 @@
DRIVERS
=======
Some Bluetooth controllers require a driver to function properly.
This may include, for instance, loading a Firmware image or patch,
loading a configuration.
Drivers included in the module are:
* [Realtek](realtek.md): Loading of Firmware and Config for Realtek USB dongles.
+62
View File
@@ -0,0 +1,62 @@
REALTEK DRIVER
==============
This driver supports loading firmware images and optional config data to
USB dongles with a Realtek chipset.
A number of USB dongles are supported, but likely not all.
When using a USB dongle, the USB product ID and manufacturer ID are used
to find whether a matching set of firmware image and config data
is needed for that specific model. If a match exists, the driver will try
load the firmware image and, if needed, config data.
The driver will look for those files by name, in order, in:
* The directory specified by the environment variable `BUMBLE_RTK_FIRMWARE_DIR`
if set.
* The directory `<package-dir>/drivers/rtk_fw` where `<package-dir>` is the directory
where the `bumble` package is installed.
* The current directory.
Obtaining Firmware Images and Config Data
-----------------------------------------
Firmware images and config data may be obtained from a variety of online
sources.
To facilitate finding a downloading the, the utility program `bumble-rtk-fw-download`
may be used.
```
Usage: bumble-rtk-fw-download [OPTIONS]
Download RTK firmware images and configs.
Options:
--output-dir TEXT Output directory where the files will be
saved [default: .]
--source [linux-kernel|realtek-opensource|linux-from-scratch]
[default: linux-kernel]
--single TEXT Only download a single image set, by its
base name
--force Overwrite files if they already exist
--parse Parse the FW image after saving
--help Show this message and exit.
```
Utility
-------
The `bumble-rtk-util` utility may be used to interact with a Realtek USB dongle
and/or firmware images.
```
Usage: bumble-rtk-util [OPTIONS] COMMAND [ARGS]...
Options:
--help Show this message and exit.
Commands:
drop Drop a firmware image from the USB dongle.
info Get the firmware info from a USB dongle.
load Load a firmware image into the USB dongle.
parse Parse a firmware image.
```
+6 -6
View File
@@ -1,11 +1,11 @@
UDP TRANSPORT
=============
WEBSOCKET CLIENT TRANSPORT
==========================
The UDP transport is a UDP socket, receiving packets on a specified port number, and sending packets to a specified host and port number.
The WebSocket Client transport is WebSocket connection to a WebSocket server over which HCI packets
are sent and received.
## Moniker
The moniker syntax for a UDP transport is: `udp:<local-host>:<local-port>,<remote-host>:<remote-port>`.
The moniker syntax for a WebSocket Client transport is: `ws-client:<ws-url>`
!!! example
`udp:0.0.0.0:9000,127.0.0.1:9001`
UDP transport where packets are received on port `9000` and sent to `127.0.0.1` on port `9001`
`ws-client:ws://localhost:1234/some/path`
+8 -6
View File
@@ -1,11 +1,13 @@
UDP TRANSPORT
=============
WEBSOCKET SERVER TRANSPORT
==========================
The UDP transport is a UDP socket, receiving packets on a specified port number, and sending packets to a specified host and port number.
The WebSocket Server transport is WebSocket server that accepts connections from a WebSocket
client. HCI packets are sent and received over the connection.
## Moniker
The moniker syntax for a UDP transport is: `udp:<local-host>:<local-port>,<remote-host>:<remote-port>`.
The moniker syntax for a WebSocket Server transport is: `ws-server:<host>:<port>`,
where `<host>` may be the address of a local network interface, or `_`to accept connections on all local network interfaces. `<port>` is the TCP port number on which to accept connections.
!!! example
`udp:0.0.0.0:9000,127.0.0.1:9001`
UDP transport where packets are received on port `9000` and sent to `127.0.0.1` on port `9001`
`ws-server:_:9001`
+1 -1
View File
@@ -62,7 +62,7 @@ async def main():
print(
f'>>> {color(advertisement.address, address_color)} '
f'[{color(address_type_string, type_color)}]'
f'{address_qualifier}:{separator}RSSI:{advertisement.rssi}'
f'{address_qualifier}:{separator}RSSI: {advertisement.rssi}'
f'{separator}'
f'{advertisement.data.to_string(separator)}'
)
+7 -4
View File
@@ -24,16 +24,17 @@ url = https://github.com/google/bumble
[options]
python_requires = >=3.8
packages = bumble, bumble.transport, bumble.profiles, bumble.apps, bumble.apps.link_relay, bumble.pandora
packages = bumble, bumble.transport, bumble.drivers, bumble.profiles, bumble.apps, bumble.apps.link_relay, bumble.pandora, bumble.tools
package_dir =
bumble = bumble
bumble.apps = apps
include-package-data = True
bumble.tools = tools
include_package_data = True
install_requires =
aiohttp >= 3.8.4; platform_system!='Emscripten'
aiohttp ~= 3.8; platform_system!='Emscripten'
appdirs >= 1.4
bt-test-interfaces >= 0.0.2
click >= 7.1.2; platform_system!='Emscripten'
click == 8.1.3; platform_system!='Emscripten'
cryptography == 35; platform_system!='Emscripten'
grpcio == 1.51.1; platform_system!='Emscripten'
humanize >= 4.6.0
@@ -64,6 +65,8 @@ console_scripts =
bumble-bench = bumble.apps.bench:main
bumble-speaker = bumble.apps.speaker.speaker:main
bumble-pandora-server = bumble.apps.pandora_server:main
bumble-rtk-util = bumble.tools.rtk_util:main
bumble-rtk-fw-download = bumble.tools.rtk_fw_download:main
[options.package_data]
* = py.typed, *.pyi
View File
+149
View File
@@ -0,0 +1,149 @@
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import logging
import pathlib
import urllib.request
import urllib.error
import click
from bumble.colors import color
from bumble.drivers import rtk
from bumble.tools import rtk_util
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
# Constants
# -----------------------------------------------------------------------------
LINUX_KERNEL_GIT_SOURCE = (
"https://git.kernel.org/pub/scm/linux/kernel/git/firmware/linux-firmware.git/plain/rtl_bt",
False,
)
REALTEK_OPENSOURCE_SOURCE = (
"https://github.com/Realtek-OpenSource/android_hardware_realtek/raw/rtk1395/bt/rtkbt/Firmware/BT",
True,
)
LINUX_FROM_SCRATCH_SOURCE = (
"https://anduin.linuxfromscratch.org/sources/linux-firmware/rtl_bt",
False,
)
# -----------------------------------------------------------------------------
# Functions
# -----------------------------------------------------------------------------
def download_file(base_url, name, remove_suffix):
if remove_suffix:
name = name.replace(".bin", "")
url = f"{base_url}/{name}"
with urllib.request.urlopen(url) as file:
data = file.read()
print(f"Downloaded {name}: {len(data)} bytes")
return data
# -----------------------------------------------------------------------------
@click.command
@click.option(
"--output-dir",
default=".",
help="Output directory where the files will be saved",
show_default=True,
)
@click.option(
"--source",
type=click.Choice(["linux-kernel", "realtek-opensource", "linux-from-scratch"]),
default="linux-kernel",
show_default=True,
)
@click.option("--single", help="Only download a single image set, by its base name")
@click.option("--force", is_flag=True, help="Overwrite files if they already exist")
@click.option("--parse", is_flag=True, help="Parse the FW image after saving")
def main(output_dir, source, single, force, parse):
"""Download RTK firmware images and configs."""
# Check that the output dir exists
output_dir = pathlib.Path(output_dir)
if not output_dir.is_dir():
print("Output dir does not exist or is not a directory")
return
base_url, remove_suffix = {
"linux-kernel": LINUX_KERNEL_GIT_SOURCE,
"realtek-opensource": REALTEK_OPENSOURCE_SOURCE,
"linux-from-scratch": LINUX_FROM_SCRATCH_SOURCE,
}[source]
print("Downloading")
print(color("FROM:", "green"), base_url)
print(color("TO:", "green"), output_dir)
if single:
images = [(f"{single}_fw.bin", f"{single}_config.bin", True)]
else:
images = [
(driver_info.fw_name, driver_info.config_name, driver_info.config_needed)
for driver_info in rtk.Driver.DRIVER_INFOS
]
for (fw_name, config_name, config_needed) in images:
print(color("---", "yellow"))
fw_image_out = output_dir / fw_name
if not force and fw_image_out.exists():
print(color(f"{fw_image_out} already exists, skipping", "red"))
continue
if config_name:
config_image_out = output_dir / config_name
if not force and config_image_out.exists():
print(color("f{config_out} already exists, skipping", "red"))
continue
try:
fw_image = download_file(base_url, fw_name, remove_suffix)
except urllib.error.HTTPError as error:
print(f"Failed to download {fw_name}: {error}")
continue
config_image = None
if config_name:
try:
config_image = download_file(base_url, config_name, remove_suffix)
except urllib.error.HTTPError as error:
if config_needed:
print(f"Failed to download {config_name}: {error}")
continue
else:
print(f"No config available as {config_name}")
fw_image_out.write_bytes(fw_image)
if parse and config_name:
print(color("Parsing:", "cyan"), fw_name)
rtk_util.do_parse(fw_image_out)
if config_image:
config_image_out.write_bytes(config_image)
# -----------------------------------------------------------------------------
if __name__ == '__main__':
main()
+161
View File
@@ -0,0 +1,161 @@
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import logging
import asyncio
import os
import click
from bumble import transport
from bumble.host import Host
from bumble.drivers import rtk
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
def do_parse(firmware_path):
with open(firmware_path, 'rb') as firmware_file:
firmware_data = firmware_file.read()
firmware = rtk.Firmware(firmware_data)
print(
f"Firmware: version=0x{firmware.version:08X} "
f"project_id=0x{firmware.project_id:04X}"
)
for patch in firmware.patches:
print(
f" Patch: chip_id=0x{patch[0]:04X}, "
f"{len(patch[1])} bytes, "
f"SVN Version={patch[2]:08X}"
)
# -----------------------------------------------------------------------------
async def do_load(usb_transport, force):
async with await transport.open_transport_or_link(usb_transport) as (
hci_source,
hci_sink,
):
# Create a host to communicate with the device
host = Host(hci_source, hci_sink)
await host.reset(driver_factory=None)
# Get the driver.
driver = await rtk.Driver.for_host(host, force)
if driver is None:
if not force:
print("Firmware already loaded or no supported driver for this device.")
return
await driver.download_firmware()
# -----------------------------------------------------------------------------
async def do_drop(usb_transport):
async with await transport.open_transport_or_link(usb_transport) as (
hci_source,
hci_sink,
):
# Create a host to communicate with the device
host = Host(hci_source, hci_sink)
await host.reset(driver_factory=None)
# Tell the device to reset/drop any loaded patch
await rtk.Driver.drop_firmware(host)
# -----------------------------------------------------------------------------
async def do_info(usb_transport, force):
async with await transport.open_transport(usb_transport) as (
hci_source,
hci_sink,
):
# Create a host to communicate with the device
host = Host(hci_source, hci_sink)
await host.reset(driver_factory=None)
# Check if this is a supported device.
if not force and not rtk.Driver.check(host):
print("USB device not supported by this RTK driver")
return
# Get the driver info.
driver_info = await rtk.Driver.driver_info_for_host(host)
if driver_info:
print(
"Driver:\n"
f" ROM: {driver_info.rom:04X}\n"
f" Firmware: {driver_info.fw_name}\n"
f" Config: {driver_info.config_name}\n"
)
else:
print("Firmware already loaded or no supported driver for this device.")
# -----------------------------------------------------------------------------
@click.group()
def main():
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
@main.command
@click.argument("firmware_path")
def parse(firmware_path):
"""Parse a firmware image."""
do_parse(firmware_path)
@main.command
@click.argument("usb_transport")
@click.option(
"--force",
is_flag=True,
default=False,
help="Load even if the USB info doesn't match",
)
def load(usb_transport, force):
"""Load a firmware image into the USB dongle."""
asyncio.run(do_load(usb_transport, force))
@main.command
@click.argument("usb_transport")
def drop(usb_transport):
"""Drop a firmware image from the USB dongle."""
asyncio.run(do_drop(usb_transport))
@main.command
@click.argument("usb_transport")
@click.option(
"--force",
is_flag=True,
default=False,
help="Try to get the device info even if the USB info doesn't match",
)
def info(usb_transport, force):
"""Get the firmware info from a USB dongle."""
asyncio.run(do_info(usb_transport, force))
# -----------------------------------------------------------------------------
if __name__ == '__main__':
main()