Compare commits

...

10 Commits

Author SHA1 Message Date
Gilles Boccon-Gibod
867e8c13dc lower the number of test cases for l2cap in order to speed up the test 2022-11-14 17:26:09 -08:00
Michael Mogenson
27c46eef9d Merge pull request #71 from mogenson/prefer-notify
Add prefer_notify option to gatt_client subscribe()
2022-11-13 19:53:09 -05:00
Michael Mogenson
c140876157 Add prefer_notify option to gatt_client subscribe()
If characteristic supports Notify and Indicate, the prefer_notify option
will subscribe with Notify if True or Indicate if False.

If characteristic only supports one property, Notify or Indicate, that
mode will be selected, regardless of the prefer_notify setting.

Tested with a characteristic that supports both Notify and Indicate and
verified that prefer_notify sets the desired mode.
2022-11-13 19:38:12 -05:00
Lucas Abel
d743656f09 Merge pull request #68 from google/uael/pairing-improvements
Pairing improvements
2022-11-11 21:03:17 -08:00
Abel Lucas
b91d0e24c1 device: handle HCI passkey notification event 2022-11-11 18:43:35 +00:00
Abel Lucas
eb46f60c87 le: save own_address_type on ACL connection for SMP to be able to use the right self address 2022-11-10 02:06:37 +00:00
Abel Lucas
8bbba7c84c pairing: always ask user for confirmation, even in JUST_WORKS method 2022-11-10 01:58:02 +00:00
Gilles Boccon-Gibod
ee54df2d08 Merge pull request #65 from google/gbg/fix-classic-connect-await
fix classic connection event filtering
2022-11-09 14:40:29 -08:00
Gilles Boccon-Gibod
6549e53398 Merge pull request #60 from google/gbg/fix-console-logs
use a formatter object, not a string
2022-11-09 13:19:26 -08:00
Gilles Boccon-Gibod
6e1baf0344 use a formatter object, not a string 2022-11-08 13:19:41 -08:00
8 changed files with 219 additions and 95 deletions

View File

@@ -902,7 +902,7 @@ class LogHandler(logging.Handler):
def __init__(self, app):
super().__init__()
self.app = app
self.setFormatter("[%(asctime)s][%(pathname)s:%(lineno)d][%(levelname)s] %(message)s")
self.setFormatter(logging.Formatter('[%(asctime)s][%(levelname)s] %(message)s'))
def emit(self, record):
message = self.format(record)

View File

@@ -294,8 +294,8 @@ class Peer:
async def discover_attributes(self):
return await self.gatt_client.discover_attributes()
async def subscribe(self, characteristic, subscriber=None):
return await self.gatt_client.subscribe(characteristic, subscriber)
async def subscribe(self, characteristic, subscriber=None, prefer_notify=True):
return await self.gatt_client.subscribe(characteristic, subscriber, prefer_notify)
async def unsubscribe(self, characteristic, subscriber=None):
return await self.gatt_client.unsubscribe(characteristic, subscriber)
@@ -394,6 +394,7 @@ class Connection(CompositeEventEmitter):
device,
handle,
transport,
self_address,
peer_address,
peer_resolvable_address,
role,
@@ -404,6 +405,7 @@ class Connection(CompositeEventEmitter):
self.device = device
self.handle = handle
self.transport = transport
self.self_address = self_address
self.peer_address = peer_address
self.peer_resolvable_address = peer_resolvable_address
self.peer_name = None # Classic only
@@ -699,6 +701,10 @@ class Device(CompositeEventEmitter):
self.address_resolver = None
self.classic_pending_accepts = {Address.ANY: []} # Futures, by BD address OR [Futures] for Address.ANY
# Own address type cache
self.advertising_own_address_type = None
self.connect_own_address_type = None
# Use the initial config or a default
self.public_address = Address('00:00:00:00:00:00')
if config is None:
@@ -731,8 +737,7 @@ class Device(CompositeEventEmitter):
self.random_address = address
# Setup SMP
# TODO: allow using a public address
self.smp_manager = smp.Manager(self, self.random_address)
self.smp_manager = smp.Manager(self)
self.l2cap_channel_manager.register_fixed_channel(
smp.SMP_CID, self.on_smp_pdu)
self.l2cap_channel_manager.register_fixed_channel(
@@ -928,7 +933,7 @@ class Device(CompositeEventEmitter):
self,
advertising_type=AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE,
target=None,
own_address_type=Address.RANDOM_DEVICE_ADDRESS,
own_address_type=OwnAddressType.RANDOM,
auto_restart=False
):
# If we're advertising, stop first
@@ -975,9 +980,10 @@ class Device(CompositeEventEmitter):
advertising_enable = 1
), check_result=True)
self.auto_restart_advertising = auto_restart
self.advertising_type = advertising_type
self.advertising = True
self.advertising_own_address_type = own_address_type
self.auto_restart_advertising = auto_restart
self.advertising_type = advertising_type
self.advertising = True
async def stop_advertising(self):
# Disable advertising
@@ -986,9 +992,10 @@ class Device(CompositeEventEmitter):
advertising_enable = 0
), check_result=True)
self.advertising = False
self.advertising_type = None
self.auto_restart_advertising = False
self.advertising_own_address_type = None
self.advertising = False
self.advertising_type = None
self.auto_restart_advertising = False
@property
def is_advertising(self):
@@ -1000,7 +1007,7 @@ class Device(CompositeEventEmitter):
active=True,
scan_interval=DEVICE_DEFAULT_SCAN_INTERVAL, # Scan interval in ms
scan_window=DEVICE_DEFAULT_SCAN_WINDOW, # Scan window in ms
own_address_type=Address.RANDOM_DEVICE_ADDRESS,
own_address_type=OwnAddressType.RANDOM,
filter_duplicates=False,
scanning_phys=(HCI_LE_1M_PHY, HCI_LE_CODED_PHY)
):
@@ -1181,7 +1188,7 @@ class Device(CompositeEventEmitter):
peer_address,
transport=BT_LE_TRANSPORT,
connection_parameters_preferences=None,
own_address_type=Address.RANDOM_DEVICE_ADDRESS,
own_address_type=OwnAddressType.RANDOM,
timeout=DEVICE_DEFAULT_CONNECT_TIMEOUT
):
'''
@@ -1251,6 +1258,8 @@ class Device(CompositeEventEmitter):
HCI_LE_CODED_PHY: ConnectionParametersPreferences.default
}
self.connect_own_address_type = own_address_type
if self.host.supports_command(HCI_LE_EXTENDED_CREATE_CONNECTION_COMMAND):
# Only keep supported PHYs
phys = sorted(list(set(filter(self.supports_le_phy, connection_parameters_preferences.keys()))))
@@ -1350,6 +1359,7 @@ class Device(CompositeEventEmitter):
self.remove_listener('connection_failure', on_connection_failure)
if transport == BT_LE_TRANSPORT:
self.le_connecting = False
self.connect_own_address_type = None
async def accept(
self,
@@ -1847,6 +1857,7 @@ class Device(CompositeEventEmitter):
self,
connection_handle,
transport,
self.public_address,
peer_address,
peer_resolvable_address,
role,
@@ -1875,8 +1886,17 @@ class Device(CompositeEventEmitter):
peer_resolvable_address = peer_address
peer_address = resolved_address
# Guess which own address type is used for this connection.
# This logic is somewhat correct but may need to be improved
# when multiple advertising are run simultaneously.
if self.connect_own_address_type is not None:
own_address_type = self.connect_own_address_type
else:
own_address_type = self.advertising_own_address_type
# We are no longer advertising
self.advertising = False
self.advertising_own_address_type = None
self.advertising = False
# Create and notify of the new connection asynchronously
async def new_connection():
@@ -1890,11 +1910,16 @@ class Device(CompositeEventEmitter):
else:
phy = ConnectionPHY(HCI_LE_1M_PHY, HCI_LE_1M_PHY)
self_address = self.random_address
if own_address_type in (OwnAddressType.PUBLIC, OwnAddressType.RESOLVABLE_OR_PUBLIC):
self_address = self.public_address
# Create a new connection
connection = Connection(
self,
connection_handle,
transport,
self_address,
peer_address,
peer_resolvable_address,
role,
@@ -1914,7 +1939,8 @@ class Device(CompositeEventEmitter):
# For directed advertising, this means a timeout
if transport == BT_LE_TRANSPORT and self.advertising and self.advertising_type.is_directed:
self.advertising = False
self.advertising_own_address_type = None
self.advertising = False
# Notify listeners
error = ConnectionError(
@@ -2067,13 +2093,13 @@ class Device(CompositeEventEmitter):
# Ask what the pairing config should be for this connection
pairing_config = self.pairing_config_factory(connection)
can_confirm = pairing_config.delegate.io_capability not in {
can_compare = pairing_config.delegate.io_capability not in {
smp.SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY,
smp.SMP_DISPLAY_ONLY_IO_CAPABILITY
}
# Respond
if can_confirm and pairing_config.delegate:
if can_compare:
async def compare_numbers():
numbers_match = await pairing_config.delegate.compare_numbers(code, digits=6)
if numbers_match:
@@ -2087,9 +2113,18 @@ class Device(CompositeEventEmitter):
asyncio.create_task(compare_numbers())
else:
self.host.send_command_sync(
HCI_User_Confirmation_Request_Reply_Command(bd_addr=connection.peer_address)
)
async def confirm():
confirm = await pairing_config.delegate.confirm()
if confirm:
self.host.send_command_sync(
HCI_User_Confirmation_Request_Reply_Command(bd_addr=connection.peer_address)
)
else:
self.host.send_command_sync(
HCI_User_Confirmation_Request_Negative_Reply_Command(bd_addr=connection.peer_address)
)
asyncio.create_task(confirm())
# [Classic only]
@host_event_handler
@@ -2104,7 +2139,7 @@ class Device(CompositeEventEmitter):
}
# Respond
if can_input and pairing_config.delegate:
if can_input:
async def get_number():
number = await pairing_config.delegate.get_number()
if number is not None:
@@ -2124,6 +2159,15 @@ class Device(CompositeEventEmitter):
HCI_User_Passkey_Request_Negative_Reply_Command(bd_addr=connection.peer_address)
)
# [Classic only]
@host_event_handler
@with_connection_from_address
def on_authentication_user_passkey_notification(self, connection, passkey):
# Ask what the pairing config should be for this connection
pairing_config = self.pairing_config_factory(connection)
asyncio.create_task(pairing_config.delegate.display_number(passkey))
# [Classic only]
@host_event_handler
@try_with_connection_from_address

View File

@@ -26,20 +26,17 @@
import asyncio
import logging
import struct
from colors import color
from .core import ProtocolError, TimeoutError
from .hci import *
from .att import *
from .gatt import (
GATT_CLIENT_CHARACTERISTIC_CONFIGURATION_DESCRIPTOR,
GATT_REQUEST_TIMEOUT,
GATT_PRIMARY_SERVICE_ATTRIBUTE_TYPE,
GATT_SECONDARY_SERVICE_ATTRIBUTE_TYPE,
GATT_CHARACTERISTIC_ATTRIBUTE_TYPE,
Characteristic,
ClientCharacteristicConfigurationBits
)
from .core import InvalidStateError, ProtocolError, TimeoutError
from .gatt import (GATT_CHARACTERISTIC_ATTRIBUTE_TYPE,
GATT_CLIENT_CHARACTERISTIC_CONFIGURATION_DESCRIPTOR,
GATT_PRIMARY_SERVICE_ATTRIBUTE_TYPE, GATT_REQUEST_TIMEOUT,
GATT_SECONDARY_SERVICE_ATTRIBUTE_TYPE, Characteristic,
ClientCharacteristicConfigurationBits)
from .hci import *
# -----------------------------------------------------------------------------
# Logging
@@ -115,7 +112,7 @@ class CharacteristicProxy(AttributeProxy):
async def discover_descriptors(self):
return await self.client.discover_descriptors(self)
async def subscribe(self, subscriber=None):
async def subscribe(self, subscriber=None, prefer_notify=True):
if subscriber is not None:
if subscriber in self.subscribers:
# We already have a proxy subscriber
@@ -129,7 +126,7 @@ class CharacteristicProxy(AttributeProxy):
self.subscribers[subscriber] = on_change
subscriber = on_change
return await self.client.subscribe(self, subscriber)
return await self.client.subscribe(self, subscriber, prefer_notify)
async def unsubscribe(self, subscriber=None):
if subscriber in self.subscribers:
@@ -547,7 +544,7 @@ class Client:
return attributes
async def subscribe(self, characteristic, subscriber=None):
async def subscribe(self, characteristic, subscriber=None, prefer_notify=True):
# If we haven't already discovered the descriptors for this characteristic, do it now
if not characteristic.descriptors_discovered:
await self.discover_descriptors(characteristic)
@@ -558,23 +555,32 @@ class Client:
logger.warning('subscribing to characteristic with no CCCD descriptor')
return
# Set the subscription bits and select the subscriber set
bits = ClientCharacteristicConfigurationBits.DEFAULT
subscriber_sets = []
if characteristic.properties & Characteristic.NOTIFY:
bits |= ClientCharacteristicConfigurationBits.NOTIFICATION
subscriber_sets.append(self.notification_subscribers.setdefault(characteristic.handle, set()))
if characteristic.properties & Characteristic.INDICATE:
bits |= ClientCharacteristicConfigurationBits.INDICATION
subscriber_sets.append(self.indication_subscribers.setdefault(characteristic.handle, set()))
if (
characteristic.properties & Characteristic.NOTIFY
and characteristic.properties & Characteristic.INDICATE
):
if prefer_notify:
bits = ClientCharacteristicConfigurationBits.NOTIFICATION
subscribers = self.notification_subscribers
else:
bits = ClientCharacteristicConfigurationBits.INDICATION
subscribers = self.indication_subscribers
elif characteristic.properties & Characteristic.NOTIFY:
bits = ClientCharacteristicConfigurationBits.NOTIFICATION
subscribers = self.notification_subscribers
elif characteristic.properties & Characteristic.INDICATE:
bits = ClientCharacteristicConfigurationBits.INDICATION
subscribers = self.indication_subscribers
else:
raise InvalidStateError("characteristic is not notify or indicate")
# Add subscribers to the sets
for subscriber_set in subscriber_sets:
if subscriber is not None:
subscriber_set.add(subscriber)
# Add the characteristic as a subscriber, which will result in the characteristic
# emitting an 'update' event when a notification or indication is received
subscriber_set.add(characteristic)
subscriber_set = subscribers.setdefault(characteristic.handle, set())
if subscriber is not None:
subscriber_set.add(subscriber)
# Add the characteristic as a subscriber, which will result in the characteristic
# emitting an 'update' event when a notification or indication is received
subscriber_set.add(characteristic)
await self.write_value(cccd, struct.pack('<H', bits), with_response=True)

View File

@@ -1756,6 +1756,26 @@ class Address:
return ':'.join([f'{x:02X}' for x in reversed(self.address_bytes)])
# -----------------------------------------------------------------------------
class OwnAddressType:
PUBLIC = 0
RANDOM = 1
RESOLVABLE_OR_PUBLIC = 2
RESOLVABLE_OR_RANDOM = 3
TYPE_NAMES = {
PUBLIC: 'PUBLIC',
RANDOM: 'RANDOM',
RESOLVABLE_OR_PUBLIC: 'RESOLVABLE_OR_PUBLIC',
RESOLVABLE_OR_RANDOM: 'RESOLVABLE_OR_RANDOM'
}
@staticmethod
def type_name(type):
return name_or_number(OwnAddressType.TYPE_NAMES, type)
TYPE_SPEC = {'size': 1, 'mapper': lambda x: OwnAddressType.type_name(x)}
# -----------------------------------------------------------------------------
class HCI_Packet:
'''
@@ -2848,7 +2868,7 @@ class HCI_LE_Set_Random_Address_Command(HCI_Command):
('advertising_interval_min', 2),
('advertising_interval_max', 2),
('advertising_type', {'size': 1, 'mapper': lambda x: HCI_LE_Set_Advertising_Parameters_Command.advertising_type_name(x)}),
('own_address_type', Address.ADDRESS_TYPE_SPEC),
('own_address_type', OwnAddressType.TYPE_SPEC),
('peer_address_type', Address.ADDRESS_TYPE_SPEC),
('peer_address', Address.parse_address_preceded_by_type),
('advertising_channel_map', 1),
@@ -2927,7 +2947,7 @@ class HCI_LE_Set_Advertising_Enable_Command(HCI_Command):
('le_scan_type', 1),
('le_scan_interval', 2),
('le_scan_window', 2),
('own_address_type', Address.ADDRESS_TYPE_SPEC),
('own_address_type', OwnAddressType.TYPE_SPEC),
('scanning_filter_policy', 1)
])
class HCI_LE_Set_Scan_Parameters_Command(HCI_Command):
@@ -2961,7 +2981,7 @@ class HCI_LE_Set_Scan_Enable_Command(HCI_Command):
('initiator_filter_policy', 1),
('peer_address_type', Address.ADDRESS_TYPE_SPEC),
('peer_address', Address.parse_address_preceded_by_type),
('own_address_type', Address.ADDRESS_TYPE_SPEC),
('own_address_type', OwnAddressType.TYPE_SPEC),
('connection_interval_min', 2),
('connection_interval_max', 2),
('max_latency', 2),
@@ -3283,7 +3303,7 @@ class HCI_LE_Set_Advertising_Set_Random_Address_Command(HCI_Command):
('primary_advertising_interval_min', 3),
('primary_advertising_interval_max', 3),
('primary_advertising_channel_map', {'size': 1, 'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Parameters_Command.channel_map_string(x)}),
('own_address_type', Address.ADDRESS_TYPE_SPEC),
('own_address_type', OwnAddressType.TYPE_SPEC),
('peer_address_type', Address.ADDRESS_TYPE_SPEC),
('peer_address', Address.parse_address_preceded_by_type),
('advertising_filter_policy', 1),
@@ -3687,7 +3707,7 @@ class HCI_LE_Extended_Create_Connection_Command(HCI_Command):
initiating_phys_strs = bit_flags_to_strings(self.initiating_phys, HCI_LE_PHY_BIT_NAMES)
fields = [
('initiator_filter_policy:', self.initiator_filter_policy),
('own_address_type: ', Address.address_type_name(self.own_address_type)),
('own_address_type: ', OwnAddressType.type_name(self.own_address_type)),
('peer_address_type: ', Address.address_type_name(self.peer_address_type)),
('peer_address: ', str(self.peer_address)),
('initiating_phys: ', ','.join(initiating_phys_strs)),
@@ -4855,6 +4875,17 @@ class HCI_Link_Supervision_Timeout_Changed_Event(HCI_Event):
'''
# -----------------------------------------------------------------------------
@HCI_Event.event([
('bd_addr', Address.parse_address),
('passkey', 4)
])
class HCI_User_Passkey_Notification_Event(HCI_Event):
'''
See Bluetooth spec @ 7.7.48 User Passkey Notification Event
'''
# -----------------------------------------------------------------------------
@HCI_Event.event([
('bd_addr', Address.parse_address),

View File

@@ -638,6 +638,9 @@ class Host(EventEmitter):
def on_hci_user_passkey_request_event(self, event):
self.emit('authentication_user_passkey_request', event.bd_addr)
def on_hci_user_passkey_notification_event(self, event):
self.emit('authentication_user_passkey_notification', event.bd_addr, event.passkey)
def on_hci_inquiry_complete_event(self, event):
self.emit('inquiry_complete')

View File

@@ -477,6 +477,9 @@ class PairingDelegate:
async def accept(self):
return True
async def confirm(self):
return True
async def compare_numbers(self, number, digits=6):
return True
@@ -637,15 +640,16 @@ class Session:
self.oob = False
# Set up addresses
self_address = connection.self_address
peer_address = connection.peer_resolvable_address or connection.peer_address
if self.is_initiator:
self.ia = bytes(manager.address)
self.iat = 1 if manager.address.is_random else 0
self.ia = bytes(self_address)
self.iat = 1 if self_address.is_random else 0
self.ra = bytes(peer_address)
self.rat = 1 if peer_address.is_random else 0
else:
self.ra = bytes(manager.address)
self.rat = 1 if manager.address.is_random else 0
self.ra = bytes(self_address)
self.rat = 1 if self_address.is_random else 0
self.ia = bytes(peer_address)
self.iat = 1 if peer_address.is_random else 0
@@ -715,6 +719,21 @@ class Session:
return False
return True
def prompt_user_for_confirmation(self, next_steps):
async def prompt():
logger.debug('ask for confirmation')
try:
response = await self.pairing_config.delegate.confirm()
if response:
next_steps()
return
except Exception as error:
logger.warn(f'exception while confirm: {error}')
self.send_pairing_failed(SMP_CONFIRM_VALUE_FAILED_ERROR)
asyncio.create_task(prompt())
def prompt_user_for_numeric_comparison(self, code, next_steps):
async def prompt():
logger.debug(f'verification code: {code}')
@@ -907,8 +926,8 @@ class Session:
SMP_Identity_Information_Command(identity_resolving_key=self.manager.device.irk)
)
self.send_command(SMP_Identity_Address_Information_Command(
addr_type = self.manager.address.address_type,
bd_addr = self.manager.address
addr_type = self.connection.self_address.address_type,
bd_addr = self.connection.self_address
))
# Distribute CSRK
@@ -939,8 +958,8 @@ class Session:
SMP_Identity_Information_Command(identity_resolving_key=self.manager.device.irk)
)
self.send_command(SMP_Identity_Address_Information_Command(
addr_type = self.manager.address.address_type,
bd_addr = self.manager.address
addr_type = self.connection.self_address.address_type,
bd_addr = self.connection.self_address
))
# Distribute CSRK
@@ -1387,12 +1406,12 @@ class Session:
# Compute the 6-digit code
code = crypto.g2(self.pka, self.pkb, self.na, self.nb) % 1000000
if self.pairing_method == self.NUMERIC_COMPARISON:
# Ask for user confirmation
self.wait_before_continuing = asyncio.get_running_loop().create_future()
self.prompt_user_for_numeric_comparison(code, next_steps)
# Ask for user confirmation
self.wait_before_continuing = asyncio.get_running_loop().create_future()
if self.pairing_method == self.JUST_WORKS:
self.prompt_user_for_confirmation(next_steps)
else:
next_steps()
self.prompt_user_for_numeric_comparison(code, next_steps)
else:
next_steps()
@@ -1486,10 +1505,9 @@ class Manager(EventEmitter):
Implements the Initiator and Responder roles of the Security Manager Protocol
'''
def __init__(self, device, address):
def __init__(self, device):
super().__init__()
self.device = device
self.address = address
self.sessions = {}
self._ecc_key = None
self.pairing_config_factory = lambda connection: PairingConfig()

View File

@@ -612,14 +612,25 @@ async def test_subscribe_notify():
await async_barrier()
assert not c2._called
c3._called = False
c3._called_2 = False
c3._called_3 = False
c3._last_update = None
c3._last_update_2 = None
c3._last_update_3 = None
def on_c3_update(value):
c3._called = True
c3._last_update = value
def on_c3_update_2(value):
def on_c3_update_2(value): # for notify
c3._called_2 = True
c3._last_update_2 = value
def on_c3_update_3(value): # for indicate
c3._called_3 = True
c3._last_update_3 = value
c3.on('update', on_c3_update)
await peer.subscribe(c3, on_c3_update_2)
await async_barrier()
@@ -629,22 +640,33 @@ async def test_subscribe_notify():
assert c3._last_update == characteristic3.value
assert c3._called_2
assert c3._last_update_2 == characteristic3.value
assert not c3._called_3
c3._called = False
c3._called_2 = False
c3._called_3 = False
await peer.unsubscribe(c3)
await peer.subscribe(c3, on_c3_update_3, prefer_notify=False)
await async_barrier()
characteristic3.value = bytes([1, 2, 3])
await server.indicate_subscriber(characteristic3._last_subscription[0], characteristic3)
await async_barrier()
assert c3._called
assert c3._last_update == characteristic3.value
assert c3._called_2
assert c3._last_update_2 == characteristic3.value
assert not c3._called_2
assert c3._called_3
assert c3._last_update_3 == characteristic3.value
c3._called = False
c3._called_2 = False
c3._called_3 = False
await peer.unsubscribe(c3)
await server.notify_subscriber(characteristic3._last_subscription[0], characteristic3)
await server.indicate_subscriber(characteristic3._last_subscription[0], characteristic3)
await async_barrier()
assert not c3._called
assert not c3._called_2
assert not c3._called_3
# -----------------------------------------------------------------------------

View File

@@ -85,8 +85,8 @@ async def setup_connection():
await two_devices.devices[0].connect(two_devices.devices[1].random_address)
# Check the post conditions
assert(two_devices.connections[0] is not None)
assert(two_devices.connections[1] is not None)
assert two_devices.connections[0] is not None
assert two_devices.connections[1] is not None
return two_devices
@@ -94,31 +94,31 @@ async def setup_connection():
# -----------------------------------------------------------------------------
def test_helpers():
psm = L2CAP_Connection_Request.serialize_psm(0x01)
assert(psm == bytes([0x01, 0x00]))
assert psm == bytes([0x01, 0x00])
psm = L2CAP_Connection_Request.serialize_psm(0x1023)
assert(psm == bytes([0x23, 0x10]))
assert psm == bytes([0x23, 0x10])
psm = L2CAP_Connection_Request.serialize_psm(0x242311)
assert(psm == bytes([0x11, 0x23, 0x24]))
assert psm == bytes([0x11, 0x23, 0x24])
(offset, psm) = L2CAP_Connection_Request.parse_psm(bytes([0x00, 0x01, 0x00, 0x44]), 1)
assert(offset == 3)
assert(psm == 0x01)
assert offset == 3
assert psm == 0x01
(offset, psm) = L2CAP_Connection_Request.parse_psm(bytes([0x00, 0x23, 0x10, 0x44]), 1)
assert(offset == 3)
assert(psm == 0x1023)
assert offset == 3
assert psm == 0x1023
(offset, psm) = L2CAP_Connection_Request.parse_psm(bytes([0x00, 0x11, 0x23, 0x24, 0x44]), 1)
assert(offset == 4)
assert(psm == 0x242311)
assert offset == 4
assert psm == 0x242311
rq = L2CAP_Connection_Request(psm = 0x01, source_cid = 0x44)
brq = bytes(rq)
srq = L2CAP_Connection_Request.from_bytes(brq)
assert(srq.psm == rq.psm)
assert(srq.source_cid == rq.source_cid)
assert srq.psm == rq.psm
assert srq.source_cid == rq.source_cid
# -----------------------------------------------------------------------------
@@ -170,12 +170,12 @@ async def test_basic_connection():
l2cap_channel.on('close', lambda: on_close(0, None))
incoming_channel.on('close', lambda: on_close(1, closed_event))
await l2cap_channel.disconnect()
assert(closed == [True, True])
assert closed == [True, True]
await closed_event.wait()
sent_bytes = b''.join(messages)
received_bytes = b''.join(received)
assert(sent_bytes == received_bytes)
assert sent_bytes == received_bytes
# -----------------------------------------------------------------------------
@@ -201,7 +201,7 @@ async def transfer_payload(max_credits, mtu, mps):
messages = [
bytes([1, 2, 3, 4, 5, 6, 7]) * x
for x in (3, 10, 100, 500, 789)
for x in (3, 10, 100, 789)
]
for message in messages:
l2cap_channel.write(message)
@@ -214,14 +214,14 @@ async def transfer_payload(max_credits, mtu, mps):
sent_bytes = b''.join(messages)
received_bytes = b''.join(received)
assert(sent_bytes == received_bytes)
assert sent_bytes == received_bytes
@pytest.mark.asyncio
async def test_transfer():
for max_credits in (1, 10, 100, 10000):
for mtu in (23, 24, 25, 26, 50, 200, 255, 256, 1000):
for mps in (23, 24, 25, 26, 50, 200, 255, 256, 1000):
for mtu in (50, 255, 256, 1000):
for mps in (50, 255, 256, 1000):
# print(max_credits, mtu, mps)
await transfer_payload(max_credits, mtu, mps)
@@ -267,8 +267,8 @@ async def test_bidirectional_transfer():
message_bytes = b''.join(messages)
client_received_bytes = b''.join(client_received)
server_received_bytes = b''.join(server_received)
assert(client_received_bytes == message_bytes)
assert(server_received_bytes == message_bytes)
assert client_received_bytes == message_bytes
assert server_received_bytes == message_bytes
# -----------------------------------------------------------------------------