Merge pull request #382 from google/gbg/extended-advertising-v2

extended advertising v2
This commit is contained in:
Gilles Boccon-Gibod
2024-02-02 20:43:28 -08:00
committed by GitHub
18 changed files with 1199 additions and 722 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -1068,7 +1068,7 @@ class Client:
logger.warning('!!! unexpected response, there is no pending request') logger.warning('!!! unexpected response, there is no pending request')
return return
# Sanity check: the response should match the pending request unless it is # The response should match the pending request unless it is
# an error response # an error response
if att_pdu.op_code != ATT_ERROR_RESPONSE: if att_pdu.op_code != ATT_ERROR_RESPONSE:
expected_response_name = self.pending_request.name.replace( expected_response_name = self.pending_request.name.replace(

View File

@@ -328,7 +328,7 @@ class Server(EventEmitter):
f'handle=0x{characteristic.handle:04X}: {value.hex()}' f'handle=0x{characteristic.handle:04X}: {value.hex()}'
) )
# Sanity check # Check parameters
if len(value) != 2: if len(value) != 2:
logger.warning('CCCD value not 2 bytes long') logger.warning('CCCD value not 2 bytes long')
return return

View File

@@ -23,7 +23,7 @@ import functools
import logging import logging
import secrets import secrets
import struct import struct
from typing import Any, Dict, Callable, Optional, Type, Union, List from typing import Any, Callable, Dict, Iterable, List, Optional, Type, Union
from bumble import crypto from bumble import crypto
from .colors import color from .colors import color
@@ -223,41 +223,47 @@ HCI_VENDOR_EVENT = 0xFF
# HCI Subevent Codes # HCI Subevent Codes
HCI_LE_CONNECTION_COMPLETE_EVENT = 0x01 HCI_LE_CONNECTION_COMPLETE_EVENT = 0x01
HCI_LE_ADVERTISING_REPORT_EVENT = 0x02 HCI_LE_ADVERTISING_REPORT_EVENT = 0x02
HCI_LE_CONNECTION_UPDATE_COMPLETE_EVENT = 0x03 HCI_LE_CONNECTION_UPDATE_COMPLETE_EVENT = 0x03
HCI_LE_READ_REMOTE_FEATURES_COMPLETE_EVENT = 0x04 HCI_LE_READ_REMOTE_FEATURES_COMPLETE_EVENT = 0x04
HCI_LE_LONG_TERM_KEY_REQUEST_EVENT = 0x05 HCI_LE_LONG_TERM_KEY_REQUEST_EVENT = 0x05
HCI_LE_REMOTE_CONNECTION_PARAMETER_REQUEST_EVENT = 0x06 HCI_LE_REMOTE_CONNECTION_PARAMETER_REQUEST_EVENT = 0x06
HCI_LE_DATA_LENGTH_CHANGE_EVENT = 0x07 HCI_LE_DATA_LENGTH_CHANGE_EVENT = 0x07
HCI_LE_READ_LOCAL_P_256_PUBLIC_KEY_COMPLETE_EVENT = 0x08 HCI_LE_READ_LOCAL_P_256_PUBLIC_KEY_COMPLETE_EVENT = 0x08
HCI_LE_GENERATE_DHKEY_COMPLETE_EVENT = 0x09 HCI_LE_GENERATE_DHKEY_COMPLETE_EVENT = 0x09
HCI_LE_ENHANCED_CONNECTION_COMPLETE_EVENT = 0x0A HCI_LE_ENHANCED_CONNECTION_COMPLETE_EVENT = 0x0A
HCI_LE_DIRECTED_ADVERTISING_REPORT_EVENT = 0x0B HCI_LE_DIRECTED_ADVERTISING_REPORT_EVENT = 0x0B
HCI_LE_PHY_UPDATE_COMPLETE_EVENT = 0x0C HCI_LE_PHY_UPDATE_COMPLETE_EVENT = 0x0C
HCI_LE_EXTENDED_ADVERTISING_REPORT_EVENT = 0x0D HCI_LE_EXTENDED_ADVERTISING_REPORT_EVENT = 0x0D
HCI_LE_PERIODIC_ADVERTISING_SYNC_ESTABLISHED_EVENT = 0x0E HCI_LE_PERIODIC_ADVERTISING_SYNC_ESTABLISHED_EVENT = 0x0E
HCI_LE_PERIODIC_ADVERTISING_REPORT_EVENT = 0x0F HCI_LE_PERIODIC_ADVERTISING_REPORT_EVENT = 0x0F
HCI_LE_PERIODIC_ADVERTISING_SYNC_LOST_EVENT = 0x10 HCI_LE_PERIODIC_ADVERTISING_SYNC_LOST_EVENT = 0x10
HCI_LE_SCAN_TIMEOUT_EVENT = 0x11 HCI_LE_SCAN_TIMEOUT_EVENT = 0x11
HCI_LE_ADVERTISING_SET_TERMINATED_EVENT = 0x12 HCI_LE_ADVERTISING_SET_TERMINATED_EVENT = 0x12
HCI_LE_SCAN_REQUEST_RECEIVED_EVENT = 0x13 HCI_LE_SCAN_REQUEST_RECEIVED_EVENT = 0x13
HCI_LE_CHANNEL_SELECTION_ALGORITHM_EVENT = 0x14 HCI_LE_CHANNEL_SELECTION_ALGORITHM_EVENT = 0x14
HCI_LE_CONNECTIONLESS_IQ_REPORT_EVENT = 0X15 HCI_LE_CONNECTIONLESS_IQ_REPORT_EVENT = 0X15
HCI_LE_CONNECTION_IQ_REPORT_EVENT = 0X16 HCI_LE_CONNECTION_IQ_REPORT_EVENT = 0X16
HCI_LE_CTE_REQUEST_FAILED_EVENT = 0X17 HCI_LE_CTE_REQUEST_FAILED_EVENT = 0X17
HCI_LE_PERIODIC_ADVERTISING_SYNC_TRANSFER_RECEIVED_EVENT = 0X18 HCI_LE_PERIODIC_ADVERTISING_SYNC_TRANSFER_RECEIVED_EVENT = 0X18
HCI_LE_CIS_ESTABLISHED_EVENT = 0X19 HCI_LE_CIS_ESTABLISHED_EVENT = 0X19
HCI_LE_CIS_REQUEST_EVENT = 0X1A HCI_LE_CIS_REQUEST_EVENT = 0X1A
HCI_LE_CREATE_BIG_COMPLETE_EVENT = 0X1B HCI_LE_CREATE_BIG_COMPLETE_EVENT = 0X1B
HCI_LE_TERMINATE_BIG_COMPLETE_EVENT = 0X1C HCI_LE_TERMINATE_BIG_COMPLETE_EVENT = 0X1C
HCI_LE_BIG_SYNC_ESTABLISHED_EVENT = 0X1D HCI_LE_BIG_SYNC_ESTABLISHED_EVENT = 0X1D
HCI_LE_BIG_SYNC_LOST_EVENT = 0X1E HCI_LE_BIG_SYNC_LOST_EVENT = 0X1E
HCI_LE_REQUEST_PEER_SCA_COMPLETE_EVENT = 0X1F HCI_LE_REQUEST_PEER_SCA_COMPLETE_EVENT = 0X1F
HCI_LE_PATH_LOSS_THRESHOLD_EVENT = 0X20 HCI_LE_PATH_LOSS_THRESHOLD_EVENT = 0X20
HCI_LE_TRANSMIT_POWER_REPORTING_EVENT = 0X21 HCI_LE_TRANSMIT_POWER_REPORTING_EVENT = 0X21
HCI_LE_BIGINFO_ADVERTISING_REPORT_EVENT = 0X22 HCI_LE_BIGINFO_ADVERTISING_REPORT_EVENT = 0X22
HCI_LE_SUBRATE_CHANGE_EVENT = 0X23 HCI_LE_SUBRATE_CHANGE_EVENT = 0X23
HCI_LE_PERIODIC_ADVERTISING_SYNC_ESTABLISHED_V2_EVENT = 0X24
HCI_LE_PERIODIC_ADVERTISING_REPORT_V2_EVENT = 0X25
HCI_LE_PERIODIC_ADVERTISING_SYNC_TRANSFER_RECEIVED_V2_EVENT = 0X26
HCI_LE_PERIODIC_ADVERTISING_SUBEVENT_DATA_REQUEST_EVENT = 0X27
HCI_LE_PERIODIC_ADVERTISING_RESPONSE_REPORT_EVENT = 0X28
HCI_LE_ENHANCED_CONNECTION_COMPLETE_V2_EVENT = 0X29
# HCI Command # HCI Command
@@ -650,47 +656,6 @@ HCI_ERROR_NAMES[HCI_SUCCESS] = 'HCI_SUCCESS'
# Command Status codes # Command Status codes
HCI_COMMAND_STATUS_PENDING = 0 HCI_COMMAND_STATUS_PENDING = 0
# LE Event Masks
HCI_LE_CONNECTION_COMPLETE_EVENT_MASK = (1 << 0)
HCI_LE_ADVERTISING_REPORT_EVENT_MASK = (1 << 1)
HCI_LE_CONNECTION_UPDATE_COMPLETE_EVENT_MASK = (1 << 2)
HCI_LE_READ_REMOTE_FEATURES_COMPLETE_EVENT_MASK = (1 << 3)
HCI_LE_LONG_TERM_KEY_REQUEST_EVENT_MASK = (1 << 4)
HCI_LE_REMOTE_CONNECTION_PARAMETER_REQUEST_EVENT_MASK = (1 << 5)
HCI_LE_DATA_LENGTH_CHANGE_EVENT_MASK = (1 << 6)
HCI_LE_READ_LOCAL_P_256_PUBLIC_KEY_COMPLETE_EVENT_MASK = (1 << 7)
HCI_LE_GENERATE_DHKEY_COMPLETE_EVENT_MASK = (1 << 8)
HCI_LE_ENHANCED_CONNECTION_COMPLETE_EVENT_MASK = (1 << 9)
HCI_LE_DIRECTED_ADVERTISING_REPORT_EVENT_MASK = (1 << 10)
HCI_LE_PHY_UPDATE_COMPLETE_EVENT_MASK = (1 << 11)
HCI_LE_EXTENDED_ADVERTISING_REPORT_EVENT_MASK = (1 << 12)
HCI_LE_PERIODIC_ADVERTISING_SYNC_ESTABLISHED_EVENT_MASK = (1 << 13)
HCI_LE_PERIODIC_ADVERTISING_REPORT_EVENT_MASK = (1 << 14)
HCI_LE_PERIODIC_ADVERTISING_SYNC_LOST_EVENT_MASK = (1 << 15)
HCI_LE_EXTENDED_SCAN_TIMEOUT_EVENT_MASK = (1 << 16)
HCI_LE_EXTENDED_ADVERTISING_SET_TERMINATED_EVENT_MASK = (1 << 17)
HCI_LE_SCAN_REQUEST_RECEIVED_EVENT_MASK = (1 << 18)
HCI_LE_CHANNEL_SELECTION_ALGORITHM_EVENT_MASK = (1 << 19)
HCI_LE_CONNECTIONLESS_IQ_REPORT_EVENT_MASK = (1 << 20)
HCI_LE_CONNECTION_IQ_REPORT_EVENT_MASK = (1 << 21)
HCI_LE_CTE_REQUEST_FAILED_EVENT_MASK = (1 << 22)
HCI_LE_PERIODIC_ADVERTISING_SYNC_TRANSFER_RECEIVED_EVENT_MASK = (1 << 23)
HCI_LE_CIS_ESTABLISHED_EVENT_MASK = (1 << 24)
HCI_LE_CIS_REQUEST_EVENT_MASK = (1 << 25)
HCI_LE_CREATE_BIG_COMPLETE_EVENT_MASK = (1 << 26)
HCI_LE_TERMINATE_BIG_COMPLETE_EVENT_MASK = (1 << 27)
HCI_LE_BIG_SYNC_ESTABLISHED_EVENT_MASK = (1 << 28)
HCI_LE_BIG_SYNC_LOST_EVENT_MASK = (1 << 29)
HCI_LE_REQUEST_PEER_SCA_COMPLETE_EVENT_MASK = (1 << 30)
HCI_LE_PATH_LOSS_THRESHOLD_EVENT_MASK = (1 << 31)
HCI_LE_TRANSMIT_POWER_REPORTING_EVENT_MASK = (1 << 32)
HCI_LE_BIGINFO_ADVERTISING_REPORT_EVENT_MASK = (1 << 33)
HCI_LE_SUBRATE_CHANGE_EVENT_MASK = (1 << 34)
HCI_LE_EVENT_MASK_NAMES = {
mask: mask_name for (mask_name, mask) in globals().items()
if mask_name.startswith('HCI_LE_') and mask_name.endswith('_EVENT_MASK')
}
# ACL # ACL
HCI_ACL_PB_FIRST_NON_FLUSHABLE = 0 HCI_ACL_PB_FIRST_NON_FLUSHABLE = 0
@@ -732,15 +697,15 @@ HCI_LE_PHY_TYPE_TO_BIT = {
class Phy(enum.IntEnum): class Phy(enum.IntEnum):
LE_1M = 0x01 LE_1M = HCI_LE_1M_PHY
LE_2M = 0x02 LE_2M = HCI_LE_2M_PHY
LE_CODED = 0x03 LE_CODED = HCI_LE_CODED_PHY
class PhyBit(enum.IntFlag): class PhyBit(enum.IntFlag):
LE_1M = 0b00000001 LE_1M = 1 << HCI_LE_1M_PHY_BIT
LE_2M = 0b00000010 LE_2M = 1 << HCI_LE_2M_PHY_BIT
LE_CODED = 0b00000100 LE_CODED = 1 << HCI_LE_CODED_PHY_BIT
# Connection Parameters # Connection Parameters
@@ -2910,6 +2875,20 @@ class HCI_Set_Event_Mask_Command(HCI_Command):
See Bluetooth spec @ 7.3.1 Set Event Mask Command See Bluetooth spec @ 7.3.1 Set Event Mask Command
''' '''
@staticmethod
def mask(event_codes: Iterable[int]) -> bytes:
'''
Compute the event mask value for a list of events.
'''
# NOTE: this implementation takes advantage of the fact that as of version 5.4
# of the core specification, the bit number for each event code is equal to one
# less than the event code.
# If future versions of the specification deviate from that, a different
# implementation would be needed.
return sum((1 << event_code - 1) for event_code in event_codes).to_bytes(
8, 'little'
)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@HCI_Command.command() @HCI_Command.command()
@@ -3433,6 +3412,20 @@ class HCI_LE_Set_Event_Mask_Command(HCI_Command):
See Bluetooth spec @ 7.8.1 LE Set Event Mask Command See Bluetooth spec @ 7.8.1 LE Set Event Mask Command
''' '''
@staticmethod
def mask(event_codes: Iterable[int]) -> bytes:
'''
Compute the event mask value for a list of events.
'''
# NOTE: this implementation takes advantage of the fact that as of version 5.4
# of the core specification, the bit number for each event code is equal to one
# less than the event code.
# If future versions of the specification deviate from that, a different
# implementation would be needed.
return sum((1 << event_code - 1) for event_code in event_codes).to_bytes(
8, 'little'
)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@HCI_Command.command( @HCI_Command.command(
@@ -4040,13 +4033,16 @@ class HCI_LE_Set_Advertising_Set_Random_Address_Command(HCI_Command):
('advertising_sid', 1), ('advertising_sid', 1),
('scan_request_notification_enable', 1), ('scan_request_notification_enable', 1),
], ],
return_parameters_fields=[('status', STATUS_SPEC), ('selected_tx__power', 1)], return_parameters_fields=[('status', STATUS_SPEC), ('selected_tx_power', 1)],
) )
class HCI_LE_Set_Extended_Advertising_Parameters_Command(HCI_Command): class HCI_LE_Set_Extended_Advertising_Parameters_Command(HCI_Command):
''' '''
See Bluetooth spec @ 7.8.53 LE Set Extended Advertising Parameters Command See Bluetooth spec @ 7.8.53 LE Set Extended Advertising Parameters Command
''' '''
TX_POWER_NO_PREFERENCE = 0x7F
SHOULD_NOT_FRAGMENT = 0x01
class AdvertisingProperties(enum.IntFlag): class AdvertisingProperties(enum.IntFlag):
CONNECTABLE_ADVERTISING = 1 << 0 CONNECTABLE_ADVERTISING = 1 << 0
SCANNABLE_ADVERTISING = 1 << 1 SCANNABLE_ADVERTISING = 1 << 1
@@ -4291,7 +4287,7 @@ class HCI_LE_Set_Extended_Scan_Parameters_Command(HCI_Command):
('scanning_filter_policy:', self.scanning_filter_policy), ('scanning_filter_policy:', self.scanning_filter_policy),
('scanning_phys: ', ','.join(scanning_phys_strs)), ('scanning_phys: ', ','.join(scanning_phys_strs)),
] ]
for (i, scanning_phy_str) in enumerate(scanning_phys_strs): for i, scanning_phy_str in enumerate(scanning_phys_strs):
fields.append( fields.append(
( (
f'{scanning_phy_str}.scan_type: ', f'{scanning_phy_str}.scan_type: ',
@@ -4434,7 +4430,7 @@ class HCI_LE_Extended_Create_Connection_Command(HCI_Command):
('peer_address: ', str(self.peer_address)), ('peer_address: ', str(self.peer_address)),
('initiating_phys: ', ','.join(initiating_phys_strs)), ('initiating_phys: ', ','.join(initiating_phys_strs)),
] ]
for (i, initiating_phys_str) in enumerate(initiating_phys_strs): for i, initiating_phys_str in enumerate(initiating_phys_strs):
fields.append( fields.append(
( (
f'{initiating_phys_str}.scan_interval: ', f'{initiating_phys_str}.scan_interval: ',
@@ -5321,7 +5317,7 @@ HCI_LE_Meta_Event.subevent_classes[
('status', 1), ('status', 1),
('advertising_handle', 1), ('advertising_handle', 1),
('connection_handle', 2), ('connection_handle', 2),
('number_completed_extended_advertising_events', 1), ('num_completed_extended_advertising_events', 1),
] ]
) )
class HCI_LE_Advertising_Set_Terminated_Event(HCI_LE_Meta_Event): class HCI_LE_Advertising_Set_Terminated_Event(HCI_LE_Meta_Event):
@@ -6262,7 +6258,7 @@ class HCI_IsoDataPacket(HCI_Packet):
if ts_flag: if ts_flag:
if not should_include_sdu_info: if not should_include_sdu_info:
logger.warn(f'Timestamp included when pb_flag={bin(pb_flag)}') logger.warning(f'Timestamp included when pb_flag={bin(pb_flag)}')
time_stamp, *_ = struct.unpack_from('<I', packet, pos) time_stamp, *_ = struct.unpack_from('<I', packet, pos)
pos += 4 pos += 4
@@ -6375,7 +6371,7 @@ class HCI_AclDataPacketAssembler:
self.current_data = None self.current_data = None
self.l2cap_pdu_length = 0 self.l2cap_pdu_length = 0
else: else:
# Sanity check # Compliance check
if len(self.current_data) > self.l2cap_pdu_length + 4: if len(self.current_data) > self.l2cap_pdu_length + 4:
logger.warning('!!! ACL data exceeds L2CAP PDU') logger.warning('!!! ACL data exceeds L2CAP PDU')
self.current_data = None self.current_data = None

View File

@@ -28,59 +28,15 @@ from bumble.colors import color
from bumble.l2cap import L2CAP_PDU from bumble.l2cap import L2CAP_PDU
from bumble.snoop import Snooper from bumble.snoop import Snooper
from bumble import drivers from bumble import drivers
from bumble import hci
from .hci import ( from bumble.core import (
Address,
HCI_ACL_DATA_PACKET,
HCI_COMMAND_PACKET,
HCI_EVENT_PACKET,
HCI_ISO_DATA_PACKET,
HCI_LE_READ_BUFFER_SIZE_COMMAND,
HCI_LE_READ_LOCAL_SUPPORTED_FEATURES_COMMAND,
HCI_LE_READ_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND,
HCI_LE_WRITE_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND,
HCI_READ_BUFFER_SIZE_COMMAND,
HCI_READ_LOCAL_VERSION_INFORMATION_COMMAND,
HCI_RESET_COMMAND,
HCI_SUCCESS,
HCI_SUPPORTED_COMMANDS_FLAGS,
HCI_SYNCHRONOUS_DATA_PACKET,
HCI_VERSION_BLUETOOTH_CORE_4_0,
HCI_AclDataPacket,
HCI_AclDataPacketAssembler,
HCI_Command,
HCI_Command_Complete_Event,
HCI_Constant,
HCI_Error,
HCI_Event,
HCI_IsoDataPacket,
HCI_LE_Long_Term_Key_Request_Negative_Reply_Command,
HCI_LE_Long_Term_Key_Request_Reply_Command,
HCI_LE_Read_Buffer_Size_Command,
HCI_LE_Read_Local_Supported_Features_Command,
HCI_LE_Read_Suggested_Default_Data_Length_Command,
HCI_LE_Remote_Connection_Parameter_Request_Reply_Command,
HCI_LE_Set_Event_Mask_Command,
HCI_LE_Write_Suggested_Default_Data_Length_Command,
HCI_Link_Key_Request_Negative_Reply_Command,
HCI_Link_Key_Request_Reply_Command,
HCI_Packet,
HCI_Read_Buffer_Size_Command,
HCI_Read_Local_Supported_Commands_Command,
HCI_Read_Local_Version_Information_Command,
HCI_Reset_Command,
HCI_Set_Event_Mask_Command,
HCI_SynchronousDataPacket,
LeFeatureMask,
)
from .core import (
BT_BR_EDR_TRANSPORT, BT_BR_EDR_TRANSPORT,
BT_LE_TRANSPORT, BT_LE_TRANSPORT,
ConnectionPHY, ConnectionPHY,
ConnectionParameters, ConnectionParameters,
) )
from .utils import AbortableEventEmitter from bumble.utils import AbortableEventEmitter
from .transport.common import TransportLostError from bumble.transport.common import TransportLostError
if TYPE_CHECKING: if TYPE_CHECKING:
from .transport.common import TransportSink, TransportSource from .transport.common import TransportSink, TransportSource
@@ -100,15 +56,15 @@ class AclPacketQueue:
self, self,
max_packet_size: int, max_packet_size: int,
max_in_flight: int, max_in_flight: int,
send: Callable[[HCI_Packet], None], send: Callable[[hci.HCI_Packet], None],
) -> None: ) -> None:
self.max_packet_size = max_packet_size self.max_packet_size = max_packet_size
self.max_in_flight = max_in_flight self.max_in_flight = max_in_flight
self.in_flight = 0 self.in_flight = 0
self.send = send self.send = send
self.packets: Deque[HCI_AclDataPacket] = collections.deque() self.packets: Deque[hci.HCI_AclDataPacket] = collections.deque()
def enqueue(self, packet: HCI_AclDataPacket) -> None: def enqueue(self, packet: hci.HCI_AclDataPacket) -> None:
self.packets.appendleft(packet) self.packets.appendleft(packet)
self.check_queue() self.check_queue()
@@ -140,11 +96,13 @@ class AclPacketQueue:
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class Connection: class Connection:
def __init__(self, host: Host, handle: int, peer_address: Address, transport: int): def __init__(
self, host: Host, handle: int, peer_address: hci.Address, transport: int
):
self.host = host self.host = host
self.handle = handle self.handle = handle
self.peer_address = peer_address self.peer_address = peer_address
self.assembler = HCI_AclDataPacketAssembler(self.on_acl_pdu) self.assembler = hci.HCI_AclDataPacketAssembler(self.on_acl_pdu)
self.transport = transport self.transport = transport
acl_packet_queue: Optional[AclPacketQueue] = ( acl_packet_queue: Optional[AclPacketQueue] = (
host.le_acl_packet_queue host.le_acl_packet_queue
@@ -154,7 +112,7 @@ class Connection:
assert acl_packet_queue assert acl_packet_queue
self.acl_packet_queue = acl_packet_queue self.acl_packet_queue = acl_packet_queue
def on_hci_acl_data_packet(self, packet: HCI_AclDataPacket) -> None: def on_hci_acl_data_packet(self, packet: hci.HCI_AclDataPacket) -> None:
self.assembler.feed_packet(packet) self.assembler.feed_packet(packet)
def on_acl_pdu(self, pdu: bytes) -> None: def on_acl_pdu(self, pdu: bytes) -> None:
@@ -165,14 +123,14 @@ class Connection:
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@dataclasses.dataclass @dataclasses.dataclass
class ScoLink: class ScoLink:
peer_address: Address peer_address: hci.Address
handle: int handle: int
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@dataclasses.dataclass @dataclasses.dataclass
class CisLink: class CisLink:
peer_address: Address peer_address: hci.Address
handle: int handle: int
@@ -188,7 +146,7 @@ class Host(AbortableEventEmitter):
long_term_key_provider: Optional[ long_term_key_provider: Optional[
Callable[[int, bytes, int], Awaitable[Optional[bytes]]] Callable[[int, bytes, int], Awaitable[Optional[bytes]]]
] ]
link_key_provider: Optional[Callable[[Address], Awaitable[Optional[bytes]]]] link_key_provider: Optional[Callable[[hci.Address], Awaitable[Optional[bytes]]]]
def __init__( def __init__(
self, self,
@@ -204,6 +162,8 @@ class Host(AbortableEventEmitter):
self.sco_links = {} # SCO links, by connection handle self.sco_links = {} # SCO links, by connection handle
self.pending_command = None self.pending_command = None
self.pending_response = None self.pending_response = None
self.number_of_supported_advertising_sets = 0
self.maximum_advertising_data_length = 31
self.local_version = None self.local_version = None
self.local_supported_commands = bytes(64) self.local_supported_commands = bytes(64)
self.local_le_features = 0 self.local_le_features = 0
@@ -223,7 +183,7 @@ class Host(AbortableEventEmitter):
def find_connection_by_bd_addr( def find_connection_by_bd_addr(
self, self,
bd_addr: Address, bd_addr: hci.Address,
transport: Optional[int] = None, transport: Optional[int] = None,
check_address_type: bool = False, check_address_type: bool = False,
) -> Optional[Connection]: ) -> Optional[Connection]:
@@ -265,49 +225,139 @@ class Host(AbortableEventEmitter):
# Send a reset command unless a driver has already done so. # Send a reset command unless a driver has already done so.
if reset_needed: if reset_needed:
await self.send_command(HCI_Reset_Command(), check_result=True) await self.send_command(hci.HCI_Reset_Command(), check_result=True)
self.ready = True self.ready = True
response = await self.send_command( response = await self.send_command(
HCI_Read_Local_Supported_Commands_Command(), check_result=True hci.HCI_Read_Local_Supported_Commands_Command(), check_result=True
) )
self.local_supported_commands = response.return_parameters.supported_commands self.local_supported_commands = response.return_parameters.supported_commands
if self.supports_command(HCI_LE_READ_LOCAL_SUPPORTED_FEATURES_COMMAND): if self.supports_command(hci.HCI_LE_READ_LOCAL_SUPPORTED_FEATURES_COMMAND):
response = await self.send_command( response = await self.send_command(
HCI_LE_Read_Local_Supported_Features_Command(), check_result=True hci.HCI_LE_Read_Local_Supported_Features_Command(), check_result=True
) )
self.local_le_features = struct.unpack( self.local_le_features = struct.unpack(
'<Q', response.return_parameters.le_features '<Q', response.return_parameters.le_features
)[0] )[0]
if self.supports_command(HCI_READ_LOCAL_VERSION_INFORMATION_COMMAND): if self.supports_command(hci.HCI_READ_LOCAL_VERSION_INFORMATION_COMMAND):
response = await self.send_command( response = await self.send_command(
HCI_Read_Local_Version_Information_Command(), check_result=True hci.HCI_Read_Local_Version_Information_Command(), check_result=True
) )
self.local_version = response.return_parameters self.local_version = response.return_parameters
await self.send_command( await self.send_command(
HCI_Set_Event_Mask_Command(event_mask=bytes.fromhex('FFFFFFFFFFFFFF3F')) hci.HCI_Set_Event_Mask_Command(
event_mask=hci.HCI_Set_Event_Mask_Command.mask(
[
hci.HCI_INQUIRY_COMPLETE_EVENT,
hci.HCI_INQUIRY_RESULT_EVENT,
hci.HCI_CONNECTION_COMPLETE_EVENT,
hci.HCI_CONNECTION_REQUEST_EVENT,
hci.HCI_DISCONNECTION_COMPLETE_EVENT,
hci.HCI_AUTHENTICATION_COMPLETE_EVENT,
hci.HCI_REMOTE_NAME_REQUEST_COMPLETE_EVENT,
hci.HCI_ENCRYPTION_CHANGE_EVENT,
hci.HCI_CHANGE_CONNECTION_LINK_KEY_COMPLETE_EVENT,
hci.HCI_LINK_KEY_TYPE_CHANGED_EVENT,
hci.HCI_READ_REMOTE_SUPPORTED_FEATURES_COMPLETE_EVENT,
hci.HCI_READ_REMOTE_VERSION_INFORMATION_COMPLETE_EVENT,
hci.HCI_QOS_SETUP_COMPLETE_EVENT,
hci.HCI_HARDWARE_ERROR_EVENT,
hci.HCI_FLUSH_OCCURRED_EVENT,
hci.HCI_ROLE_CHANGE_EVENT,
hci.HCI_MODE_CHANGE_EVENT,
hci.HCI_RETURN_LINK_KEYS_EVENT,
hci.HCI_PIN_CODE_REQUEST_EVENT,
hci.HCI_LINK_KEY_REQUEST_EVENT,
hci.HCI_LINK_KEY_NOTIFICATION_EVENT,
hci.HCI_LOOPBACK_COMMAND_EVENT,
hci.HCI_DATA_BUFFER_OVERFLOW_EVENT,
hci.HCI_MAX_SLOTS_CHANGE_EVENT,
hci.HCI_READ_CLOCK_OFFSET_COMPLETE_EVENT,
hci.HCI_CONNECTION_PACKET_TYPE_CHANGED_EVENT,
hci.HCI_QOS_VIOLATION_EVENT,
hci.HCI_PAGE_SCAN_REPETITION_MODE_CHANGE_EVENT,
hci.HCI_FLOW_SPECIFICATION_COMPLETE_EVENT,
hci.HCI_INQUIRY_RESULT_WITH_RSSI_EVENT,
hci.HCI_READ_REMOTE_EXTENDED_FEATURES_COMPLETE_EVENT,
hci.HCI_SYNCHRONOUS_CONNECTION_COMPLETE_EVENT,
hci.HCI_SYNCHRONOUS_CONNECTION_CHANGED_EVENT,
hci.HCI_SNIFF_SUBRATING_EVENT,
hci.HCI_EXTENDED_INQUIRY_RESULT_EVENT,
hci.HCI_ENCRYPTION_KEY_REFRESH_COMPLETE_EVENT,
hci.HCI_IO_CAPABILITY_REQUEST_EVENT,
hci.HCI_IO_CAPABILITY_RESPONSE_EVENT,
hci.HCI_USER_CONFIRMATION_REQUEST_EVENT,
hci.HCI_USER_PASSKEY_REQUEST_EVENT,
hci.HCI_REMOTE_OOB_DATA_REQUEST_EVENT,
hci.HCI_SIMPLE_PAIRING_COMPLETE_EVENT,
hci.HCI_LINK_SUPERVISION_TIMEOUT_CHANGED_EVENT,
hci.HCI_ENHANCED_FLUSH_COMPLETE_EVENT,
hci.HCI_USER_PASSKEY_NOTIFICATION_EVENT,
hci.HCI_KEYPRESS_NOTIFICATION_EVENT,
hci.HCI_REMOTE_HOST_SUPPORTED_FEATURES_NOTIFICATION_EVENT,
hci.HCI_LE_META_EVENT,
]
)
)
) )
if ( if (
self.local_version is not None self.local_version is not None
and self.local_version.hci_version <= HCI_VERSION_BLUETOOTH_CORE_4_0 and self.local_version.hci_version <= hci.HCI_VERSION_BLUETOOTH_CORE_4_0
): ):
# Some older controllers don't like event masks with bits they don't # Some older controllers don't like event masks with bits they don't
# understand # understand
le_event_mask = bytes.fromhex('1F00000000000000') le_event_mask = bytes.fromhex('1F00000000000000')
else: else:
le_event_mask = bytes.fromhex('FFFFFFFF00000000') le_event_mask = hci.HCI_LE_Set_Event_Mask_Command.mask(
[
hci.HCI_LE_CONNECTION_COMPLETE_EVENT,
hci.HCI_LE_ADVERTISING_REPORT_EVENT,
hci.HCI_LE_CONNECTION_UPDATE_COMPLETE_EVENT,
hci.HCI_LE_READ_REMOTE_FEATURES_COMPLETE_EVENT,
hci.HCI_LE_LONG_TERM_KEY_REQUEST_EVENT,
hci.HCI_LE_REMOTE_CONNECTION_PARAMETER_REQUEST_EVENT,
hci.HCI_LE_DATA_LENGTH_CHANGE_EVENT,
hci.HCI_LE_READ_LOCAL_P_256_PUBLIC_KEY_COMPLETE_EVENT,
hci.HCI_LE_GENERATE_DHKEY_COMPLETE_EVENT,
hci.HCI_LE_ENHANCED_CONNECTION_COMPLETE_EVENT,
hci.HCI_LE_DIRECTED_ADVERTISING_REPORT_EVENT,
hci.HCI_LE_PHY_UPDATE_COMPLETE_EVENT,
hci.HCI_LE_EXTENDED_ADVERTISING_REPORT_EVENT,
hci.HCI_LE_PERIODIC_ADVERTISING_SYNC_ESTABLISHED_EVENT,
hci.HCI_LE_PERIODIC_ADVERTISING_REPORT_EVENT,
hci.HCI_LE_PERIODIC_ADVERTISING_SYNC_LOST_EVENT,
hci.HCI_LE_SCAN_TIMEOUT_EVENT,
hci.HCI_LE_ADVERTISING_SET_TERMINATED_EVENT,
hci.HCI_LE_SCAN_REQUEST_RECEIVED_EVENT,
hci.HCI_LE_CONNECTIONLESS_IQ_REPORT_EVENT,
hci.HCI_LE_CONNECTION_IQ_REPORT_EVENT,
hci.HCI_LE_CTE_REQUEST_FAILED_EVENT,
hci.HCI_LE_PERIODIC_ADVERTISING_SYNC_TRANSFER_RECEIVED_EVENT,
hci.HCI_LE_CIS_ESTABLISHED_EVENT,
hci.HCI_LE_CIS_REQUEST_EVENT,
hci.HCI_LE_CREATE_BIG_COMPLETE_EVENT,
hci.HCI_LE_TERMINATE_BIG_COMPLETE_EVENT,
hci.HCI_LE_BIG_SYNC_ESTABLISHED_EVENT,
hci.HCI_LE_BIG_SYNC_LOST_EVENT,
hci.HCI_LE_REQUEST_PEER_SCA_COMPLETE_EVENT,
hci.HCI_LE_PATH_LOSS_THRESHOLD_EVENT,
hci.HCI_LE_TRANSMIT_POWER_REPORTING_EVENT,
hci.HCI_LE_BIGINFO_ADVERTISING_REPORT_EVENT,
hci.HCI_LE_SUBRATE_CHANGE_EVENT,
]
)
await self.send_command( await self.send_command(
HCI_LE_Set_Event_Mask_Command(le_event_mask=le_event_mask) hci.HCI_LE_Set_Event_Mask_Command(le_event_mask=le_event_mask)
) )
if self.supports_command(HCI_READ_BUFFER_SIZE_COMMAND): if self.supports_command(hci.HCI_READ_BUFFER_SIZE_COMMAND):
response = await self.send_command( response = await self.send_command(
HCI_Read_Buffer_Size_Command(), check_result=True hci.HCI_Read_Buffer_Size_Command(), check_result=True
) )
hc_acl_data_packet_length = ( hc_acl_data_packet_length = (
response.return_parameters.hc_acl_data_packet_length response.return_parameters.hc_acl_data_packet_length
@@ -330,9 +380,9 @@ class Host(AbortableEventEmitter):
hc_le_acl_data_packet_length = 0 hc_le_acl_data_packet_length = 0
hc_total_num_le_acl_data_packets = 0 hc_total_num_le_acl_data_packets = 0
if self.supports_command(HCI_LE_READ_BUFFER_SIZE_COMMAND): if self.supports_command(hci.HCI_LE_READ_BUFFER_SIZE_COMMAND):
response = await self.send_command( response = await self.send_command(
HCI_LE_Read_Buffer_Size_Command(), check_result=True hci.HCI_LE_Read_Buffer_Size_Command(), check_result=True
) )
hc_le_acl_data_packet_length = ( hc_le_acl_data_packet_length = (
response.return_parameters.hc_le_acl_data_packet_length response.return_parameters.hc_le_acl_data_packet_length
@@ -359,10 +409,12 @@ class Host(AbortableEventEmitter):
) )
if self.supports_command( if self.supports_command(
HCI_LE_READ_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND hci.HCI_LE_READ_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND
) and self.supports_command(HCI_LE_WRITE_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND): ) and self.supports_command(
hci.HCI_LE_WRITE_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND
):
response = await self.send_command( response = await self.send_command(
HCI_LE_Read_Suggested_Default_Data_Length_Command() hci.HCI_LE_Read_Suggested_Default_Data_Length_Command()
) )
suggested_max_tx_octets = response.return_parameters.suggested_max_tx_octets suggested_max_tx_octets = response.return_parameters.suggested_max_tx_octets
suggested_max_tx_time = response.return_parameters.suggested_max_tx_time suggested_max_tx_time = response.return_parameters.suggested_max_tx_time
@@ -371,12 +423,34 @@ class Host(AbortableEventEmitter):
or suggested_max_tx_time != self.suggested_max_tx_time or suggested_max_tx_time != self.suggested_max_tx_time
): ):
await self.send_command( await self.send_command(
HCI_LE_Write_Suggested_Default_Data_Length_Command( hci.HCI_LE_Write_Suggested_Default_Data_Length_Command(
suggested_max_tx_octets=self.suggested_max_tx_octets, suggested_max_tx_octets=self.suggested_max_tx_octets,
suggested_max_tx_time=self.suggested_max_tx_time, suggested_max_tx_time=self.suggested_max_tx_time,
) )
) )
if self.supports_command(
hci.HCI_LE_READ_NUMBER_OF_SUPPORTED_ADVERTISING_SETS_COMMAND
):
response = await self.send_command(
hci.HCI_LE_Read_Number_Of_Supported_Advertising_Sets_Command(),
check_result=True,
)
self.number_of_supported_advertising_sets = (
response.return_parameters.num_supported_advertising_sets
)
if self.supports_command(
hci.HCI_LE_READ_MAXIMUM_ADVERTISING_DATA_LENGTH_COMMAND
):
response = await self.send_command(
hci.HCI_LE_Read_Maximum_Advertising_Data_Length_Command(),
check_result=True,
)
self.maximum_advertising_data_length = (
response.return_parameters.max_advertising_data_length
)
@property @property
def controller(self) -> Optional[TransportSink]: def controller(self) -> Optional[TransportSink]:
return self.hci_sink return self.hci_sink
@@ -394,7 +468,7 @@ class Host(AbortableEventEmitter):
source.set_packet_sink(self) source.set_packet_sink(self)
self.hci_metadata = getattr(source, 'metadata', self.hci_metadata) self.hci_metadata = getattr(source, 'metadata', self.hci_metadata)
def send_hci_packet(self, packet: HCI_Packet) -> None: def send_hci_packet(self, packet: hci.HCI_Packet) -> None:
logger.debug(f'{color("### HOST -> CONTROLLER", "blue")}: {packet}') logger.debug(f'{color("### HOST -> CONTROLLER", "blue")}: {packet}')
if self.snooper: if self.snooper:
self.snooper.snoop(bytes(packet), Snooper.Direction.HOST_TO_CONTROLLER) self.snooper.snoop(bytes(packet), Snooper.Direction.HOST_TO_CONTROLLER)
@@ -425,11 +499,12 @@ class Host(AbortableEventEmitter):
else: else:
status = response.return_parameters.status status = response.return_parameters.status
if status != HCI_SUCCESS: if status != hci.HCI_SUCCESS:
logger.warning( logger.warning(
f'{command.name} failed ({HCI_Constant.error_name(status)})' f'{command.name} failed '
f'({hci.HCI_Constant.error_name(status)})'
) )
raise HCI_Error(status) raise hci.HCI_Error(status)
return response return response
except Exception as error: except Exception as error:
@@ -442,8 +517,8 @@ class Host(AbortableEventEmitter):
self.pending_response = None self.pending_response = None
# Use this method to send a command from a task # Use this method to send a command from a task
def send_command_sync(self, command: HCI_Command) -> None: def send_command_sync(self, command: hci.HCI_Command) -> None:
async def send_command(command: HCI_Command) -> None: async def send_command(command: hci.HCI_Command) -> None:
await self.send_command(command) await self.send_command(command)
asyncio.create_task(send_command(command)) asyncio.create_task(send_command(command))
@@ -468,7 +543,7 @@ class Host(AbortableEventEmitter):
pb_flag = 0 pb_flag = 0
while bytes_remaining: while bytes_remaining:
data_total_length = min(bytes_remaining, packet_queue.max_packet_size) data_total_length = min(bytes_remaining, packet_queue.max_packet_size)
acl_packet = HCI_AclDataPacket( acl_packet = hci.HCI_AclDataPacket(
connection_handle=connection_handle, connection_handle=connection_handle,
pb_flag=pb_flag, pb_flag=pb_flag,
bc_flag=0, bc_flag=0,
@@ -483,7 +558,7 @@ class Host(AbortableEventEmitter):
def supports_command(self, command): def supports_command(self, command):
# Find the support flag position for this command # Find the support flag position for this command
for octet, flags in enumerate(HCI_SUPPORTED_COMMANDS_FLAGS): for octet, flags in enumerate(hci.HCI_SUPPORTED_COMMANDS_FLAGS):
for flag_position, value in enumerate(flags): for flag_position, value in enumerate(flags):
if value == command: if value == command:
# Check if the flag is set # Check if the flag is set
@@ -498,16 +573,16 @@ class Host(AbortableEventEmitter):
def supported_commands(self): def supported_commands(self):
commands = [] commands = []
for octet, flags in enumerate(self.local_supported_commands): for octet, flags in enumerate(self.local_supported_commands):
if octet < len(HCI_SUPPORTED_COMMANDS_FLAGS): if octet < len(hci.HCI_SUPPORTED_COMMANDS_FLAGS):
for flag in range(8): for flag in range(8):
if flags & (1 << flag) != 0: if flags & (1 << flag) != 0:
command = HCI_SUPPORTED_COMMANDS_FLAGS[octet][flag] command = hci.HCI_SUPPORTED_COMMANDS_FLAGS[octet][flag]
if command is not None: if command is not None:
commands.append(command) commands.append(command)
return commands return commands
def supports_le_features(self, feature: LeFeatureMask) -> bool: def supports_le_features(self, feature: hci.LeFeatureMask) -> bool:
return (self.local_le_features & feature) == feature return (self.local_le_features & feature) == feature
@property @property
@@ -518,10 +593,10 @@ class Host(AbortableEventEmitter):
# Packet Sink protocol (packets coming from the controller via HCI) # Packet Sink protocol (packets coming from the controller via HCI)
def on_packet(self, packet: bytes) -> None: def on_packet(self, packet: bytes) -> None:
hci_packet = HCI_Packet.from_bytes(packet) hci_packet = hci.HCI_Packet.from_bytes(packet)
if self.ready or ( if self.ready or (
isinstance(hci_packet, HCI_Command_Complete_Event) isinstance(hci_packet, hci.HCI_Command_Complete_Event)
and hci_packet.command_opcode == HCI_RESET_COMMAND and hci_packet.command_opcode == hci.HCI_RESET_COMMAND
): ):
self.on_hci_packet(hci_packet) self.on_hci_packet(hci_packet)
else: else:
@@ -534,44 +609,44 @@ class Host(AbortableEventEmitter):
self.emit('flush') self.emit('flush')
def on_hci_packet(self, packet: HCI_Packet) -> None: def on_hci_packet(self, packet: hci.HCI_Packet) -> None:
logger.debug(f'{color("### CONTROLLER -> HOST", "green")}: {packet}') logger.debug(f'{color("### CONTROLLER -> HOST", "green")}: {packet}')
if self.snooper: if self.snooper:
self.snooper.snoop(bytes(packet), Snooper.Direction.CONTROLLER_TO_HOST) self.snooper.snoop(bytes(packet), Snooper.Direction.CONTROLLER_TO_HOST)
# If the packet is a command, invoke the handler for this packet # If the packet is a command, invoke the handler for this packet
if packet.hci_packet_type == HCI_COMMAND_PACKET: if packet.hci_packet_type == hci.HCI_COMMAND_PACKET:
self.on_hci_command_packet(cast(HCI_Command, packet)) self.on_hci_command_packet(cast(hci.HCI_Command, packet))
elif packet.hci_packet_type == HCI_EVENT_PACKET: elif packet.hci_packet_type == hci.HCI_EVENT_PACKET:
self.on_hci_event_packet(cast(HCI_Event, packet)) self.on_hci_event_packet(cast(hci.HCI_Event, packet))
elif packet.hci_packet_type == HCI_ACL_DATA_PACKET: elif packet.hci_packet_type == hci.HCI_ACL_DATA_PACKET:
self.on_hci_acl_data_packet(cast(HCI_AclDataPacket, packet)) self.on_hci_acl_data_packet(cast(hci.HCI_AclDataPacket, packet))
elif packet.hci_packet_type == HCI_SYNCHRONOUS_DATA_PACKET: elif packet.hci_packet_type == hci.HCI_SYNCHRONOUS_DATA_PACKET:
self.on_hci_sco_data_packet(cast(HCI_SynchronousDataPacket, packet)) self.on_hci_sco_data_packet(cast(hci.HCI_SynchronousDataPacket, packet))
elif packet.hci_packet_type == HCI_ISO_DATA_PACKET: elif packet.hci_packet_type == hci.HCI_ISO_DATA_PACKET:
self.on_hci_iso_data_packet(cast(HCI_IsoDataPacket, packet)) self.on_hci_iso_data_packet(cast(hci.HCI_IsoDataPacket, packet))
else: else:
logger.warning(f'!!! unknown packet type {packet.hci_packet_type}') logger.warning(f'!!! unknown packet type {packet.hci_packet_type}')
def on_hci_command_packet(self, command: HCI_Command) -> None: def on_hci_command_packet(self, command: hci.HCI_Command) -> None:
logger.warning(f'!!! unexpected command packet: {command}') logger.warning(f'!!! unexpected command packet: {command}')
def on_hci_event_packet(self, event: HCI_Event) -> None: def on_hci_event_packet(self, event: hci.HCI_Event) -> None:
handler_name = f'on_{event.name.lower()}' handler_name = f'on_{event.name.lower()}'
handler = getattr(self, handler_name, self.on_hci_event) handler = getattr(self, handler_name, self.on_hci_event)
handler(event) handler(event)
def on_hci_acl_data_packet(self, packet: HCI_AclDataPacket) -> None: def on_hci_acl_data_packet(self, packet: hci.HCI_AclDataPacket) -> None:
# Look for the connection to which this data belongs # Look for the connection to which this data belongs
if connection := self.connections.get(packet.connection_handle): if connection := self.connections.get(packet.connection_handle):
connection.on_hci_acl_data_packet(packet) connection.on_hci_acl_data_packet(packet)
def on_hci_sco_data_packet(self, packet: HCI_SynchronousDataPacket) -> None: def on_hci_sco_data_packet(self, packet: hci.HCI_SynchronousDataPacket) -> None:
# Experimental # Experimental
self.emit('sco_packet', packet.connection_handle, packet) self.emit('sco_packet', packet.connection_handle, packet)
def on_hci_iso_data_packet(self, packet: HCI_IsoDataPacket) -> None: def on_hci_iso_data_packet(self, packet: hci.HCI_IsoDataPacket) -> None:
# Experimental # Experimental
self.emit('iso_packet', packet.connection_handle, packet) self.emit('iso_packet', packet.connection_handle, packet)
@@ -635,11 +710,11 @@ class Host(AbortableEventEmitter):
def on_hci_le_connection_complete_event(self, event): def on_hci_le_connection_complete_event(self, event):
# Check if this is a cancellation # Check if this is a cancellation
if event.status == HCI_SUCCESS: if event.status == hci.HCI_SUCCESS:
# Create/update the connection # Create/update the connection
logger.debug( logger.debug(
f'### LE CONNECTION: [0x{event.connection_handle:04X}] ' f'### LE CONNECTION: [0x{event.connection_handle:04X}] '
f'{event.peer_address} as {HCI_Constant.role_name(event.role)}' f'{event.peer_address} as {hci.HCI_Constant.role_name(event.role)}'
) )
connection = self.connections.get(event.connection_handle) connection = self.connections.get(event.connection_handle)
@@ -679,7 +754,7 @@ class Host(AbortableEventEmitter):
self.on_hci_le_connection_complete_event(event) self.on_hci_le_connection_complete_event(event)
def on_hci_connection_complete_event(self, event): def on_hci_connection_complete_event(self, event):
if event.status == HCI_SUCCESS: if event.status == hci.HCI_SUCCESS:
# Create/update the connection # Create/update the connection
logger.debug( logger.debug(
f'### BR/EDR CONNECTION: [0x{event.connection_handle:04X}] ' f'### BR/EDR CONNECTION: [0x{event.connection_handle:04X}] '
@@ -726,7 +801,7 @@ class Host(AbortableEventEmitter):
logger.warning('!!! DISCONNECTION COMPLETE: unknown handle') logger.warning('!!! DISCONNECTION COMPLETE: unknown handle')
return return
if event.status == HCI_SUCCESS: if event.status == hci.HCI_SUCCESS:
logger.debug( logger.debug(
f'### DISCONNECTION: [0x{handle:04X}] ' f'### DISCONNECTION: [0x{handle:04X}] '
f'{connection.peer_address} ' f'{connection.peer_address} '
@@ -735,7 +810,9 @@ class Host(AbortableEventEmitter):
# Notify the listeners # Notify the listeners
self.emit('disconnection', handle, event.reason) self.emit('disconnection', handle, event.reason)
(
# Remove the handle reference
_ = (
self.connections.pop(handle, 0) self.connections.pop(handle, 0)
or self.cis_links.pop(handle, 0) or self.cis_links.pop(handle, 0)
or self.sco_links.pop(handle, 0) or self.sco_links.pop(handle, 0)
@@ -752,7 +829,7 @@ class Host(AbortableEventEmitter):
return return
# Notify the client # Notify the client
if event.status == HCI_SUCCESS: if event.status == hci.HCI_SUCCESS:
connection_parameters = ConnectionParameters( connection_parameters = ConnectionParameters(
event.connection_interval, event.connection_interval,
event.peripheral_latency, event.peripheral_latency,
@@ -772,7 +849,7 @@ class Host(AbortableEventEmitter):
return return
# Notify the client # Notify the client
if event.status == HCI_SUCCESS: if event.status == hci.HCI_SUCCESS:
connection_phy = ConnectionPHY(event.tx_phy, event.rx_phy) connection_phy = ConnectionPHY(event.tx_phy, event.rx_phy)
self.emit('connection_phy_update', connection.handle, connection_phy) self.emit('connection_phy_update', connection.handle, connection_phy)
else: else:
@@ -791,6 +868,7 @@ class Host(AbortableEventEmitter):
event.status, event.status,
event.advertising_handle, event.advertising_handle,
event.connection_handle, event.connection_handle,
event.num_completed_extended_advertising_events,
) )
def on_hci_le_cis_request_event(self, event): def on_hci_le_cis_request_event(self, event):
@@ -804,10 +882,10 @@ class Host(AbortableEventEmitter):
def on_hci_le_cis_established_event(self, event): def on_hci_le_cis_established_event(self, event):
# The remaining parameters are unused for now. # The remaining parameters are unused for now.
if event.status == HCI_SUCCESS: if event.status == hci.HCI_SUCCESS:
self.cis_links[event.connection_handle] = CisLink( self.cis_links[event.connection_handle] = CisLink(
handle=event.connection_handle, handle=event.connection_handle,
peer_address=Address.ANY, peer_address=hci.Address.ANY,
) )
self.emit('cis_establishment', event.connection_handle) self.emit('cis_establishment', event.connection_handle)
else: else:
@@ -823,7 +901,7 @@ class Host(AbortableEventEmitter):
# For now, just accept everything # For now, just accept everything
# TODO: delegate the decision # TODO: delegate the decision
self.send_command_sync( self.send_command_sync(
HCI_LE_Remote_Connection_Parameter_Request_Reply_Command( hci.HCI_LE_Remote_Connection_Parameter_Request_Reply_Command(
connection_handle=event.connection_handle, connection_handle=event.connection_handle,
interval_min=event.interval_min, interval_min=event.interval_min,
interval_max=event.interval_max, interval_max=event.interval_max,
@@ -854,12 +932,12 @@ class Host(AbortableEventEmitter):
), ),
) )
if long_term_key: if long_term_key:
response = HCI_LE_Long_Term_Key_Request_Reply_Command( response = hci.HCI_LE_Long_Term_Key_Request_Reply_Command(
connection_handle=event.connection_handle, connection_handle=event.connection_handle,
long_term_key=long_term_key, long_term_key=long_term_key,
) )
else: else:
response = HCI_LE_Long_Term_Key_Request_Negative_Reply_Command( response = hci.HCI_LE_Long_Term_Key_Request_Negative_Reply_Command(
connection_handle=event.connection_handle connection_handle=event.connection_handle
) )
@@ -868,7 +946,7 @@ class Host(AbortableEventEmitter):
asyncio.create_task(send_long_term_key()) asyncio.create_task(send_long_term_key())
def on_hci_synchronous_connection_complete_event(self, event): def on_hci_synchronous_connection_complete_event(self, event):
if event.status == HCI_SUCCESS: if event.status == hci.HCI_SUCCESS:
# Create/update the connection # Create/update the connection
logger.debug( logger.debug(
f'### SCO CONNECTION: [0x{event.connection_handle:04X}] ' f'### SCO CONNECTION: [0x{event.connection_handle:04X}] '
@@ -897,16 +975,16 @@ class Host(AbortableEventEmitter):
pass pass
def on_hci_role_change_event(self, event): def on_hci_role_change_event(self, event):
if event.status == HCI_SUCCESS: if event.status == hci.HCI_SUCCESS:
logger.debug( logger.debug(
f'role change for {event.bd_addr}: ' f'role change for {event.bd_addr}: '
f'{HCI_Constant.role_name(event.new_role)}' f'{hci.HCI_Constant.role_name(event.new_role)}'
) )
self.emit('role_change', event.bd_addr, event.new_role) self.emit('role_change', event.bd_addr, event.new_role)
else: else:
logger.debug( logger.debug(
f'role change for {event.bd_addr} failed: ' f'role change for {event.bd_addr} failed: '
f'{HCI_Constant.error_name(event.status)}' f'{hci.HCI_Constant.error_name(event.status)}'
) )
self.emit('role_change_failure', event.bd_addr, event.status) self.emit('role_change_failure', event.bd_addr, event.status)
@@ -922,7 +1000,7 @@ class Host(AbortableEventEmitter):
def on_hci_authentication_complete_event(self, event): def on_hci_authentication_complete_event(self, event):
# Notify the client # Notify the client
if event.status == HCI_SUCCESS: if event.status == hci.HCI_SUCCESS:
self.emit('connection_authentication', event.connection_handle) self.emit('connection_authentication', event.connection_handle)
else: else:
self.emit( self.emit(
@@ -933,7 +1011,7 @@ class Host(AbortableEventEmitter):
def on_hci_encryption_change_event(self, event): def on_hci_encryption_change_event(self, event):
# Notify the client # Notify the client
if event.status == HCI_SUCCESS: if event.status == hci.HCI_SUCCESS:
self.emit( self.emit(
'connection_encryption_change', 'connection_encryption_change',
event.connection_handle, event.connection_handle,
@@ -946,7 +1024,7 @@ class Host(AbortableEventEmitter):
def on_hci_encryption_key_refresh_complete_event(self, event): def on_hci_encryption_key_refresh_complete_event(self, event):
# Notify the client # Notify the client
if event.status == HCI_SUCCESS: if event.status == hci.HCI_SUCCESS:
self.emit('connection_encryption_key_refresh', event.connection_handle) self.emit('connection_encryption_key_refresh', event.connection_handle)
else: else:
self.emit( self.emit(
@@ -967,16 +1045,16 @@ class Host(AbortableEventEmitter):
def on_hci_link_key_notification_event(self, event): def on_hci_link_key_notification_event(self, event):
logger.debug( logger.debug(
f'link key for {event.bd_addr}: {event.link_key.hex()}, ' f'link key for {event.bd_addr}: {event.link_key.hex()}, '
f'type={HCI_Constant.link_key_type_name(event.key_type)}' f'type={hci.HCI_Constant.link_key_type_name(event.key_type)}'
) )
self.emit('link_key', event.bd_addr, event.link_key, event.key_type) self.emit('link_key', event.bd_addr, event.link_key, event.key_type)
def on_hci_simple_pairing_complete_event(self, event): def on_hci_simple_pairing_complete_event(self, event):
logger.debug( logger.debug(
f'simple pairing complete for {event.bd_addr}: ' f'simple pairing complete for {event.bd_addr}: '
f'status={HCI_Constant.status_name(event.status)}' f'status={hci.HCI_Constant.status_name(event.status)}'
) )
if event.status == HCI_SUCCESS: if event.status == hci.HCI_SUCCESS:
self.emit('classic_pairing', event.bd_addr) self.emit('classic_pairing', event.bd_addr)
else: else:
self.emit('classic_pairing_failure', event.bd_addr, event.status) self.emit('classic_pairing_failure', event.bd_addr, event.status)
@@ -996,11 +1074,11 @@ class Host(AbortableEventEmitter):
self.link_key_provider(event.bd_addr), self.link_key_provider(event.bd_addr),
) )
if link_key: if link_key:
response = HCI_Link_Key_Request_Reply_Command( response = hci.HCI_Link_Key_Request_Reply_Command(
bd_addr=event.bd_addr, link_key=link_key bd_addr=event.bd_addr, link_key=link_key
) )
else: else:
response = HCI_Link_Key_Request_Negative_Reply_Command( response = hci.HCI_Link_Key_Request_Negative_Reply_Command(
bd_addr=event.bd_addr bd_addr=event.bd_addr
) )
@@ -1057,7 +1135,7 @@ class Host(AbortableEventEmitter):
) )
def on_hci_remote_name_request_complete_event(self, event): def on_hci_remote_name_request_complete_event(self, event):
if event.status != HCI_SUCCESS: if event.status != hci.HCI_SUCCESS:
self.emit('remote_name_failure', event.bd_addr, event.status) self.emit('remote_name_failure', event.bd_addr, event.status)
else: else:
utf8_name = event.remote_name utf8_name = event.remote_name
@@ -1075,7 +1153,7 @@ class Host(AbortableEventEmitter):
) )
def on_hci_le_read_remote_features_complete_event(self, event): def on_hci_le_read_remote_features_complete_event(self, event):
if event.status != HCI_SUCCESS: if event.status != hci.HCI_SUCCESS:
self.emit( self.emit(
'le_remote_features_failure', event.connection_handle, event.status 'le_remote_features_failure', event.connection_handle, event.status
) )

View File

@@ -208,7 +208,7 @@ class L2CAP_PDU:
@staticmethod @staticmethod
def from_bytes(data: bytes) -> L2CAP_PDU: def from_bytes(data: bytes) -> L2CAP_PDU:
# Sanity check # Check parameters
if len(data) < 4: if len(data) < 4:
raise ValueError('not enough data for L2CAP header') raise ValueError('not enough data for L2CAP header')

View File

@@ -226,13 +226,13 @@ class CompositeEventEmitter(AbortableEventEmitter):
if self._listener: if self._listener:
# Call the deregistration methods for each base class that has them # Call the deregistration methods for each base class that has them
for cls in self._listener.__class__.mro(): for cls in self._listener.__class__.mro():
if hasattr(cls, '_bumble_register_composite'): if '_bumble_register_composite' in cls.__dict__:
cls._bumble_deregister_composite(listener, self) cls._bumble_deregister_composite(self._listener, self)
self._listener = listener self._listener = listener
if listener: if listener:
# Call the registration methods for each base class that has them # Call the registration methods for each base class that has them
for cls in listener.__class__.mro(): for cls in listener.__class__.mro():
if hasattr(cls, '_bumble_deregister_composite'): if '_bumble_deregister_composite' in cls.__dict__:
cls._bumble_register_composite(listener, self) cls._bumble_register_composite(listener, self)

View File

@@ -19,9 +19,11 @@ import asyncio
import logging import logging
import sys import sys
import os import os
import struct
from bumble.core import AdvertisingData
from bumble.device import AdvertisingType, Device from bumble.device import AdvertisingType, Device
from bumble.hci import Address from bumble.hci import Address
from bumble.transport import open_transport_or_link from bumble.transport import open_transport_or_link
@@ -52,6 +54,16 @@ async def main():
print('<<< connected') print('<<< connected')
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
if advertising_type.is_scannable:
device.scan_response_data = bytes(
AdvertisingData(
[
(AdvertisingData.APPEARANCE, struct.pack('<H', 0x0340)),
]
)
)
await device.power_on() await device.power_on()
await device.start_advertising(advertising_type=advertising_type, target=target) await device.start_advertising(advertising_type=advertising_type, target=target)
await hci_source.wait_for_termination() await hci_source.wait_for_termination()

View File

@@ -22,10 +22,11 @@ import os
from bumble.device import ( from bumble.device import (
Device, Device,
Connection, Connection,
AdvertisingParameters,
AdvertisingEventProperties,
) )
from bumble.hci import ( from bumble.hci import (
OwnAddressType, OwnAddressType,
HCI_LE_Set_Extended_Advertising_Parameters_Command,
) )
from bumble.transport import open_transport_or_link from bumble.transport import open_transport_or_link
@@ -61,12 +62,7 @@ async def main() -> None:
devices[1].cis_enabled = True devices[1].cis_enabled = True
await asyncio.gather(*[device.power_on() for device in devices]) await asyncio.gather(*[device.power_on() for device in devices])
await devices[0].start_extended_advertising( advertising_set = await devices[0].create_advertising_set()
advertising_properties=(
HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties.CONNECTABLE_ADVERTISING
),
own_address_type=OwnAddressType.PUBLIC,
)
connection = await devices[1].connect( connection = await devices[1].connect(
devices[0].public_address, own_address_type=OwnAddressType.PUBLIC devices[0].public_address, own_address_type=OwnAddressType.PUBLIC

View File

@@ -98,13 +98,7 @@ async def main() -> None:
) )
+ csis.get_advertising_data() + csis.get_advertising_data()
) )
await device.start_extended_advertising( await device.create_advertising_set(advertising_data=advertising_data)
advertising_properties=(
HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties.CONNECTABLE_ADVERTISING
),
own_address_type=OwnAddressType.RANDOM,
advertising_data=advertising_data,
)
await asyncio.gather( await asyncio.gather(
*[hci_transport.source.terminated for hci_transport in hci_transports] *[hci_transport.source.terminated for hci_transport in hci_transports]

View File

@@ -19,8 +19,13 @@ import asyncio
import logging import logging
import sys import sys
import os import os
from bumble.device import AdvertisingType, Device from bumble.device import (
from bumble.hci import Address, HCI_LE_Set_Extended_Advertising_Parameters_Command AdvertisingParameters,
AdvertisingEventProperties,
AdvertisingType,
Device,
)
from bumble.hci import Address
from bumble.transport import open_transport_or_link from bumble.transport import open_transport_or_link
@@ -35,20 +40,16 @@ async def main() -> None:
return return
if len(sys.argv) >= 4: if len(sys.argv) >= 4:
advertising_properties = ( advertising_properties = AdvertisingEventProperties.from_advertising_type(
HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties( AdvertisingType(int(sys.argv[3]))
int(sys.argv[3])
)
) )
else: else:
advertising_properties = ( advertising_properties = AdvertisingEventProperties()
HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties.CONNECTABLE_ADVERTISING
)
if len(sys.argv) >= 5: if len(sys.argv) >= 5:
target = Address(sys.argv[4]) peer_address = Address(sys.argv[4])
else: else:
target = Address.ANY peer_address = Address.ANY
print('<<< connecting to HCI...') print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as hci_transport: async with await open_transport_or_link(sys.argv[2]) as hci_transport:
@@ -58,8 +59,11 @@ async def main() -> None:
sys.argv[1], hci_transport.source, hci_transport.sink sys.argv[1], hci_transport.source, hci_transport.sink
) )
await device.power_on() await device.power_on()
await device.start_extended_advertising( await device.create_advertising_set(
advertising_properties=advertising_properties, target=target advertising_parameters=AdvertisingParameters(
advertising_event_properties=advertising_properties,
peer_address=peer_address,
)
) )
await hci_transport.source.terminated await hci_transport.source.terminated

View File

@@ -0,0 +1,99 @@
# Copyright 2021-2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import logging
import sys
import os
from bumble.device import AdvertisingParameters, AdvertisingEventProperties, Device
from bumble.hci import Address
from bumble.core import AdvertisingData
from bumble.transport import open_transport_or_link
# -----------------------------------------------------------------------------
async def main() -> None:
if len(sys.argv) < 3:
print('Usage: run_extended_advertiser_2.py <config-file> <transport-spec>')
print('example: run_extended_advertiser_2.py device1.json usb:0')
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected')
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
await device.power_on()
if not device.supports_le_extended_advertising:
print("Device does not support extended advertising")
return
print("Max advertising sets:", device.host.number_of_supported_advertising_sets)
print(
"Max advertising data length:", device.host.maximum_advertising_data_length
)
if device.host.number_of_supported_advertising_sets >= 1:
advertising_data1 = AdvertisingData(
[(AdvertisingData.COMPLETE_LOCAL_NAME, "Bumble 1".encode("utf-8"))]
)
set1 = await device.create_advertising_set(
advertising_data=bytes(advertising_data1),
)
print("Selected TX power 1:", set1.selected_tx_power)
advertising_data2 = AdvertisingData(
[(AdvertisingData.COMPLETE_LOCAL_NAME, "Bumble 2".encode("utf-8"))]
)
if device.host.number_of_supported_advertising_sets >= 2:
set2 = await device.create_advertising_set(
random_address=Address("F0:F0:F0:F0:F0:F1"),
advertising_parameters=AdvertisingParameters(),
advertising_data=bytes(advertising_data2),
auto_start=False,
auto_restart=True,
)
print("Selected TX power 2:", set2.selected_tx_power)
await set2.start()
if device.host.number_of_supported_advertising_sets >= 3:
scan_response_data3 = AdvertisingData(
[(AdvertisingData.COMPLETE_LOCAL_NAME, "Bumble 3".encode("utf-8"))]
)
set3 = await device.create_advertising_set(
random_address=Address("F0:F0:F0:F0:F0:F2"),
advertising_parameters=AdvertisingParameters(
advertising_event_properties=AdvertisingEventProperties(
is_connectable=False, is_scannable=True
)
),
scan_response_data=bytes(scan_response_data3),
)
print("Selected TX power 3:", set2.selected_tx_power)
await hci_transport.source.terminated
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
asyncio.run(main())

View File

@@ -22,13 +22,12 @@ import os
import struct import struct
import secrets import secrets
from bumble.core import AdvertisingData from bumble.core import AdvertisingData
from bumble.device import Device, CisLink from bumble.device import Device, CisLink, AdvertisingParameters
from bumble.hci import ( from bumble.hci import (
CodecID, CodecID,
CodingFormat, CodingFormat,
OwnAddressType, OwnAddressType,
HCI_IsoDataPacket, HCI_IsoDataPacket,
HCI_LE_Set_Extended_Advertising_Parameters_Command,
) )
from bumble.profiles.bap import ( from bumble.profiles.bap import (
CodecSpecificCapabilities, CodecSpecificCapabilities,
@@ -179,11 +178,7 @@ async def main() -> None:
device.once('cis_establishment', on_cis) device.once('cis_establishment', on_cis)
await device.start_extended_advertising( advertising_set = await device.create_advertising_set(
advertising_properties=(
HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties.CONNECTABLE_ADVERTISING
),
own_address_type=OwnAddressType.RANDOM,
advertising_data=advertising_data, advertising_data=advertising_data,
) )

View File

@@ -99,7 +99,7 @@ development =
types-protobuf >= 4.21.0 types-protobuf >= 4.21.0
avatar = avatar =
pandora-avatar == 0.0.5 pandora-avatar == 0.0.5
rootcanal == 1.6.0 ; python_version>='3.10' rootcanal == 1.7.0 ; python_version>='3.10'
documentation = documentation =
mkdocs >= 1.4.0 mkdocs >= 1.4.0
mkdocs-material >= 8.5.6 mkdocs-material >= 8.5.6

View File

@@ -28,7 +28,7 @@ from bumble.core import (
BT_PERIPHERAL_ROLE, BT_PERIPHERAL_ROLE,
ConnectionParameters, ConnectionParameters,
) )
from bumble.device import Connection, Device from bumble.device import AdvertisingParameters, Connection, Device
from bumble.host import AclPacketQueue, Host from bumble.host import AclPacketQueue, Host
from bumble.hci import ( from bumble.hci import (
HCI_ACCEPT_CONNECTION_REQUEST_COMMAND, HCI_ACCEPT_CONNECTION_REQUEST_COMMAND,
@@ -50,7 +50,8 @@ from bumble.gatt import (
GATT_APPEARANCE_CHARACTERISTIC, GATT_APPEARANCE_CHARACTERISTIC,
) )
from .test_utils import TwoDevices from .test_utils import TwoDevices, async_barrier
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# Logging # Logging
@@ -254,12 +255,12 @@ async def test_legacy_advertising():
device = Device(host=mock.AsyncMock(Host)) device = Device(host=mock.AsyncMock(Host))
# Start advertising # Start advertising
advertiser = await device.start_legacy_advertising() await device.start_advertising()
assert device.legacy_advertiser assert device.is_advertising
# Stop advertising # Stop advertising
await advertiser.stop() await device.stop_advertising()
assert not device.legacy_advertiser assert not device.is_advertising
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -273,7 +274,7 @@ async def test_legacy_advertising_connection(own_address_type):
peer_address = Address('F0:F1:F2:F3:F4:F5') peer_address = Address('F0:F1:F2:F3:F4:F5')
# Start advertising # Start advertising
advertiser = await device.start_legacy_advertising() await device.start_advertising()
device.on_connection( device.on_connection(
0x0001, 0x0001,
BT_LE_TRANSPORT, BT_LE_TRANSPORT,
@@ -301,7 +302,7 @@ async def test_legacy_advertising_connection(own_address_type):
async def test_legacy_advertising_disconnection(auto_restart): async def test_legacy_advertising_disconnection(auto_restart):
device = Device(host=mock.AsyncMock(spec=Host)) device = Device(host=mock.AsyncMock(spec=Host))
peer_address = Address('F0:F1:F2:F3:F4:F5') peer_address = Address('F0:F1:F2:F3:F4:F5')
advertiser = await device.start_legacy_advertising(auto_restart=auto_restart) await device.start_advertising(auto_restart=auto_restart)
device.on_connection( device.on_connection(
0x0001, 0x0001,
BT_LE_TRANSPORT, BT_LE_TRANSPORT,
@@ -310,20 +311,18 @@ async def test_legacy_advertising_disconnection(auto_restart):
ConnectionParameters(0, 0, 0), ConnectionParameters(0, 0, 0),
) )
device.start_legacy_advertising = mock.AsyncMock() device.on_advertising_set_termination(
HCI_SUCCESS, device.legacy_advertising_set.advertising_handle, 0x0001, 0
)
device.on_disconnection(0x0001, 0) device.on_disconnection(0x0001, 0)
await async_barrier()
await async_barrier()
if auto_restart: if auto_restart:
device.start_legacy_advertising.assert_called_with( assert device.is_advertising
advertising_type=advertiser.advertising_type,
own_address_type=advertiser.own_address_type,
auto_restart=advertiser.auto_restart,
advertising_data=advertiser.advertising_data,
scan_response_data=advertiser.scan_response_data,
)
else: else:
device.start_legacy_advertising.assert_not_called() assert not device.is_advertising
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -332,12 +331,13 @@ async def test_extended_advertising():
device = Device(host=mock.AsyncMock(Host)) device = Device(host=mock.AsyncMock(Host))
# Start advertising # Start advertising
advertiser = await device.start_extended_advertising() advertising_set = await device.create_advertising_set()
assert device.extended_advertisers assert device.extended_advertising_sets
assert advertising_set.enabled
# Stop advertising # Stop advertising
await advertiser.stop() await advertising_set.stop()
assert not device.extended_advertisers assert not advertising_set.enabled
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -349,8 +349,8 @@ async def test_extended_advertising():
async def test_extended_advertising_connection(own_address_type): async def test_extended_advertising_connection(own_address_type):
device = Device(host=mock.AsyncMock(spec=Host)) device = Device(host=mock.AsyncMock(spec=Host))
peer_address = Address('F0:F1:F2:F3:F4:F5') peer_address = Address('F0:F1:F2:F3:F4:F5')
advertiser = await device.start_extended_advertising( advertising_set = await device.create_advertising_set(
own_address_type=own_address_type advertising_parameters=AdvertisingParameters(own_address_type=own_address_type)
) )
device.on_connection( device.on_connection(
0x0001, 0x0001,
@@ -361,8 +361,9 @@ async def test_extended_advertising_connection(own_address_type):
) )
device.on_advertising_set_termination( device.on_advertising_set_termination(
HCI_SUCCESS, HCI_SUCCESS,
advertiser.handle, advertising_set.advertising_handle,
0x0001, 0x0001,
0,
) )
if own_address_type == OwnAddressType.PUBLIC: if own_address_type == OwnAddressType.PUBLIC:
@@ -375,45 +376,6 @@ async def test_extended_advertising_connection(own_address_type):
await asyncio.sleep(0.0001) await asyncio.sleep(0.0001)
# -----------------------------------------------------------------------------
@pytest.mark.parametrize(
'auto_restart,',
(True, False),
)
@pytest.mark.asyncio
async def test_extended_advertising_disconnection(auto_restart):
device = Device(host=mock.AsyncMock(spec=Host))
peer_address = Address('F0:F1:F2:F3:F4:F5')
advertiser = await device.start_extended_advertising(auto_restart=auto_restart)
device.on_connection(
0x0001,
BT_LE_TRANSPORT,
peer_address,
BT_PERIPHERAL_ROLE,
ConnectionParameters(0, 0, 0),
)
device.on_advertising_set_termination(
HCI_SUCCESS,
advertiser.handle,
0x0001,
)
device.start_extended_advertising = mock.AsyncMock()
device.on_disconnection(0x0001, 0)
if auto_restart:
device.start_extended_advertising.assert_called_with(
advertising_properties=advertiser.advertising_properties,
own_address_type=advertiser.own_address_type,
auto_restart=advertiser.auto_restart,
advertising_data=advertiser.advertising_data,
scan_response_data=advertiser.scan_response_data,
)
else:
device.start_extended_advertising.assert_not_called()
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_get_remote_le_features(): async def test_get_remote_le_features():

View File

@@ -50,6 +50,7 @@ from bumble.att import (
ATT_Error_Response, ATT_Error_Response,
ATT_Read_By_Group_Type_Request, ATT_Read_By_Group_Type_Request,
) )
from .test_utils import async_barrier
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -456,13 +457,6 @@ class LinkedDevices:
self.paired = [None, None, None] self.paired = [None, None, None]
# -----------------------------------------------------------------------------
async def async_barrier():
ready = asyncio.get_running_loop().create_future()
asyncio.get_running_loop().call_soon(ready.set_result, None)
await ready
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_read_write(): async def test_read_write():

View File

@@ -23,6 +23,8 @@ from bumble.hci import (
HCI_LE_READ_BUFFER_SIZE_COMMAND, HCI_LE_READ_BUFFER_SIZE_COMMAND,
HCI_RESET_COMMAND, HCI_RESET_COMMAND,
HCI_SUCCESS, HCI_SUCCESS,
HCI_LE_CONNECTION_COMPLETE_EVENT,
HCI_LE_ENHANCED_CONNECTION_COMPLETE_V2_EVENT,
Address, Address,
CodingFormat, CodingFormat,
CodecID, CodecID,
@@ -274,8 +276,14 @@ def test_HCI_Set_Event_Mask_Command():
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_HCI_LE_Set_Event_Mask_Command(): def test_HCI_LE_Set_Event_Mask_Command():
command = HCI_LE_Set_Event_Mask_Command( command = HCI_LE_Set_Event_Mask_Command(
le_event_mask=bytes.fromhex('0011223344556677') le_event_mask=HCI_LE_Set_Event_Mask_Command.mask(
[
HCI_LE_CONNECTION_COMPLETE_EVENT,
HCI_LE_ENHANCED_CONNECTION_COMPLETE_V2_EVENT,
]
)
) )
assert command.le_event_mask == bytes.fromhex('0100000000010000')
basic_check(command) basic_check(command)

View File

@@ -12,6 +12,10 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
from typing import List, Optional from typing import List, Optional
from bumble.controller import Controller from bumble.controller import Controller
@@ -22,6 +26,7 @@ from bumble.transport import AsyncPipeSink
from bumble.hci import Address from bumble.hci import Address
# -----------------------------------------------------------------------------
class TwoDevices: class TwoDevices:
connections: List[Optional[Connection]] connections: List[Optional[Connection]]
@@ -75,3 +80,10 @@ class TwoDevices:
def __getitem__(self, index: int) -> Device: def __getitem__(self, index: int) -> Device:
return self.devices[index] return self.devices[index]
# -----------------------------------------------------------------------------
async def async_barrier():
ready = asyncio.get_running_loop().create_future()
asyncio.get_running_loop().call_soon(ready.set_result, None)
await ready