Merge pull request #13 from google/gbg/standard-profiles

support for type adapters and framework for standard GATT profiles
This commit is contained in:
Gilles Boccon-Gibod
2022-07-22 10:21:39 -07:00
committed by GitHub
19 changed files with 936 additions and 273 deletions
+150
View File
@@ -18,6 +18,7 @@
import asyncio
import logging
import os
import struct
import pytest
from bumble.controller import Controller
@@ -25,6 +26,12 @@ from bumble.link import LocalLink
from bumble.device import Device, Peer
from bumble.host import Host
from bumble.gatt import (
GATT_BATTERY_LEVEL_CHARACTERISTIC,
CharacteristicAdapter,
DelegatedCharacteristicAdapter,
PackedCharacteristicAdapter,
MappedCharacteristicAdapter,
UTF8CharacteristicAdapter,
Service,
Characteristic,
CharacteristicValue
@@ -91,6 +98,96 @@ def test_ATT_Read_By_Group_Type_Request():
basic_check(pdu)
# -----------------------------------------------------------------------------
def test_CharacteristicAdapter():
# Check that the CharacteristicAdapter base class is transparent
v = bytes([1, 2, 3])
c = Characteristic(GATT_BATTERY_LEVEL_CHARACTERISTIC, Characteristic.READ, Characteristic.READABLE, v)
a = CharacteristicAdapter(c)
value = a.read_value(None)
assert(value == v)
v = bytes([3, 4, 5])
a.write_value(None, v)
assert(c.value == v)
# Simple delegated adapter
a = DelegatedCharacteristicAdapter(c, lambda x: bytes(reversed(x)), lambda x: bytes(reversed(x)))
value = a.read_value(None)
assert(value == bytes(reversed(v)))
v = bytes([3, 4, 5])
a.write_value(None, v)
assert(a.value == bytes(reversed(v)))
# Packed adapter with single element format
v = 1234
pv = struct.pack('>H', v)
c.value = v
a = PackedCharacteristicAdapter(c, '>H')
value = a.read_value(None)
assert(value == pv)
c.value = None
a.write_value(None, pv)
assert(a.value == v)
# Packed adapter with multi-element format
v1 = 1234
v2 = 5678
pv = struct.pack('>HH', v1, v2)
c.value = (v1, v2)
a = PackedCharacteristicAdapter(c, '>HH')
value = a.read_value(None)
assert(value == pv)
c.value = None
a.write_value(None, pv)
assert(a.value == (v1, v2))
# Mapped adapter
v1 = 1234
v2 = 5678
pv = struct.pack('>HH', v1, v2)
mapped = {'v1': v1, 'v2': v2}
c.value = mapped
a = MappedCharacteristicAdapter(c, '>HH', ('v1', 'v2'))
value = a.read_value(None)
assert(value == pv)
c.value = None
a.write_value(None, pv)
assert(a.value == mapped)
# UTF-8 adapter
v = 'Hello π'
ev = v.encode('utf-8')
c.value = v
a = UTF8CharacteristicAdapter(c)
value = a.read_value(None)
assert(value == ev)
c.value = None
a.write_value(None, ev)
assert(a.value == v)
# -----------------------------------------------------------------------------
def test_CharacteristicValue():
b = bytes([1, 2, 3])
c = CharacteristicValue(read=lambda _: b)
x = c.read(None)
assert(x == b)
result = []
c = CharacteristicValue(write=lambda connection, value: result.append((connection, value)))
z = object()
c.write(z, b)
assert(result == [(z, b)])
# -----------------------------------------------------------------------------
class TwoDevices:
def __init__(self):
@@ -199,6 +296,56 @@ async def test_read_write():
assert(characteristic2._last_value[1] == b)
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_read_write2():
[client, server] = TwoDevices().devices
v = bytes([0x11, 0x22, 0x33, 0x44])
characteristic1 = Characteristic(
'FDB159DB-036C-49E3-B3DB-6325AC750806',
Characteristic.READ | Characteristic.WRITE,
Characteristic.READABLE | Characteristic.WRITEABLE,
value=v
)
service1 = Service(
'3A657F47-D34F-46B3-B1EC-698E29B6B829',
[
characteristic1
]
)
server.add_services([service1])
await client.power_on()
await server.power_on()
connection = await client.connect(server.random_address)
peer = Peer(connection)
await peer.discover_services()
c = peer.get_services_by_uuid(service1.uuid)
assert(len(c) == 1)
s = c[0]
await s.discover_characteristics()
c = s.get_characteristics_by_uuid(characteristic1.uuid)
assert(len(c) == 1)
c1 = c[0]
v1 = await c1.read_value()
assert(v1 == v)
a1 = PackedCharacteristicAdapter(c1, '>I')
v1 = await a1.read_value()
assert(v1 == struct.unpack('>I', v)[0])
b = bytes([0x55, 0x66, 0x77, 0x88])
await a1.write_value(struct.unpack('>I', b)[0])
await async_barrier()
assert(characteristic1.value == b)
v1 = await a1.read_value()
assert(v1 == struct.unpack('>I', b)[0])
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_subscribe_notify():
@@ -330,6 +477,7 @@ async def test_subscribe_notify():
# -----------------------------------------------------------------------------
async def async_main():
await test_read_write()
await test_read_write2()
await test_subscribe_notify()
# -----------------------------------------------------------------------------
@@ -338,4 +486,6 @@ if __name__ == '__main__':
test_UUID()
test_ATT_Error_Response()
test_ATT_Read_By_Group_Type_Request()
test_CharacteristicValue()
test_CharacteristicAdapter()
asyncio.run(async_main())