forked from auracaster/bumble_mirror
Add EATT Support
This commit is contained in:
@@ -28,6 +28,7 @@ from unittest.mock import ANY, AsyncMock, Mock
|
||||
import pytest
|
||||
from typing_extensions import Self
|
||||
|
||||
from bumble import gatt_client, l2cap
|
||||
from bumble.att import (
|
||||
ATT_ATTRIBUTE_NOT_FOUND_ERROR,
|
||||
ATT_PDU,
|
||||
@@ -63,7 +64,6 @@ from bumble.gatt_adapters import (
|
||||
UTF8CharacteristicAdapter,
|
||||
UTF8CharacteristicProxyAdapter,
|
||||
)
|
||||
from bumble.gatt_client import CharacteristicProxy
|
||||
|
||||
from .test_utils import Devices, TwoDevices, async_barrier
|
||||
|
||||
@@ -140,7 +140,7 @@ async def test_characteristic_encoding():
|
||||
await c.write_value(Mock(), bytes([122]))
|
||||
assert c.value == 122
|
||||
|
||||
class FooProxy(CharacteristicProxy):
|
||||
class FooProxy(gatt_client.CharacteristicProxy):
|
||||
def __init__(self, characteristic):
|
||||
super().__init__(
|
||||
characteristic.client,
|
||||
@@ -456,7 +456,7 @@ async def test_CharacteristicProxyAdapter() -> None:
|
||||
async def write_value(self, handle, value, with_response=False):
|
||||
self.value = value
|
||||
|
||||
class TestAttributeProxy(CharacteristicProxy):
|
||||
class TestAttributeProxy(gatt_client.CharacteristicProxy):
|
||||
def __init__(self, value) -> None:
|
||||
super().__init__(Client(value), 0, 0, None, 0) # type: ignore
|
||||
|
||||
@@ -1425,10 +1425,10 @@ async def test_get_characteristics_by_uuid():
|
||||
await peer.discover_characteristics()
|
||||
c = peer.get_characteristics_by_uuid(uuid=UUID('1234'))
|
||||
assert len(c) == 2
|
||||
assert isinstance(c[0], CharacteristicProxy)
|
||||
assert isinstance(c[0], gatt_client.CharacteristicProxy)
|
||||
c = peer.get_characteristics_by_uuid(uuid=UUID('1234'), service=UUID('ABCD'))
|
||||
assert len(c) == 1
|
||||
assert isinstance(c[0], CharacteristicProxy)
|
||||
assert isinstance(c[0], gatt_client.CharacteristicProxy)
|
||||
c = peer.get_characteristics_by_uuid(uuid=UUID('1234'), service=UUID('AAAA'))
|
||||
assert len(c) == 0
|
||||
|
||||
@@ -1463,6 +1463,181 @@ async def test_write_return_error():
|
||||
assert e.value.error_code == ErrorCode.VALUE_NOT_ALLOWED
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@pytest.mark.asyncio
|
||||
async def test_eatt_read():
|
||||
devices = await TwoDevices.create_with_connection()
|
||||
devices[1].gatt_server.register_eatt()
|
||||
|
||||
characteristic = Characteristic(
|
||||
'1234',
|
||||
Characteristic.Properties.READ,
|
||||
Characteristic.Permissions.READABLE,
|
||||
b'9999',
|
||||
)
|
||||
service = Service('ABCD', [characteristic])
|
||||
devices[1].add_service(service)
|
||||
|
||||
client = await gatt_client.Client.connect_eatt(devices.connections[0])
|
||||
await client.discover_services()
|
||||
service_proxy = client.get_services_by_uuid(service.uuid)[0]
|
||||
await service_proxy.discover_characteristics()
|
||||
characteristic_proxy = service_proxy.get_characteristics_by_uuid(
|
||||
characteristic.uuid
|
||||
)[0]
|
||||
assert await characteristic_proxy.read_value() == b'9999'
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@pytest.mark.asyncio
|
||||
async def test_eatt_write():
|
||||
devices = await TwoDevices.create_with_connection()
|
||||
devices[1].gatt_server.register_eatt()
|
||||
|
||||
write_queue = asyncio.Queue()
|
||||
characteristic = Characteristic(
|
||||
'1234',
|
||||
Characteristic.Properties.WRITE,
|
||||
Characteristic.Permissions.WRITEABLE,
|
||||
CharacteristicValue(write=lambda *args: write_queue.put_nowait(args)),
|
||||
)
|
||||
service = Service('ABCD', [characteristic])
|
||||
devices[1].add_service(service)
|
||||
|
||||
client = await gatt_client.Client.connect_eatt(devices.connections[0])
|
||||
await client.discover_services()
|
||||
service_proxy = client.get_services_by_uuid(service.uuid)[0]
|
||||
await service_proxy.discover_characteristics()
|
||||
characteristic_proxy = service_proxy.get_characteristics_by_uuid(
|
||||
characteristic.uuid
|
||||
)[0]
|
||||
await characteristic_proxy.write_value(b'9999')
|
||||
assert await write_queue.get() == (devices.connections[1], b'9999')
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@pytest.mark.asyncio
|
||||
async def test_eatt_notify():
|
||||
devices = await TwoDevices.create_with_connection()
|
||||
devices[1].gatt_server.register_eatt()
|
||||
|
||||
characteristic = Characteristic(
|
||||
'1234',
|
||||
Characteristic.Properties.NOTIFY,
|
||||
Characteristic.Permissions.WRITEABLE,
|
||||
)
|
||||
service = Service('ABCD', [characteristic])
|
||||
devices[1].add_service(service)
|
||||
|
||||
clients = [
|
||||
(
|
||||
devices.connections[0].gatt_client,
|
||||
asyncio.Queue[bytes](),
|
||||
),
|
||||
(
|
||||
await gatt_client.Client.connect_eatt(devices.connections[0]),
|
||||
asyncio.Queue[bytes](),
|
||||
),
|
||||
(
|
||||
await gatt_client.Client.connect_eatt(devices.connections[0]),
|
||||
asyncio.Queue[bytes](),
|
||||
),
|
||||
]
|
||||
for client, queue in clients:
|
||||
await client.discover_services()
|
||||
service_proxy = client.get_services_by_uuid(service.uuid)[0]
|
||||
await service_proxy.discover_characteristics()
|
||||
characteristic_proxy = service_proxy.get_characteristics_by_uuid(
|
||||
characteristic.uuid
|
||||
)[0]
|
||||
|
||||
for client, queue in clients[:2]:
|
||||
characteristic_proxy = service_proxy.get_characteristics_by_uuid(
|
||||
characteristic.uuid
|
||||
)[0]
|
||||
await characteristic_proxy.subscribe(queue.put_nowait, prefer_notify=True)
|
||||
|
||||
await devices[1].gatt_server.notify_subscribers(characteristic, b'1234')
|
||||
for _, queue in clients[:2]:
|
||||
assert await queue.get() == b'1234'
|
||||
assert queue.empty()
|
||||
assert clients[2][1].empty()
|
||||
|
||||
await devices[1].gatt_server.notify_subscriber(
|
||||
devices.connections[1], characteristic, b'5678'
|
||||
)
|
||||
for _, queue in clients[:2]:
|
||||
assert await queue.get() == b'5678'
|
||||
assert queue.empty()
|
||||
assert clients[2][1].empty()
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@pytest.mark.asyncio
|
||||
async def test_eatt_indicate():
|
||||
devices = await TwoDevices.create_with_connection()
|
||||
devices[1].gatt_server.register_eatt()
|
||||
|
||||
characteristic = Characteristic(
|
||||
'1234',
|
||||
Characteristic.Properties.INDICATE,
|
||||
Characteristic.Permissions.WRITEABLE,
|
||||
)
|
||||
service = Service('ABCD', [characteristic])
|
||||
devices[1].add_service(service)
|
||||
|
||||
clients = [
|
||||
(
|
||||
devices.connections[0].gatt_client,
|
||||
asyncio.Queue[bytes](),
|
||||
),
|
||||
(
|
||||
await gatt_client.Client.connect_eatt(devices.connections[0]),
|
||||
asyncio.Queue[bytes](),
|
||||
),
|
||||
(
|
||||
await gatt_client.Client.connect_eatt(devices.connections[0]),
|
||||
asyncio.Queue[bytes](),
|
||||
),
|
||||
]
|
||||
for client, queue in clients:
|
||||
await client.discover_services()
|
||||
service_proxy = client.get_services_by_uuid(service.uuid)[0]
|
||||
await service_proxy.discover_characteristics()
|
||||
characteristic_proxy = service_proxy.get_characteristics_by_uuid(
|
||||
characteristic.uuid
|
||||
)[0]
|
||||
|
||||
for client, queue in clients[:2]:
|
||||
characteristic_proxy = service_proxy.get_characteristics_by_uuid(
|
||||
characteristic.uuid
|
||||
)[0]
|
||||
await characteristic_proxy.subscribe(queue.put_nowait, prefer_notify=False)
|
||||
|
||||
await devices[1].gatt_server.indicate_subscribers(characteristic, b'1234')
|
||||
for _, queue in clients[:2]:
|
||||
assert await queue.get() == b'1234'
|
||||
assert queue.empty()
|
||||
assert clients[2][1].empty()
|
||||
|
||||
await devices[1].gatt_server.indicate_subscriber(
|
||||
devices.connections[1], characteristic, b'5678'
|
||||
)
|
||||
for _, queue in clients[:2]:
|
||||
assert await queue.get() == b'5678'
|
||||
assert queue.empty()
|
||||
assert clients[2][1].empty()
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@pytest.mark.asyncio
|
||||
async def test_eatt_connection_failure():
|
||||
devices = await TwoDevices.create_with_connection()
|
||||
|
||||
with pytest.raises(l2cap.L2capError):
|
||||
await gatt_client.Client.connect_eatt(devices.connections[0])
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
if __name__ == '__main__':
|
||||
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
|
||||
|
||||
Reference in New Issue
Block a user