mirror of
https://github.com/google/bumble.git
synced 2026-04-18 00:45:32 +00:00
Added class CharacteristicDeclaration, gatt_server getters
* Converted CharacteristicDeclaration implementation to class * Added ability to get a gatt_server attribute by service UUID, characteristics UUID, descriptor UUID
This commit is contained in:
committed by
Alan Rosenthal
parent
b437bd8619
commit
b89f9030a0
@@ -26,6 +26,7 @@
|
||||
import asyncio
|
||||
import logging
|
||||
from collections import defaultdict
|
||||
from typing import Tuple, Optional
|
||||
from pyee import EventEmitter
|
||||
from colors import color
|
||||
|
||||
@@ -79,6 +80,63 @@ class Server(EventEmitter):
|
||||
return attribute
|
||||
return None
|
||||
|
||||
def get_service_attribute(self, service_uuid: UUID) -> Optional[Service]:
|
||||
return next(
|
||||
(
|
||||
attribute
|
||||
for attribute in self.attributes
|
||||
if attribute.type == GATT_PRIMARY_SERVICE_ATTRIBUTE_TYPE
|
||||
and attribute.uuid == service_uuid
|
||||
),
|
||||
None,
|
||||
)
|
||||
|
||||
def get_characteristic_attributes(
|
||||
self, service_uuid: UUID, characteristic_uuid: UUID
|
||||
) -> Optional[Tuple[CharacteristicDeclaration, Characteristic]]:
|
||||
service_handle = self.get_service_attribute(service_uuid)
|
||||
if not service_handle:
|
||||
return None
|
||||
|
||||
return next(
|
||||
(
|
||||
(attribute, self.get_attribute(attribute.characteristic.handle))
|
||||
for attribute in map(
|
||||
self.get_attribute,
|
||||
range(service_handle.handle, service_handle.end_group_handle + 1),
|
||||
)
|
||||
if attribute.type == GATT_CHARACTERISTIC_ATTRIBUTE_TYPE
|
||||
and attribute.characteristic.uuid == characteristic_uuid
|
||||
),
|
||||
None,
|
||||
)
|
||||
|
||||
def get_descriptor_attribute(
|
||||
self, service_uuid: UUID, characteristic_uuid: UUID, descriptor_uuid: UUID
|
||||
) -> Optional[Descriptor]:
|
||||
characteristics = self.get_characteristic_attributes(
|
||||
service_uuid, characteristic_uuid
|
||||
)
|
||||
if not characteristics:
|
||||
return None
|
||||
|
||||
(_, characteristic_value) = characteristics
|
||||
|
||||
return next(
|
||||
(
|
||||
attribute
|
||||
for attribute in map(
|
||||
self.get_attribute,
|
||||
range(
|
||||
characteristic_value.handle + 1,
|
||||
characteristic_value.end_group_handle + 1,
|
||||
),
|
||||
)
|
||||
if attribute.type == descriptor_uuid
|
||||
),
|
||||
None,
|
||||
)
|
||||
|
||||
def add_attribute(self, attribute):
|
||||
# Assign a handle to this attribute
|
||||
attribute.handle = self.next_handle()
|
||||
@@ -95,16 +153,9 @@ class Server(EventEmitter):
|
||||
|
||||
# Add all characteristics
|
||||
for characteristic in service.characteristics:
|
||||
# Add a Characteristic Declaration (Vol 3, Part G - 3.3.1 Characteristic Declaration)
|
||||
declaration_bytes = struct.pack(
|
||||
'<BH',
|
||||
characteristic.properties,
|
||||
self.next_handle() + 1, # The value will be the next attribute after this declaration
|
||||
) + characteristic.uuid.to_pdu_bytes()
|
||||
characteristic_declaration = Attribute(
|
||||
GATT_CHARACTERISTIC_ATTRIBUTE_TYPE,
|
||||
Attribute.READABLE,
|
||||
declaration_bytes
|
||||
# Add a Characteristic Declaration
|
||||
characteristic_declaration = CharacteristicDeclaration(
|
||||
characteristic, self.next_handle() + 1
|
||||
)
|
||||
self.add_attribute(characteristic_declaration)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user