format (+3 squashed commits)

Squashed commits:
[60e610f] wip
[eeab73d] wip
[3cdd5b8] basic first pass
This commit is contained in:
Gilles Boccon-Gibod
2023-12-18 09:49:57 -08:00
parent 071fc2723a
commit d3273ffa8c
11 changed files with 1118 additions and 577 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -23,7 +23,7 @@ import functools
import logging import logging
import secrets import secrets
import struct import struct
from typing import Any, Dict, Callable, Optional, Type, Union, List from typing import Any, Callable, Dict, Iterable, List, Optional, Type, Union
from bumble import crypto from bumble import crypto
from .colors import color from .colors import color
@@ -223,41 +223,47 @@ HCI_VENDOR_EVENT = 0xFF
# HCI Subevent Codes # HCI Subevent Codes
HCI_LE_CONNECTION_COMPLETE_EVENT = 0x01 HCI_LE_CONNECTION_COMPLETE_EVENT = 0x01
HCI_LE_ADVERTISING_REPORT_EVENT = 0x02 HCI_LE_ADVERTISING_REPORT_EVENT = 0x02
HCI_LE_CONNECTION_UPDATE_COMPLETE_EVENT = 0x03 HCI_LE_CONNECTION_UPDATE_COMPLETE_EVENT = 0x03
HCI_LE_READ_REMOTE_FEATURES_COMPLETE_EVENT = 0x04 HCI_LE_READ_REMOTE_FEATURES_COMPLETE_EVENT = 0x04
HCI_LE_LONG_TERM_KEY_REQUEST_EVENT = 0x05 HCI_LE_LONG_TERM_KEY_REQUEST_EVENT = 0x05
HCI_LE_REMOTE_CONNECTION_PARAMETER_REQUEST_EVENT = 0x06 HCI_LE_REMOTE_CONNECTION_PARAMETER_REQUEST_EVENT = 0x06
HCI_LE_DATA_LENGTH_CHANGE_EVENT = 0x07 HCI_LE_DATA_LENGTH_CHANGE_EVENT = 0x07
HCI_LE_READ_LOCAL_P_256_PUBLIC_KEY_COMPLETE_EVENT = 0x08 HCI_LE_READ_LOCAL_P_256_PUBLIC_KEY_COMPLETE_EVENT = 0x08
HCI_LE_GENERATE_DHKEY_COMPLETE_EVENT = 0x09 HCI_LE_GENERATE_DHKEY_COMPLETE_EVENT = 0x09
HCI_LE_ENHANCED_CONNECTION_COMPLETE_EVENT = 0x0A HCI_LE_ENHANCED_CONNECTION_COMPLETE_EVENT = 0x0A
HCI_LE_DIRECTED_ADVERTISING_REPORT_EVENT = 0x0B HCI_LE_DIRECTED_ADVERTISING_REPORT_EVENT = 0x0B
HCI_LE_PHY_UPDATE_COMPLETE_EVENT = 0x0C HCI_LE_PHY_UPDATE_COMPLETE_EVENT = 0x0C
HCI_LE_EXTENDED_ADVERTISING_REPORT_EVENT = 0x0D HCI_LE_EXTENDED_ADVERTISING_REPORT_EVENT = 0x0D
HCI_LE_PERIODIC_ADVERTISING_SYNC_ESTABLISHED_EVENT = 0x0E HCI_LE_PERIODIC_ADVERTISING_SYNC_ESTABLISHED_EVENT = 0x0E
HCI_LE_PERIODIC_ADVERTISING_REPORT_EVENT = 0x0F HCI_LE_PERIODIC_ADVERTISING_REPORT_EVENT = 0x0F
HCI_LE_PERIODIC_ADVERTISING_SYNC_LOST_EVENT = 0x10 HCI_LE_PERIODIC_ADVERTISING_SYNC_LOST_EVENT = 0x10
HCI_LE_SCAN_TIMEOUT_EVENT = 0x11 HCI_LE_SCAN_TIMEOUT_EVENT = 0x11
HCI_LE_ADVERTISING_SET_TERMINATED_EVENT = 0x12 HCI_LE_ADVERTISING_SET_TERMINATED_EVENT = 0x12
HCI_LE_SCAN_REQUEST_RECEIVED_EVENT = 0x13 HCI_LE_SCAN_REQUEST_RECEIVED_EVENT = 0x13
HCI_LE_CHANNEL_SELECTION_ALGORITHM_EVENT = 0x14 HCI_LE_CHANNEL_SELECTION_ALGORITHM_EVENT = 0x14
HCI_LE_CONNECTIONLESS_IQ_REPORT_EVENT = 0X15 HCI_LE_CONNECTIONLESS_IQ_REPORT_EVENT = 0X15
HCI_LE_CONNECTION_IQ_REPORT_EVENT = 0X16 HCI_LE_CONNECTION_IQ_REPORT_EVENT = 0X16
HCI_LE_CTE_REQUEST_FAILED_EVENT = 0X17 HCI_LE_CTE_REQUEST_FAILED_EVENT = 0X17
HCI_LE_PERIODIC_ADVERTISING_SYNC_TRANSFER_RECEIVED_EVENT = 0X18 HCI_LE_PERIODIC_ADVERTISING_SYNC_TRANSFER_RECEIVED_EVENT = 0X18
HCI_LE_CIS_ESTABLISHED_EVENT = 0X19 HCI_LE_CIS_ESTABLISHED_EVENT = 0X19
HCI_LE_CIS_REQUEST_EVENT = 0X1A HCI_LE_CIS_REQUEST_EVENT = 0X1A
HCI_LE_CREATE_BIG_COMPLETE_EVENT = 0X1B HCI_LE_CREATE_BIG_COMPLETE_EVENT = 0X1B
HCI_LE_TERMINATE_BIG_COMPLETE_EVENT = 0X1C HCI_LE_TERMINATE_BIG_COMPLETE_EVENT = 0X1C
HCI_LE_BIG_SYNC_ESTABLISHED_EVENT = 0X1D HCI_LE_BIG_SYNC_ESTABLISHED_EVENT = 0X1D
HCI_LE_BIG_SYNC_LOST_EVENT = 0X1E HCI_LE_BIG_SYNC_LOST_EVENT = 0X1E
HCI_LE_REQUEST_PEER_SCA_COMPLETE_EVENT = 0X1F HCI_LE_REQUEST_PEER_SCA_COMPLETE_EVENT = 0X1F
HCI_LE_PATH_LOSS_THRESHOLD_EVENT = 0X20 HCI_LE_PATH_LOSS_THRESHOLD_EVENT = 0X20
HCI_LE_TRANSMIT_POWER_REPORTING_EVENT = 0X21 HCI_LE_TRANSMIT_POWER_REPORTING_EVENT = 0X21
HCI_LE_BIGINFO_ADVERTISING_REPORT_EVENT = 0X22 HCI_LE_BIGINFO_ADVERTISING_REPORT_EVENT = 0X22
HCI_LE_SUBRATE_CHANGE_EVENT = 0X23 HCI_LE_SUBRATE_CHANGE_EVENT = 0X23
HCI_LE_PERIODIC_ADVERTISING_SYNC_ESTABLISHED_V2_EVENT = 0X24
HCI_LE_PERIODIC_ADVERTISING_REPORT_V2_EVENT = 0X25
HCI_LE_PERIODIC_ADVERTISING_SYNC_TRANSFER_RECEIVED_V2_EVENT = 0X26
HCI_LE_PERIODIC_ADVERTISING_SUBEVENT_DATA_REQUEST_EVENT = 0X27
HCI_LE_PERIODIC_ADVERTISING_RESPONSE_REPORT_EVENT = 0X28
HCI_LE_ENHANCED_CONNECTION_COMPLETE_V2_EVENT = 0X29
# HCI Command # HCI Command
@@ -650,47 +656,6 @@ HCI_ERROR_NAMES[HCI_SUCCESS] = 'HCI_SUCCESS'
# Command Status codes # Command Status codes
HCI_COMMAND_STATUS_PENDING = 0 HCI_COMMAND_STATUS_PENDING = 0
# LE Event Masks
HCI_LE_CONNECTION_COMPLETE_EVENT_MASK = (1 << 0)
HCI_LE_ADVERTISING_REPORT_EVENT_MASK = (1 << 1)
HCI_LE_CONNECTION_UPDATE_COMPLETE_EVENT_MASK = (1 << 2)
HCI_LE_READ_REMOTE_FEATURES_COMPLETE_EVENT_MASK = (1 << 3)
HCI_LE_LONG_TERM_KEY_REQUEST_EVENT_MASK = (1 << 4)
HCI_LE_REMOTE_CONNECTION_PARAMETER_REQUEST_EVENT_MASK = (1 << 5)
HCI_LE_DATA_LENGTH_CHANGE_EVENT_MASK = (1 << 6)
HCI_LE_READ_LOCAL_P_256_PUBLIC_KEY_COMPLETE_EVENT_MASK = (1 << 7)
HCI_LE_GENERATE_DHKEY_COMPLETE_EVENT_MASK = (1 << 8)
HCI_LE_ENHANCED_CONNECTION_COMPLETE_EVENT_MASK = (1 << 9)
HCI_LE_DIRECTED_ADVERTISING_REPORT_EVENT_MASK = (1 << 10)
HCI_LE_PHY_UPDATE_COMPLETE_EVENT_MASK = (1 << 11)
HCI_LE_EXTENDED_ADVERTISING_REPORT_EVENT_MASK = (1 << 12)
HCI_LE_PERIODIC_ADVERTISING_SYNC_ESTABLISHED_EVENT_MASK = (1 << 13)
HCI_LE_PERIODIC_ADVERTISING_REPORT_EVENT_MASK = (1 << 14)
HCI_LE_PERIODIC_ADVERTISING_SYNC_LOST_EVENT_MASK = (1 << 15)
HCI_LE_EXTENDED_SCAN_TIMEOUT_EVENT_MASK = (1 << 16)
HCI_LE_EXTENDED_ADVERTISING_SET_TERMINATED_EVENT_MASK = (1 << 17)
HCI_LE_SCAN_REQUEST_RECEIVED_EVENT_MASK = (1 << 18)
HCI_LE_CHANNEL_SELECTION_ALGORITHM_EVENT_MASK = (1 << 19)
HCI_LE_CONNECTIONLESS_IQ_REPORT_EVENT_MASK = (1 << 20)
HCI_LE_CONNECTION_IQ_REPORT_EVENT_MASK = (1 << 21)
HCI_LE_CTE_REQUEST_FAILED_EVENT_MASK = (1 << 22)
HCI_LE_PERIODIC_ADVERTISING_SYNC_TRANSFER_RECEIVED_EVENT_MASK = (1 << 23)
HCI_LE_CIS_ESTABLISHED_EVENT_MASK = (1 << 24)
HCI_LE_CIS_REQUEST_EVENT_MASK = (1 << 25)
HCI_LE_CREATE_BIG_COMPLETE_EVENT_MASK = (1 << 26)
HCI_LE_TERMINATE_BIG_COMPLETE_EVENT_MASK = (1 << 27)
HCI_LE_BIG_SYNC_ESTABLISHED_EVENT_MASK = (1 << 28)
HCI_LE_BIG_SYNC_LOST_EVENT_MASK = (1 << 29)
HCI_LE_REQUEST_PEER_SCA_COMPLETE_EVENT_MASK = (1 << 30)
HCI_LE_PATH_LOSS_THRESHOLD_EVENT_MASK = (1 << 31)
HCI_LE_TRANSMIT_POWER_REPORTING_EVENT_MASK = (1 << 32)
HCI_LE_BIGINFO_ADVERTISING_REPORT_EVENT_MASK = (1 << 33)
HCI_LE_SUBRATE_CHANGE_EVENT_MASK = (1 << 34)
HCI_LE_EVENT_MASK_NAMES = {
mask: mask_name for (mask_name, mask) in globals().items()
if mask_name.startswith('HCI_LE_') and mask_name.endswith('_EVENT_MASK')
}
# ACL # ACL
HCI_ACL_PB_FIRST_NON_FLUSHABLE = 0 HCI_ACL_PB_FIRST_NON_FLUSHABLE = 0
@@ -732,15 +697,15 @@ HCI_LE_PHY_TYPE_TO_BIT = {
class Phy(enum.IntEnum): class Phy(enum.IntEnum):
LE_1M = 0x01 LE_1M = HCI_LE_1M_PHY
LE_2M = 0x02 LE_2M = HCI_LE_2M_PHY
LE_CODED = 0x03 LE_CODED = HCI_LE_CODED_PHY
class PhyBit(enum.IntFlag): class PhyBit(enum.IntFlag):
LE_1M = 0b00000001 LE_1M = 1 << HCI_LE_1M_PHY_BIT
LE_2M = 0b00000010 LE_2M = 1 << HCI_LE_2M_PHY_BIT
LE_CODED = 0b00000100 LE_CODED = 1 << HCI_LE_CODED_PHY_BIT
# Connection Parameters # Connection Parameters
@@ -2910,6 +2875,20 @@ class HCI_Set_Event_Mask_Command(HCI_Command):
See Bluetooth spec @ 7.3.1 Set Event Mask Command See Bluetooth spec @ 7.3.1 Set Event Mask Command
''' '''
@staticmethod
def mask(event_codes: Iterable[int]) -> bytes:
'''
Compute the event mask value for a list of events.
'''
# NOTE: this implementation takes advantage of the fact that as of version 5.4
# of the core specification, the bit number for each event code is equal to one
# less than the event code.
# If future versions of the specification deviate from that, a different
# implementation would be needed.
return sum((1 << event_code - 1) for event_code in event_codes).to_bytes(
8, 'little'
)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@HCI_Command.command() @HCI_Command.command()
@@ -3433,6 +3412,20 @@ class HCI_LE_Set_Event_Mask_Command(HCI_Command):
See Bluetooth spec @ 7.8.1 LE Set Event Mask Command See Bluetooth spec @ 7.8.1 LE Set Event Mask Command
''' '''
@staticmethod
def mask(event_codes: Iterable[int]) -> bytes:
'''
Compute the event mask value for a list of events.
'''
# NOTE: this implementation takes advantage of the fact that as of version 5.4
# of the core specification, the bit number for each event code is equal to one
# less than the event code.
# If future versions of the specification deviate from that, a different
# implementation would be needed.
return sum((1 << event_code - 1) for event_code in event_codes).to_bytes(
8, 'little'
)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@HCI_Command.command( @HCI_Command.command(
@@ -4040,13 +4033,16 @@ class HCI_LE_Set_Advertising_Set_Random_Address_Command(HCI_Command):
('advertising_sid', 1), ('advertising_sid', 1),
('scan_request_notification_enable', 1), ('scan_request_notification_enable', 1),
], ],
return_parameters_fields=[('status', STATUS_SPEC), ('selected_tx__power', 1)], return_parameters_fields=[('status', STATUS_SPEC), ('selected_tx_power', 1)],
) )
class HCI_LE_Set_Extended_Advertising_Parameters_Command(HCI_Command): class HCI_LE_Set_Extended_Advertising_Parameters_Command(HCI_Command):
''' '''
See Bluetooth spec @ 7.8.53 LE Set Extended Advertising Parameters Command See Bluetooth spec @ 7.8.53 LE Set Extended Advertising Parameters Command
''' '''
TX_POWER_NO_PREFERENCE = 0x7F
SHOULD_NOT_FRAGMENT = 0x01
class AdvertisingProperties(enum.IntFlag): class AdvertisingProperties(enum.IntFlag):
CONNECTABLE_ADVERTISING = 1 << 0 CONNECTABLE_ADVERTISING = 1 << 0
SCANNABLE_ADVERTISING = 1 << 1 SCANNABLE_ADVERTISING = 1 << 1
@@ -4291,7 +4287,7 @@ class HCI_LE_Set_Extended_Scan_Parameters_Command(HCI_Command):
('scanning_filter_policy:', self.scanning_filter_policy), ('scanning_filter_policy:', self.scanning_filter_policy),
('scanning_phys: ', ','.join(scanning_phys_strs)), ('scanning_phys: ', ','.join(scanning_phys_strs)),
] ]
for (i, scanning_phy_str) in enumerate(scanning_phys_strs): for i, scanning_phy_str in enumerate(scanning_phys_strs):
fields.append( fields.append(
( (
f'{scanning_phy_str}.scan_type: ', f'{scanning_phy_str}.scan_type: ',
@@ -4434,7 +4430,7 @@ class HCI_LE_Extended_Create_Connection_Command(HCI_Command):
('peer_address: ', str(self.peer_address)), ('peer_address: ', str(self.peer_address)),
('initiating_phys: ', ','.join(initiating_phys_strs)), ('initiating_phys: ', ','.join(initiating_phys_strs)),
] ]
for (i, initiating_phys_str) in enumerate(initiating_phys_strs): for i, initiating_phys_str in enumerate(initiating_phys_strs):
fields.append( fields.append(
( (
f'{initiating_phys_str}.scan_interval: ', f'{initiating_phys_str}.scan_interval: ',
@@ -5321,7 +5317,7 @@ HCI_LE_Meta_Event.subevent_classes[
('status', 1), ('status', 1),
('advertising_handle', 1), ('advertising_handle', 1),
('connection_handle', 2), ('connection_handle', 2),
('number_completed_extended_advertising_events', 1), ('num_completed_extended_advertising_events', 1),
] ]
) )
class HCI_LE_Advertising_Set_Terminated_Event(HCI_LE_Meta_Event): class HCI_LE_Advertising_Set_Terminated_Event(HCI_LE_Meta_Event):
@@ -6262,7 +6258,7 @@ class HCI_IsoDataPacket(HCI_Packet):
if ts_flag: if ts_flag:
if not should_include_sdu_info: if not should_include_sdu_info:
logger.warn(f'Timestamp included when pb_flag={bin(pb_flag)}') logger.warning(f'Timestamp included when pb_flag={bin(pb_flag)}')
time_stamp, *_ = struct.unpack_from('<I', packet, pos) time_stamp, *_ = struct.unpack_from('<I', packet, pos)
pos += 4 pos += 4

View File

@@ -39,6 +39,8 @@ from .hci import (
HCI_LE_READ_LOCAL_SUPPORTED_FEATURES_COMMAND, HCI_LE_READ_LOCAL_SUPPORTED_FEATURES_COMMAND,
HCI_LE_READ_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND, HCI_LE_READ_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND,
HCI_LE_WRITE_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND, HCI_LE_WRITE_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND,
HCI_LE_READ_NUMBER_OF_SUPPORTED_ADVERTISING_SETS_COMMAND,
HCI_LE_READ_MAXIMUM_ADVERTISING_DATA_LENGTH_COMMAND,
HCI_READ_BUFFER_SIZE_COMMAND, HCI_READ_BUFFER_SIZE_COMMAND,
HCI_READ_LOCAL_VERSION_INFORMATION_COMMAND, HCI_READ_LOCAL_VERSION_INFORMATION_COMMAND,
HCI_RESET_COMMAND, HCI_RESET_COMMAND,
@@ -46,6 +48,88 @@ from .hci import (
HCI_SUPPORTED_COMMANDS_FLAGS, HCI_SUPPORTED_COMMANDS_FLAGS,
HCI_SYNCHRONOUS_DATA_PACKET, HCI_SYNCHRONOUS_DATA_PACKET,
HCI_VERSION_BLUETOOTH_CORE_4_0, HCI_VERSION_BLUETOOTH_CORE_4_0,
HCI_INQUIRY_COMPLETE_EVENT,
HCI_INQUIRY_RESULT_EVENT,
HCI_CONNECTION_COMPLETE_EVENT,
HCI_CONNECTION_REQUEST_EVENT,
HCI_DISCONNECTION_COMPLETE_EVENT,
HCI_AUTHENTICATION_COMPLETE_EVENT,
HCI_REMOTE_NAME_REQUEST_COMPLETE_EVENT,
HCI_ENCRYPTION_CHANGE_EVENT,
HCI_CHANGE_CONNECTION_LINK_KEY_COMPLETE_EVENT,
HCI_LINK_KEY_TYPE_CHANGED_EVENT,
HCI_READ_REMOTE_SUPPORTED_FEATURES_COMPLETE_EVENT,
HCI_READ_REMOTE_VERSION_INFORMATION_COMPLETE_EVENT,
HCI_QOS_SETUP_COMPLETE_EVENT,
HCI_HARDWARE_ERROR_EVENT,
HCI_FLUSH_OCCURRED_EVENT,
HCI_ROLE_CHANGE_EVENT,
HCI_MODE_CHANGE_EVENT,
HCI_RETURN_LINK_KEYS_EVENT,
HCI_PIN_CODE_REQUEST_EVENT,
HCI_LINK_KEY_REQUEST_EVENT,
HCI_LINK_KEY_NOTIFICATION_EVENT,
HCI_LOOPBACK_COMMAND_EVENT,
HCI_DATA_BUFFER_OVERFLOW_EVENT,
HCI_MAX_SLOTS_CHANGE_EVENT,
HCI_READ_CLOCK_OFFSET_COMPLETE_EVENT,
HCI_CONNECTION_PACKET_TYPE_CHANGED_EVENT,
HCI_QOS_VIOLATION_EVENT,
HCI_PAGE_SCAN_REPETITION_MODE_CHANGE_EVENT,
HCI_FLOW_SPECIFICATION_COMPLETE_EVENT,
HCI_INQUIRY_RESULT_WITH_RSSI_EVENT,
HCI_READ_REMOTE_EXTENDED_FEATURES_COMPLETE_EVENT,
HCI_SYNCHRONOUS_CONNECTION_COMPLETE_EVENT,
HCI_SYNCHRONOUS_CONNECTION_CHANGED_EVENT,
HCI_SNIFF_SUBRATING_EVENT,
HCI_EXTENDED_INQUIRY_RESULT_EVENT,
HCI_ENCRYPTION_KEY_REFRESH_COMPLETE_EVENT,
HCI_IO_CAPABILITY_REQUEST_EVENT,
HCI_IO_CAPABILITY_RESPONSE_EVENT,
HCI_USER_CONFIRMATION_REQUEST_EVENT,
HCI_USER_PASSKEY_REQUEST_EVENT,
HCI_REMOTE_OOB_DATA_REQUEST_EVENT,
HCI_SIMPLE_PAIRING_COMPLETE_EVENT,
HCI_LINK_SUPERVISION_TIMEOUT_CHANGED_EVENT,
HCI_ENHANCED_FLUSH_COMPLETE_EVENT,
HCI_USER_PASSKEY_NOTIFICATION_EVENT,
HCI_KEYPRESS_NOTIFICATION_EVENT,
HCI_REMOTE_HOST_SUPPORTED_FEATURES_NOTIFICATION_EVENT,
HCI_LE_META_EVENT,
HCI_LE_CONNECTION_COMPLETE_EVENT,
HCI_LE_ADVERTISING_REPORT_EVENT,
HCI_LE_CONNECTION_UPDATE_COMPLETE_EVENT,
HCI_LE_READ_REMOTE_FEATURES_COMPLETE_EVENT,
HCI_LE_LONG_TERM_KEY_REQUEST_EVENT,
HCI_LE_REMOTE_CONNECTION_PARAMETER_REQUEST_EVENT,
HCI_LE_DATA_LENGTH_CHANGE_EVENT,
HCI_LE_READ_LOCAL_P_256_PUBLIC_KEY_COMPLETE_EVENT,
HCI_LE_GENERATE_DHKEY_COMPLETE_EVENT,
HCI_LE_ENHANCED_CONNECTION_COMPLETE_EVENT,
HCI_LE_DIRECTED_ADVERTISING_REPORT_EVENT,
HCI_LE_PHY_UPDATE_COMPLETE_EVENT,
HCI_LE_EXTENDED_ADVERTISING_REPORT_EVENT,
HCI_LE_PERIODIC_ADVERTISING_SYNC_ESTABLISHED_EVENT,
HCI_LE_PERIODIC_ADVERTISING_REPORT_EVENT,
HCI_LE_PERIODIC_ADVERTISING_SYNC_LOST_EVENT,
HCI_LE_SCAN_TIMEOUT_EVENT,
HCI_LE_ADVERTISING_SET_TERMINATED_EVENT,
HCI_LE_SCAN_REQUEST_RECEIVED_EVENT,
HCI_LE_CONNECTIONLESS_IQ_REPORT_EVENT,
HCI_LE_CONNECTION_IQ_REPORT_EVENT,
HCI_LE_CTE_REQUEST_FAILED_EVENT,
HCI_LE_PERIODIC_ADVERTISING_SYNC_TRANSFER_RECEIVED_EVENT,
HCI_LE_CIS_ESTABLISHED_EVENT,
HCI_LE_CIS_REQUEST_EVENT,
HCI_LE_CREATE_BIG_COMPLETE_EVENT,
HCI_LE_TERMINATE_BIG_COMPLETE_EVENT,
HCI_LE_BIG_SYNC_ESTABLISHED_EVENT,
HCI_LE_BIG_SYNC_LOST_EVENT,
HCI_LE_REQUEST_PEER_SCA_COMPLETE_EVENT,
HCI_LE_PATH_LOSS_THRESHOLD_EVENT,
HCI_LE_TRANSMIT_POWER_REPORTING_EVENT,
HCI_LE_BIGINFO_ADVERTISING_REPORT_EVENT,
HCI_LE_SUBRATE_CHANGE_EVENT,
HCI_AclDataPacket, HCI_AclDataPacket,
HCI_AclDataPacketAssembler, HCI_AclDataPacketAssembler,
HCI_Command, HCI_Command,
@@ -56,12 +140,14 @@ from .hci import (
HCI_IsoDataPacket, HCI_IsoDataPacket,
HCI_LE_Long_Term_Key_Request_Negative_Reply_Command, HCI_LE_Long_Term_Key_Request_Negative_Reply_Command,
HCI_LE_Long_Term_Key_Request_Reply_Command, HCI_LE_Long_Term_Key_Request_Reply_Command,
HCI_LE_Read_Number_Of_Supported_Advertising_Sets_Command,
HCI_LE_Read_Buffer_Size_Command, HCI_LE_Read_Buffer_Size_Command,
HCI_LE_Read_Local_Supported_Features_Command, HCI_LE_Read_Local_Supported_Features_Command,
HCI_LE_Read_Suggested_Default_Data_Length_Command, HCI_LE_Read_Suggested_Default_Data_Length_Command,
HCI_LE_Remote_Connection_Parameter_Request_Reply_Command, HCI_LE_Remote_Connection_Parameter_Request_Reply_Command,
HCI_LE_Set_Event_Mask_Command, HCI_LE_Set_Event_Mask_Command,
HCI_LE_Write_Suggested_Default_Data_Length_Command, HCI_LE_Write_Suggested_Default_Data_Length_Command,
HCI_LE_Read_Maximum_Advertising_Data_Length_Command,
HCI_Link_Key_Request_Negative_Reply_Command, HCI_Link_Key_Request_Negative_Reply_Command,
HCI_Link_Key_Request_Reply_Command, HCI_Link_Key_Request_Reply_Command,
HCI_Packet, HCI_Packet,
@@ -204,6 +290,8 @@ class Host(AbortableEventEmitter):
self.sco_links = {} # SCO links, by connection handle self.sco_links = {} # SCO links, by connection handle
self.pending_command = None self.pending_command = None
self.pending_response = None self.pending_response = None
self.number_of_supported_advertising_sets = 0
self.maximum_advertising_data_length = 31
self.local_version = None self.local_version = None
self.local_supported_commands = bytes(64) self.local_supported_commands = bytes(64)
self.local_le_features = 0 self.local_le_features = 0
@@ -288,7 +376,60 @@ class Host(AbortableEventEmitter):
self.local_version = response.return_parameters self.local_version = response.return_parameters
await self.send_command( await self.send_command(
HCI_Set_Event_Mask_Command(event_mask=bytes.fromhex('FFFFFFFFFFFFFF3F')) HCI_Set_Event_Mask_Command(
event_mask=HCI_Set_Event_Mask_Command.mask(
[
HCI_INQUIRY_COMPLETE_EVENT,
HCI_INQUIRY_RESULT_EVENT,
HCI_CONNECTION_COMPLETE_EVENT,
HCI_CONNECTION_REQUEST_EVENT,
HCI_DISCONNECTION_COMPLETE_EVENT,
HCI_AUTHENTICATION_COMPLETE_EVENT,
HCI_REMOTE_NAME_REQUEST_COMPLETE_EVENT,
HCI_ENCRYPTION_CHANGE_EVENT,
HCI_CHANGE_CONNECTION_LINK_KEY_COMPLETE_EVENT,
HCI_LINK_KEY_TYPE_CHANGED_EVENT,
HCI_READ_REMOTE_SUPPORTED_FEATURES_COMPLETE_EVENT,
HCI_READ_REMOTE_VERSION_INFORMATION_COMPLETE_EVENT,
HCI_QOS_SETUP_COMPLETE_EVENT,
HCI_HARDWARE_ERROR_EVENT,
HCI_FLUSH_OCCURRED_EVENT,
HCI_ROLE_CHANGE_EVENT,
HCI_MODE_CHANGE_EVENT,
HCI_RETURN_LINK_KEYS_EVENT,
HCI_PIN_CODE_REQUEST_EVENT,
HCI_LINK_KEY_REQUEST_EVENT,
HCI_LINK_KEY_NOTIFICATION_EVENT,
HCI_LOOPBACK_COMMAND_EVENT,
HCI_DATA_BUFFER_OVERFLOW_EVENT,
HCI_MAX_SLOTS_CHANGE_EVENT,
HCI_READ_CLOCK_OFFSET_COMPLETE_EVENT,
HCI_CONNECTION_PACKET_TYPE_CHANGED_EVENT,
HCI_QOS_VIOLATION_EVENT,
HCI_PAGE_SCAN_REPETITION_MODE_CHANGE_EVENT,
HCI_FLOW_SPECIFICATION_COMPLETE_EVENT,
HCI_INQUIRY_RESULT_WITH_RSSI_EVENT,
HCI_READ_REMOTE_EXTENDED_FEATURES_COMPLETE_EVENT,
HCI_SYNCHRONOUS_CONNECTION_COMPLETE_EVENT,
HCI_SYNCHRONOUS_CONNECTION_CHANGED_EVENT,
HCI_SNIFF_SUBRATING_EVENT,
HCI_EXTENDED_INQUIRY_RESULT_EVENT,
HCI_ENCRYPTION_KEY_REFRESH_COMPLETE_EVENT,
HCI_IO_CAPABILITY_REQUEST_EVENT,
HCI_IO_CAPABILITY_RESPONSE_EVENT,
HCI_USER_CONFIRMATION_REQUEST_EVENT,
HCI_USER_PASSKEY_REQUEST_EVENT,
HCI_REMOTE_OOB_DATA_REQUEST_EVENT,
HCI_SIMPLE_PAIRING_COMPLETE_EVENT,
HCI_LINK_SUPERVISION_TIMEOUT_CHANGED_EVENT,
HCI_ENHANCED_FLUSH_COMPLETE_EVENT,
HCI_USER_PASSKEY_NOTIFICATION_EVENT,
HCI_KEYPRESS_NOTIFICATION_EVENT,
HCI_REMOTE_HOST_SUPPORTED_FEATURES_NOTIFICATION_EVENT,
HCI_LE_META_EVENT,
]
)
)
) )
if ( if (
@@ -299,7 +440,44 @@ class Host(AbortableEventEmitter):
# understand # understand
le_event_mask = bytes.fromhex('1F00000000000000') le_event_mask = bytes.fromhex('1F00000000000000')
else: else:
le_event_mask = bytes.fromhex('FFFFFFFF00000000') le_event_mask = HCI_LE_Set_Event_Mask_Command.mask(
[
HCI_LE_CONNECTION_COMPLETE_EVENT,
HCI_LE_ADVERTISING_REPORT_EVENT,
HCI_LE_CONNECTION_UPDATE_COMPLETE_EVENT,
HCI_LE_READ_REMOTE_FEATURES_COMPLETE_EVENT,
HCI_LE_LONG_TERM_KEY_REQUEST_EVENT,
HCI_LE_REMOTE_CONNECTION_PARAMETER_REQUEST_EVENT,
HCI_LE_DATA_LENGTH_CHANGE_EVENT,
HCI_LE_READ_LOCAL_P_256_PUBLIC_KEY_COMPLETE_EVENT,
HCI_LE_GENERATE_DHKEY_COMPLETE_EVENT,
HCI_LE_ENHANCED_CONNECTION_COMPLETE_EVENT,
HCI_LE_DIRECTED_ADVERTISING_REPORT_EVENT,
HCI_LE_PHY_UPDATE_COMPLETE_EVENT,
HCI_LE_EXTENDED_ADVERTISING_REPORT_EVENT,
HCI_LE_PERIODIC_ADVERTISING_SYNC_ESTABLISHED_EVENT,
HCI_LE_PERIODIC_ADVERTISING_REPORT_EVENT,
HCI_LE_PERIODIC_ADVERTISING_SYNC_LOST_EVENT,
HCI_LE_SCAN_TIMEOUT_EVENT,
HCI_LE_ADVERTISING_SET_TERMINATED_EVENT,
HCI_LE_SCAN_REQUEST_RECEIVED_EVENT,
HCI_LE_CONNECTIONLESS_IQ_REPORT_EVENT,
HCI_LE_CONNECTION_IQ_REPORT_EVENT,
HCI_LE_CTE_REQUEST_FAILED_EVENT,
HCI_LE_PERIODIC_ADVERTISING_SYNC_TRANSFER_RECEIVED_EVENT,
HCI_LE_CIS_ESTABLISHED_EVENT,
HCI_LE_CIS_REQUEST_EVENT,
HCI_LE_CREATE_BIG_COMPLETE_EVENT,
HCI_LE_TERMINATE_BIG_COMPLETE_EVENT,
HCI_LE_BIG_SYNC_ESTABLISHED_EVENT,
HCI_LE_BIG_SYNC_LOST_EVENT,
HCI_LE_REQUEST_PEER_SCA_COMPLETE_EVENT,
HCI_LE_PATH_LOSS_THRESHOLD_EVENT,
HCI_LE_TRANSMIT_POWER_REPORTING_EVENT,
HCI_LE_BIGINFO_ADVERTISING_REPORT_EVENT,
HCI_LE_SUBRATE_CHANGE_EVENT,
]
)
await self.send_command( await self.send_command(
HCI_LE_Set_Event_Mask_Command(le_event_mask=le_event_mask) HCI_LE_Set_Event_Mask_Command(le_event_mask=le_event_mask)
@@ -377,6 +555,25 @@ class Host(AbortableEventEmitter):
) )
) )
if self.supports_command(
HCI_LE_READ_NUMBER_OF_SUPPORTED_ADVERTISING_SETS_COMMAND
):
response = await self.send_command(
HCI_LE_Read_Number_Of_Supported_Advertising_Sets_Command(),
check_result=True,
)
self.number_of_supported_advertising_sets = (
response.return_parameters.num_supported_advertising_sets
)
if self.supports_command(HCI_LE_READ_MAXIMUM_ADVERTISING_DATA_LENGTH_COMMAND):
response = await self.send_command(
HCI_LE_Read_Maximum_Advertising_Data_Length_Command(), check_result=True
)
self.maximum_advertising_data_length = (
response.return_parameters.max_advertising_data_length
)
@property @property
def controller(self) -> Optional[TransportSink]: def controller(self) -> Optional[TransportSink]:
return self.hci_sink return self.hci_sink
@@ -791,6 +988,7 @@ class Host(AbortableEventEmitter):
event.status, event.status,
event.advertising_handle, event.advertising_handle,
event.connection_handle, event.connection_handle,
event.num_completed_extended_advertising_events,
) )
def on_hci_le_cis_request_event(self, event): def on_hci_le_cis_request_event(self, event):

View File

@@ -226,13 +226,13 @@ class CompositeEventEmitter(AbortableEventEmitter):
if self._listener: if self._listener:
# Call the deregistration methods for each base class that has them # Call the deregistration methods for each base class that has them
for cls in self._listener.__class__.mro(): for cls in self._listener.__class__.mro():
if hasattr(cls, '_bumble_register_composite'): if '_bumble_register_composite' in cls.__dict__:
cls._bumble_deregister_composite(listener, self) cls._bumble_deregister_composite(self._listener, self)
self._listener = listener self._listener = listener
if listener: if listener:
# Call the registration methods for each base class that has them # Call the registration methods for each base class that has them
for cls in listener.__class__.mro(): for cls in listener.__class__.mro():
if hasattr(cls, '_bumble_deregister_composite'): if '_bumble_deregister_composite' in cls.__dict__:
cls._bumble_register_composite(listener, self) cls._bumble_register_composite(listener, self)

View File

@@ -19,9 +19,11 @@ import asyncio
import logging import logging
import sys import sys
import os import os
import struct
from bumble.core import AdvertisingData
from bumble.device import AdvertisingType, Device from bumble.device import AdvertisingType, Device
from bumble.hci import Address from bumble.hci import Address
from bumble.transport import open_transport_or_link from bumble.transport import open_transport_or_link
@@ -52,6 +54,16 @@ async def main():
print('<<< connected') print('<<< connected')
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
if advertising_type.is_scannable:
device.scan_response_data = bytes(
AdvertisingData(
[
(AdvertisingData.APPEARANCE, struct.pack('<H', 0x0340)),
]
)
)
await device.power_on() await device.power_on()
await device.start_advertising(advertising_type=advertising_type, target=target) await device.start_advertising(advertising_type=advertising_type, target=target)
await hci_source.wait_for_termination() await hci_source.wait_for_termination()

View File

@@ -22,10 +22,11 @@ import os
from bumble.device import ( from bumble.device import (
Device, Device,
Connection, Connection,
AdvertisingParameters,
AdvertisingEventProperties,
) )
from bumble.hci import ( from bumble.hci import (
OwnAddressType, OwnAddressType,
HCI_LE_Set_Extended_Advertising_Parameters_Command,
) )
from bumble.transport import open_transport_or_link from bumble.transport import open_transport_or_link
@@ -61,12 +62,8 @@ async def main() -> None:
devices[1].cis_enabled = True devices[1].cis_enabled = True
await asyncio.gather(*[device.power_on() for device in devices]) await asyncio.gather(*[device.power_on() for device in devices])
await devices[0].start_extended_advertising( advertising_set = await devices[0].create_advertising_set()
advertising_properties=( await advertising_set.start()
HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties.CONNECTABLE_ADVERTISING
),
own_address_type=OwnAddressType.PUBLIC,
)
connection = await devices[1].connect( connection = await devices[1].connect(
devices[0].public_address, own_address_type=OwnAddressType.PUBLIC devices[0].public_address, own_address_type=OwnAddressType.PUBLIC

View File

@@ -19,8 +19,13 @@ import asyncio
import logging import logging
import sys import sys
import os import os
from bumble.device import AdvertisingType, Device from bumble.device import (
from bumble.hci import Address, HCI_LE_Set_Extended_Advertising_Parameters_Command AdvertisingParameters,
AdvertisingEventProperties,
AdvertisingType,
Device,
)
from bumble.hci import Address
from bumble.transport import open_transport_or_link from bumble.transport import open_transport_or_link
@@ -35,20 +40,16 @@ async def main() -> None:
return return
if len(sys.argv) >= 4: if len(sys.argv) >= 4:
advertising_properties = ( advertising_properties = AdvertisingEventProperties.from_advertising_type(
HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties( AdvertisingType(int(sys.argv[3]))
int(sys.argv[3])
)
) )
else: else:
advertising_properties = ( advertising_properties = AdvertisingEventProperties()
HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties.CONNECTABLE_ADVERTISING
)
if len(sys.argv) >= 5: if len(sys.argv) >= 5:
target = Address(sys.argv[4]) peer_address = Address(sys.argv[4])
else: else:
target = Address.ANY peer_address = Address.ANY
print('<<< connecting to HCI...') print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as hci_transport: async with await open_transport_or_link(sys.argv[2]) as hci_transport:
@@ -58,9 +59,14 @@ async def main() -> None:
sys.argv[1], hci_transport.source, hci_transport.sink sys.argv[1], hci_transport.source, hci_transport.sink
) )
await device.power_on() await device.power_on()
await device.start_extended_advertising( await (
advertising_properties=advertising_properties, target=target await device.create_advertising_set(
) advertising_parameters=AdvertisingParameters(
advertising_event_properties=advertising_properties,
peer_address=peer_address,
)
)
).start()
await hci_transport.source.terminated await hci_transport.source.terminated

View File

@@ -0,0 +1,100 @@
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import logging
import sys
import os
from bumble.device import AdvertisingParameters, AdvertisingEventProperties, Device
from bumble.hci import Address
from bumble.core import AdvertisingData
from bumble.transport import open_transport_or_link
# -----------------------------------------------------------------------------
async def main() -> None:
if len(sys.argv) < 3:
print('Usage: run_extended_advertiser_2.py <config-file> <transport-spec>')
print('example: run_extended_advertiser_2.py device1.json usb:0')
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected')
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
await device.power_on()
if not device.supports_le_extended_advertising:
print("Device does not support extended adverising")
return
print("Max advertising sets:", device.host.number_of_supported_advertising_sets)
print(
"Max advertising data length:", device.host.maximum_advertising_data_length
)
if device.host.number_of_supported_advertising_sets >= 1:
advertising_data1 = AdvertisingData(
[(AdvertisingData.COMPLETE_LOCAL_NAME, "Bumble 1".encode("utf-8"))]
)
set1 = await device.create_advertising_set(
advertising_data=bytes(advertising_data1),
)
print("Selected TX power 1:", set1.selected_tx_power)
await set1.start()
advertising_data2 = AdvertisingData(
[(AdvertisingData.COMPLETE_LOCAL_NAME, "Bumble 2".encode("utf-8"))]
)
if device.host.number_of_supported_advertising_sets >= 2:
set2 = await device.create_advertising_set(
random_address=Address("F0:F0:F0:F0:F0:F1"),
advertising_parameters=AdvertisingParameters(),
advertising_data=bytes(advertising_data2),
auto_restart=True,
)
print("Selected TX power 2:", set2.selected_tx_power)
await set2.start()
if device.host.number_of_supported_advertising_sets >= 3:
scan_response_data3 = AdvertisingData(
[(AdvertisingData.COMPLETE_LOCAL_NAME, "Bumble 3".encode("utf-8"))]
)
set3 = await device.create_advertising_set(
random_address=Address("F0:F0:F0:F0:F0:F2"),
advertising_parameters=AdvertisingParameters(
advertising_event_properties=AdvertisingEventProperties(
is_connectable=False, is_scannable=True
)
),
scan_response_data=bytes(scan_response_data3),
)
print("Selected TX power 3:", set2.selected_tx_power)
await set3.start()
await hci_transport.source.terminated
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
asyncio.run(main())

View File

@@ -22,13 +22,12 @@ import os
import struct import struct
import secrets import secrets
from bumble.core import AdvertisingData from bumble.core import AdvertisingData
from bumble.device import Device, CisLink from bumble.device import Device, CisLink, AdvertisingParameters
from bumble.hci import ( from bumble.hci import (
CodecID, CodecID,
CodingFormat, CodingFormat,
OwnAddressType, OwnAddressType,
HCI_IsoDataPacket, HCI_IsoDataPacket,
HCI_LE_Set_Extended_Advertising_Parameters_Command,
) )
from bumble.profiles.bap import ( from bumble.profiles.bap import (
CodecSpecificCapabilities, CodecSpecificCapabilities,
@@ -179,13 +178,10 @@ async def main() -> None:
device.once('cis_establishment', on_cis) device.once('cis_establishment', on_cis)
await device.start_extended_advertising( advertising_set = await device.create_advertising_set(
advertising_properties=(
HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties.CONNECTABLE_ADVERTISING
),
own_address_type=OwnAddressType.RANDOM,
advertising_data=advertising_data, advertising_data=advertising_data,
) )
await advertising_set.start()
await hci_transport.source.terminated await hci_transport.source.terminated

View File

@@ -28,7 +28,7 @@ from bumble.core import (
BT_PERIPHERAL_ROLE, BT_PERIPHERAL_ROLE,
ConnectionParameters, ConnectionParameters,
) )
from bumble.device import Connection, Device from bumble.device import AdvertisingParameters, Connection, Device
from bumble.host import AclPacketQueue, Host from bumble.host import AclPacketQueue, Host
from bumble.hci import ( from bumble.hci import (
HCI_ACCEPT_CONNECTION_REQUEST_COMMAND, HCI_ACCEPT_CONNECTION_REQUEST_COMMAND,
@@ -254,12 +254,12 @@ async def test_legacy_advertising():
device = Device(host=mock.AsyncMock(Host)) device = Device(host=mock.AsyncMock(Host))
# Start advertising # Start advertising
advertiser = await device.start_legacy_advertising() await device.start_advertising()
assert device.legacy_advertiser assert device.is_advertising
# Stop advertising # Stop advertising
await advertiser.stop() await device.stop_advertising()
assert not device.legacy_advertiser assert not device.is_advertising
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -273,7 +273,7 @@ async def test_legacy_advertising_connection(own_address_type):
peer_address = Address('F0:F1:F2:F3:F4:F5') peer_address = Address('F0:F1:F2:F3:F4:F5')
# Start advertising # Start advertising
advertiser = await device.start_legacy_advertising() await device.start_advertising()
device.on_connection( device.on_connection(
0x0001, 0x0001,
BT_LE_TRANSPORT, BT_LE_TRANSPORT,
@@ -301,7 +301,7 @@ async def test_legacy_advertising_connection(own_address_type):
async def test_legacy_advertising_disconnection(auto_restart): async def test_legacy_advertising_disconnection(auto_restart):
device = Device(host=mock.AsyncMock(spec=Host)) device = Device(host=mock.AsyncMock(spec=Host))
peer_address = Address('F0:F1:F2:F3:F4:F5') peer_address = Address('F0:F1:F2:F3:F4:F5')
advertiser = await device.start_legacy_advertising(auto_restart=auto_restart) await device.start_advertising(auto_restart=auto_restart)
device.on_connection( device.on_connection(
0x0001, 0x0001,
BT_LE_TRANSPORT, BT_LE_TRANSPORT,
@@ -310,20 +310,14 @@ async def test_legacy_advertising_disconnection(auto_restart):
ConnectionParameters(0, 0, 0), ConnectionParameters(0, 0, 0),
) )
device.start_legacy_advertising = mock.AsyncMock() device.start_advertising = mock.AsyncMock()
device.on_disconnection(0x0001, 0) device.on_disconnection(0x0001, 0)
if auto_restart: if auto_restart:
device.start_legacy_advertising.assert_called_with( assert device.is_advertising
advertising_type=advertiser.advertising_type,
own_address_type=advertiser.own_address_type,
auto_restart=advertiser.auto_restart,
advertising_data=advertiser.advertising_data,
scan_response_data=advertiser.scan_response_data,
)
else: else:
device.start_legacy_advertising.assert_not_called() not device.is_advertising
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -332,12 +326,14 @@ async def test_extended_advertising():
device = Device(host=mock.AsyncMock(Host)) device = Device(host=mock.AsyncMock(Host))
# Start advertising # Start advertising
advertiser = await device.start_extended_advertising() advertising_set = await device.create_advertising_set()
assert device.extended_advertisers await advertising_set.start()
assert device.extended_advertising_sets
assert advertising_set.enabled
# Stop advertising # Stop advertising
await advertiser.stop() await advertising_set.stop()
assert not device.extended_advertisers assert not advertising_set.enabled
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -349,9 +345,10 @@ async def test_extended_advertising():
async def test_extended_advertising_connection(own_address_type): async def test_extended_advertising_connection(own_address_type):
device = Device(host=mock.AsyncMock(spec=Host)) device = Device(host=mock.AsyncMock(spec=Host))
peer_address = Address('F0:F1:F2:F3:F4:F5') peer_address = Address('F0:F1:F2:F3:F4:F5')
advertiser = await device.start_extended_advertising( advertising_set = await device.create_advertising_set(
own_address_type=own_address_type advertising_parameters=AdvertisingParameters(own_address_type=own_address_type)
) )
await advertising_set.start()
device.on_connection( device.on_connection(
0x0001, 0x0001,
BT_LE_TRANSPORT, BT_LE_TRANSPORT,
@@ -361,8 +358,9 @@ async def test_extended_advertising_connection(own_address_type):
) )
device.on_advertising_set_termination( device.on_advertising_set_termination(
HCI_SUCCESS, HCI_SUCCESS,
advertiser.handle, advertising_set.advertising_handle,
0x0001, 0x0001,
0,
) )
if own_address_type == OwnAddressType.PUBLIC: if own_address_type == OwnAddressType.PUBLIC:
@@ -375,45 +373,6 @@ async def test_extended_advertising_connection(own_address_type):
await asyncio.sleep(0.0001) await asyncio.sleep(0.0001)
# -----------------------------------------------------------------------------
@pytest.mark.parametrize(
'auto_restart,',
(True, False),
)
@pytest.mark.asyncio
async def test_extended_advertising_disconnection(auto_restart):
device = Device(host=mock.AsyncMock(spec=Host))
peer_address = Address('F0:F1:F2:F3:F4:F5')
advertiser = await device.start_extended_advertising(auto_restart=auto_restart)
device.on_connection(
0x0001,
BT_LE_TRANSPORT,
peer_address,
BT_PERIPHERAL_ROLE,
ConnectionParameters(0, 0, 0),
)
device.on_advertising_set_termination(
HCI_SUCCESS,
advertiser.handle,
0x0001,
)
device.start_extended_advertising = mock.AsyncMock()
device.on_disconnection(0x0001, 0)
if auto_restart:
device.start_extended_advertising.assert_called_with(
advertising_properties=advertiser.advertising_properties,
own_address_type=advertiser.own_address_type,
auto_restart=advertiser.auto_restart,
advertising_data=advertiser.advertising_data,
scan_response_data=advertiser.scan_response_data,
)
else:
device.start_extended_advertising.assert_not_called()
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_get_remote_le_features(): async def test_get_remote_le_features():

View File

@@ -23,6 +23,8 @@ from bumble.hci import (
HCI_LE_READ_BUFFER_SIZE_COMMAND, HCI_LE_READ_BUFFER_SIZE_COMMAND,
HCI_RESET_COMMAND, HCI_RESET_COMMAND,
HCI_SUCCESS, HCI_SUCCESS,
HCI_LE_CONNECTION_COMPLETE_EVENT,
HCI_LE_ENHANCED_CONNECTION_COMPLETE_V2_EVENT,
Address, Address,
CodingFormat, CodingFormat,
CodecID, CodecID,
@@ -274,8 +276,14 @@ def test_HCI_Set_Event_Mask_Command():
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_HCI_LE_Set_Event_Mask_Command(): def test_HCI_LE_Set_Event_Mask_Command():
command = HCI_LE_Set_Event_Mask_Command( command = HCI_LE_Set_Event_Mask_Command(
le_event_mask=bytes.fromhex('0011223344556677') le_event_mask=HCI_LE_Set_Event_Mask_Command.mask(
[
HCI_LE_CONNECTION_COMPLETE_EVENT,
HCI_LE_ENHANCED_CONNECTION_COMPLETE_V2_EVENT,
]
)
) )
assert command.le_event_mask == bytes.fromhex('0100000000010000')
basic_check(command) basic_check(command)