mirror of
https://github.com/google/bumble.git
synced 2026-05-06 03:38:01 +00:00
add support for type adapters and framework for adding standard GATT profiles
This commit is contained in:
238
bumble/gatt.py
238
bumble/gatt.py
@@ -22,6 +22,8 @@
|
||||
# -----------------------------------------------------------------------------
|
||||
# Imports
|
||||
# -----------------------------------------------------------------------------
|
||||
import asyncio
|
||||
import types
|
||||
import logging
|
||||
from colors import color
|
||||
|
||||
@@ -53,13 +55,13 @@ GATT_NEXT_DST_CHANGE_SERVICE = UUID.from_16_bits(0x1807, 'Next DS
|
||||
GATT_GLUCOSE_SERVICE = UUID.from_16_bits(0x1808, 'Glucose')
|
||||
GATT_HEALTH_THERMOMETER_SERVICE = UUID.from_16_bits(0x1809, 'Health Thermometer')
|
||||
GATT_DEVICE_INFORMATION_SERVICE = UUID.from_16_bits(0x180A, 'Device Information')
|
||||
GATT_DEVICE_HEART_RATE_SERVICE = UUID.from_16_bits(0x180D, 'Heart Rate')
|
||||
GATT_PHONE_ALTERT_STATUS_SERVICE = UUID.from_16_bits(0x180E, 'Phone Alert Status')
|
||||
GATT_DEVICE_BATTERY_SERVICE = UUID.from_16_bits(0x180F, 'Battery')
|
||||
GATT_HEART_RATE_SERVICE = UUID.from_16_bits(0x180D, 'Heart Rate')
|
||||
GATT_PHONE_ALERT_STATUS_SERVICE = UUID.from_16_bits(0x180E, 'Phone Alert Status')
|
||||
GATT_BATTERY_SERVICE = UUID.from_16_bits(0x180F, 'Battery')
|
||||
GATT_BLOOD_PRESSURE_SERVICE = UUID.from_16_bits(0x1810, 'Blood Pressure')
|
||||
GATT_ALTERT_NOTIFICATION_SERVICE = UUID.from_16_bits(0x1811, 'Alert Notification')
|
||||
GATT_DEVICE_HUMAN_INTERFACE_DEVICE_SERVICE = UUID.from_16_bits(0x1812, 'Human Interface Device')
|
||||
GATT_DEVICE_SCAN_PARAMETERS_SERVICE = UUID.from_16_bits(0x1813, 'Scan Parameters')
|
||||
GATT_ALERT_NOTIFICATION_SERVICE = UUID.from_16_bits(0x1811, 'Alert Notification')
|
||||
GATT_HUMAN_INTERFACE_DEVICE_SERVICE = UUID.from_16_bits(0x1812, 'Human Interface Device')
|
||||
GATT_SCAN_PARAMETERS_SERVICE = UUID.from_16_bits(0x1813, 'Scan Parameters')
|
||||
GATT_RUNNING_SPEED_AND_CADENCE_SERVICE = UUID.from_16_bits(0x1814, 'Running Speed and Cadence')
|
||||
GATT_AUTOMATION_IO_SERVICE = UUID.from_16_bits(0x1815, 'Automation IO')
|
||||
GATT_CYCLING_SPEED_AND_CADENCE_SERVICE = UUID.from_16_bits(0x1816, 'Cycling Speed and Cadence')
|
||||
@@ -119,7 +121,7 @@ GATT_ENVIRONMENTAL_SENSING_CONFIGURATION_DESCRIPTOR = UUID.from_16_bits(0x290B,
|
||||
GATT_ENVIRONMENTAL_SENSING_MEASUREMENT_DESCRIPTOR = UUID.from_16_bits(0x290C, 'Environmental Sensing Measurement')
|
||||
GATT_ENVIRONMENTAL_SENSING_TRIGGER_DESCRIPTOR = UUID.from_16_bits(0x290D, 'Environmental Sensing Trigger Setting')
|
||||
GATT_TIME_TRIGGER_DESCRIPTOR = UUID.from_16_bits(0x290E, 'Time Trigger Setting')
|
||||
GATT_COMPLETE_BE_EDR_TRANSPORT_BLOCK_DATA_DESCRIPTOR = UUID.from_16_bits(0x290F, 'Complete BR-EDR Transport Block Data')
|
||||
GATT_COMPLETE_BR_EDR_TRANSPORT_BLOCK_DATA_DESCRIPTOR = UUID.from_16_bits(0x290F, 'Complete BR-EDR Transport Block Data')
|
||||
|
||||
# Device Information Service
|
||||
GATT_SYSTEM_ID_CHARACTERISTIC = UUID.from_16_bits(0x2A23, 'System ID')
|
||||
@@ -140,19 +142,19 @@ GATT_REPORT_CHARACTERISTIC = UUID.from_16_bits(0x2A4D, 'Report')
|
||||
GATT_PROTOCOL_MODE_CHARACTERISTIC = UUID.from_16_bits(0x2A4E, 'Protocol Mode')
|
||||
|
||||
# Misc
|
||||
GATT_DEVICE_NAME_CHARACTERISTIC = UUID.from_16_bits(0x2A00, 'Device Name')
|
||||
GATT_APPEARANCE_CHARACTERISTIC = UUID.from_16_bits(0x2A01, 'Appearance')
|
||||
GATT_PERIPHERAL_PRIVACY_FLAG_CHARACTERISTIC = UUID.from_16_bits(0x2A02, 'Peripheral Privacy Flag')
|
||||
GATT_RECONNECTION_ADDRESS_CHARACTERISTIC = UUID.from_16_bits(0x2A03, 'Reconnection Address')
|
||||
GATT_PERIPHERAL_PREFERRREED_CONNECTION_PARAMETERS_CHARACTERISTIC = UUID.from_16_bits(0x2A04, 'Peripheral Preferred Connection Parameters')
|
||||
GATT_SERVICE_CHANGED_CHARACTERISTIC = UUID.from_16_bits(0x2A05, 'Service Changed')
|
||||
GATT_ALERT_LEVEL_CHARACTERISTIC = UUID.from_16_bits(0x2A06, 'Alert Level')
|
||||
GATT_TX_POWER_LEVEL_CHARACTERISTIC = UUID.from_16_bits(0x2A07, 'Tx Power Level')
|
||||
GATT_BATTERY_LEVEL_CHARACTERISTIC = UUID.from_16_bits(0x2A19, 'Battery Level')
|
||||
GATT_BOOT_KEYBOARD_INPUT_REPORT_CHARACTERISTIC = UUID.from_16_bits(0x2A22, 'Boot Keyboard Input Report')
|
||||
GATT_CURRENT_TIME_CHARACTERISTIC = UUID.from_16_bits(0x2A2B, 'Current Time')
|
||||
GATT_BOOT_KEYBOARD_OUTPUT_REPORT_CHARACTERISTIC = UUID.from_16_bits(0x2A32, 'Boot Keyboard Output Report')
|
||||
GATT_CENTRAL_ADDRESS_RESOLUTION__CHARACTERISTIC = UUID.from_16_bits(0x2AA6, 'Central Address Resolution')
|
||||
GATT_DEVICE_NAME_CHARACTERISTIC = UUID.from_16_bits(0x2A00, 'Device Name')
|
||||
GATT_APPEARANCE_CHARACTERISTIC = UUID.from_16_bits(0x2A01, 'Appearance')
|
||||
GATT_PERIPHERAL_PRIVACY_FLAG_CHARACTERISTIC = UUID.from_16_bits(0x2A02, 'Peripheral Privacy Flag')
|
||||
GATT_RECONNECTION_ADDRESS_CHARACTERISTIC = UUID.from_16_bits(0x2A03, 'Reconnection Address')
|
||||
GATT_PERIPHERAL_PREFERRED_CONNECTION_PARAMETERS_CHARACTERISTIC = UUID.from_16_bits(0x2A04, 'Peripheral Preferred Connection Parameters')
|
||||
GATT_SERVICE_CHANGED_CHARACTERISTIC = UUID.from_16_bits(0x2A05, 'Service Changed')
|
||||
GATT_ALERT_LEVEL_CHARACTERISTIC = UUID.from_16_bits(0x2A06, 'Alert Level')
|
||||
GATT_TX_POWER_LEVEL_CHARACTERISTIC = UUID.from_16_bits(0x2A07, 'Tx Power Level')
|
||||
GATT_BATTERY_LEVEL_CHARACTERISTIC = UUID.from_16_bits(0x2A19, 'Battery Level')
|
||||
GATT_BOOT_KEYBOARD_INPUT_REPORT_CHARACTERISTIC = UUID.from_16_bits(0x2A22, 'Boot Keyboard Input Report')
|
||||
GATT_CURRENT_TIME_CHARACTERISTIC = UUID.from_16_bits(0x2A2B, 'Current Time')
|
||||
GATT_BOOT_KEYBOARD_OUTPUT_REPORT_CHARACTERISTIC = UUID.from_16_bits(0x2A32, 'Boot Keyboard Output Report')
|
||||
GATT_CENTRAL_ADDRESS_RESOLUTION__CHARACTERISTIC = UUID.from_16_bits(0x2AA6, 'Central Address Resolution')
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@@ -189,7 +191,6 @@ class Service(Attribute):
|
||||
self.uuid = uuid
|
||||
self.included_services = []
|
||||
self.characteristics = characteristics[:]
|
||||
self.end_group_handle = 0
|
||||
self.primary = primary
|
||||
|
||||
def __str__(self):
|
||||
@@ -227,56 +228,34 @@ class Characteristic(Attribute):
|
||||
def property_name(property):
|
||||
return Characteristic.PROPERTY_NAMES.get(property, '')
|
||||
|
||||
@staticmethod
|
||||
def properties_as_string(properties):
|
||||
return ','.join([
|
||||
Characteristic.property_name(p) for p in Characteristic.PROPERTY_NAMES.keys()
|
||||
if properties & p
|
||||
])
|
||||
|
||||
def __init__(self, uuid, properties, permissions, value = b'', descriptors = []):
|
||||
# Convert the uuid to a UUID object if it isn't already
|
||||
if type(uuid) is str:
|
||||
uuid = UUID(uuid)
|
||||
|
||||
super().__init__(uuid, permissions, value)
|
||||
self.uuid = uuid
|
||||
self.properties = properties
|
||||
self._descriptors = descriptors
|
||||
self._descriptors_discovered = False
|
||||
self.end_group_handle = 0
|
||||
self.attach_descriptors()
|
||||
|
||||
def attach_descriptors(self):
|
||||
""" Let all the descriptors know they are attached to this characteristic """
|
||||
for descriptor in self._descriptors:
|
||||
descriptor.characteristic = self
|
||||
|
||||
def add_descriptor(self, descriptor):
|
||||
descriptor.characteristic = self
|
||||
self.descriptors.append(descriptor)
|
||||
self.uuid = self.type
|
||||
self.properties = properties
|
||||
self.descriptors = descriptors
|
||||
|
||||
def get_descriptor(self, descriptor_type):
|
||||
for descriptor in self.descriptors:
|
||||
if descriptor.uuid == descriptor_type:
|
||||
return descriptor
|
||||
|
||||
@property
|
||||
def descriptors(self):
|
||||
return self._descriptors
|
||||
|
||||
@descriptors.setter
|
||||
def descriptors(self, value):
|
||||
self._descriptors = value
|
||||
self._descriptors_discovered = True
|
||||
self.attach_descriptors()
|
||||
|
||||
@property
|
||||
def descriptors_discovered(self):
|
||||
return self._descriptors_discovered
|
||||
|
||||
def get_properties_as_string(self):
|
||||
return ','.join([self.property_name(p) for p in self.PROPERTY_NAMES.keys() if self.properties & p])
|
||||
|
||||
def __str__(self):
|
||||
return f'Characteristic(handle=0x{self.handle:04X}, end=0x{self.end_group_handle:04X}, uuid={self.uuid}, properties={self.get_properties_as_string()})'
|
||||
return f'Characteristic(handle=0x{self.handle:04X}, end=0x{self.end_group_handle:04X}, uuid={self.uuid}, properties={Characteristic.properties_as_string(self.properties)})'
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
class CharacteristicValue:
|
||||
'''
|
||||
Characteristic value where reading and/or writing is delegated to functions
|
||||
passed as arguments to the constructor.
|
||||
'''
|
||||
def __init__(self, read=None, write=None):
|
||||
self._read = read
|
||||
self._write = write
|
||||
@@ -289,20 +268,145 @@ class CharacteristicValue:
|
||||
self._write(connection, value)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
class CharacteristicAdapter:
|
||||
'''
|
||||
An adapter that can adapt any object with `read_value` and `write_value`
|
||||
methods (like Characteristic and CharacteristicProxy objects) by wrapping
|
||||
those methods with ones that return/accept encoded/decoded values.
|
||||
Objects with async methods are considered proxies, so the adaptation is one
|
||||
where the return value of `read_value` is decoded and the value passed to
|
||||
`write_value` is encoded. Other objects are considered local characteristics
|
||||
so the adaptation is one where the return value of `read_value` is encoded
|
||||
and the value passed to `write_value` is decoded.
|
||||
If the characteristic has a `subscribe` method, it is wrapped with one where
|
||||
the values are decoded before being passed to the subscriber.
|
||||
'''
|
||||
def __init__(self, characteristic):
|
||||
self.wrapped_characteristic = characteristic
|
||||
|
||||
if (
|
||||
asyncio.iscoroutinefunction(characteristic.read_value) and
|
||||
asyncio.iscoroutinefunction(characteristic.write_value)
|
||||
):
|
||||
self.read_value = self.read_decoded_value
|
||||
self.write_value = self.write_decoded_value
|
||||
else:
|
||||
self.read_value = self.read_encoded_value
|
||||
self.write_value = self.write_encoded_value
|
||||
|
||||
if hasattr(self.wrapped_characteristic, 'subscribe'):
|
||||
self.subscribe = self.wrapped_subscribe
|
||||
|
||||
def __getattr__(self, name):
|
||||
return getattr(self.wrapped_characteristic, name)
|
||||
|
||||
def read_encoded_value(self, connection):
|
||||
return self.encode_value(self.wrapped_characteristic.read_value(connection))
|
||||
|
||||
def write_encoded_value(self, connection, value):
|
||||
return self.wrapped_characteristic.write_value(connection, self.decode_value(value))
|
||||
|
||||
async def read_decoded_value(self):
|
||||
return self.decode_value(await self.wrapped_characteristic.read_value())
|
||||
|
||||
async def write_decoded_value(self, value):
|
||||
return await self.wrapped_characteristic.write_value(self.encode_value(value))
|
||||
|
||||
def encode_value(self, value):
|
||||
return value
|
||||
|
||||
def decode_value(self, value):
|
||||
return value
|
||||
|
||||
def wrapped_subscribe(self, subscriber=None):
|
||||
return self.wrapped_characteristic.subscribe(
|
||||
None if subscriber is None else lambda value: subscriber(self.decode_value(value))
|
||||
)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
class DelegatedCharacteristicAdapter(CharacteristicAdapter):
|
||||
def __init__(self, characteristic, encode, decode):
|
||||
super().__init__(characteristic)
|
||||
self.encode = encode
|
||||
self.decode = decode
|
||||
|
||||
def encode_value(self, value):
|
||||
return self.encode(value)
|
||||
|
||||
def decode_value(self, value):
|
||||
return self.decode(value)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
class PackedCharacteristicAdapter(CharacteristicAdapter):
|
||||
'''
|
||||
Adapter that packs/unpacks characteristic values according to a standard
|
||||
Python `struct` format.
|
||||
For formats with a single value, the adapted `read_value` and `write_value`
|
||||
methods return/accept single values. For formats with multiple values,
|
||||
they return/accept a tuple with the same number of elements as is required for
|
||||
the format.
|
||||
'''
|
||||
def __init__(self, characteristic, format):
|
||||
super().__init__(characteristic)
|
||||
self.struct = struct.Struct(format)
|
||||
|
||||
def pack(self, *values):
|
||||
return self.struct.pack(*values)
|
||||
|
||||
def unpack(self, buffer):
|
||||
return self.struct.unpack(buffer)
|
||||
|
||||
def encode_value(self, value):
|
||||
return self.pack(*value if type(value) is tuple else (value,))
|
||||
|
||||
def decode_value(self, value):
|
||||
unpacked = self.unpack(value)
|
||||
return unpacked[0] if len(unpacked) == 1 else unpacked
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
class MappedCharacteristicAdapter(PackedCharacteristicAdapter):
|
||||
'''
|
||||
Adapter that packs/unpacks characteristic values according to a standard
|
||||
Python `struct` format.
|
||||
The adapted `read_value` and `write_value` methods return/accept aa dictionary which
|
||||
is packed/unpacked according to format, with the arguments extracted from the dictionary
|
||||
by key, in the same order as they occur in the `keys` parameter.
|
||||
'''
|
||||
def __init__(self, characteristic, format, keys):
|
||||
super().__init__(characteristic, format)
|
||||
self.keys = keys
|
||||
|
||||
def pack(self, values):
|
||||
return super().pack(*(values[key] for key in self.keys))
|
||||
|
||||
def unpack(self, buffer):
|
||||
return dict(zip(self.keys, super().unpack(buffer)))
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
class UTF8CharacteristicAdapter(CharacteristicAdapter):
|
||||
'''
|
||||
Adapter that converts strings to/from bytes using UTF-8 encoding
|
||||
'''
|
||||
def encode_value(self, value):
|
||||
return value.encode('utf-8')
|
||||
|
||||
def decode_value(self, value):
|
||||
return value.decode('utf-8')
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
class Descriptor(Attribute):
|
||||
'''
|
||||
See Vol 3, Part G - 3.3.3 Characteristic Descriptor Declarations
|
||||
'''
|
||||
|
||||
def __init__(self, uuid, permissions, value = b''):
|
||||
# Convert the uuid to a UUID object if it isn't already
|
||||
if type(uuid) is str:
|
||||
uuid = UUID(uuid)
|
||||
|
||||
super().__init__(uuid, permissions, value)
|
||||
self.uuid = uuid
|
||||
self.characteristic = None
|
||||
def __init__(self, descriptor_type, permissions, value = b''):
|
||||
super().__init__(descriptor_type, permissions, value)
|
||||
|
||||
def __str__(self):
|
||||
return f'Descriptor(handle=0x{self.handle:04X}, uuid={self.uuid}, value={self.read_value(None).hex()})'
|
||||
return f'Descriptor(handle=0x{self.handle:04X}, type={self.type}, value={self.read_value(None).hex()})'
|
||||
|
||||
Reference in New Issue
Block a user