# Copyright 2024 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ----------------------------------------------------------------------------- # Imports # ----------------------------------------------------------------------------- from __future__ import annotations import asyncio import functools import sys import bumble.logging from bumble import core, hci from bumble.device import ChannelSoundingCapabilities, Connection, Device from bumble.transport import open_transport # From https://cs.android.com/android/platform/superproject/main/+/main:packages/modules/Bluetooth/system/gd/hci/distance_measurement_manager.cc. CS_TONE_ANTENNA_CONFIG_MAPPING_TABLE = [ [0, 4, 5, 6], [1, 7, 7, 7], [2, 7, 7, 7], [3, 7, 7, 7], ] CS_PREFERRED_PEER_ANTENNA_MAPPING_TABLE = [1, 1, 1, 1, 3, 7, 15, 3] CS_ANTENNA_PERMUTATION_ARRAY = [ [1, 2, 3, 4], [2, 1, 3, 4], [1, 3, 2, 4], [3, 1, 2, 4], [3, 2, 1, 4], [2, 3, 1, 4], [1, 2, 4, 3], [2, 1, 4, 3], [1, 4, 2, 3], [4, 1, 2, 3], [4, 2, 1, 3], [2, 4, 1, 3], [1, 4, 3, 2], [4, 1, 3, 2], [1, 3, 4, 2], [3, 1, 4, 2], [3, 4, 1, 2], [4, 3, 1, 2], [4, 2, 3, 1], [2, 4, 3, 1], [4, 3, 2, 1], [3, 4, 2, 1], [3, 2, 4, 1], [2, 3, 4, 1], ] # ----------------------------------------------------------------------------- async def main() -> None: if len(sys.argv) < 3: print( 'Usage: run_channel_sounding.py ' '[target_address](If missing, run as reflector)' ) print('example: run_channel_sounding.py cs_reflector.json usb:0') print( 'example: run_channel_sounding.py cs_initiator.json usb:0 F0:F1:F2:F3:F4:F5' ) return print('<<< connecting to HCI...') async with await open_transport(sys.argv[2]) as hci_transport: print('<<< connected') device = Device.from_config_file_with_hci( sys.argv[1], hci_transport.source, hci_transport.sink ) await device.power_on() assert (local_cs_capabilities := device.cs_capabilities) if len(sys.argv) == 3: print('<<< Start Advertising') await device.start_advertising( own_address_type=hci.OwnAddressType.RANDOM, auto_restart=True ) def on_cs_capabilities( connection: Connection, capabilities: ChannelSoundingCapabilities ): del capabilities print('<<< Set CS Settings') asyncio.create_task(device.set_default_cs_settings(connection)) device.on( 'connection', lambda connection: connection.on( 'channel_sounding_capabilities', functools.partial(on_cs_capabilities, connection), ), ) else: target_address = hci.Address(sys.argv[3]) print(f'<<< Connecting to {target_address}') connection = await device.connect( target_address, transport=core.PhysicalTransport.LE ) print('<<< ACL Connected') if not (await device.get_long_term_key(connection.handle, b'', 0)): print('<<< No bond, start pairing') await connection.pair() print('<<< Pairing complete') print('<<< Encrypting Connection') await connection.encrypt() print('<<< Getting remote CS Capabilities...') remote_capabilities = await device.get_remote_cs_capabilities(connection) print('<<< Set CS Settings...') await device.set_default_cs_settings(connection) print('<<< Set CS Config...') config = await device.create_cs_config(connection) print('<<< Enable CS Security...') await device.enable_cs_security(connection) tone_antenna_config_selection = CS_TONE_ANTENNA_CONFIG_MAPPING_TABLE[ local_cs_capabilities.num_antennas_supported - 1 ][remote_capabilities.num_antennas_supported - 1] print('<<< Set CS Procedure Parameters...') await device.set_cs_procedure_parameters( connection=connection, config=config, tone_antenna_config_selection=tone_antenna_config_selection, preferred_peer_antenna=CS_PREFERRED_PEER_ANTENNA_MAPPING_TABLE[ tone_antenna_config_selection ], ) print('<<< Enable CS Procedure...') await device.enable_cs_procedure(connection=connection, config=config) await hci_transport.source.terminated # ----------------------------------------------------------------------------- bumble.logging.setup_basic_logging('DEBUG') asyncio.run(main())