# Copyright 2021-2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ----------------------------------------------------------------------------- # Imports # ----------------------------------------------------------------------------- import struct from bumble.hci import ( HCI_DISCONNECT_COMMAND, HCI_LE_1M_PHY_BIT, HCI_LE_CODED_PHY_BIT, HCI_LE_READ_BUFFER_SIZE_COMMAND, HCI_RESET_COMMAND, HCI_VENDOR_EVENT, HCI_SUCCESS, HCI_LE_CONNECTION_COMPLETE_EVENT, HCI_LE_ENHANCED_CONNECTION_COMPLETE_V2_EVENT, Address, CodingFormat, CodecID, HCI_Command, HCI_Command_Complete_Event, HCI_Command_Status_Event, HCI_CustomPacket, HCI_Disconnect_Command, HCI_Event, HCI_IsoDataPacket, HCI_LE_Add_Device_To_Filter_Accept_List_Command, HCI_LE_Advertising_Report_Event, HCI_LE_Channel_Selection_Algorithm_Event, HCI_LE_Connection_Complete_Event, HCI_LE_Connection_Update_Command, HCI_LE_Connection_Update_Complete_Event, HCI_LE_Create_Connection_Command, HCI_LE_Extended_Create_Connection_Command, HCI_LE_Read_Buffer_Size_Command, HCI_LE_Read_Remote_Features_Command, HCI_LE_Read_Remote_Features_Complete_Event, HCI_LE_Remove_Device_From_Filter_Accept_List_Command, HCI_LE_Set_Advertising_Data_Command, HCI_LE_Set_Advertising_Parameters_Command, HCI_LE_Set_Default_PHY_Command, HCI_LE_Set_Event_Mask_Command, HCI_LE_Set_Extended_Advertising_Enable_Command, HCI_LE_Set_Extended_Scan_Parameters_Command, HCI_LE_Set_Random_Address_Command, HCI_LE_Set_Scan_Enable_Command, HCI_LE_Set_Scan_Parameters_Command, HCI_LE_Setup_ISO_Data_Path_Command, HCI_Number_Of_Completed_Packets_Event, HCI_Packet, HCI_PIN_Code_Request_Reply_Command, HCI_Read_Local_Supported_Codecs_Command, HCI_Read_Local_Supported_Codecs_V2_Command, HCI_Read_Local_Supported_Commands_Command, HCI_Read_Local_Supported_Features_Command, HCI_Read_Local_Version_Information_Command, HCI_Reset_Command, HCI_Set_Event_Mask_Command, HCI_Vendor_Event, ) # ----------------------------------------------------------------------------- # pylint: disable=invalid-name def basic_check(x): packet = bytes(x) print(packet.hex()) parsed = HCI_Packet.from_bytes(packet) x_str = str(x) parsed_str = str(parsed) print(x_str) parsed_bytes = bytes(parsed) assert x_str == parsed_str assert packet == parsed_bytes # ----------------------------------------------------------------------------- def test_HCI_Event(): event = HCI_Event(0xF9) basic_check(event) event = HCI_Event(0xF8, bytes.fromhex('AABBCC')) basic_check(event) # ----------------------------------------------------------------------------- def test_HCI_LE_Connection_Complete_Event(): address = Address('00:11:22:33:44:55') event = HCI_LE_Connection_Complete_Event( status=HCI_SUCCESS, connection_handle=1, role=1, peer_address_type=1, peer_address=address, connection_interval=3, peripheral_latency=4, supervision_timeout=5, central_clock_accuracy=6, ) basic_check(event) # ----------------------------------------------------------------------------- def test_HCI_LE_Advertising_Report_Event(): address = Address('00:11:22:33:44:55/P') report = HCI_LE_Advertising_Report_Event.Report( HCI_LE_Advertising_Report_Event.Report.FIELDS, event_type=HCI_LE_Advertising_Report_Event.ADV_IND, address_type=Address.PUBLIC_DEVICE_ADDRESS, address=address, data=bytes.fromhex( '0201061106ba5689a6fabfa2bd01467d6e00fbabad08160a181604659b03' ), rssi=100, ) event = HCI_LE_Advertising_Report_Event([report]) basic_check(event) # ----------------------------------------------------------------------------- def test_HCI_LE_Read_Remote_Features_Complete_Event(): event = HCI_LE_Read_Remote_Features_Complete_Event( status=HCI_SUCCESS, connection_handle=0x007, le_features=bytes.fromhex('0011223344556677'), ) basic_check(event) # ----------------------------------------------------------------------------- def test_HCI_LE_Connection_Update_Complete_Event(): event = HCI_LE_Connection_Update_Complete_Event( status=HCI_SUCCESS, connection_handle=0x007, connection_interval=10, peripheral_latency=3, supervision_timeout=5, ) basic_check(event) # ----------------------------------------------------------------------------- def test_HCI_LE_Channel_Selection_Algorithm_Event(): event = HCI_LE_Channel_Selection_Algorithm_Event( connection_handle=7, channel_selection_algorithm=1 ) basic_check(event) # ----------------------------------------------------------------------------- def test_HCI_Command_Complete_Event(): # With a serializable object event = HCI_Command_Complete_Event( num_hci_command_packets=34, command_opcode=HCI_LE_READ_BUFFER_SIZE_COMMAND, return_parameters=HCI_LE_Read_Buffer_Size_Command.create_return_parameters( status=0, hc_le_acl_data_packet_length=1234, hc_total_num_le_acl_data_packets=56, ), ) basic_check(event) # With an arbitrary byte array event = HCI_Command_Complete_Event( num_hci_command_packets=1, command_opcode=HCI_RESET_COMMAND, return_parameters=bytes([1, 2, 3, 4]), ) basic_check(event) # With a simple status as a 1-byte array event = HCI_Command_Complete_Event( num_hci_command_packets=1, command_opcode=HCI_RESET_COMMAND, return_parameters=bytes([7]), ) basic_check(event) event = HCI_Packet.from_bytes(bytes(event)) assert event.return_parameters == 7 # With a simple status as an integer status event = HCI_Command_Complete_Event( num_hci_command_packets=1, command_opcode=HCI_RESET_COMMAND, return_parameters=9 ) basic_check(event) assert event.return_parameters == 9 # ----------------------------------------------------------------------------- def test_HCI_Command_Status_Event(): event = HCI_Command_Status_Event( status=0, num_hci_command_packets=37, command_opcode=HCI_DISCONNECT_COMMAND ) basic_check(event) # ----------------------------------------------------------------------------- def test_HCI_Number_Of_Completed_Packets_Event(): event = HCI_Number_Of_Completed_Packets_Event([(1, 2), (3, 4)]) basic_check(event) # ----------------------------------------------------------------------------- def test_HCI_Vendor_Event(): data = bytes.fromhex('01020304') event = HCI_Vendor_Event(data=data) event_bytes = bytes(event) parsed = HCI_Packet.from_bytes(event_bytes) assert isinstance(parsed, HCI_Vendor_Event) assert parsed.data == data class HCI_Custom_Event(HCI_Event): def __init__(self, blabla): super().__init__(HCI_VENDOR_EVENT, parameters=struct.pack("