Merge pull request #80 from AlanRosenthal/alan/gatt_server_getter

Added class CharacteristicDeclaration, gatt_server getters
This commit is contained in:
Gilles Boccon-Gibod
2022-11-28 19:21:08 -08:00
committed by GitHub
3 changed files with 110 additions and 10 deletions

View File

@@ -283,6 +283,23 @@ class Characteristic(Attribute):
return f'Characteristic(handle=0x{self.handle:04X}, end=0x{self.end_group_handle:04X}, uuid={self.uuid}, properties={Characteristic.properties_as_string(self.properties)})' return f'Characteristic(handle=0x{self.handle:04X}, end=0x{self.end_group_handle:04X}, uuid={self.uuid}, properties={Characteristic.properties_as_string(self.properties)})'
# -----------------------------------------------------------------------------
class CharacteristicDeclaration(Attribute):
'''
See Vol 3, Part G - 3.3.1 CHARACTERISTIC DECLARATION
'''
def __init__(self, characteristic, value_handle):
declaration_bytes = struct.pack(
'<BH',
characteristic.properties,
value_handle
) + characteristic.uuid.to_pdu_bytes()
super().__init__(GATT_CHARACTERISTIC_ATTRIBUTE_TYPE, Attribute.READABLE, declaration_bytes)
self.characteristic = characteristic
def __str__(self):
return f'CharacteristicDeclaration(handle=0x{self.handle:04X}, value_handle=0x{self.value_handle:04X}, uuid={self.uuid}, properties={Characteristic.properties_as_string(self.properties)})'
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class CharacteristicValue: class CharacteristicValue:
''' '''

View File

@@ -26,6 +26,7 @@
import asyncio import asyncio
import logging import logging
from collections import defaultdict from collections import defaultdict
from typing import Tuple, Optional
from pyee import EventEmitter from pyee import EventEmitter
from colors import color from colors import color
@@ -82,6 +83,63 @@ class Server(EventEmitter):
return attribute return attribute
return None return None
def get_service_attribute(self, service_uuid: UUID) -> Optional[Service]:
return next(
(
attribute
for attribute in self.attributes
if attribute.type == GATT_PRIMARY_SERVICE_ATTRIBUTE_TYPE
and attribute.uuid == service_uuid
),
None,
)
def get_characteristic_attributes(
self, service_uuid: UUID, characteristic_uuid: UUID
) -> Optional[Tuple[CharacteristicDeclaration, Characteristic]]:
service_handle = self.get_service_attribute(service_uuid)
if not service_handle:
return None
return next(
(
(attribute, self.get_attribute(attribute.characteristic.handle))
for attribute in map(
self.get_attribute,
range(service_handle.handle, service_handle.end_group_handle + 1),
)
if attribute.type == GATT_CHARACTERISTIC_ATTRIBUTE_TYPE
and attribute.characteristic.uuid == characteristic_uuid
),
None,
)
def get_descriptor_attribute(
self, service_uuid: UUID, characteristic_uuid: UUID, descriptor_uuid: UUID
) -> Optional[Descriptor]:
characteristics = self.get_characteristic_attributes(
service_uuid, characteristic_uuid
)
if not characteristics:
return None
(_, characteristic_value) = characteristics
return next(
(
attribute
for attribute in map(
self.get_attribute,
range(
characteristic_value.handle + 1,
characteristic_value.end_group_handle + 1,
),
)
if attribute.type == descriptor_uuid
),
None,
)
def add_attribute(self, attribute): def add_attribute(self, attribute):
# Assign a handle to this attribute # Assign a handle to this attribute
attribute.handle = self.next_handle() attribute.handle = self.next_handle()
@@ -98,16 +156,9 @@ class Server(EventEmitter):
# Add all characteristics # Add all characteristics
for characteristic in service.characteristics: for characteristic in service.characteristics:
# Add a Characteristic Declaration (Vol 3, Part G - 3.3.1 Characteristic Declaration) # Add a Characteristic Declaration
declaration_bytes = struct.pack( characteristic_declaration = CharacteristicDeclaration(
'<BH', characteristic, self.next_handle() + 1
characteristic.properties,
self.next_handle() + 1, # The value will be the next attribute after this declaration
) + characteristic.uuid.to_pdu_bytes()
characteristic_declaration = Attribute(
GATT_CHARACTERISTIC_ATTRIBUTE_TYPE,
Attribute.READABLE,
declaration_bytes
) )
self.add_attribute(characteristic_declaration) self.add_attribute(characteristic_declaration)

View File

@@ -28,6 +28,7 @@ from bumble.device import Device, Peer
from bumble.host import Host from bumble.host import Host
from bumble.gatt import ( from bumble.gatt import (
GATT_BATTERY_LEVEL_CHARACTERISTIC, GATT_BATTERY_LEVEL_CHARACTERISTIC,
GATT_CLIENT_CHARACTERISTIC_CONFIGURATION_DESCRIPTOR,
CharacteristicAdapter, CharacteristicAdapter,
DelegatedCharacteristicAdapter, DelegatedCharacteristicAdapter,
PackedCharacteristicAdapter, PackedCharacteristicAdapter,
@@ -226,6 +227,37 @@ async def test_characteristic_encoding():
assert last_change is None assert last_change is None
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_attribute_getters():
[client, server] = LinkedDevices().devices[:2]
characteristic_uuid = UUID('FDB159DB-036C-49E3-B3DB-6325AC750806')
characteristic = Characteristic(
characteristic_uuid,
Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY,
Characteristic.READABLE | Characteristic.WRITEABLE,
bytes([123])
)
service_uuid = UUID('3A657F47-D34F-46B3-B1EC-698E29B6B829')
service = Service(service_uuid, [characteristic])
server.add_service(service)
service_attr = server.gatt_server.get_service_attribute(service_uuid)
assert service_attr
(char_decl_attr, char_value_attr) = server.gatt_server.get_characteristic_attributes(service_uuid, characteristic_uuid)
assert char_decl_attr and char_value_attr
desc_attr = server.gatt_server.get_descriptor_attribute(service_uuid, characteristic_uuid, GATT_CLIENT_CHARACTERISTIC_CONFIGURATION_DESCRIPTOR)
assert desc_attr
# assert all handles are in expected order
assert service_attr.handle < char_decl_attr.handle < char_value_attr.handle < desc_attr.handle == service_attr.end_group_handle
# assert characteristic declarations attribute is followed by characteristic value attribute
assert char_decl_attr.handle + 1 == char_value_attr.handle
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_CharacteristicAdapter(): def test_CharacteristicAdapter():
# Check that the CharacteristicAdapter base class is transparent # Check that the CharacteristicAdapter base class is transparent