more complete set of HCI types and constants

This commit is contained in:
Gilles Boccon-Gibod
2022-07-15 09:46:49 -07:00
parent b64fa65921
commit ebd0a0c8ca
6 changed files with 1352 additions and 584 deletions
+12 -12
View File
@@ -77,7 +77,7 @@ class Controller:
self.le_features = bytes.fromhex('ff49010000000000') self.le_features = bytes.fromhex('ff49010000000000')
self.le_states = bytes.fromhex('ffff3fffff030000') self.le_states = bytes.fromhex('ffff3fffff030000')
self.avertising_channel_tx_power = 0 self.avertising_channel_tx_power = 0
self.white_list_size = 8 self.filter_accept_list_size = 8
self.resolving_list_size = 8 self.resolving_list_size = 8
self.supported_max_tx_octets = 27 self.supported_max_tx_octets = 27
self.supported_max_tx_time = 10000 # microseconds self.supported_max_tx_time = 10000 # microseconds
@@ -731,27 +731,27 @@ class Controller:
''' '''
return bytes([HCI_SUCCESS]) return bytes([HCI_SUCCESS])
def on_hci_le_read_white_list_size_command(self, command): def on_hci_le_read_filter_accept_list_size_command(self, command):
''' '''
See Bluetooth spec Vol 2, Part E - 7.8.14 LE Read White List Size Command See Bluetooth spec Vol 2, Part E - 7.8.14 LE Read Filter Accept List Size Command
''' '''
return bytes([HCI_SUCCESS, self.white_list_size]) return bytes([HCI_SUCCESS, self.filter_accept_list_size])
def on_hci_le_clear_white_list_command(self, command): def on_hci_le_clear_filter_accept_list_command(self, command):
''' '''
See Bluetooth spec Vol 2, Part E - 7.8.15 LE Clear White List Command See Bluetooth spec Vol 2, Part E - 7.8.15 LE Clear Filter Accept List Command
''' '''
return bytes([HCI_SUCCESS]) return bytes([HCI_SUCCESS])
def on_hci_le_add_device_to_white_list_command(self, command): def on_hci_le_add_device_to_filter_accept_list_command(self, command):
''' '''
See Bluetooth spec Vol 2, Part E - 7.8.16 LE Add Device To White List Command See Bluetooth spec Vol 2, Part E - 7.8.16 LE Add Device To Filter Accept List Command
''' '''
return bytes([HCI_SUCCESS]) return bytes([HCI_SUCCESS])
def on_hci_le_remove_device_from_white_list_command(self, command): def on_hci_le_remove_device_from_filter_accept_list_command(self, command):
''' '''
See Bluetooth spec Vol 2, Part E - 7.8.17 LE Remove Device From White List Command See Bluetooth spec Vol 2, Part E - 7.8.17 LE Remove Device From Filter Accept List Command
''' '''
return bytes([HCI_SUCCESS]) return bytes([HCI_SUCCESS])
@@ -780,9 +780,9 @@ class Controller:
''' '''
return bytes([HCI_SUCCESS]) + struct.pack('Q', random.randint(0, 1 << 64)) return bytes([HCI_SUCCESS]) + struct.pack('Q', random.randint(0, 1 << 64))
def on_hci_le_start_encryption_command(self, command): def on_hci_le_enable_encryption_command(self, command):
''' '''
See Bluetooth spec Vol 2, Part E - 7.8.24 LE Start Encryption Command See Bluetooth spec Vol 2, Part E - 7.8.24 LE Enable Encryption Command
''' '''
# Check the parameters # Check the parameters
+4 -3
View File
@@ -303,6 +303,7 @@ class DeviceConfiguration:
# within a class requires unnecessarily complicated acrobatics) # within a class requires unnecessarily complicated acrobatics)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# Decorator that converts the first argument from a connection handle to a connection # Decorator that converts the first argument from a connection handle to a connection
def with_connection_from_handle(function): def with_connection_from_handle(function):
@functools.wraps(function) @functools.wraps(function)
@@ -773,7 +774,7 @@ class Device(CompositeEventEmitter):
try: try:
peer_address = Address(peer_address) peer_address = Address(peer_address)
except ValueError: except ValueError:
# If the address is not parssable, assume it is a name instead # If the address is not parsable, assume it is a name instead
logger.debug('looking for peer by name') logger.debug('looking for peer by name')
peer_address = await self.find_peer_by_name(peer_address, transport) peer_address = await self.find_peer_by_name(peer_address, transport)
@@ -1045,7 +1046,7 @@ class Device(CompositeEventEmitter):
raise InvalidStateError('only centrals can start encryption') raise InvalidStateError('only centrals can start encryption')
result = await self.send_command( result = await self.send_command(
HCI_LE_Start_Encryption_Command( HCI_LE_Enable_Encryption_Command(
connection_handle = connection.handle, connection_handle = connection.handle,
random_number = rand, random_number = rand,
encrypted_diversifier = ediv, encrypted_diversifier = ediv,
@@ -1054,7 +1055,7 @@ class Device(CompositeEventEmitter):
) )
if result.status != HCI_COMMAND_STATUS_PENDING: if result.status != HCI_COMMAND_STATUS_PENDING:
logger.warn(f'HCI_LE_Start_Encryption_Command failed: {HCI_Constant.error_name(result.status)}') logger.warn(f'HCI_LE_Enable_Encryption_Command failed: {HCI_Constant.error_name(result.status)}')
raise HCI_Error(result.status) raise HCI_Error(result.status)
else: else:
result = await self.send_command( result = await self.send_command(
+1235 -541
View File
File diff suppressed because it is too large Load Diff
+76 -21
View File
@@ -81,7 +81,9 @@ class Host(EventEmitter):
self.hc_total_num_acl_data_packets = HOST_HC_TOTAL_NUM_ACL_DATA_PACKETS self.hc_total_num_acl_data_packets = HOST_HC_TOTAL_NUM_ACL_DATA_PACKETS
self.acl_packet_queue = collections.deque() self.acl_packet_queue = collections.deque()
self.acl_packets_in_flight = 0 self.acl_packets_in_flight = 0
self.local_version = None
self.local_supported_commands = bytes(64) self.local_supported_commands = bytes(64)
self.local_le_features = 0
self.command_semaphore = asyncio.Semaphore(1) self.command_semaphore = asyncio.Semaphore(1)
self.long_term_key_provider = None self.long_term_key_provider = None
self.link_key_provider = None self.link_key_provider = None
@@ -97,34 +99,51 @@ class Host(EventEmitter):
await self.send_command(HCI_Reset_Command()) await self.send_command(HCI_Reset_Command())
self.ready = True self.ready = True
response = await self.send_command(HCI_Read_Local_Supported_Commands_Command())
if response.return_parameters.status != HCI_SUCCESS:
raise ProtocolError(response.return_parameters.status, 'hci')
self.local_supported_commands = response.return_parameters.supported_commands
await self.send_command(HCI_Set_Event_Mask_Command(event_mask = bytes.fromhex('FFFFFFFFFFFFFFFF'))) await self.send_command(HCI_Set_Event_Mask_Command(event_mask = bytes.fromhex('FFFFFFFFFFFFFFFF')))
await self.send_command(HCI_LE_Set_Event_Mask_Command(le_event_mask = bytes.fromhex('FFFFF00000000000'))) await self.send_command(HCI_LE_Set_Event_Mask_Command(le_event_mask = bytes.fromhex('FFFFF00000000000')))
await self.send_command(HCI_Read_Local_Version_Information_Command())
await self.send_command(HCI_Write_LE_Host_Support_Command(le_supported_host = 1, simultaneous_le_host = 0))
response = await self.send_command(HCI_LE_Read_Buffer_Size_Command()) response = await self.send_command(HCI_Read_Local_Supported_Commands_Command())
if response.return_parameters.status == HCI_SUCCESS: if response.return_parameters.status == HCI_SUCCESS:
self.hc_le_acl_data_packet_length = response.return_parameters.hc_le_acl_data_packet_length self.local_supported_commands = response.return_parameters.supported_commands
self.hc_total_num_le_acl_data_packets = response.return_parameters.hc_total_num_le_acl_data_packets
logger.debug(f'HCI LE ACL flow control: hc_le_acl_data_packet_length={response.return_parameters.hc_le_acl_data_packet_length}, hc_total_num_le_acl_data_packets={response.return_parameters.hc_total_num_le_acl_data_packets}')
else: else:
logger.warn(f'HCI_LE_Read_Buffer_Size_Command failed: {response.return_parameters.status}') logger.warn(f'HCI_Read_Local_Supported_Commands_Command failed: {response.return_parameters.status}')
if response.return_parameters.hc_le_acl_data_packet_length == 0 or response.return_parameters.hc_total_num_le_acl_data_packets == 0:
# Read the non-LE-specific values if self.supports_command(HCI_WRITE_LE_HOST_SUPPORT_COMMAND):
response = await self.send_command(HCI_Read_Buffer_Size_Command()) await self.send_command(HCI_Write_LE_Host_Support_Command(le_supported_host = 1, simultaneous_le_host = 0))
if self.supports_command(HCI_READ_LOCAL_VERSION_INFORMATION_COMMAND):
response = await self.send_command(HCI_Read_Local_Version_Information_Command())
if response.return_parameters.status == HCI_SUCCESS: if response.return_parameters.status == HCI_SUCCESS:
self.hc_acl_data_packet_length = response.return_parameters.hc_le_acl_data_packet_length self.local_version = response.return_parameters
self.hc_le_acl_data_packet_length = self.hc_le_acl_data_packet_length or self.hc_acl_data_packet_length
self.hc_total_num_acl_data_packets = response.return_parameters.hc_total_num_le_acl_data_packets
self.hc_total_num_le_acl_data_packets = self.hc_total_num_le_acl_data_packets or self.hc_total_num_acl_data_packets
logger.debug(f'HCI LE ACL flow control: hc_le_acl_data_packet_length={self.hc_le_acl_data_packet_length}, hc_total_num_le_acl_data_packets={self.hc_total_num_le_acl_data_packets}')
else: else:
logger.warn(f'HCI_Read_Buffer_Size_Command failed: {response.return_parameters.status}') logger.warn(f'HCI_Read_Local_Version_Information_Command failed: {response.return_parameters.status}')
if self.supports_command(HCI_LE_READ_BUFFER_SIZE_COMMAND):
response = await self.send_command(HCI_LE_Read_Buffer_Size_Command())
if response.return_parameters.status == HCI_SUCCESS:
self.hc_le_acl_data_packet_length = response.return_parameters.hc_le_acl_data_packet_length
self.hc_total_num_le_acl_data_packets = response.return_parameters.hc_total_num_le_acl_data_packets
logger.debug(f'HCI LE ACL flow control: hc_le_acl_data_packet_length={response.return_parameters.hc_le_acl_data_packet_length}, hc_total_num_le_acl_data_packets={response.return_parameters.hc_total_num_le_acl_data_packets}')
else:
logger.warn(f'HCI_LE_Read_Buffer_Size_Command failed: {response.return_parameters.status}')
if response.return_parameters.hc_le_acl_data_packet_length == 0 or response.return_parameters.hc_total_num_le_acl_data_packets == 0:
# Read the non-LE-specific values
response = await self.send_command(HCI_Read_Buffer_Size_Command())
if response.return_parameters.status == HCI_SUCCESS:
self.hc_acl_data_packet_length = response.return_parameters.hc_le_acl_data_packet_length
self.hc_le_acl_data_packet_length = self.hc_le_acl_data_packet_length or self.hc_acl_data_packet_length
self.hc_total_num_acl_data_packets = response.return_parameters.hc_total_num_le_acl_data_packets
self.hc_total_num_le_acl_data_packets = self.hc_total_num_le_acl_data_packets or self.hc_total_num_acl_data_packets
logger.debug(f'HCI LE ACL flow control: hc_le_acl_data_packet_length={self.hc_le_acl_data_packet_length}, hc_total_num_le_acl_data_packets={self.hc_total_num_le_acl_data_packets}')
else:
logger.warn(f'HCI_Read_Buffer_Size_Command failed: {response.return_parameters.status}')
if self.supports_command(HCI_LE_READ_LOCAL_SUPPORTED_FEATURES_COMMAND):
response = await self.send_command(HCI_LE_Read_Local_Supported_Features_Command())
if response.return_parameters.status == HCI_SUCCESS:
self.local_le_features = struct.unpack('<Q', response.return_parameters.le_features)[0]
else:
logger.warn(f'HCI_LE_Read_Supported_Features_Command failed: {response.return_parameters.status}')
self.reset_done = True self.reset_done = True
@@ -211,6 +230,42 @@ class Host(EventEmitter):
self.send_hci_packet(packet) self.send_hci_packet(packet)
self.acl_packets_in_flight += 1 self.acl_packets_in_flight += 1
def supports_command(self, command):
# Find the support flag position for this command
for (octet, flags) in enumerate(HCI_SUPPORTED_COMMANDS_FLAGS):
for (flag_position, value) in enumerate(flags):
if value == command:
# Check if the flag is set
if octet < len(self.local_supported_commands) and flag_position < 8:
return (self.local_supported_commands[octet] & (1 << flag_position)) != 0
return False
@property
def supported_commands(self):
commands = []
for (octet, flags) in enumerate(self.local_supported_commands):
if octet < len(HCI_SUPPORTED_COMMANDS_FLAGS):
for flag in range(8):
if flags & (1 << flag) != 0:
command = HCI_SUPPORTED_COMMANDS_FLAGS[octet][flag]
if command is not None:
commands.append(command)
return commands
def supports_le_feature(self, feature):
return (self.local_le_features & (1 << feature)) != 0
@property
def supported_le_features(self):
features = []
for bit in range(64):
if self.local_le_features & (1 << bit):
features.append(bit)
return features
# Packet Sink protocol (packets coming from the controller via HCI) # Packet Sink protocol (packets coming from the controller via HCI)
def on_packet(self, packet): def on_packet(self, packet):
hci_packet = HCI_Packet.from_bytes(packet) hci_packet = HCI_Packet.from_bytes(packet)
+1 -1
View File
@@ -852,7 +852,7 @@ class Session:
# distribute the long term and/or other keys over an encrypted connection # distribute the long term and/or other keys over an encrypted connection
asyncio.create_task( asyncio.create_task(
self.manager.device.host.send_command( self.manager.device.host.send_command(
HCI_LE_Start_Encryption_Command( HCI_LE_Enable_Encryption_Command(
connection_handle = self.connection.handle, connection_handle = self.connection.handle,
random_number = bytes(8), random_number = bytes(8),
encrypted_diversifier = 0, encrypted_diversifier = 0,
+24 -6
View File
@@ -294,8 +294,8 @@ def test_HCI_LE_Create_Connection_Command():
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_HCI_LE_Add_Device_To_White_List_Command(): def test_HCI_LE_Add_Device_To_Filter_Accept_List_Command():
command = HCI_LE_Add_Device_To_White_List_Command( command = HCI_LE_Add_Device_To_Filter_Accept_List_Command(
address_type = 1, address_type = 1,
address = Address('00:11:22:33:44:55') address = Address('00:11:22:33:44:55')
) )
@@ -303,8 +303,8 @@ def test_HCI_LE_Add_Device_To_White_List_Command():
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_HCI_LE_Remove_Device_From_White_List_Command(): def test_HCI_LE_Remove_Device_From_Filter_Accept_List_Command():
command = HCI_LE_Remove_Device_From_White_List_Command( command = HCI_LE_Remove_Device_From_Filter_Accept_List_Command(
address_type = 1, address_type = 1,
address = Address('00:11:22:33:44:55') address = Address('00:11:22:33:44:55')
) )
@@ -343,6 +343,23 @@ def test_HCI_LE_Set_Default_PHY_Command():
basic_check(command) basic_check(command)
# -----------------------------------------------------------------------------
def test_HCI_LE_Set_Extended_Scan_Parameters_Command():
command = HCI_LE_Set_Extended_Scan_Parameters_Command(
own_address_type=Address.RANDOM_DEVICE_ADDRESS,
scanning_filter_policy=HCI_LE_Set_Extended_Scan_Parameters_Command.BASIC_FILTERED_POLICY,
scanning_phys=(1 << HCI_LE_Set_Extended_Scan_Parameters_Command.LE_1M_PHY | 1 << HCI_LE_Set_Extended_Scan_Parameters_Command.LE_CODED_PHY | 1 << 4),
scan_types=[
HCI_LE_Set_Extended_Scan_Parameters_Command.ACTIVE_SCANNING,
HCI_LE_Set_Extended_Scan_Parameters_Command.ACTIVE_SCANNING,
HCI_LE_Set_Extended_Scan_Parameters_Command.PASSIVE_SCANNING
],
scan_intervals=[1, 2, 3],
scan_windows=[4, 5, 6]
)
basic_check(command)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_address(): def test_address():
a = Address('C4:F2:17:1A:1D:BB') a = Address('C4:F2:17:1A:1D:BB')
@@ -391,11 +408,12 @@ def run_test_commands():
test_HCI_LE_Set_Scan_Parameters_Command() test_HCI_LE_Set_Scan_Parameters_Command()
test_HCI_LE_Set_Scan_Enable_Command() test_HCI_LE_Set_Scan_Enable_Command()
test_HCI_LE_Create_Connection_Command() test_HCI_LE_Create_Connection_Command()
test_HCI_LE_Add_Device_To_White_List_Command() test_HCI_LE_Add_Device_To_Filter_Accept_List_Command()
test_HCI_LE_Remove_Device_From_White_List_Command() test_HCI_LE_Remove_Device_From_Filter_Accept_List_Command()
test_HCI_LE_Connection_Update_Command() test_HCI_LE_Connection_Update_Command()
test_HCI_LE_Read_Remote_Features_Command() test_HCI_LE_Read_Remote_Features_Command()
test_HCI_LE_Set_Default_PHY_Command() test_HCI_LE_Set_Default_PHY_Command()
test_HCI_LE_Set_Extended_Scan_Parameters_Command()
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------