This commit is contained in:
Gilles Boccon-Gibod
2022-10-03 16:50:42 -07:00
parent c316e8805f
commit d10dda7e10
10 changed files with 936 additions and 435 deletions

View File

@@ -19,6 +19,7 @@ import json
import asyncio
import logging
from contextlib import asynccontextmanager, AsyncExitStack
from dataclasses import dataclass
from .hci import *
from .host import Host
@@ -41,20 +42,30 @@ logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
# Constants
# -----------------------------------------------------------------------------
DEVICE_DEFAULT_ADDRESS = '00:00:00:00:00:00'
DEVICE_DEFAULT_ADVERTISING_INTERVAL = 1000 # ms
DEVICE_DEFAULT_ADVERTISING_DATA = ''
DEVICE_DEFAULT_NAME = 'Bumble'
DEVICE_DEFAULT_INQUIRY_LENGTH = 8 # 10.24 seconds
DEVICE_DEFAULT_CLASS_OF_DEVICE = 0
DEVICE_DEFAULT_SCAN_RESPONSE_DATA = b''
DEVICE_DEFAULT_DATA_LENGTH = (27, 328, 27, 328)
DEVICE_DEFAULT_SCAN_INTERVAL = 60 # ms
DEVICE_DEFAULT_SCAN_WINDOW = 60 # ms
DEVICE_MIN_SCAN_INTERVAL = 25
DEVICE_MAX_SCAN_INTERVAL = 10240
DEVICE_MIN_SCAN_WINDOW = 25
DEVICE_MAX_SCAN_WINDOW = 10240
DEVICE_DEFAULT_ADDRESS = '00:00:00:00:00:00'
DEVICE_DEFAULT_ADVERTISING_INTERVAL = 1000 # ms
DEVICE_DEFAULT_ADVERTISING_DATA = ''
DEVICE_DEFAULT_NAME = 'Bumble'
DEVICE_DEFAULT_INQUIRY_LENGTH = 8 # 10.24 seconds
DEVICE_DEFAULT_CLASS_OF_DEVICE = 0
DEVICE_DEFAULT_SCAN_RESPONSE_DATA = b''
DEVICE_DEFAULT_DATA_LENGTH = (27, 328, 27, 328)
DEVICE_DEFAULT_SCAN_INTERVAL = 60 # ms
DEVICE_DEFAULT_SCAN_WINDOW = 60 # ms
DEVICE_DEFAULT_CONNECT_TIMEOUT = None # No timeout
DEVICE_DEFAULT_CONNECT_SCAN_INTERVAL = 60 # ms
DEVICE_DEFAULT_CONNECT_SCAN_WINDOW = 60 # ms
DEVICE_DEFAULT_CONNECTION_INTERVAL_MIN = 15 # ms
DEVICE_DEFAULT_CONNECTION_INTERVAL_MAX = 30 # ms
DEVICE_DEFAULT_CONNECTION_MAX_LATENCY = 0
DEVICE_DEFAULT_CONNECTION_SUPERVISION_TIMEOUT = 720 # ms
DEVICE_DEFAULT_CONNECTION_MIN_CE_LENGTH = 0 # ms
DEVICE_DEFAULT_CONNECTION_MAX_CE_LENGTH = 0 # ms
DEVICE_MIN_SCAN_INTERVAL = 25
DEVICE_MAX_SCAN_INTERVAL = 10240
DEVICE_MIN_SCAN_WINDOW = 25
DEVICE_MAX_SCAN_WINDOW = 10240
# -----------------------------------------------------------------------------
# Classes
@@ -83,6 +94,8 @@ class Advertisement:
is_directed = False,
is_scannable = False,
is_scan_response = False,
is_complete = True,
is_truncated = False,
primary_phy = 0,
secondary_phy = 0,
tx_power = HCI_LE_Extended_Advertising_Report_Event.TX_POWER_INFORMATION_NOT_AVAILABLE,
@@ -97,6 +110,8 @@ class Advertisement:
self.is_directed = is_directed
self.is_scannable = is_scannable
self.is_scan_response = is_scan_response
self.is_complete = is_complete
self.is_truncated = is_truncated
self.primary_phy = primary_phy
self.secondary_phy = secondary_phy
self.tx_power = tx_power
@@ -139,6 +154,8 @@ class ExtendedAdvertisement(Advertisement):
is_directed = report.event_type & (1 << HCI_LE_Extended_Advertising_Report_Event.DIRECTED_ADVERTISING) != 0,
is_scannable = report.event_type & (1 << HCI_LE_Extended_Advertising_Report_Event.SCANNABLE_ADVERTISING) != 0,
is_scan_response = report.event_type & (1 << HCI_LE_Extended_Advertising_Report_Event.SCAN_RESPONSE) != 0,
is_complete = (report.event_type >> 5 & 3) == HCI_LE_Extended_Advertising_Report_Event.DATA_COMPLETE,
is_truncated = (report.event_type >> 5 & 3) == HCI_LE_Extended_Advertising_Report_Event.DATA_INCOMPLETE_TRUNCATED_NO_MORE_TO_COME,
primary_phy = report.primary_phy,
secondary_phy = report.secondary_phy,
tx_power = report.tx_power,
@@ -150,46 +167,36 @@ class ExtendedAdvertisement(Advertisement):
# -----------------------------------------------------------------------------
class AdvertisementDataAccumulator:
def __init__(self, passive=False):
self.passive = passive
self.last_event_type = None
self.advertisement = None
self.data = b''
self.passive = passive
self.last_advertisement = None
self.last_data = b''
def update(self, report):
if isinstance(report, HCI_LE_Advertising_Report_Event.Report):
if report.event_type == HCI_LE_Advertising_Report_Event.SCAN_RSP:
if self.last_event_type in {
HCI_LE_Advertising_Report_Event.ADV_IND,
HCI_LE_Advertising_Report_Event.ADV_SCAN_IND
}:
# This is the response to a scannable advertisement
self.advertisement = Advertisement.from_advertising_report(report)
self.advertisement.data = AdvertisingData.from_bytes(self.data + report.data)
self.advertisement.is_connectable = (self.last_event_type == HCI_LE_Advertising_Report_Event.ADV_IND)
self.advertisement.is_scannable = True
else:
# Unexpected scan response
self.advertisement = None
advertisement = Advertisement.from_advertising_report(report)
result = None
# Reset the data
self.data = b''
else:
self.data = report.data
if advertisement.is_scan_response:
if self.last_advertisement is not None and not self.last_advertisement.is_scan_response:
# This is the response to a scannable advertisement
result = Advertisement.from_advertising_report(report)
result.is_connectable = self.last_advertisement.is_connectable
result.is_scannable = True
result.data = AdvertisingData.from_bytes(self.last_data + report.data)
self.last_data = b''
else:
if (
self.passive or
(not advertisement.is_scannable) or
(self.last_advertisement is not None and not self.last_advertisement.is_scan_response)
):
# Don't wait for a scan response
result = Advertisement.from_advertising_report(report)
if self.passive or report.event_type in {
HCI_LE_Advertising_Report_Event.ADV_DIRECT_IND,
HCI_LE_Advertising_Report_Event.ADV_NONCONN_IND
} or self.last_event_type not in {
None,
HCI_LE_Advertising_Report_Event.SCAN_RSP
}:
# Don't wait for a scan response
self.advertisement = Advertisement.from_advertising_report(report)
else:
# Wait for a scan response
self.advertisement = None
self.last_data = report.data
self.last_event_type = report.event_type
self.last_advertisement = advertisement
return result
# -----------------------------------------------------------------------------
@@ -206,7 +213,9 @@ class Peer:
return self.gatt_client.services
async def request_mtu(self, mtu):
return await self.gatt_client.request_mtu(mtu)
mtu = await self.gatt_client.request_mtu(mtu)
self.connection.att_mtu = mtu
self.connection.emit('connection_att_mtu_update')
async def discover_service(self, uuid):
return await self.gatt_client.discover_service(uuid)
@@ -275,11 +284,23 @@ class Peer:
async def __aexit__(self, exc_type, exc_value, traceback):
pass
def __str__(self):
return f'{self.connection.peer_address} as {self.connection.role_name}'
# -----------------------------------------------------------------------------
@dataclass
class ConnectionParametersPreferences:
connection_interval_min: int = DEVICE_DEFAULT_CONNECTION_INTERVAL_MIN
connection_interval_max: int = DEVICE_DEFAULT_CONNECTION_INTERVAL_MAX
max_latency: int = DEVICE_DEFAULT_CONNECTION_MAX_LATENCY
supervision_timeout: int = DEVICE_DEFAULT_CONNECTION_SUPERVISION_TIMEOUT
min_ce_length: int = DEVICE_DEFAULT_CONNECTION_MIN_CE_LENGTH
max_ce_length: int = DEVICE_DEFAULT_CONNECTION_MAX_CE_LENGTH
DEVICE_DEFAULT_CONNECTION_PARAMETER_PREFERENCES = ConnectionParametersPreferences()
# -----------------------------------------------------------------------------
class Connection(CompositeEventEmitter):
@composite_listener
@@ -308,7 +329,17 @@ class Connection(CompositeEventEmitter):
def on_connection_encryption_key_refresh(self):
pass
def __init__(self, device, handle, transport, peer_address, peer_resolvable_address, role, parameters):
def __init__(
self,
device,
handle,
transport,
peer_address,
peer_resolvable_address,
role,
parameters,
phy
):
super().__init__()
self.device = device
self.handle = handle
@@ -320,7 +351,7 @@ class Connection(CompositeEventEmitter):
self.parameters = parameters
self.encryption = 0
self.authenticated = False
self.phy = ConnectionPHY(HCI_LE_1M_PHY, HCI_LE_1M_PHY)
self.phy = phy
self.att_mtu = ATT_DEFAULT_MTU
self.data_length = DEVICE_DEFAULT_DATA_LENGTH
self.gatt_client = None # Per-connection client
@@ -373,19 +404,28 @@ class Connection(CompositeEventEmitter):
async def update_parameters(
self,
conn_interval_min,
conn_interval_max,
conn_latency,
connection_interval_min,
connection_interval_max,
max_latency,
supervision_timeout
):
return await self.device.update_connection_parameters(
self,
conn_interval_min,
conn_interval_max,
conn_latency,
connection_interval_min,
connection_interval_max,
max_latency,
supervision_timeout
)
async def set_phy(self, tx_phys=None, rx_phys=None, phy_options=None):
return await self.device.set_connection_phy(self, tx_phys, rx_phys, phy_options)
async def get_rssi(self):
return await self.device.get_connection_rssi(self)
async def get_phy(self):
return await self.device.get_connection_phy(self)
# [Classic only]
async def request_remote_name(self):
return await self.device.request_remote_name(self)
@@ -549,25 +589,26 @@ class Device(CompositeEventEmitter):
def __init__(self, name = None, address = None, config = None, host = None, generic_access_service = True):
super().__init__()
self._host = None
self.powered_on = False
self.advertising = False
self.auto_restart_advertising = False
self.command_timeout = 10 # seconds
self.gatt_server = gatt_server.Server(self)
self.sdp_server = sdp.Server(self)
self.l2cap_channel_manager = l2cap.ChannelManager(
[l2cap.L2CAP_Information_Request.EXTENDED_FEATURE_FIXED_CHANNELS])
self.advertisement_data = {}
self.scanning = False
self.scanning_is_passive = False
self.discovering = False
self.connecting = False
self.disconnecting = False
self.connections = {} # Connections, by connection handle
self.classic_enabled = False
self.inquiry_response = None
self.address_resolver = None
self._host = None
self.powered_on = False
self.advertising = False
self.auto_restart_advertising = False
self.command_timeout = 10 # seconds
self.gatt_server = gatt_server.Server(self)
self.sdp_server = sdp.Server(self)
self.l2cap_channel_manager = l2cap.ChannelManager(
[l2cap.L2CAP_Information_Request.EXTENDED_FEATURE_FIXED_CHANNELS]
)
self.advertisement_accumulators = {} # Accumulators, by address
self.scanning = False
self.scanning_is_passive = False
self.discovering = False
self.connecting = False
self.disconnecting = False
self.connections = {} # Connections, by connection handle
self.classic_enabled = False
self.inquiry_response = None
self.address_resolver = None
# Use the initial config or a default
self.public_address = Address('00:00:00:00:00:00')
@@ -676,9 +717,12 @@ class Device(CompositeEventEmitter):
def send_l2cap_pdu(self, connection_handle, cid, pdu):
self.host.send_l2cap_pdu(connection_handle, cid, pdu)
async def send_command(self, command):
async def send_command(self, command, check_result=False):
try:
return await asyncio.wait_for(self.host.send_command(command), self.command_timeout)
return await asyncio.wait_for(
self.host.send_command(command, check_result),
self.command_timeout
)
except asyncio.TimeoutError:
logger.warning('!!! Command timed out')
@@ -701,10 +745,10 @@ class Device(CompositeEventEmitter):
# Set the controller address
await self.send_command(HCI_LE_Set_Random_Address_Command(
random_address = self.random_address
))
), check_result=True)
# Load the address resolving list
if self.keystore:
if self.keystore and self.host.supports_command(HCI_LE_CLEAR_RESOLVING_LIST_COMMAND):
await self.send_command(HCI_LE_Clear_Resolving_List_Command())
resolving_keys = await self.keystore.get_resolving_keys()
@@ -754,6 +798,19 @@ class Device(CompositeEventEmitter):
def supports_le_feature(self, feature):
return self.host.supports_le_feature(feature)
def supports_le_phy(self, phy):
if phy == HCI_LE_1M_PHY:
return True
feature_map = {
HCI_LE_2M_PHY: HCI_LE_2M_PHY_LE_SUPPORTED_FEATURE,
HCI_LE_CODED_PHY: HCI_LE_CODED_PHY_LE_SUPPORTED_FEATURE
}
if phy not in feature_map:
raise ValueError('invalid PHY')
return self.host.supports_le_feature(feature_map[phy])
async def start_advertising(self, auto_restart=False):
self.auto_restart_advertising = auto_restart
@@ -764,12 +821,12 @@ class Device(CompositeEventEmitter):
# Set/update the advertising data
await self.send_command(HCI_LE_Set_Advertising_Data_Command(
advertising_data = self.advertising_data
))
), check_result=True)
# Set/update the scan response data
await self.send_command(HCI_LE_Set_Scan_Response_Data_Command(
scan_response_data = self.scan_response_data
))
), check_result=True)
# Set the advertising parameters
await self.send_command(HCI_LE_Set_Advertising_Parameters_Command(
@@ -782,12 +839,12 @@ class Device(CompositeEventEmitter):
peer_address = Address('00:00:00:00:00:00'),
advertising_channel_map = 7,
advertising_filter_policy = 0
))
), check_result=True)
# Enable advertising
await self.send_command(HCI_LE_Set_Advertising_Enable_Command(
advertising_enable = 1
))
), check_result=True)
self.advertising = True
@@ -796,7 +853,7 @@ class Device(CompositeEventEmitter):
if self.advertising:
await self.send_command(HCI_LE_Set_Advertising_Enable_Command(
advertising_enable = 0
))
), check_result=True)
self.advertising = False
@@ -810,7 +867,8 @@ class Device(CompositeEventEmitter):
scan_interval=DEVICE_DEFAULT_SCAN_INTERVAL, # Scan interval in ms
scan_window=DEVICE_DEFAULT_SCAN_WINDOW, # Scan window in ms
own_address_type=Address.RANDOM_DEVICE_ADDRESS,
filter_duplicates=False
filter_duplicates=False,
scanning_phys=(HCI_LE_1M_PHY, HCI_LE_CODED_PHY)
):
# Check that the arguments are legal
if scan_interval < scan_window:
@@ -820,25 +878,36 @@ class Device(CompositeEventEmitter):
if scan_window < DEVICE_MIN_SCAN_WINDOW or scan_window > DEVICE_MAX_SCAN_WINDOW:
raise ValueError('scan_interval out of range')
# Reset the accumulators
self.advertisement_accumulator = {}
# Enable scanning
if self.supports_le_feature(HCI_LE_EXTENDED_ADVERTISING_LE_SUPPORTED_FEATURE):
# Set the scanning parameters
scan_type = HCI_LE_Set_Extended_Scan_Parameters_Command.ACTIVE_SCANNING if active else HCI_LE_Set_Extended_Scan_Parameters_Command.PASSIVE_SCANNING
scanning_filter_policy = HCI_LE_Set_Extended_Scan_Parameters_Command.BASIC_UNFILTERED_POLICY # TODO: support other types
scanning_phys = 1 << HCI_LE_Set_Extended_Scan_Parameters_Command.LE_1M_PHY
scanning_phy_count = 1
if self.supports_le_feature(HCI_LE_CODED_PHY_LE_SUPPORTED_FEATURE):
scanning_phys |= 1 << HCI_LE_Set_Extended_Scan_Parameters_Command.LE_CODED_PHY
scanning_phy_count = 0
scanning_phys_bits = 0
if HCI_LE_1M_PHY in scanning_phys:
scanning_phys_bits |= 1 << HCI_LE_1M_PHY_BIT
scanning_phy_count += 1
if HCI_LE_CODED_PHY in scanning_phys:
if self.supports_le_feature(HCI_LE_CODED_PHY_LE_SUPPORTED_FEATURE):
scanning_phys_bits |= 1 << HCI_LE_CODED_PHY_BIT
scanning_phy_count += 1
if scanning_phy_count == 0:
raise ValueError('at least one scanning PHY must be enabled')
await self.send_command(HCI_LE_Set_Extended_Scan_Parameters_Command(
own_address_type = own_address_type,
scanning_filter_policy = scanning_filter_policy,
scanning_phys = scanning_phys,
scanning_phys = scanning_phys_bits,
scan_types = [scan_type] * scanning_phy_count,
scan_intervals = [int(scan_window / 0.625)] * scanning_phy_count,
scan_windows = [int(scan_window / 0.625)] * scanning_phy_count
))
), check_result=True)
# Enable scanning
await self.send_command(HCI_LE_Set_Extended_Scan_Enable_Command(
@@ -846,7 +915,7 @@ class Device(CompositeEventEmitter):
filter_duplicates = 1 if filter_duplicates else 0,
duration = 0, # TODO allow other values
period = 0 # TODO allow other values
))
), check_result=True)
else:
# Set the scanning parameters
scan_type = HCI_LE_Set_Scan_Parameters_Command.ACTIVE_SCANNING if active else HCI_LE_Set_Scan_Parameters_Command.PASSIVE_SCANNING
@@ -856,22 +925,32 @@ class Device(CompositeEventEmitter):
le_scan_window = int(scan_window / 0.625),
own_address_type = own_address_type,
scanning_filter_policy = HCI_LE_Set_Scan_Parameters_Command.BASIC_UNFILTERED_POLICY
))
), check_result=True)
# Enable scanning
await self.send_command(HCI_LE_Set_Scan_Enable_Command(
le_scan_enable = 1,
filter_duplicates = 1 if filter_duplicates else 0
))
), check_result=True)
self.scanning_is_passive = not active
self.scanning = True
async def stop_scanning(self):
await self.send_command(HCI_LE_Set_Scan_Enable_Command(
le_scan_enable = 0,
filter_duplicates = 0
))
# Disable scanning
if self.supports_le_feature(HCI_LE_EXTENDED_ADVERTISING_LE_SUPPORTED_FEATURE):
await self.send_command(HCI_LE_Set_Extended_Scan_Enable_Command(
enable = 0,
filter_duplicates = 0,
duration = 0,
period = 0
), check_result=True)
else:
await self.send_command(HCI_LE_Set_Scan_Enable_Command(
le_scan_enable = 0,
filter_duplicates = 0
), check_result=True)
self.scanning = False
@property
@@ -880,21 +959,22 @@ class Device(CompositeEventEmitter):
@host_event_handler
def on_advertising_report(self, report):
if not (accumulator := self.advertisement_data.get(report.address)):
if not (accumulator := self.advertisement_accumulators.get(report.address)):
accumulator = AdvertisementDataAccumulator(passive=self.scanning_is_passive)
self.advertisement_data[report.address] = accumulator
accumulator.update(report)
if accumulator.advertisement is not None:
self.emit('advertisement', accumulator.advertisement)
self.advertisement_accumulators[report.address] = accumulator
if advertisement := accumulator.update(report):
self.emit('advertisement', advertisement)
async def start_discovery(self):
await self.host.send_command(HCI_Write_Inquiry_Mode_Command(inquiry_mode=HCI_EXTENDED_INQUIRY_MODE))
await self.send_command(HCI_Write_Inquiry_Mode_Command(
inquiry_mode=HCI_EXTENDED_INQUIRY_MODE
), check_result=True)
response = await self.send_command(HCI_Inquiry_Command(
lap = HCI_GENERAL_INQUIRY_LAP,
inquiry_length = DEVICE_DEFAULT_INQUIRY_LENGTH,
num_responses = 0 # Unlimited number of responses.
))
), check_result=True)
if response.status != HCI_Command_Status_Event.PENDING:
self.discovering = False
raise HCI_StatusError(response)
@@ -902,7 +982,7 @@ class Device(CompositeEventEmitter):
self.discovering = True
async def stop_discovery(self):
await self.send_command(HCI_Inquiry_Cancel_Command())
await self.send_command(HCI_Inquiry_Cancel_Command(), check_result=True)
self.discovering = False
@host_event_handler
@@ -939,11 +1019,12 @@ class Device(CompositeEventEmitter):
)
# Update the controller
await self.host.send_command(
await self.send_command(
HCI_Write_Extended_Inquiry_Response_Command(
fec_required = 0,
extended_inquiry_response = self.inquiry_response
)
),
check_result=True
)
await self.set_scan_enable(
inquiry_scan_enabled = self.discoverable,
@@ -958,12 +1039,26 @@ class Device(CompositeEventEmitter):
page_scan_enabled = self.connectable
)
async def connect(self, peer_address, transport=BT_LE_TRANSPORT):
async def connect(
self,
peer_address,
transport=BT_LE_TRANSPORT,
connection_parameters_preferences=None,
timeout=DEVICE_DEFAULT_CONNECT_TIMEOUT
):
'''
Request a connection to a peer.
This method cannot be called if there is already a pending connection.
connection_parameters_preferences: (BLE only, ignored for BR/EDR)
* None: use all PHYs with default parameters
* map: each entry has a PHY as key and a ConnectionParametersPreferences object as value
'''
# Check parameters
if transport not in {BT_LE_TRANSPORT, BT_BR_EDR_TRANSPORT}:
raise ValueError('invalid transport')
# Adjust the transport automatically if we need to
if transport == BT_LE_TRANSPORT and not self.le_enabled:
transport = BT_BR_EDR_TRANSPORT
@@ -980,49 +1075,117 @@ class Device(CompositeEventEmitter):
except ValueError:
# If the address is not parsable, assume it is a name instead
logger.debug('looking for peer by name')
peer_address = await self.find_peer_by_name(peer_address, transport)
peer_address = await self.find_peer_by_name(peer_address, transport) # TODO: timeout
# Create a future so that we can wait for the connection's result
pending_connection = asyncio.get_running_loop().create_future()
self.on('connection', pending_connection.set_result)
self.on('connection_failure', pending_connection.set_exception)
# Tell the controller to connect
if transport == BT_LE_TRANSPORT:
# TODO: use real values, not fixed ones
result = await self.send_command(HCI_LE_Create_Connection_Command(
le_scan_interval = 96,
le_scan_window = 96,
initiator_filter_policy = 0,
peer_address_type = peer_address.address_type,
peer_address = peer_address,
own_address_type = Address.RANDOM_DEVICE_ADDRESS,
conn_interval_min = 12,
conn_interval_max = 24,
conn_latency = 0,
supervision_timeout = 72,
minimum_ce_length = 0,
maximum_ce_length = 0
))
else:
# TODO: use real values, not fixed ones
result = await self.send_command(HCI_Create_Connection_Command(
bd_addr = peer_address,
packet_type = 0xCC18, # FIXME: change
page_scan_repetition_mode = HCI_R2_PAGE_SCAN_REPETITION_MODE,
clock_offset = 0x0000,
allow_role_switch = 0x01,
reserved = 0
))
try:
# Tell the controller to connect
if transport == BT_LE_TRANSPORT:
if connection_parameters_preferences is None:
if connection_parameters_preferences is None:
connection_parameters_preferences = {
HCI_LE_1M_PHY: DEVICE_DEFAULT_CONNECTION_PARAMETER_PREFERENCES,
HCI_LE_2M_PHY: DEVICE_DEFAULT_CONNECTION_PARAMETER_PREFERENCES,
HCI_LE_CODED_PHY: DEVICE_DEFAULT_CONNECTION_PARAMETER_PREFERENCES
}
if self.host.supports_command(HCI_LE_EXTENDED_CREATE_CONNECTION_COMMAND):
# Only keep supported PHYs
phys = sorted(list(set(filter(self.supports_le_phy, connection_parameters_preferences.keys()))))
if not phys:
raise ValueError('least one supported PHY needed')
phy_count = len(phys)
initiating_phys = phy_list_to_bits(phys)
connection_interval_mins = [
int(connection_parameters_preferences[phy].connection_interval_min / 1.25) for phy in phys
]
connection_interval_maxs = [
int(connection_parameters_preferences[phy].connection_interval_max / 1.25) for phy in phys
]
max_latencies = [
connection_parameters_preferences[phy].max_latency for phy in phys
]
supervision_timeouts = [
int(connection_parameters_preferences[phy].supervision_timeout / 10) for phy in phys
]
min_ce_lengths = [
int(connection_parameters_preferences[phy].min_ce_length / 0.625) for phy in phys
]
max_ce_lengths = [
int(connection_parameters_preferences[phy].max_ce_length / 0.625) for phy in phys
]
result = await self.send_command(HCI_LE_Extended_Create_Connection_Command(
initiator_filter_policy = 0,
own_address_type = Address.RANDOM_DEVICE_ADDRESS,
peer_address_type = peer_address.address_type,
peer_address = peer_address,
initiating_phys = initiating_phys,
scan_intervals = (int(DEVICE_DEFAULT_CONNECT_SCAN_INTERVAL / 0.625),) * phy_count,
scan_windows = (int(DEVICE_DEFAULT_CONNECT_SCAN_WINDOW / 0.625),) * phy_count,
connection_interval_mins = connection_interval_mins,
connection_interval_maxs = connection_interval_maxs,
max_latencies = max_latencies,
supervision_timeouts = supervision_timeouts,
min_ce_lengths = min_ce_lengths,
max_ce_lengths = max_ce_lengths
))
else:
if HCI_LE_1M_PHY not in connection_parameters_preferences:
raise ValueError('1M PHY preferences required')
prefs = connection_parameters_preferences[HCI_LE_1M_PHY]
result = await self.send_command(HCI_LE_Create_Connection_Command(
le_scan_interval = int(DEVICE_DEFAULT_CONNECT_SCAN_INTERVAL / 0.625),
le_scan_window = int(DEVICE_DEFAULT_CONNECT_SCAN_WINDOW / 0.625),
initiator_filter_policy = 0,
peer_address_type = peer_address.address_type,
peer_address = peer_address,
own_address_type = Address.RANDOM_DEVICE_ADDRESS,
connection_interval_min = int(prefs.connection_interval_min / 1.25),
connection_interval_max = int(prefs.connection_interval_max / 1.25),
max_latency = prefs.max_latency,
supervision_timeout = int(prefs.supervision_timeout / 10),
min_ce_length = int(prefs.min_ce_length / 0.625),
max_ce_length = int(prefs.max_ce_length / 0.625),
))
else:
# TODO: allow passing other settings
result = await self.send_command(HCI_Create_Connection_Command(
bd_addr = peer_address,
packet_type = 0xCC18, # FIXME: change
page_scan_repetition_mode = HCI_R2_PAGE_SCAN_REPETITION_MODE,
clock_offset = 0x0000,
allow_role_switch = 0x01,
reserved = 0
))
if result.status != HCI_Command_Status_Event.PENDING:
raise HCI_StatusError(result)
# Wait for the connection process to complete
self.connecting = True
return await pending_connection
if timeout is None:
return await pending_connection
else:
try:
return await asyncio.wait_for(asyncio.shield(pending_connection), timeout)
except asyncio.TimeoutError:
if transport == BT_LE_TRANSPORT:
await self.send_command(HCI_LE_Create_Connection_Cancel_Command())
else:
await self.send_command(HCI_Create_Connection_Cancel_Command(bd_addr=peer_address))
try:
return await pending_connection
except ConnectionError:
raise TimeoutError()
finally:
self.remove_listener('connection', pending_connection.set_result)
self.remove_listener('connection_failure', pending_connection.set_exception)
@@ -1047,7 +1210,7 @@ class Device(CompositeEventEmitter):
async def cancel_connection(self):
if not self.is_connecting:
return
await self.send_command(HCI_LE_Create_Connection_Cancel_Command())
await self.send_command(HCI_LE_Create_Connection_Cancel_Command(), check_result=True)
async def disconnect(self, connection, reason):
# Create a future so that we can wait for the disconnection's result
@@ -1056,7 +1219,9 @@ class Device(CompositeEventEmitter):
connection.on('disconnection_failure', pending_disconnection.set_exception)
# Request a disconnection
result = await self.send_command(HCI_Disconnect_Command(connection_handle = connection.handle, reason = reason))
result = await self.send_command(HCI_Disconnect_Command(
connection_handle = connection.handle, reason = reason
))
try:
if result.status != HCI_Command_Status_Event.PENDING:
@@ -1073,26 +1238,66 @@ class Device(CompositeEventEmitter):
async def update_connection_parameters(
self,
connection,
conn_interval_min,
conn_interval_max,
conn_latency,
connection_interval_min,
connection_interval_max,
max_latency,
supervision_timeout,
minimum_ce_length = 0,
maximum_ce_length = 0
min_ce_length = 0,
max_ce_length = 0
):
'''
NOTE: the name of the parameters may look odd, but it just follows the names used in the Bluetooth spec.
'''
await self.send_command(HCI_LE_Connection_Update_Command(
connection_handle = connection.handle,
conn_interval_min = conn_interval_min,
conn_interval_max = conn_interval_max,
conn_latency = conn_latency,
supervision_timeout = supervision_timeout,
minimum_ce_length = minimum_ce_length,
maximum_ce_length = maximum_ce_length
))
# TODO: check result
connection_handle = connection.handle,
connection_interval_min = connection_interval_min,
connection_interval_max = connection_interval_max,
max_latency = max_latency,
supervision_timeout = supervision_timeout,
min_ce_length = min_ce_length,
max_ce_length = max_ce_length
), check_result=True)
async def get_connection_rssi(self, connection):
result = await self.send_command(HCI_Read_RSSI_Command(handle = connection.handle), check_result=True)
return result.return_parameters.rssi
async def get_connection_phy(self, connection):
result = await self.send_command(
HCI_LE_Read_PHY_Command(connection_handle = connection.handle),
check_result=True
)
return (result.return_parameters.tx_phy, result.return_parameters.rx_phy)
async def set_connection_phy(
self,
connection,
tx_phys=None,
rx_phys=None,
phy_options=None
):
all_phys_bits = (1 if tx_phys is None else 0) | ((1 if rx_phys is None else 0) << 1)
return await self.send_command(
HCI_LE_Set_PHY_Command(
connection_handle = connection.handle,
all_phys = all_phys_bits,
tx_phys = phy_list_to_bits(tx_phys),
rx_phys = phy_list_to_bits(rx_phys),
phy_options = 0 # TODO: parse from function argument
)
)
async def set_default_phy(self, tx_phys=None, rx_phys=None):
all_phys_bits = (1 if tx_phys is None else 0) | ((1 if rx_phys is None else 0) << 1)
return await self.send_command(
HCI_LE_Set_Default_PHY_Command(
all_phys = all_phys_bits,
tx_phys = phy_list_to_bits(tx_phys),
rx_phys = phy_list_to_bits(rx_phys)
), check_result=True
)
async def find_peer_by_name(self, name, transport=BT_LE_TRANSPORT):
"""
@@ -1116,8 +1321,7 @@ class Device(CompositeEventEmitter):
event_name = 'advertisement'
handler = self.on(
event_name,
lambda address, ad_data, rssi, connectable:
on_peer_found(address, ad_data)
lambda advertisement: on_peer_found(advertisement.address, advertisement.data)
)
was_scanning = self.scanning
@@ -1371,26 +1575,41 @@ class Device(CompositeEventEmitter):
peer_resolvable_address = peer_address
peer_address = resolved_address
# Create a new connection
connection = Connection(
self,
connection_handle,
transport,
peer_address,
peer_resolvable_address,
role,
connection_parameters
)
self.connections[connection_handle] = connection
# We are no longer advertising
self.advertising = False
# Emit an event to notify listeners of the new connection
self.emit('connection', connection)
# Create and notify of the new connection asynchronously
async def new_connection():
# Figure out which PHY we're connected with
if self.host.supports_command(HCI_LE_READ_PHY_COMMAND):
result = await self.send_command(
HCI_LE_Read_PHY_Command(connection_handle=connection_handle),
check_result=True
)
phy = ConnectionPHY(result.return_parameters.tx_phy, result.return_parameters.rx_phy)
else:
phy = ConnectionPHY(HCI_LE_1M_PHY, HCI_LE_1M_PHY)
# Create a new connection
connection = Connection(
self,
connection_handle,
transport,
peer_address,
peer_resolvable_address,
role,
connection_parameters,
phy
)
self.connections[connection_handle] = connection
# Emit an event to notify listeners of the new connection
self.emit('connection', connection)
asyncio.create_task(new_connection())
@host_event_handler
def on_connection_failure(self, error_code):
def on_connection_failure(self, connection_handle, error_code):
logger.debug(f'*** Connection failed: {error_code}')
error = ConnectionError(
error_code,