mirror of
https://github.com/google/bumble.git
synced 2026-05-06 03:38:01 +00:00
new adapter classes
This commit is contained in:
390
bumble/gatt_adapters.py
Normal file
390
bumble/gatt_adapters.py
Normal file
@@ -0,0 +1,390 @@
|
||||
# Copyright 2025 Google LLC
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# https://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# GATT - Type Adapters
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Imports
|
||||
# -----------------------------------------------------------------------------
|
||||
from __future__ import annotations
|
||||
import struct
|
||||
from typing import (
|
||||
Any,
|
||||
Callable,
|
||||
Generic,
|
||||
Iterable,
|
||||
Literal,
|
||||
Optional,
|
||||
Type,
|
||||
TypeVar,
|
||||
)
|
||||
|
||||
from bumble.core import InvalidOperationError
|
||||
from bumble.gatt import Characteristic
|
||||
from bumble.gatt_client import CharacteristicProxy
|
||||
from bumble.utils import ByteSerializable, IntConvertible
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Typing
|
||||
# -----------------------------------------------------------------------------
|
||||
_T = TypeVar('_T')
|
||||
_T2 = TypeVar('_T2', bound=ByteSerializable)
|
||||
_T3 = TypeVar('_T3', bound=IntConvertible)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
class CharacteristicAdapter(Characteristic, Generic[_T]):
|
||||
'''
|
||||
An adapter that can adapt a Characteristic to automatically encode/decode values
|
||||
read/written from the characteristic.
|
||||
'''
|
||||
|
||||
def __init__(self, characteristic: Characteristic) -> None:
|
||||
super().__init__(
|
||||
characteristic.uuid,
|
||||
characteristic.properties,
|
||||
characteristic.permissions,
|
||||
characteristic.value,
|
||||
characteristic.descriptors,
|
||||
)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
class CharacteristicProxyAdapter(CharacteristicProxy[_T]):
|
||||
'''
|
||||
An adapter that can adapt Characteristic and AttributeProxy objects
|
||||
by wrapping their `read_value()` and `write_value()` methods with ones that
|
||||
return/accept encoded/decoded values.
|
||||
|
||||
For proxies (i.e used by a GATT client), the adaptation is one where the return
|
||||
value of `read_value()` is decoded and the value passed to `write_value()` is
|
||||
encoded. The `subscribe()` method, is wrapped with one where the values are decoded
|
||||
before being passed to the subscriber.
|
||||
|
||||
For local values (i.e hosted by a GATT server) the adaptation is one where the
|
||||
return value of `read_value()` is encoded and the value passed to `write_value()`
|
||||
is decoded.
|
||||
'''
|
||||
|
||||
def __init__(self, characteristic_proxy: CharacteristicProxy):
|
||||
super().__init__(
|
||||
characteristic_proxy.client,
|
||||
characteristic_proxy.handle,
|
||||
characteristic_proxy.end_group_handle,
|
||||
characteristic_proxy.uuid,
|
||||
characteristic_proxy.properties,
|
||||
)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
class DelegatedCharacteristicAdapter(CharacteristicAdapter[_T]):
|
||||
'''
|
||||
Adapter that converts bytes values using an encode and/or a decode function.
|
||||
'''
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
characteristic: Characteristic,
|
||||
encode: Optional[Callable[[_T], bytes]] = None,
|
||||
decode: Optional[Callable[[bytes], _T]] = None,
|
||||
):
|
||||
super().__init__(characteristic)
|
||||
self.encode = encode
|
||||
self.decode = decode
|
||||
|
||||
def encode_value(self, value: _T) -> bytes:
|
||||
if self.encode is None:
|
||||
raise InvalidOperationError('delegated adapter does not have an encoder')
|
||||
return self.encode(value)
|
||||
|
||||
def decode_value(self, value: bytes) -> _T:
|
||||
if self.decode is None:
|
||||
raise InvalidOperationError('delegate adapter does not have a decoder')
|
||||
return self.decode(value)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
class DelegatedCharacteristicProxyAdapter(CharacteristicProxyAdapter[_T]):
|
||||
'''
|
||||
Adapter that converts bytes values using an encode and a decode function.
|
||||
'''
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
characteristic_proxy: CharacteristicProxy,
|
||||
encode: Optional[Callable[[_T], bytes]] = None,
|
||||
decode: Optional[Callable[[bytes], _T]] = None,
|
||||
):
|
||||
super().__init__(characteristic_proxy)
|
||||
self.encode = encode
|
||||
self.decode = decode
|
||||
|
||||
def encode_value(self, value: _T) -> bytes:
|
||||
if self.encode is None:
|
||||
raise InvalidOperationError('delegated adapter does not have an encoder')
|
||||
return self.encode(value)
|
||||
|
||||
def decode_value(self, value: bytes) -> _T:
|
||||
if self.decode is None:
|
||||
raise InvalidOperationError('delegate adapter does not have a decoder')
|
||||
return self.decode(value)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
class PackedCharacteristicAdapter(CharacteristicAdapter):
|
||||
'''
|
||||
Adapter that packs/unpacks characteristic values according to a standard
|
||||
Python `struct` format.
|
||||
For formats with a single value, the adapted `read_value` and `write_value`
|
||||
methods return/accept single values. For formats with multiple values,
|
||||
they return/accept a tuple with the same number of elements as is required for
|
||||
the format.
|
||||
'''
|
||||
|
||||
def __init__(self, characteristic: Characteristic, pack_format: str) -> None:
|
||||
super().__init__(characteristic)
|
||||
self.struct = struct.Struct(pack_format)
|
||||
|
||||
def pack(self, *values) -> bytes:
|
||||
return self.struct.pack(*values)
|
||||
|
||||
def unpack(self, buffer: bytes) -> tuple:
|
||||
return self.struct.unpack(buffer)
|
||||
|
||||
def encode_value(self, value: Any) -> bytes:
|
||||
return self.pack(*value if isinstance(value, tuple) else (value,))
|
||||
|
||||
def decode_value(self, value: bytes) -> Any:
|
||||
unpacked = self.unpack(value)
|
||||
return unpacked[0] if len(unpacked) == 1 else unpacked
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
class PackedCharacteristicProxyAdapter(CharacteristicProxyAdapter):
|
||||
'''
|
||||
Adapter that packs/unpacks characteristic values according to a standard
|
||||
Python `struct` format.
|
||||
For formats with a single value, the adapted `read_value` and `write_value`
|
||||
methods return/accept single values. For formats with multiple values,
|
||||
they return/accept a tuple with the same number of elements as is required for
|
||||
the format.
|
||||
'''
|
||||
|
||||
def __init__(self, characteristic_proxy, pack_format):
|
||||
super().__init__(characteristic_proxy)
|
||||
self.struct = struct.Struct(pack_format)
|
||||
|
||||
def pack(self, *values) -> bytes:
|
||||
return self.struct.pack(*values)
|
||||
|
||||
def unpack(self, buffer: bytes) -> tuple:
|
||||
return self.struct.unpack(buffer)
|
||||
|
||||
def encode_value(self, value: Any) -> bytes:
|
||||
return self.pack(*value if isinstance(value, tuple) else (value,))
|
||||
|
||||
def decode_value(self, value: bytes) -> Any:
|
||||
unpacked = self.unpack(value)
|
||||
return unpacked[0] if len(unpacked) == 1 else unpacked
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
class MappedCharacteristicAdapter(PackedCharacteristicAdapter):
|
||||
'''
|
||||
Adapter that packs/unpacks characteristic values according to a standard
|
||||
Python `struct` format.
|
||||
The adapted `read_value` and `write_value` methods return/accept a dictionary which
|
||||
is packed/unpacked according to format, with the arguments extracted from the
|
||||
dictionary by key, in the same order as they occur in the `keys` parameter.
|
||||
'''
|
||||
|
||||
def __init__(
|
||||
self, characteristic: Characteristic, pack_format: str, keys: Iterable[str]
|
||||
) -> None:
|
||||
super().__init__(characteristic, pack_format)
|
||||
self.keys = keys
|
||||
|
||||
# pylint: disable=arguments-differ
|
||||
def pack(self, values) -> bytes:
|
||||
return super().pack(*(values[key] for key in self.keys))
|
||||
|
||||
def unpack(self, buffer: bytes) -> Any:
|
||||
return dict(zip(self.keys, super().unpack(buffer)))
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
class MappedCharacteristicProxyAdapter(PackedCharacteristicProxyAdapter):
|
||||
'''
|
||||
Adapter that packs/unpacks characteristic values according to a standard
|
||||
Python `struct` format.
|
||||
The adapted `read_value` and `write_value` methods return/accept a dictionary which
|
||||
is packed/unpacked according to format, with the arguments extracted from the
|
||||
dictionary by key, in the same order as they occur in the `keys` parameter.
|
||||
'''
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
characteristic_proxy: CharacteristicProxy,
|
||||
pack_format: str,
|
||||
keys: Iterable[str],
|
||||
) -> None:
|
||||
super().__init__(characteristic_proxy, pack_format)
|
||||
self.keys = keys
|
||||
|
||||
# pylint: disable=arguments-differ
|
||||
def pack(self, values) -> bytes:
|
||||
return super().pack(*(values[key] for key in self.keys))
|
||||
|
||||
def unpack(self, buffer: bytes) -> Any:
|
||||
return dict(zip(self.keys, super().unpack(buffer)))
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
class UTF8CharacteristicAdapter(CharacteristicAdapter[str]):
|
||||
'''
|
||||
Adapter that converts strings to/from bytes using UTF-8 encoding
|
||||
'''
|
||||
|
||||
def encode_value(self, value: str) -> bytes:
|
||||
return value.encode('utf-8')
|
||||
|
||||
def decode_value(self, value: bytes) -> str:
|
||||
return value.decode('utf-8')
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
class UTF8CharacteristicProxyAdapter(CharacteristicProxyAdapter[str]):
|
||||
'''
|
||||
Adapter that converts strings to/from bytes using UTF-8 encoding
|
||||
'''
|
||||
|
||||
def encode_value(self, value: str) -> bytes:
|
||||
return value.encode('utf-8')
|
||||
|
||||
def decode_value(self, value: bytes) -> str:
|
||||
return value.decode('utf-8')
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
class SerializableCharacteristicAdapter(CharacteristicAdapter[_T2]):
|
||||
'''
|
||||
Adapter that converts any class to/from bytes using the class'
|
||||
`to_bytes` and `__bytes__` methods, respectively.
|
||||
'''
|
||||
|
||||
def __init__(self, characteristic: Characteristic, cls: Type[_T2]) -> None:
|
||||
super().__init__(characteristic)
|
||||
self.cls = cls
|
||||
|
||||
def encode_value(self, value: _T2) -> bytes:
|
||||
return bytes(value)
|
||||
|
||||
def decode_value(self, value: bytes) -> _T2:
|
||||
return self.cls.from_bytes(value)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
class SerializableCharacteristicProxyAdapter(CharacteristicProxyAdapter[_T2]):
|
||||
'''
|
||||
Adapter that converts any class to/from bytes using the class'
|
||||
`to_bytes` and `__bytes__` methods, respectively.
|
||||
'''
|
||||
|
||||
def __init__(
|
||||
self, characteristic_proxy: CharacteristicProxy, cls: Type[_T2]
|
||||
) -> None:
|
||||
super().__init__(characteristic_proxy)
|
||||
self.cls = cls
|
||||
|
||||
def encode_value(self, value: _T2) -> bytes:
|
||||
return bytes(value)
|
||||
|
||||
def decode_value(self, value: bytes) -> _T2:
|
||||
return self.cls.from_bytes(value)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
class EnumCharacteristicAdapter(CharacteristicAdapter[_T3]):
|
||||
'''
|
||||
Adapter that converts int-enum-like classes to/from bytes using the class'
|
||||
`int().to_bytes()` and `from_bytes()` methods, respectively.
|
||||
'''
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
characteristic: Characteristic,
|
||||
cls: Type[_T3],
|
||||
length: int,
|
||||
byteorder: Literal['little', 'big'] = 'little',
|
||||
):
|
||||
"""
|
||||
Initialize an instance.
|
||||
|
||||
Params:
|
||||
characteristic: the Characteristic to adapt to/from
|
||||
cls: the class to/from which to convert integer values
|
||||
length: number of bytes used to represent integer values
|
||||
byteorder: byte order of the byte representation of integers.
|
||||
"""
|
||||
super().__init__(characteristic)
|
||||
self.cls = cls
|
||||
self.length = length
|
||||
self.byteorder = byteorder
|
||||
|
||||
def encode_value(self, value: _T3) -> bytes:
|
||||
return int(value).to_bytes(self.length, self.byteorder)
|
||||
|
||||
def decode_value(self, value: bytes) -> _T3:
|
||||
int_value = int.from_bytes(value, self.byteorder)
|
||||
return self.cls(int_value)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
class EnumCharacteristicProxyAdapter(CharacteristicProxyAdapter[_T3]):
|
||||
'''
|
||||
Adapter that converts int-enum-like classes to/from bytes using the class'
|
||||
`int().to_bytes()` and `from_bytes()` methods, respectively.
|
||||
'''
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
characteristic_proxy: CharacteristicProxy,
|
||||
cls: Type[_T3],
|
||||
length: int,
|
||||
byteorder: Literal['little', 'big'] = 'little',
|
||||
):
|
||||
"""
|
||||
Initialize an instance.
|
||||
|
||||
Params:
|
||||
characteristic_proxy: the CharacteristicProxy to adapt to/from
|
||||
cls: the class to/from which to convert integer values
|
||||
length: number of bytes used to represent integer values
|
||||
byteorder: byte order of the byte representation of integers.
|
||||
"""
|
||||
super().__init__(characteristic_proxy)
|
||||
self.cls = cls
|
||||
self.length = length
|
||||
self.byteorder = byteorder
|
||||
|
||||
def encode_value(self, value: _T3) -> bytes:
|
||||
return int(value).to_bytes(self.length, self.byteorder)
|
||||
|
||||
def decode_value(self, value: bytes) -> _T3:
|
||||
int_value = int.from_bytes(value, self.byteorder)
|
||||
a = self.cls(int_value)
|
||||
return self.cls(int_value)
|
||||
Reference in New Issue
Block a user