Merge pull request #619 from zxzxwu/cs

Channel Sounding
This commit is contained in:
zxzxwu
2025-02-01 03:46:59 +08:00
committed by GitHub
8 changed files with 640 additions and 9 deletions

View File

@@ -120,6 +120,8 @@ DEVICE_MIN_EXTENDED_ADVERTISING_SET_HANDLE = 0x00
DEVICE_MAX_EXTENDED_ADVERTISING_SET_HANDLE = 0xEF
DEVICE_MIN_BIG_HANDLE = 0x00
DEVICE_MAX_BIG_HANDLE = 0xEF
DEVICE_MIN_CS_CONFIG_ID = 0x00
DEVICE_MAX_CS_CONFIG_ID = 0x03
DEVICE_DEFAULT_ADDRESS = '00:00:00:00:00:00'
DEVICE_DEFAULT_ADVERTISING_INTERVAL = 1000 # ms
@@ -1117,6 +1119,72 @@ class BigSync(EventEmitter):
await terminated.wait()
# -----------------------------------------------------------------------------
@dataclass
class ChannelSoundingCapabilities:
num_config_supported: int
max_consecutive_procedures_supported: int
num_antennas_supported: int
max_antenna_paths_supported: int
roles_supported: int
modes_supported: int
rtt_capability: int
rtt_aa_only_n: int
rtt_sounding_n: int
rtt_random_payload_n: int
nadm_sounding_capability: int
nadm_random_capability: int
cs_sync_phys_supported: int
subfeatures_supported: int
t_ip1_times_supported: int
t_ip2_times_supported: int
t_fcs_times_supported: int
t_pm_times_supported: int
t_sw_time_supported: int
tx_snr_capability: int
# -----------------------------------------------------------------------------
@dataclass
class ChannelSoundingConfig:
config_id: int
main_mode_type: int
sub_mode_type: int
min_main_mode_steps: int
max_main_mode_steps: int
main_mode_repetition: int
mode_0_steps: int
role: int
rtt_type: int
cs_sync_phy: int
channel_map: bytes
channel_map_repetition: int
channel_selection_type: int
ch3c_shape: int
ch3c_jump: int
reserved: int
t_ip1_time: int
t_ip2_time: int
t_fcs_time: int
t_pm_time: int
# -----------------------------------------------------------------------------
@dataclass
class ChannelSoundingProcedure:
config_id: int
state: int
tone_antenna_config_selection: int
selected_tx_power: int
subevent_len: int
subevents_per_event: int
subevent_interval: int
event_interval: int
procedure_interval: int
procedure_count: int
max_procedure_len: int
# -----------------------------------------------------------------------------
class LePhyOptions:
# Coded PHY preference
@@ -1452,6 +1520,8 @@ class Connection(CompositeEventEmitter):
gatt_client: gatt_client.Client
pairing_peer_io_capability: Optional[int]
pairing_peer_authentication_requirements: Optional[int]
cs_configs: dict[int, ChannelSoundingConfig] = {} # Config ID to Configuration
cs_procedures: dict[int, ChannelSoundingProcedure] = {} # Config ID to Procedures
@composite_listener
class Listener:
@@ -1746,6 +1816,7 @@ class DeviceConfiguration:
address_resolution_offload: bool = False
address_generation_offload: bool = False
cis_enabled: bool = False
channel_sounding_enabled: bool = False
identity_address_type: Optional[int] = None
io_capability: int = pairing.PairingDelegate.IoCapability.NO_OUTPUT_NO_INPUT
gap_service_enabled: bool = True
@@ -1916,6 +1987,7 @@ class Device(CompositeEventEmitter):
gatt_server: gatt_server.Server
advertising_data: bytes
scan_response_data: bytes
cs_capabilities: ChannelSoundingCapabilities | None = None
connections: Dict[int, Connection]
pending_connections: Dict[hci.Address, Connection]
classic_pending_accepts: Dict[
@@ -2432,6 +2504,41 @@ class Device(CompositeEventEmitter):
check_result=True,
)
if self.config.channel_sounding_enabled:
await self.send_command(
hci.HCI_LE_Set_Host_Feature_Command(
bit_number=hci.LeFeature.CHANNEL_SOUNDING_HOST_SUPPORT,
bit_value=1,
),
check_result=True,
)
result = await self.send_command(
hci.HCI_LE_CS_Read_Local_Supported_Capabilities_Command(),
check_result=True,
)
self.cs_capabilities = ChannelSoundingCapabilities(
num_config_supported=result.return_parameters.num_config_supported,
max_consecutive_procedures_supported=result.return_parameters.max_consecutive_procedures_supported,
num_antennas_supported=result.return_parameters.num_antennas_supported,
max_antenna_paths_supported=result.return_parameters.max_antenna_paths_supported,
roles_supported=result.return_parameters.roles_supported,
modes_supported=result.return_parameters.modes_supported,
rtt_capability=result.return_parameters.rtt_capability,
rtt_aa_only_n=result.return_parameters.rtt_aa_only_n,
rtt_sounding_n=result.return_parameters.rtt_sounding_n,
rtt_random_payload_n=result.return_parameters.rtt_random_payload_n,
nadm_sounding_capability=result.return_parameters.nadm_sounding_capability,
nadm_random_capability=result.return_parameters.nadm_random_capability,
cs_sync_phys_supported=result.return_parameters.cs_sync_phys_supported,
subfeatures_supported=result.return_parameters.subfeatures_supported,
t_ip1_times_supported=result.return_parameters.t_ip1_times_supported,
t_ip2_times_supported=result.return_parameters.t_ip2_times_supported,
t_fcs_times_supported=result.return_parameters.t_fcs_times_supported,
t_pm_times_supported=result.return_parameters.t_pm_times_supported,
t_sw_time_supported=result.return_parameters.t_sw_time_supported,
tx_snr_capability=result.return_parameters.tx_snr_capability,
)
if self.classic_enabled:
await self.send_command(
hci.HCI_Write_Local_Name_Command(local_name=self.name.encode('utf8'))
@@ -4478,6 +4585,213 @@ class Device(CompositeEventEmitter):
)
return await read_feature_future
@experimental('Only for testing.')
async def get_remote_cs_capabilities(
self, connection: Connection
) -> ChannelSoundingCapabilities:
complete_future: asyncio.Future[ChannelSoundingCapabilities] = (
asyncio.get_running_loop().create_future()
)
with closing(EventWatcher()) as watcher:
watcher.once(
connection, 'channel_sounding_capabilities', complete_future.set_result
)
watcher.once(
connection,
'channel_sounding_capabilities_failure',
lambda status: complete_future.set_exception(hci.HCI_Error(status)),
)
await self.send_command(
hci.HCI_LE_CS_Read_Remote_Supported_Capabilities_Command(
connection_handle=connection.handle
),
check_result=True,
)
return await complete_future
@experimental('Only for testing.')
async def set_default_cs_settings(
self,
connection: Connection,
role_enable: int = (
hci.CsRoleMask.INITIATOR | hci.CsRoleMask.REFLECTOR
), # Both role
cs_sync_antenna_selection: int = 0xFF, # No Preference
max_tx_power: int = 0x04, # 4 dB
) -> None:
await self.send_command(
hci.HCI_LE_CS_Set_Default_Settings_Command(
connection_handle=connection.handle,
role_enable=role_enable,
cs_sync_antenna_selection=cs_sync_antenna_selection,
max_tx_power=max_tx_power,
),
check_result=True,
)
@experimental('Only for testing.')
async def create_cs_config(
self,
connection: Connection,
config_id: int | None = None,
create_context: int = 0x01,
main_mode_type: int = 0x02,
sub_mode_type: int = 0xFF,
min_main_mode_steps: int = 0x02,
max_main_mode_steps: int = 0x05,
main_mode_repetition: int = 0x00,
mode_0_steps: int = 0x03,
role: int = hci.CsRole.INITIATOR,
rtt_type: int = hci.RttType.AA_ONLY,
cs_sync_phy: int = hci.CsSyncPhy.LE_1M,
channel_map: bytes = b'\x54\x55\x55\x54\x55\x55\x55\x55\x55\x15',
channel_map_repetition: int = 0x01,
channel_selection_type: int = hci.HCI_LE_CS_Create_Config_Command.ChannelSelectionType.ALGO_3B,
ch3c_shape: int = hci.HCI_LE_CS_Create_Config_Command.Ch3cShape.HAT,
ch3c_jump: int = 0x03,
) -> ChannelSoundingConfig:
complete_future: asyncio.Future[ChannelSoundingConfig] = (
asyncio.get_running_loop().create_future()
)
if config_id is None:
# Allocate an ID.
config_id = next(
(
i
for i in range(DEVICE_MIN_CS_CONFIG_ID, DEVICE_MAX_CS_CONFIG_ID + 1)
if i not in connection.cs_configs
),
None,
)
if config_id is None:
raise OutOfResourcesError("No available config ID on this connection!")
with closing(EventWatcher()) as watcher:
watcher.once(
connection, 'channel_sounding_config', complete_future.set_result
)
watcher.once(
connection,
'channel_sounding_config_failure',
lambda status: complete_future.set_exception(hci.HCI_Error(status)),
)
await self.send_command(
hci.HCI_LE_CS_Create_Config_Command(
connection_handle=connection.handle,
config_id=config_id,
create_context=create_context,
main_mode_type=main_mode_type,
sub_mode_type=sub_mode_type,
min_main_mode_steps=min_main_mode_steps,
max_main_mode_steps=max_main_mode_steps,
main_mode_repetition=main_mode_repetition,
mode_0_steps=mode_0_steps,
role=role,
rtt_type=rtt_type,
cs_sync_phy=cs_sync_phy,
channel_map=channel_map,
channel_map_repetition=channel_map_repetition,
channel_selection_type=channel_selection_type,
ch3c_shape=ch3c_shape,
ch3c_jump=ch3c_jump,
reserved=0x00,
),
check_result=True,
)
return await complete_future
@experimental('Only for testing.')
async def enable_cs_security(self, connection: Connection) -> None:
complete_future: asyncio.Future[None] = (
asyncio.get_running_loop().create_future()
)
with closing(EventWatcher()) as watcher:
def on_event(event: hci.HCI_LE_CS_Security_Enable_Complete_Event) -> None:
if event.connection_handle != connection.handle:
return
if event.status == hci.HCI_SUCCESS:
complete_future.set_result(None)
else:
complete_future.set_exception(hci.HCI_Error(event.status))
watcher.once(self.host, 'cs_security', on_event)
await self.send_command(
hci.HCI_LE_CS_Security_Enable_Command(
connection_handle=connection.handle
),
check_result=True,
)
return await complete_future
@experimental('Only for testing.')
async def set_cs_procedure_parameters(
self,
connection: Connection,
config: ChannelSoundingConfig,
tone_antenna_config_selection=0x00,
preferred_peer_antenna=0x00,
max_procedure_len=0x2710, # 6.25s
min_procedure_interval=0x01,
max_procedure_interval=0xFF,
max_procedure_count=0x01,
min_subevent_len=0x0004E2, # 1250us
max_subevent_len=0x1E8480, # 2s
phy=hci.CsSyncPhy.LE_1M,
tx_power_delta=0x00,
snr_control_initiator=hci.CsSnr.NOT_APPLIED,
snr_control_reflector=hci.CsSnr.NOT_APPLIED,
) -> None:
await self.send_command(
hci.HCI_LE_CS_Set_Procedure_Parameters_Command(
connection_handle=connection.handle,
config_id=config.config_id,
max_procedure_len=max_procedure_len,
min_procedure_interval=min_procedure_interval,
max_procedure_interval=max_procedure_interval,
max_procedure_count=max_procedure_count,
min_subevent_len=min_subevent_len,
max_subevent_len=max_subevent_len,
tone_antenna_config_selection=tone_antenna_config_selection,
phy=phy,
tx_power_delta=tx_power_delta,
preferred_peer_antenna=preferred_peer_antenna,
snr_control_initiator=snr_control_initiator,
snr_control_reflector=snr_control_reflector,
),
check_result=True,
)
@experimental('Only for testing.')
async def enable_cs_procedure(
self,
connection: Connection,
config: ChannelSoundingConfig,
enabled: bool = True,
) -> ChannelSoundingProcedure:
complete_future: asyncio.Future[ChannelSoundingProcedure] = (
asyncio.get_running_loop().create_future()
)
with closing(EventWatcher()) as watcher:
watcher.once(
connection, 'channel_sounding_procedure', complete_future.set_result
)
watcher.once(
connection,
'channel_sounding_procedure_failure',
lambda x: complete_future.set_exception(hci.HCI_Error(x)),
)
await self.send_command(
hci.HCI_LE_CS_Procedure_Enable_Command(
connection_handle=connection.handle,
config_id=config.config_id,
enable=enabled,
),
check_result=True,
)
return await complete_future
@host_event_handler
def on_flush(self):
self.emit('flush')
@@ -5441,6 +5755,106 @@ class Device(CompositeEventEmitter):
)
connection.emit('connection_data_length_change')
@host_event_handler
def on_cs_remote_supported_capabilities(
self, event: hci.HCI_LE_CS_Read_Remote_Supported_Capabilities_Complete_Event
):
if not (connection := self.lookup_connection(event.connection_handle)):
return
if event.status != hci.HCI_SUCCESS:
connection.emit('channel_sounding_capabilities_failure', event.status)
return
capabilities = ChannelSoundingCapabilities(
num_config_supported=event.num_config_supported,
max_consecutive_procedures_supported=event.max_consecutive_procedures_supported,
num_antennas_supported=event.num_antennas_supported,
max_antenna_paths_supported=event.max_antenna_paths_supported,
roles_supported=event.roles_supported,
modes_supported=event.modes_supported,
rtt_capability=event.rtt_capability,
rtt_aa_only_n=event.rtt_aa_only_n,
rtt_sounding_n=event.rtt_sounding_n,
rtt_random_payload_n=event.rtt_random_payload_n,
nadm_sounding_capability=event.nadm_sounding_capability,
nadm_random_capability=event.nadm_random_capability,
cs_sync_phys_supported=event.cs_sync_phys_supported,
subfeatures_supported=event.subfeatures_supported,
t_ip1_times_supported=event.t_ip1_times_supported,
t_ip2_times_supported=event.t_ip2_times_supported,
t_fcs_times_supported=event.t_fcs_times_supported,
t_pm_times_supported=event.t_pm_times_supported,
t_sw_time_supported=event.t_sw_time_supported,
tx_snr_capability=event.tx_snr_capability,
)
connection.emit('channel_sounding_capabilities', capabilities)
@host_event_handler
def on_cs_config(self, event: hci.HCI_LE_CS_Config_Complete_Event):
if not (connection := self.lookup_connection(event.connection_handle)):
return
if event.status != hci.HCI_SUCCESS:
connection.emit('channel_sounding_config_failure', event.status)
return
if event.action == hci.HCI_LE_CS_Config_Complete_Event.Action.CREATED:
config = ChannelSoundingConfig(
config_id=event.config_id,
main_mode_type=event.main_mode_type,
sub_mode_type=event.sub_mode_type,
min_main_mode_steps=event.min_main_mode_steps,
max_main_mode_steps=event.max_main_mode_steps,
main_mode_repetition=event.main_mode_repetition,
mode_0_steps=event.mode_0_steps,
role=event.role,
rtt_type=event.rtt_type,
cs_sync_phy=event.cs_sync_phy,
channel_map=event.channel_map,
channel_map_repetition=event.channel_map_repetition,
channel_selection_type=event.channel_selection_type,
ch3c_shape=event.ch3c_shape,
ch3c_jump=event.ch3c_jump,
reserved=event.reserved,
t_ip1_time=event.t_ip1_time,
t_ip2_time=event.t_ip2_time,
t_fcs_time=event.t_fcs_time,
t_pm_time=event.t_pm_time,
)
connection.cs_configs[event.config_id] = config
connection.emit('channel_sounding_config', config)
elif event.action == hci.HCI_LE_CS_Config_Complete_Event.Action.REMOVED:
try:
config = connection.cs_configs.pop(event.config_id)
connection.emit('channel_sounding_config_removed', config.config_id)
except KeyError:
logger.error('Removing unknown config %d', event.config_id)
@host_event_handler
def on_cs_procedure(self, event: hci.HCI_LE_CS_Procedure_Enable_Complete_Event):
if not (connection := self.lookup_connection(event.connection_handle)):
return
if event.status != hci.HCI_SUCCESS:
connection.emit('channel_sounding_procedure_failure', event.status)
return
procedure = ChannelSoundingProcedure(
config_id=event.config_id,
state=event.state,
tone_antenna_config_selection=event.tone_antenna_config_selection,
selected_tx_power=event.selected_tx_power,
subevent_len=event.subevent_len,
subevents_per_event=event.subevents_per_event,
subevent_interval=event.subevent_interval,
event_interval=event.event_interval,
procedure_interval=event.procedure_interval,
procedure_count=event.procedure_count,
max_procedure_len=event.max_procedure_len,
)
connection.cs_procedures[procedure.config_id] = procedure
connection.emit('channel_sounding_procedure', procedure)
# [Classic only]
@host_event_handler
@with_connection_from_address
@@ -5498,14 +5912,14 @@ class Device(CompositeEventEmitter):
if att_pdu.op_code & 1:
if connection.gatt_client is None:
logger.warning(
color('no GATT client for connection 0x{connection_handle:04X}')
'No GATT client for connection 0x%04X', connection.handle
)
return
connection.gatt_client.on_gatt_pdu(att_pdu)
else:
if connection.gatt_server is None:
logger.warning(
color('no GATT server for connection 0x{connection_handle:04X}')
'No GATT server for connection 0x%04X', connection.handle
)
return
connection.gatt_server.on_gatt_pdu(connection, att_pdu)