# Copyright 2021-2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ----------------------------------------------------------------------------- # Imports # ----------------------------------------------------------------------------- import asyncio import logging import os import struct import pytest from bumble.controller import Controller from bumble.gatt_client import CharacteristicProxy from bumble.link import LocalLink from bumble.device import Device, Peer from bumble.host import Host from bumble.gatt import ( GATT_BATTERY_LEVEL_CHARACTERISTIC, CharacteristicAdapter, DelegatedCharacteristicAdapter, PackedCharacteristicAdapter, MappedCharacteristicAdapter, UTF8CharacteristicAdapter, Service, Characteristic, CharacteristicValue ) from bumble.transport import AsyncPipeSink from bumble.core import UUID from bumble.att import ( ATT_EXCHANGE_MTU_REQUEST, ATT_ATTRIBUTE_NOT_FOUND_ERROR, ATT_PDU, ATT_Error_Response, ATT_Read_By_Group_Type_Request ) # ----------------------------------------------------------------------------- def basic_check(x): pdu = x.to_bytes() parsed = ATT_PDU.from_bytes(pdu) x_str = str(x) parsed_str = str(parsed) assert x_str == parsed_str # ----------------------------------------------------------------------------- def test_UUID(): u = UUID.from_16_bits(0x7788) assert str(u) == 'UUID-16:7788' u = UUID.from_32_bits(0x11223344) assert str(u) == 'UUID-32:11223344' u = UUID('61A3512C-09BE-4DDC-A6A6-0B03667AAFC6') assert str(u) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6' v = UUID(str(u)) assert str(v) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6' w = UUID.from_bytes(v.to_bytes()) assert str(w) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6' u1 = UUID.from_16_bits(0x1234) b1 = u1.to_bytes(force_128 = True) u2 = UUID.from_bytes(b1) assert u1 == u2 u3 = UUID.from_16_bits(0x180a) assert str(u3) == 'UUID-16:180A (Device Information)' # ----------------------------------------------------------------------------- def test_ATT_Error_Response(): pdu = ATT_Error_Response( request_opcode_in_error = ATT_EXCHANGE_MTU_REQUEST, attribute_handle_in_error = 0x0000, error_code = ATT_ATTRIBUTE_NOT_FOUND_ERROR ) basic_check(pdu) # ----------------------------------------------------------------------------- def test_ATT_Read_By_Group_Type_Request(): pdu = ATT_Read_By_Group_Type_Request( starting_handle = 0x0001, ending_handle = 0xFFFF, attribute_group_type = UUID.from_16_bits(0x2800) ) basic_check(pdu) # ----------------------------------------------------------------------------- @pytest.mark.asyncio async def test_characteristic_encoding(): class Foo(Characteristic): def encode_value(self, value): return bytes([value]) def decode_value(self, value_bytes): return value_bytes[0] c = Foo(GATT_BATTERY_LEVEL_CHARACTERISTIC, Characteristic.READ, Characteristic.READABLE, 123) x = c.read_value(None) assert x == bytes([123]) c.write_value(None, bytes([122])) assert c.value == 122 class FooProxy(CharacteristicProxy): def __init__(self, characteristic): super().__init__( characteristic.client, characteristic.handle, characteristic.end_group_handle, characteristic.uuid, characteristic.properties ) def encode_value(self, value): return bytes([value]) def decode_value(self, value_bytes): return value_bytes[0] [client, server] = LinkedDevices().devices[:2] characteristic = Characteristic( 'FDB159DB-036C-49E3-B3DB-6325AC750806', Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY, Characteristic.READABLE | Characteristic.WRITEABLE, bytes([123]) ) service = Service( '3A657F47-D34F-46B3-B1EC-698E29B6B829', [characteristic] ) server.add_service(service) await client.power_on() await server.power_on() connection = await client.connect(server.random_address) peer = Peer(connection) await peer.discover_services() await peer.discover_characteristics() c = peer.get_characteristics_by_uuid(characteristic.uuid) assert len(c) == 1 c = c[0] cp = FooProxy(c) v = await cp.read_value() assert v == 123 await cp.write_value(124) await async_barrier() assert characteristic.value == bytes([124]) v = await cp.read_value() assert v == 124 await cp.write_value(125, with_response=True) await async_barrier() assert characteristic.value == bytes([125]) cd = DelegatedCharacteristicAdapter(c, encode=lambda x: bytes([x // 2])) await cd.write_value(100, with_response=True) await async_barrier() assert characteristic.value == bytes([50]) last_change = None def on_change(value): nonlocal last_change last_change = value await c.subscribe(on_change) await server.notify_subscribers(characteristic) await async_barrier() assert last_change == characteristic.value last_change = None await server.notify_subscribers(characteristic, value=bytes([125])) await async_barrier() assert last_change == bytes([125]) last_change = None await c.unsubscribe(on_change) await server.notify_subscribers(characteristic) await async_barrier() assert last_change is None await cp.subscribe(on_change) await server.notify_subscribers(characteristic) await async_barrier() assert last_change == characteristic.value[0] last_change = None await server.notify_subscribers(characteristic, value=bytes([126])) await async_barrier() assert last_change == 126 last_change = None await cp.unsubscribe(on_change) await server.notify_subscribers(characteristic) await async_barrier() assert last_change is None cd = DelegatedCharacteristicAdapter(c, decode=lambda x: x[0]) await cd.subscribe(on_change) await server.notify_subscribers(characteristic) await async_barrier() assert last_change == characteristic.value[0] last_change = None await cd.unsubscribe(on_change) await server.notify_subscribers(characteristic) await async_barrier() assert last_change is None # ----------------------------------------------------------------------------- def test_CharacteristicAdapter(): # Check that the CharacteristicAdapter base class is transparent v = bytes([1, 2, 3]) c = Characteristic(GATT_BATTERY_LEVEL_CHARACTERISTIC, Characteristic.READ, Characteristic.READABLE, v) a = CharacteristicAdapter(c) value = a.read_value(None) assert value == v v = bytes([3, 4, 5]) a.write_value(None, v) assert c.value == v # Simple delegated adapter a = DelegatedCharacteristicAdapter(c, lambda x: bytes(reversed(x)), lambda x: bytes(reversed(x))) value = a.read_value(None) assert value == bytes(reversed(v)) v = bytes([3, 4, 5]) a.write_value(None, v) assert a.value == bytes(reversed(v)) # Packed adapter with single element format v = 1234 pv = struct.pack('>H', v) c.value = v a = PackedCharacteristicAdapter(c, '>H') value = a.read_value(None) assert value == pv c.value = None a.write_value(None, pv) assert a.value == v # Packed adapter with multi-element format v1 = 1234 v2 = 5678 pv = struct.pack('>HH', v1, v2) c.value = (v1, v2) a = PackedCharacteristicAdapter(c, '>HH') value = a.read_value(None) assert value == pv c.value = None a.write_value(None, pv) assert a.value == (v1, v2) # Mapped adapter v1 = 1234 v2 = 5678 pv = struct.pack('>HH', v1, v2) mapped = {'v1': v1, 'v2': v2} c.value = mapped a = MappedCharacteristicAdapter(c, '>HH', ('v1', 'v2')) value = a.read_value(None) assert value == pv c.value = None a.write_value(None, pv) assert a.value == mapped # UTF-8 adapter v = 'Hello π' ev = v.encode('utf-8') c.value = v a = UTF8CharacteristicAdapter(c) value = a.read_value(None) assert value == ev c.value = None a.write_value(None, ev) assert a.value == v # ----------------------------------------------------------------------------- def test_CharacteristicValue(): b = bytes([1, 2, 3]) c = CharacteristicValue(read=lambda _: b) x = c.read(None) assert x == b result = [] c = CharacteristicValue(write=lambda connection, value: result.append((connection, value))) z = object() c.write(z, b) assert result == [(z, b)] # ----------------------------------------------------------------------------- class LinkedDevices: def __init__(self): self.connections = [None, None, None] self.link = LocalLink() self.controllers = [ Controller('C1', link = self.link), Controller('C2', link = self.link), Controller('C3', link = self.link) ] self.devices = [ Device( address = 'F0:F1:F2:F3:F4:F5', host = Host(self.controllers[0], AsyncPipeSink(self.controllers[0])) ), Device( address = 'F1:F2:F3:F4:F5:F6', host = Host(self.controllers[1], AsyncPipeSink(self.controllers[1])) ), Device( address = 'F2:F3:F4:F5:F6:F7', host = Host(self.controllers[2], AsyncPipeSink(self.controllers[2])) ) ] self.paired = [None, None, None] # ----------------------------------------------------------------------------- async def async_barrier(): ready = asyncio.get_running_loop().create_future() asyncio.get_running_loop().call_soon(ready.set_result, None) await ready # ----------------------------------------------------------------------------- @pytest.mark.asyncio async def test_read_write(): [client, server] = LinkedDevices().devices[:2] characteristic1 = Characteristic( 'FDB159DB-036C-49E3-B3DB-6325AC750806', Characteristic.READ | Characteristic.WRITE, Characteristic.READABLE | Characteristic.WRITEABLE ) def on_characteristic1_write(connection, value): characteristic1._last_value = (connection, value) characteristic1.on('write', on_characteristic1_write) def on_characteristic2_read(connection): return bytes(str(connection.peer_address)) def on_characteristic2_write(connection, value): characteristic2._last_value = (connection, value) characteristic2 = Characteristic( '66DE9057-C848-4ACA-B993-D675644EBB85', Characteristic.READ | Characteristic.WRITE, Characteristic.READABLE | Characteristic.WRITEABLE, CharacteristicValue(read=on_characteristic2_read, write=on_characteristic2_write) ) service1 = Service( '3A657F47-D34F-46B3-B1EC-698E29B6B829', [ characteristic1, characteristic2 ] ) server.add_services([service1]) await client.power_on() await server.power_on() connection = await client.connect(server.random_address) peer = Peer(connection) await peer.discover_services() await peer.discover_characteristics() c = peer.get_characteristics_by_uuid(characteristic1.uuid) assert len(c) == 1 c1 = c[0] c = peer.get_characteristics_by_uuid(characteristic2.uuid) assert len(c) == 1 c2 = c[0] v1 = await peer.read_value(c1) assert v1 == b'' b = bytes([1, 2, 3]) await peer.write_value(c1, b) await async_barrier() assert characteristic1.value == b v1 = await peer.read_value(c1) assert v1 == b assert type(characteristic1._last_value is tuple) assert len(characteristic1._last_value) == 2 assert str(characteristic1._last_value[0].peer_address) == str(client.random_address) assert characteristic1._last_value[1] == b bb = bytes([3, 4, 5, 6]) characteristic1.value = bb v1 = await peer.read_value(c1) assert v1 == bb await peer.write_value(c2, b) await async_barrier() assert type(characteristic2._last_value is tuple) assert len(characteristic2._last_value) == 2 assert str(characteristic2._last_value[0].peer_address) == str(client.random_address) assert characteristic2._last_value[1] == b # ----------------------------------------------------------------------------- @pytest.mark.asyncio async def test_read_write2(): [client, server] = LinkedDevices().devices[:2] v = bytes([0x11, 0x22, 0x33, 0x44]) characteristic1 = Characteristic( 'FDB159DB-036C-49E3-B3DB-6325AC750806', Characteristic.READ | Characteristic.WRITE, Characteristic.READABLE | Characteristic.WRITEABLE, value=v ) service1 = Service( '3A657F47-D34F-46B3-B1EC-698E29B6B829', [ characteristic1 ] ) server.add_services([service1]) await client.power_on() await server.power_on() connection = await client.connect(server.random_address) peer = Peer(connection) await peer.discover_services() c = peer.get_services_by_uuid(service1.uuid) assert len(c) == 1 s = c[0] await s.discover_characteristics() c = s.get_characteristics_by_uuid(characteristic1.uuid) assert len(c) == 1 c1 = c[0] v1 = await c1.read_value() assert v1 == v a1 = PackedCharacteristicAdapter(c1, '>I') v1 = await a1.read_value() assert v1 == struct.unpack('>I', v)[0] b = bytes([0x55, 0x66, 0x77, 0x88]) await a1.write_value(struct.unpack('>I', b)[0]) await async_barrier() assert characteristic1.value == b v1 = await a1.read_value() assert v1 == struct.unpack('>I', b)[0] # ----------------------------------------------------------------------------- @pytest.mark.asyncio async def test_subscribe_notify(): [client, server] = LinkedDevices().devices[:2] characteristic1 = Characteristic( 'FDB159DB-036C-49E3-B3DB-6325AC750806', Characteristic.READ | Characteristic.NOTIFY, Characteristic.READABLE, bytes([1, 2, 3]) ) def on_characteristic1_subscription(connection, notify_enabled, indicate_enabled): characteristic1._last_subscription = (connection, notify_enabled, indicate_enabled) characteristic1.on('subscription', on_characteristic1_subscription) characteristic2 = Characteristic( '66DE9057-C848-4ACA-B993-D675644EBB85', Characteristic.READ | Characteristic.INDICATE, Characteristic.READABLE, bytes([4, 5, 6]) ) def on_characteristic2_subscription(connection, notify_enabled, indicate_enabled): characteristic2._last_subscription = (connection, notify_enabled, indicate_enabled) characteristic2.on('subscription', on_characteristic2_subscription) characteristic3 = Characteristic( 'AB5E639C-40C1-4238-B9CB-AF41F8B806E4', Characteristic.READ | Characteristic.NOTIFY | Characteristic.INDICATE, Characteristic.READABLE, bytes([7, 8, 9]) ) def on_characteristic3_subscription(connection, notify_enabled, indicate_enabled): characteristic3._last_subscription = (connection, notify_enabled, indicate_enabled) characteristic3.on('subscription', on_characteristic3_subscription) service1 = Service( '3A657F47-D34F-46B3-B1EC-698E29B6B829', [ characteristic1, characteristic2, characteristic3 ] ) server.add_services([service1]) def on_characteristic_subscription(connection, characteristic, notify_enabled, indicate_enabled): server._last_subscription = (connection, characteristic, notify_enabled, indicate_enabled) server.on('characteristic_subscription', on_characteristic_subscription) await client.power_on() await server.power_on() connection = await client.connect(server.random_address) peer = Peer(connection) await peer.discover_services() await peer.discover_characteristics() c = peer.get_characteristics_by_uuid(characteristic1.uuid) assert len(c) == 1 c1 = c[0] c = peer.get_characteristics_by_uuid(characteristic2.uuid) assert len(c) == 1 c2 = c[0] c = peer.get_characteristics_by_uuid(characteristic3.uuid) assert len(c) == 1 c3 = c[0] c1._called = False c1._last_update = None def on_c1_update(value): c1._called = True c1._last_update = value c1.on('update', on_c1_update) await peer.subscribe(c1) await async_barrier() assert server._last_subscription[1] == characteristic1 assert server._last_subscription[2] assert not server._last_subscription[3] assert characteristic1._last_subscription[1] assert not characteristic1._last_subscription[2] await server.indicate_subscribers(characteristic1) await async_barrier() assert not c1._called await server.notify_subscribers(characteristic1) await async_barrier() assert c1._called assert c1._last_update == characteristic1.value c1._called = False c1._last_update = None c1_value = characteristic1.value await server.notify_subscribers(characteristic1, bytes([0, 1, 2])) await async_barrier() assert c1._called assert c1._last_update == bytes([0, 1, 2]) assert characteristic1.value == c1_value c1._called = False await peer.unsubscribe(c1) await server.notify_subscribers(characteristic1) assert not c1._called c2._called = False c2._last_update = None def on_c2_update(value): c2._called = True c2._last_update = value await peer.subscribe(c2, on_c2_update) await async_barrier() await server.notify_subscriber(characteristic2._last_subscription[0], characteristic2) await async_barrier() assert not c2._called await server.indicate_subscriber(characteristic2._last_subscription[0], characteristic2) await async_barrier() assert c2._called assert c2._last_update == characteristic2.value c2._called = False await peer.unsubscribe(c2, on_c2_update) await server.indicate_subscriber(characteristic2._last_subscription[0], characteristic2) await async_barrier() assert not c2._called def on_c3_update(value): c3._called = True c3._last_update = value def on_c3_update_2(value): c3._called_2 = True c3._last_update_2 = value c3.on('update', on_c3_update) await peer.subscribe(c3, on_c3_update_2) await async_barrier() await server.notify_subscriber(characteristic3._last_subscription[0], characteristic3) await async_barrier() assert c3._called assert c3._last_update == characteristic3.value assert c3._called_2 assert c3._last_update_2 == characteristic3.value characteristic3.value = bytes([1, 2, 3]) await server.indicate_subscriber(characteristic3._last_subscription[0], characteristic3) await async_barrier() assert c3._called assert c3._last_update == characteristic3.value assert c3._called_2 assert c3._last_update_2 == characteristic3.value c3._called = False c3._called_2 = False await peer.unsubscribe(c3) await server.notify_subscriber(characteristic3._last_subscription[0], characteristic3) await server.indicate_subscriber(characteristic3._last_subscription[0], characteristic3) await async_barrier() assert not c3._called assert not c3._called_2 # ----------------------------------------------------------------------------- @pytest.mark.asyncio async def test_mtu_exchange(): [d1, d2, d3] = LinkedDevices().devices[:3] d3.gatt_server.max_mtu = 100 d3_connections = [] @d3.on('connection') def on_d3_connection(connection): d3_connections.append(connection) await d1.power_on() await d2.power_on() await d3.power_on() d1_connection = await d1.connect(d3.random_address) assert len(d3_connections) == 1 assert d3_connections[0] is not None d2_connection = await d2.connect(d3.random_address) assert len(d3_connections) == 2 assert d3_connections[1] is not None d1_peer = Peer(d1_connection) d2_peer = Peer(d2_connection) d1_client_mtu = await d1_peer.request_mtu(220) assert d1_client_mtu == 100 assert d1_connection.att_mtu == 100 d2_client_mtu = await d2_peer.request_mtu(50) assert d2_client_mtu == 50 assert d2_connection.att_mtu == 50 # ----------------------------------------------------------------------------- async def async_main(): await test_read_write() await test_read_write2() await test_subscribe_notify() await test_characteristic_encoding() await test_mtu_exchange() # ----------------------------------------------------------------------------- if __name__ == '__main__': logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper()) test_UUID() test_ATT_Error_Response() test_ATT_Read_By_Group_Type_Request() test_CharacteristicValue() test_CharacteristicAdapter() asyncio.run(async_main())