add long read self test

This commit is contained in:
Gilles Boccon-Gibod
2022-07-31 12:01:25 -07:00
parent 16d684c199
commit 3fa5d320de
4 changed files with 55 additions and 3 deletions

View File

@@ -320,6 +320,12 @@ class CharacteristicAdapter:
def __getattr__(self, name): def __getattr__(self, name):
return getattr(self.wrapped_characteristic, name) return getattr(self.wrapped_characteristic, name)
def __setattr__(self, name, value):
if name in {'wrapped_characteristic', 'read_value', 'write_value', 'subscribe'}:
super().__setattr__(name, value)
else:
setattr(self.wrapped_characteristic, name, value)
def read_encoded_value(self, connection): def read_encoded_value(self, connection):
return self.encode_value(self.wrapped_characteristic.read_value(connection)) return self.encode_value(self.wrapped_characteristic.read_value(connection))
@@ -343,6 +349,10 @@ class CharacteristicAdapter:
None if subscriber is None else lambda value: subscriber(self.decode_value(value)) None if subscriber is None else lambda value: subscriber(self.decode_value(value))
) )
def __str__(self):
wrapped = str(self.wrapped_characteristic)
return f'{self.__class__.__name__}({wrapped})'
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class DelegatedCharacteristicAdapter(CharacteristicAdapter): class DelegatedCharacteristicAdapter(CharacteristicAdapter):

View File

@@ -545,13 +545,13 @@ class Server(EventEmitter):
value = attribute.read_value(connection) value = attribute.read_value(connection)
if request.value_offset > len(value): if request.value_offset > len(value):
response = ATT_Error_Response( response = ATT_Error_Response(
request_opcode_in_error = request.op_code, request_opcode_in_error = request.op_code,
attribute_handle_in_error = request.attribute_handle, attribute_handle_in_error = request.attribute_handle,
error_code = ATT_INVALID_OFFSET_ERROR error_code = ATT_INVALID_OFFSET_ERROR
) )
elif len(value) <= mtu - 1: elif len(value) <= mtu - 1:
response = ATT_Error_Response( response = ATT_Error_Response(
request_opcode_in_error = request.op_code, request_opcode_in_error = request.op_code,
attribute_handle_in_error = request.attribute_handle, attribute_handle_in_error = request.attribute_handle,
error_code = ATT_ATTRIBUTE_NOT_LONG_ERROR error_code = ATT_ATTRIBUTE_NOT_LONG_ERROR
) )

View File

@@ -3276,6 +3276,9 @@ class HCI_Event(HCI_Packet):
parameters = b'' if self.parameters is None else self.parameters parameters = b'' if self.parameters is None else self.parameters
return bytes([HCI_EVENT_PACKET, self.event_code, len(parameters)]) + parameters return bytes([HCI_EVENT_PACKET, self.event_code, len(parameters)]) + parameters
def __bytes__(self):
return self.to_bytes()
def __str__(self): def __str__(self):
result = color(self.name, 'magenta') result = color(self.name, 'magenta')
if fields := getattr(self, 'fields', None): if fields := getattr(self, 'fields', None):

View File

@@ -163,6 +163,44 @@ async def test_self_gatt():
assert(result == c1.value) assert(result == c1.value)
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_self_gatt_long_read():
# Create two devices, each with a controller, attached to the same link
two_devices = TwoDevices()
# Add some GATT characteristics to device 1
characteristics = [
Characteristic(
f'3A143AD7-D4A7-436B-97D6-5B62C315{i:04X}',
Characteristic.READ,
Characteristic.READABLE,
bytes([x & 255 for x in range(i)])
)
for i in range(0, 513)
]
service = Service('8140E247-04F0-42C1-BC34-534C344DAFCA', characteristics)
two_devices.devices[1].add_service(service)
# Start
await two_devices.devices[0].power_on()
await two_devices.devices[1].power_on()
# Connect the two devices
connection = await two_devices.devices[0].connect(two_devices.devices[1].random_address)
peer = Peer(connection)
result = await peer.discover_service(service.uuid)
assert(len(result) == 1)
found_service = result[0]
found_characteristics = await found_service.discover_characteristics()
assert(len(found_characteristics) == 513)
for (i, characteristic) in enumerate(found_characteristics):
value = await characteristic.read_value()
assert(value == characteristics[i].value)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
async def _test_self_smp_with_configs(pairing_config1, pairing_config2): async def _test_self_smp_with_configs(pairing_config1, pairing_config2):
# Create two devices, each with a controller, attached to the same link # Create two devices, each with a controller, attached to the same link
@@ -274,7 +312,7 @@ async def test_self_smp(io_cap, sc, mitm, key_dist):
pairing_config2.delegate.peer_delegate = pairing_config1.delegate pairing_config2.delegate.peer_delegate = pairing_config1.delegate
await _test_self_smp_with_configs(pairing_config1, pairing_config2) await _test_self_smp_with_configs(pairing_config1, pairing_config2)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -323,6 +361,7 @@ async def test_self_smp_wrong_pin():
async def run_test_self(): async def run_test_self():
await test_self_connection() await test_self_connection()
await test_self_gatt() await test_self_gatt()
await test_self_gatt_long_read()
await test_self_smp() await test_self_smp()
await test_self_smp_reject() await test_self_smp_reject()
await test_self_smp_wrong_pin() await test_self_smp_wrong_pin()