diff --git a/apps/auracast.py b/apps/auracast.py index 77e58ed4..e20d0afd 100644 --- a/apps/auracast.py +++ b/apps/auracast.py @@ -522,14 +522,19 @@ async def run_assist( return # Subscribe to and read the broadcast receive state characteristics + def on_broadcast_receive_state_update( + value: bass.BroadcastReceiveState, index: int + ) -> None: + print( + f"{color(f'Broadcast Receive State Update [{index}]:', 'green')} {value}" + ) + for i, broadcast_receive_state in enumerate( bass_client.broadcast_receive_states ): try: await broadcast_receive_state.subscribe( - lambda value, i=i: print( - f"{color(f'Broadcast Receive State Update [{i}]:', 'green')} {value}" - ) + functools.partial(on_broadcast_receive_state_update, index=i) ) except core.ProtocolError as error: print( diff --git a/apps/gg_bridge.py b/apps/gg_bridge.py index 12d16e42..c1ee2fb7 100644 --- a/apps/gg_bridge.py +++ b/apps/gg_bridge.py @@ -234,7 +234,7 @@ class GattlinkNodeBridge(GattlinkL2capEndpoint, Device.Listener): Characteristic.WRITEABLE, CharacteristicValue(write=self.on_rx_write), ) - self.tx_characteristic = Characteristic( + self.tx_characteristic: Characteristic[bytes] = Characteristic( GG_GATTLINK_TX_CHARACTERISTIC_UUID, Characteristic.Properties.NOTIFY, Characteristic.READABLE, diff --git a/bumble/att.py b/bumble/att.py index 8884d04b..140d0193 100644 --- a/bumble/att.py +++ b/bumble/att.py @@ -29,13 +29,14 @@ import functools import inspect import struct from typing import ( - Any, Awaitable, Callable, + Generic, Dict, List, Optional, Type, + TypeVar, Union, TYPE_CHECKING, ) @@ -43,13 +44,18 @@ from typing import ( from pyee import EventEmitter from bumble import utils -from bumble.core import UUID, name_or_number, ProtocolError +from bumble.core import UUID, name_or_number, InvalidOperationError, ProtocolError from bumble.hci import HCI_Object, key_with_value from bumble.colors import color +# ----------------------------------------------------------------------------- +# Typing +# ----------------------------------------------------------------------------- if TYPE_CHECKING: from bumble.device import Connection +_T = TypeVar('_T') + # ----------------------------------------------------------------------------- # Constants # ----------------------------------------------------------------------------- @@ -748,7 +754,7 @@ class ATT_Handle_Value_Confirmation(ATT_PDU): # ----------------------------------------------------------------------------- -class AttributeValue: +class AttributeValue(Generic[_T]): ''' Attribute value where reading and/or writing is delegated to functions passed as arguments to the constructor. @@ -757,33 +763,34 @@ class AttributeValue: def __init__( self, read: Union[ - Callable[[Optional[Connection]], Any], - Callable[[Optional[Connection]], Awaitable[Any]], + Callable[[Optional[Connection]], _T], + Callable[[Optional[Connection]], Awaitable[_T]], None, ] = None, write: Union[ - Callable[[Optional[Connection], Any], None], - Callable[[Optional[Connection], Any], Awaitable[None]], + Callable[[Optional[Connection], _T], None], + Callable[[Optional[Connection], _T], Awaitable[None]], None, ] = None, ): self._read = read self._write = write - def read(self, connection: Optional[Connection]) -> Union[bytes, Awaitable[bytes]]: - return self._read(connection) if self._read else b'' + def read(self, connection: Optional[Connection]) -> Union[_T, Awaitable[_T]]: + if self._read is None: + raise InvalidOperationError('AttributeValue has no read function') + return self._read(connection) def write( - self, connection: Optional[Connection], value: bytes + self, connection: Optional[Connection], value: _T ) -> Union[Awaitable[None], None]: - if self._write: - return self._write(connection, value) - - return None + if self._write is None: + raise InvalidOperationError('AttributeValue has no write function') + return self._write(connection, value) # ----------------------------------------------------------------------------- -class Attribute(EventEmitter): +class Attribute(EventEmitter, Generic[_T]): class Permissions(enum.IntFlag): READABLE = 0x01 WRITEABLE = 0x02 @@ -822,13 +829,13 @@ class Attribute(EventEmitter): READ_REQUIRES_AUTHORIZATION = Permissions.READ_REQUIRES_AUTHORIZATION WRITE_REQUIRES_AUTHORIZATION = Permissions.WRITE_REQUIRES_AUTHORIZATION - value: Any + value: Union[AttributeValue[_T], _T, None] def __init__( self, attribute_type: Union[str, bytes, UUID], permissions: Union[str, Attribute.Permissions], - value: Any = b'', + value: Union[AttributeValue[_T], _T, None] = None, ) -> None: EventEmitter.__init__(self) self.handle = 0 @@ -848,11 +855,11 @@ class Attribute(EventEmitter): self.value = value - def encode_value(self, value: Any) -> bytes: - return value + def encode_value(self, value: _T) -> bytes: + return value # type: ignore - def decode_value(self, value_bytes: bytes) -> Any: - return value_bytes + def decode_value(self, value: bytes) -> _T: + return value # type: ignore async def read_value(self, connection: Optional[Connection]) -> bytes: if ( @@ -877,11 +884,16 @@ class Attribute(EventEmitter): error_code=ATT_INSUFFICIENT_AUTHORIZATION_ERROR, att_handle=self.handle ) - if hasattr(self.value, 'read'): + value: Union[_T, None] + if self.value is None: + value = None + elif hasattr(self.value, 'read'): try: - value = self.value.read(connection) - if inspect.isawaitable(value): - value = await value + read_value = self.value.read(connection) + if inspect.isawaitable(read_value): + value = await read_value + else: + value = read_value except ATT_Error as error: raise ATT_Error( error_code=error.error_code, att_handle=self.handle @@ -889,20 +901,24 @@ class Attribute(EventEmitter): else: value = self.value - self.emit('read', connection, value) + self.emit('read', connection, b'' if value is None else value) - return self.encode_value(value) + return b'' if value is None else self.encode_value(value) - async def write_value(self, connection: Connection, value_bytes: bytes) -> None: + async def write_value(self, connection: Optional[Connection], value: bytes) -> None: if ( - self.permissions & self.WRITE_REQUIRES_ENCRYPTION - ) and not connection.encryption: + (self.permissions & self.WRITE_REQUIRES_ENCRYPTION) + and connection is not None + and not connection.encryption + ): raise ATT_Error( error_code=ATT_INSUFFICIENT_ENCRYPTION_ERROR, att_handle=self.handle ) if ( - self.permissions & self.WRITE_REQUIRES_AUTHENTICATION - ) and not connection.authenticated: + (self.permissions & self.WRITE_REQUIRES_AUTHENTICATION) + and connection is not None + and not connection.authenticated + ): raise ATT_Error( error_code=ATT_INSUFFICIENT_AUTHENTICATION_ERROR, att_handle=self.handle ) @@ -912,11 +928,11 @@ class Attribute(EventEmitter): error_code=ATT_INSUFFICIENT_AUTHORIZATION_ERROR, att_handle=self.handle ) - value = self.decode_value(value_bytes) + decoded_value = self.decode_value(value) - if hasattr(self.value, 'write'): + if self.value is not None and hasattr(self.value, 'write'): try: - result = self.value.write(connection, value) + result = self.value.write(connection, decoded_value) if inspect.isawaitable(result): await result except ATT_Error as error: @@ -924,9 +940,9 @@ class Attribute(EventEmitter): error_code=error.error_code, att_handle=self.handle ) from error else: - self.value = value + self.value = decoded_value - self.emit('write', connection, value) + self.emit('write', connection, decoded_value) def __repr__(self): if isinstance(self.value, bytes): diff --git a/bumble/device.py b/bumble/device.py index 30c8e054..28cfef05 100644 --- a/bumble/device.py +++ b/bumble/device.py @@ -53,7 +53,7 @@ from pyee import EventEmitter from .colors import color from .att import ATT_CID, ATT_DEFAULT_MTU, ATT_PDU -from .gatt import Characteristic, Descriptor, Service +from .gatt import Attribute, Characteristic, Descriptor, Service from .host import DataPacketQueue, Host from .profiles.gap import GenericAccessService from .core import ( @@ -2221,7 +2221,7 @@ class Device(CompositeEventEmitter): permissions=descriptor["permissions"], ) descriptors.append(new_descriptor) - new_characteristic = Characteristic( + new_characteristic: Characteristic[bytes] = Characteristic( uuid=characteristic["uuid"], properties=Characteristic.Properties.from_string( characteristic["properties"] @@ -4920,16 +4920,84 @@ class Device(CompositeEventEmitter): self.gatt_service = gatt_service.GenericAttributeProfileService() self.gatt_server.add_service(self.gatt_service) - async def notify_subscriber(self, connection, attribute, value=None, force=False): + async def notify_subscriber( + self, + connection: Connection, + attribute: Attribute, + value: Optional[Any] = None, + force: bool = False, + ) -> None: + """ + Send a notification to an attribute subscriber. + + Args: + connection: + The connection of the subscriber. + attribute: + The attribute whose value is notified. + value: + The value of the attribute (if None, the value is read from the attribute) + force: + If True, send a notification even if there is no subscriber. + """ await self.gatt_server.notify_subscriber(connection, attribute, value, force) - async def notify_subscribers(self, attribute, value=None, force=False): + async def notify_subscribers( + self, attribute: Attribute, value=None, force=False + ) -> None: + """ + Send a notification to all the subscribers of an attribute. + + Args: + attribute: + The attribute whose value is notified. + value: + The value of the attribute (if None, the value is read from the attribute) + force: + If True, send a notification for every connection even if there is no + subscriber. + """ await self.gatt_server.notify_subscribers(attribute, value, force) - async def indicate_subscriber(self, connection, attribute, value=None, force=False): + async def indicate_subscriber( + self, + connection: Connection, + attribute: Attribute, + value: Optional[Any] = None, + force: bool = False, + ): + """ + Send an indication to an attribute subscriber. + + This method returns when the response to the indication has been received. + + Args: + connection: + The connection of the subscriber. + attribute: + The attribute whose value is indicated. + value: + The value of the attribute (if None, the value is read from the attribute) + force: + If True, send an indication even if there is no subscriber. + """ await self.gatt_server.indicate_subscriber(connection, attribute, value, force) - async def indicate_subscribers(self, attribute, value=None, force=False): + async def indicate_subscribers( + self, attribute: Attribute, value: Optional[Any] = None, force: bool = False + ): + """ + Send an indication to all the subscribers of an attribute. + + Args: + attribute: + The attribute whose value is notified. + value: + The value of the attribute (if None, the value is read from the attribute) + force: + If True, send an indication for every connection even if there is no + subscriber. + """ await self.gatt_server.indicate_subscribers(attribute, value, force) @host_event_handler diff --git a/bumble/gatt.py b/bumble/gatt.py index fb7ede35..106d7c8d 100644 --- a/bumble/gatt.py +++ b/bumble/gatt.py @@ -27,28 +27,16 @@ import enum import functools import logging import struct -from typing import ( - Any, - Callable, - Dict, - Iterable, - List, - Optional, - Sequence, - SupportsBytes, - Type, - Union, - TYPE_CHECKING, -) +from typing import Iterable, List, Optional, Sequence, TypeVar, Union from bumble.colors import color -from bumble.core import BaseBumbleError, InvalidOperationError, UUID +from bumble.core import BaseBumbleError, UUID from bumble.att import Attribute, AttributeValue -from bumble.utils import ByteSerializable - -if TYPE_CHECKING: - from bumble.gatt_client import AttributeProxy +# ----------------------------------------------------------------------------- +# Typing +# ----------------------------------------------------------------------------- +_T = TypeVar('_T') # ----------------------------------------------------------------------------- # Logging @@ -436,7 +424,7 @@ class IncludedServiceDeclaration(Attribute): # ----------------------------------------------------------------------------- -class Characteristic(Attribute): +class Characteristic(Attribute[_T]): ''' See Vol 3, Part G - 3.3 CHARACTERISTIC DEFINITION ''' @@ -499,7 +487,7 @@ class Characteristic(Attribute): uuid: Union[str, bytes, UUID], properties: Characteristic.Properties, permissions: Union[str, Attribute.Permissions], - value: Any = b'', + value: Union[AttributeValue[_T], _T, None] = None, descriptors: Sequence[Descriptor] = (), ): super().__init__(uuid, permissions, value) @@ -559,217 +547,10 @@ class CharacteristicDeclaration(Attribute): # ----------------------------------------------------------------------------- -class CharacteristicValue(AttributeValue): +class CharacteristicValue(AttributeValue[_T]): """Same as AttributeValue, for backward compatibility""" -# ----------------------------------------------------------------------------- -class CharacteristicAdapter: - ''' - An adapter that can adapt Characteristic and AttributeProxy objects - by wrapping their `read_value()` and `write_value()` methods with ones that - return/accept encoded/decoded values. - - For proxies (i.e used by a GATT client), the adaptation is one where the return - value of `read_value()` is decoded and the value passed to `write_value()` is - encoded. The `subscribe()` method, is wrapped with one where the values are decoded - before being passed to the subscriber. - - For local values (i.e hosted by a GATT server) the adaptation is one where the - return value of `read_value()` is encoded and the value passed to `write_value()` - is decoded. - ''' - - read_value: Callable - write_value: Callable - - def __init__(self, characteristic: Union[Characteristic, AttributeProxy]): - self.wrapped_characteristic = characteristic - self.subscribers: Dict[Callable, Callable] = ( - {} - ) # Map from subscriber to proxy subscriber - - if isinstance(characteristic, Characteristic): - self.read_value = self.read_encoded_value - self.write_value = self.write_encoded_value - else: - self.read_value = self.read_decoded_value - self.write_value = self.write_decoded_value - self.subscribe = self.wrapped_subscribe - self.unsubscribe = self.wrapped_unsubscribe - - def __getattr__(self, name): - return getattr(self.wrapped_characteristic, name) - - def __setattr__(self, name, value): - if name in ( - 'wrapped_characteristic', - 'subscribers', - 'read_value', - 'write_value', - 'subscribe', - 'unsubscribe', - ): - super().__setattr__(name, value) - else: - setattr(self.wrapped_characteristic, name, value) - - async def read_encoded_value(self, connection): - return self.encode_value( - await self.wrapped_characteristic.read_value(connection) - ) - - async def write_encoded_value(self, connection, value): - return await self.wrapped_characteristic.write_value( - connection, self.decode_value(value) - ) - - async def read_decoded_value(self): - return self.decode_value(await self.wrapped_characteristic.read_value()) - - async def write_decoded_value(self, value, with_response=False): - return await self.wrapped_characteristic.write_value( - self.encode_value(value), with_response - ) - - def encode_value(self, value): - return value - - def decode_value(self, value): - return value - - def wrapped_subscribe(self, subscriber=None): - if subscriber is not None: - if subscriber in self.subscribers: - # We already have a proxy subscriber - subscriber = self.subscribers[subscriber] - else: - # Create and register a proxy that will decode the value - original_subscriber = subscriber - - def on_change(value): - original_subscriber(self.decode_value(value)) - - self.subscribers[subscriber] = on_change - subscriber = on_change - - return self.wrapped_characteristic.subscribe(subscriber) - - def wrapped_unsubscribe(self, subscriber=None): - if subscriber in self.subscribers: - subscriber = self.subscribers.pop(subscriber) - - return self.wrapped_characteristic.unsubscribe(subscriber) - - def __str__(self) -> str: - wrapped = str(self.wrapped_characteristic) - return f'{self.__class__.__name__}({wrapped})' - - -# ----------------------------------------------------------------------------- -class DelegatedCharacteristicAdapter(CharacteristicAdapter): - ''' - Adapter that converts bytes values using an encode and a decode function. - ''' - - def __init__(self, characteristic, encode=None, decode=None): - super().__init__(characteristic) - self.encode = encode - self.decode = decode - - def encode_value(self, value): - if self.encode is None: - raise InvalidOperationError('delegated adapter does not have an encoder') - return self.encode(value) - - def decode_value(self, value): - if self.decode is None: - raise InvalidOperationError('delegate adapter does not have a decoder') - return self.decode(value) - - -# ----------------------------------------------------------------------------- -class PackedCharacteristicAdapter(CharacteristicAdapter): - ''' - Adapter that packs/unpacks characteristic values according to a standard - Python `struct` format. - For formats with a single value, the adapted `read_value` and `write_value` - methods return/accept single values. For formats with multiple values, - they return/accept a tuple with the same number of elements as is required for - the format. - ''' - - def __init__(self, characteristic, pack_format): - super().__init__(characteristic) - self.struct = struct.Struct(pack_format) - - def pack(self, *values): - return self.struct.pack(*values) - - def unpack(self, buffer): - return self.struct.unpack(buffer) - - def encode_value(self, value): - return self.pack(*value if isinstance(value, tuple) else (value,)) - - def decode_value(self, value): - unpacked = self.unpack(value) - return unpacked[0] if len(unpacked) == 1 else unpacked - - -# ----------------------------------------------------------------------------- -class MappedCharacteristicAdapter(PackedCharacteristicAdapter): - ''' - Adapter that packs/unpacks characteristic values according to a standard - Python `struct` format. - The adapted `read_value` and `write_value` methods return/accept a dictionary which - is packed/unpacked according to format, with the arguments extracted from the - dictionary by key, in the same order as they occur in the `keys` parameter. - ''' - - def __init__(self, characteristic, pack_format, keys): - super().__init__(characteristic, pack_format) - self.keys = keys - - # pylint: disable=arguments-differ - def pack(self, values): - return super().pack(*(values[key] for key in self.keys)) - - def unpack(self, buffer): - return dict(zip(self.keys, super().unpack(buffer))) - - -# ----------------------------------------------------------------------------- -class UTF8CharacteristicAdapter(CharacteristicAdapter): - ''' - Adapter that converts strings to/from bytes using UTF-8 encoding - ''' - - def encode_value(self, value: str) -> bytes: - return value.encode('utf-8') - - def decode_value(self, value: bytes) -> str: - return value.decode('utf-8') - - -# ----------------------------------------------------------------------------- -class SerializableCharacteristicAdapter(CharacteristicAdapter): - ''' - Adapter that converts any class to/from bytes using the class' - `to_bytes` and `__bytes__` methods, respectively. - ''' - - def __init__(self, characteristic, cls: Type[ByteSerializable]): - super().__init__(characteristic) - self.cls = cls - - def encode_value(self, value: SupportsBytes) -> bytes: - return bytes(value) - - def decode_value(self, value: bytes) -> Any: - return self.cls.from_bytes(value) - - # ----------------------------------------------------------------------------- class Descriptor(Attribute): ''' diff --git a/bumble/gatt_adapters.py b/bumble/gatt_adapters.py new file mode 100644 index 00000000..5ea0789d --- /dev/null +++ b/bumble/gatt_adapters.py @@ -0,0 +1,390 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# ----------------------------------------------------------------------------- +# GATT - Type Adapters +# ----------------------------------------------------------------------------- + +# ----------------------------------------------------------------------------- +# Imports +# ----------------------------------------------------------------------------- +from __future__ import annotations +import struct +from typing import ( + Any, + Callable, + Generic, + Iterable, + Literal, + Optional, + Type, + TypeVar, +) + +from bumble.core import InvalidOperationError +from bumble.gatt import Characteristic +from bumble.gatt_client import CharacteristicProxy +from bumble.utils import ByteSerializable, IntConvertible + + +# ----------------------------------------------------------------------------- +# Typing +# ----------------------------------------------------------------------------- +_T = TypeVar('_T') +_T2 = TypeVar('_T2', bound=ByteSerializable) +_T3 = TypeVar('_T3', bound=IntConvertible) + + +# ----------------------------------------------------------------------------- +class CharacteristicAdapter(Characteristic, Generic[_T]): + ''' + An adapter that can adapt a Characteristic to automatically encode/decode values + read/written from the characteristic. + ''' + + def __init__(self, characteristic: Characteristic) -> None: + super().__init__( + characteristic.uuid, + characteristic.properties, + characteristic.permissions, + characteristic.value, + characteristic.descriptors, + ) + + +# ----------------------------------------------------------------------------- +class CharacteristicProxyAdapter(CharacteristicProxy[_T]): + ''' + An adapter that can adapt Characteristic and AttributeProxy objects + by wrapping their `read_value()` and `write_value()` methods with ones that + return/accept encoded/decoded values. + + For proxies (i.e used by a GATT client), the adaptation is one where the return + value of `read_value()` is decoded and the value passed to `write_value()` is + encoded. The `subscribe()` method, is wrapped with one where the values are decoded + before being passed to the subscriber. + + For local values (i.e hosted by a GATT server) the adaptation is one where the + return value of `read_value()` is encoded and the value passed to `write_value()` + is decoded. + ''' + + def __init__(self, characteristic_proxy: CharacteristicProxy): + super().__init__( + characteristic_proxy.client, + characteristic_proxy.handle, + characteristic_proxy.end_group_handle, + characteristic_proxy.uuid, + characteristic_proxy.properties, + ) + + +# ----------------------------------------------------------------------------- +class DelegatedCharacteristicAdapter(CharacteristicAdapter[_T]): + ''' + Adapter that converts bytes values using an encode and/or a decode function. + ''' + + def __init__( + self, + characteristic: Characteristic, + encode: Optional[Callable[[_T], bytes]] = None, + decode: Optional[Callable[[bytes], _T]] = None, + ): + super().__init__(characteristic) + self.encode = encode + self.decode = decode + + def encode_value(self, value: _T) -> bytes: + if self.encode is None: + raise InvalidOperationError('delegated adapter does not have an encoder') + return self.encode(value) + + def decode_value(self, value: bytes) -> _T: + if self.decode is None: + raise InvalidOperationError('delegate adapter does not have a decoder') + return self.decode(value) + + +# ----------------------------------------------------------------------------- +class DelegatedCharacteristicProxyAdapter(CharacteristicProxyAdapter[_T]): + ''' + Adapter that converts bytes values using an encode and a decode function. + ''' + + def __init__( + self, + characteristic_proxy: CharacteristicProxy, + encode: Optional[Callable[[_T], bytes]] = None, + decode: Optional[Callable[[bytes], _T]] = None, + ): + super().__init__(characteristic_proxy) + self.encode = encode + self.decode = decode + + def encode_value(self, value: _T) -> bytes: + if self.encode is None: + raise InvalidOperationError('delegated adapter does not have an encoder') + return self.encode(value) + + def decode_value(self, value: bytes) -> _T: + if self.decode is None: + raise InvalidOperationError('delegate adapter does not have a decoder') + return self.decode(value) + + +# ----------------------------------------------------------------------------- +class PackedCharacteristicAdapter(CharacteristicAdapter): + ''' + Adapter that packs/unpacks characteristic values according to a standard + Python `struct` format. + For formats with a single value, the adapted `read_value` and `write_value` + methods return/accept single values. For formats with multiple values, + they return/accept a tuple with the same number of elements as is required for + the format. + ''' + + def __init__(self, characteristic: Characteristic, pack_format: str) -> None: + super().__init__(characteristic) + self.struct = struct.Struct(pack_format) + + def pack(self, *values) -> bytes: + return self.struct.pack(*values) + + def unpack(self, buffer: bytes) -> tuple: + return self.struct.unpack(buffer) + + def encode_value(self, value: Any) -> bytes: + return self.pack(*value if isinstance(value, tuple) else (value,)) + + def decode_value(self, value: bytes) -> Any: + unpacked = self.unpack(value) + return unpacked[0] if len(unpacked) == 1 else unpacked + + +# ----------------------------------------------------------------------------- +class PackedCharacteristicProxyAdapter(CharacteristicProxyAdapter): + ''' + Adapter that packs/unpacks characteristic values according to a standard + Python `struct` format. + For formats with a single value, the adapted `read_value` and `write_value` + methods return/accept single values. For formats with multiple values, + they return/accept a tuple with the same number of elements as is required for + the format. + ''' + + def __init__(self, characteristic_proxy, pack_format): + super().__init__(characteristic_proxy) + self.struct = struct.Struct(pack_format) + + def pack(self, *values) -> bytes: + return self.struct.pack(*values) + + def unpack(self, buffer: bytes) -> tuple: + return self.struct.unpack(buffer) + + def encode_value(self, value: Any) -> bytes: + return self.pack(*value if isinstance(value, tuple) else (value,)) + + def decode_value(self, value: bytes) -> Any: + unpacked = self.unpack(value) + return unpacked[0] if len(unpacked) == 1 else unpacked + + +# ----------------------------------------------------------------------------- +class MappedCharacteristicAdapter(PackedCharacteristicAdapter): + ''' + Adapter that packs/unpacks characteristic values according to a standard + Python `struct` format. + The adapted `read_value` and `write_value` methods return/accept a dictionary which + is packed/unpacked according to format, with the arguments extracted from the + dictionary by key, in the same order as they occur in the `keys` parameter. + ''' + + def __init__( + self, characteristic: Characteristic, pack_format: str, keys: Iterable[str] + ) -> None: + super().__init__(characteristic, pack_format) + self.keys = keys + + # pylint: disable=arguments-differ + def pack(self, values) -> bytes: + return super().pack(*(values[key] for key in self.keys)) + + def unpack(self, buffer: bytes) -> Any: + return dict(zip(self.keys, super().unpack(buffer))) + + +# ----------------------------------------------------------------------------- +class MappedCharacteristicProxyAdapter(PackedCharacteristicProxyAdapter): + ''' + Adapter that packs/unpacks characteristic values according to a standard + Python `struct` format. + The adapted `read_value` and `write_value` methods return/accept a dictionary which + is packed/unpacked according to format, with the arguments extracted from the + dictionary by key, in the same order as they occur in the `keys` parameter. + ''' + + def __init__( + self, + characteristic_proxy: CharacteristicProxy, + pack_format: str, + keys: Iterable[str], + ) -> None: + super().__init__(characteristic_proxy, pack_format) + self.keys = keys + + # pylint: disable=arguments-differ + def pack(self, values) -> bytes: + return super().pack(*(values[key] for key in self.keys)) + + def unpack(self, buffer: bytes) -> Any: + return dict(zip(self.keys, super().unpack(buffer))) + + +# ----------------------------------------------------------------------------- +class UTF8CharacteristicAdapter(CharacteristicAdapter[str]): + ''' + Adapter that converts strings to/from bytes using UTF-8 encoding + ''' + + def encode_value(self, value: str) -> bytes: + return value.encode('utf-8') + + def decode_value(self, value: bytes) -> str: + return value.decode('utf-8') + + +# ----------------------------------------------------------------------------- +class UTF8CharacteristicProxyAdapter(CharacteristicProxyAdapter[str]): + ''' + Adapter that converts strings to/from bytes using UTF-8 encoding + ''' + + def encode_value(self, value: str) -> bytes: + return value.encode('utf-8') + + def decode_value(self, value: bytes) -> str: + return value.decode('utf-8') + + +# ----------------------------------------------------------------------------- +class SerializableCharacteristicAdapter(CharacteristicAdapter[_T2]): + ''' + Adapter that converts any class to/from bytes using the class' + `to_bytes` and `__bytes__` methods, respectively. + ''' + + def __init__(self, characteristic: Characteristic, cls: Type[_T2]) -> None: + super().__init__(characteristic) + self.cls = cls + + def encode_value(self, value: _T2) -> bytes: + return bytes(value) + + def decode_value(self, value: bytes) -> _T2: + return self.cls.from_bytes(value) + + +# ----------------------------------------------------------------------------- +class SerializableCharacteristicProxyAdapter(CharacteristicProxyAdapter[_T2]): + ''' + Adapter that converts any class to/from bytes using the class' + `to_bytes` and `__bytes__` methods, respectively. + ''' + + def __init__( + self, characteristic_proxy: CharacteristicProxy, cls: Type[_T2] + ) -> None: + super().__init__(characteristic_proxy) + self.cls = cls + + def encode_value(self, value: _T2) -> bytes: + return bytes(value) + + def decode_value(self, value: bytes) -> _T2: + return self.cls.from_bytes(value) + + +# ----------------------------------------------------------------------------- +class EnumCharacteristicAdapter(CharacteristicAdapter[_T3]): + ''' + Adapter that converts int-enum-like classes to/from bytes using the class' + `int().to_bytes()` and `from_bytes()` methods, respectively. + ''' + + def __init__( + self, + characteristic: Characteristic, + cls: Type[_T3], + length: int, + byteorder: Literal['little', 'big'] = 'little', + ): + """ + Initialize an instance. + + Params: + characteristic: the Characteristic to adapt to/from + cls: the class to/from which to convert integer values + length: number of bytes used to represent integer values + byteorder: byte order of the byte representation of integers. + """ + super().__init__(characteristic) + self.cls = cls + self.length = length + self.byteorder = byteorder + + def encode_value(self, value: _T3) -> bytes: + return int(value).to_bytes(self.length, self.byteorder) + + def decode_value(self, value: bytes) -> _T3: + int_value = int.from_bytes(value, self.byteorder) + return self.cls(int_value) + + +# ----------------------------------------------------------------------------- +class EnumCharacteristicProxyAdapter(CharacteristicProxyAdapter[_T3]): + ''' + Adapter that converts int-enum-like classes to/from bytes using the class' + `int().to_bytes()` and `from_bytes()` methods, respectively. + ''' + + def __init__( + self, + characteristic_proxy: CharacteristicProxy, + cls: Type[_T3], + length: int, + byteorder: Literal['little', 'big'] = 'little', + ): + """ + Initialize an instance. + + Params: + characteristic_proxy: the CharacteristicProxy to adapt to/from + cls: the class to/from which to convert integer values + length: number of bytes used to represent integer values + byteorder: byte order of the byte representation of integers. + """ + super().__init__(characteristic_proxy) + self.cls = cls + self.length = length + self.byteorder = byteorder + + def encode_value(self, value: _T3) -> bytes: + return int(value).to_bytes(self.length, self.byteorder) + + def decode_value(self, value: bytes) -> _T3: + int_value = int.from_bytes(value, self.byteorder) + a = self.cls(int_value) + return self.cls(int_value) diff --git a/bumble/gatt_client.py b/bumble/gatt_client.py index 73b8a631..483ecc81 100644 --- a/bumble/gatt_client.py +++ b/bumble/gatt_client.py @@ -29,16 +29,18 @@ import logging import struct from datetime import datetime from typing import ( + Any, + Callable, + Dict, + Generic, + Iterable, List, Optional, - Dict, - Tuple, - Callable, - Union, - Any, - Iterable, - Type, Set, + Tuple, + Union, + Type, + TypeVar, TYPE_CHECKING, ) @@ -82,9 +84,14 @@ from .gatt import ( TemplateService, ) +# ----------------------------------------------------------------------------- +# Typing +# ----------------------------------------------------------------------------- if TYPE_CHECKING: from bumble.device import Connection +_T = TypeVar('_T') + # ----------------------------------------------------------------------------- # Logging # ----------------------------------------------------------------------------- @@ -110,7 +117,7 @@ def show_services(services: Iterable[ServiceProxy]) -> None: # ----------------------------------------------------------------------------- # Proxies # ----------------------------------------------------------------------------- -class AttributeProxy(EventEmitter): +class AttributeProxy(EventEmitter, Generic[_T]): def __init__( self, client: Client, handle: int, end_group_handle: int, attribute_type: UUID ) -> None: @@ -120,21 +127,21 @@ class AttributeProxy(EventEmitter): self.end_group_handle = end_group_handle self.type = attribute_type - async def read_value(self, no_long_read: bool = False) -> bytes: + async def read_value(self, no_long_read: bool = False) -> _T: return self.decode_value( await self.client.read_value(self.handle, no_long_read) ) - async def write_value(self, value, with_response=False): + async def write_value(self, value: _T, with_response=False): return await self.client.write_value( self.handle, self.encode_value(value), with_response ) - def encode_value(self, value: Any) -> bytes: - return value + def encode_value(self, value: _T) -> bytes: + return value # type: ignore - def decode_value(self, value_bytes: bytes) -> Any: - return value_bytes + def decode_value(self, value: bytes) -> _T: + return value # type: ignore def __str__(self) -> str: return f'Attribute(handle=0x{self.handle:04X}, type={self.type})' @@ -184,19 +191,19 @@ class ServiceProxy(AttributeProxy): return f'Service(handle=0x{self.handle:04X}, uuid={self.uuid})' -class CharacteristicProxy(AttributeProxy): +class CharacteristicProxy(AttributeProxy[_T]): properties: Characteristic.Properties descriptors: List[DescriptorProxy] - subscribers: Dict[Any, Callable[[bytes], Any]] + subscribers: Dict[Any, Callable[[_T], Any]] def __init__( self, - client, - handle, - end_group_handle, - uuid, + client: Client, + handle: int, + end_group_handle: int, + uuid: UUID, properties: int, - ): + ) -> None: super().__init__(client, handle, end_group_handle, uuid) self.uuid = uuid self.properties = Characteristic.Properties(properties) @@ -204,21 +211,21 @@ class CharacteristicProxy(AttributeProxy): self.descriptors_discovered = False self.subscribers = {} # Map from subscriber to proxy subscriber - def get_descriptor(self, descriptor_type): + def get_descriptor(self, descriptor_type: UUID) -> Optional[DescriptorProxy]: for descriptor in self.descriptors: if descriptor.type == descriptor_type: return descriptor return None - async def discover_descriptors(self): + async def discover_descriptors(self) -> list[DescriptorProxy]: return await self.client.discover_descriptors(self) async def subscribe( self, - subscriber: Optional[Callable[[bytes], Any]] = None, + subscriber: Optional[Callable[[_T], Any]] = None, prefer_notify: bool = True, - ): + ) -> None: if subscriber is not None: if subscriber in self.subscribers: # We already have a proxy subscriber @@ -233,13 +240,13 @@ class CharacteristicProxy(AttributeProxy): self.subscribers[subscriber] = on_change subscriber = on_change - return await self.client.subscribe(self, subscriber, prefer_notify) + await self.client.subscribe(self, subscriber, prefer_notify) - async def unsubscribe(self, subscriber=None, force=False): + async def unsubscribe(self, subscriber=None, force=False) -> None: if subscriber in self.subscribers: subscriber = self.subscribers.pop(subscriber) - return await self.client.unsubscribe(self, subscriber, force) + await self.client.unsubscribe(self, subscriber, force) def __str__(self) -> str: return ( @@ -250,7 +257,7 @@ class CharacteristicProxy(AttributeProxy): class DescriptorProxy(AttributeProxy): - def __init__(self, client, handle, descriptor_type): + def __init__(self, client: Client, handle: int, descriptor_type: UUID) -> None: super().__init__(client, handle, 0, descriptor_type) def __str__(self) -> str: @@ -679,7 +686,7 @@ class Client: properties, handle = struct.unpack_from(' None: # If we haven't already discovered the descriptors for this characteristic, @@ -868,7 +875,7 @@ class Client: async def unsubscribe( self, characteristic: CharacteristicProxy, - subscriber: Optional[Callable[[bytes], Any]] = None, + subscriber: Optional[Callable[[Any], Any]] = None, force: bool = False, ) -> None: ''' diff --git a/bumble/gatt_server.py b/bumble/gatt_server.py index 60467c8a..60cd984c 100644 --- a/bumble/gatt_server.py +++ b/bumble/gatt_server.py @@ -36,7 +36,6 @@ from typing import ( Tuple, TypeVar, Type, - Union, TYPE_CHECKING, ) from pyee import EventEmitter @@ -78,7 +77,6 @@ from bumble.gatt import ( GATT_REQUEST_TIMEOUT, GATT_SECONDARY_SERVICE_ATTRIBUTE_TYPE, Characteristic, - CharacteristicAdapter, CharacteristicDeclaration, CharacteristicValue, IncludedServiceDeclaration, @@ -469,7 +467,7 @@ class Server(EventEmitter): finally: self.pending_confirmations[connection.handle] = None - async def notify_or_indicate_subscribers( + async def _notify_or_indicate_subscribers( self, indicate: bool, attribute: Attribute, @@ -503,7 +501,9 @@ class Server(EventEmitter): value: Optional[bytes] = None, force: bool = False, ): - return await self.notify_or_indicate_subscribers(False, attribute, value, force) + return await self._notify_or_indicate_subscribers( + False, attribute, value, force + ) async def indicate_subscribers( self, @@ -511,7 +511,7 @@ class Server(EventEmitter): value: Optional[bytes] = None, force: bool = False, ): - return await self.notify_or_indicate_subscribers(True, attribute, value, force) + return await self._notify_or_indicate_subscribers(True, attribute, value, force) def on_disconnection(self, connection: Connection) -> None: if connection.handle in self.subscribers: diff --git a/bumble/profiles/aics.py b/bumble/profiles/aics.py index 6e0e56e7..e70b5a42 100644 --- a/bumble/profiles/aics.py +++ b/bumble/profiles/aics.py @@ -24,16 +24,13 @@ import struct from dataclasses import dataclass from typing import Optional -from bumble import gatt from bumble.device import Connection from bumble.att import ATT_Error from bumble.gatt import ( + Attribute, Characteristic, - SerializableCharacteristicAdapter, - PackedCharacteristicAdapter, TemplateService, CharacteristicValue, - UTF8CharacteristicAdapter, GATT_AUDIO_INPUT_CONTROL_SERVICE, GATT_AUDIO_INPUT_STATE_CHARACTERISTIC, GATT_GAIN_SETTINGS_ATTRIBUTE_CHARACTERISTIC, @@ -42,6 +39,14 @@ from bumble.gatt import ( GATT_AUDIO_INPUT_CONTROL_POINT_CHARACTERISTIC, GATT_AUDIO_INPUT_DESCRIPTION_CHARACTERISTIC, ) +from bumble.gatt_adapters import ( + CharacteristicProxy, + PackedCharacteristicProxyAdapter, + SerializableCharacteristicAdapter, + SerializableCharacteristicProxyAdapter, + UTF8CharacteristicAdapter, + UTF8CharacteristicProxyAdapter, +) from bumble.gatt_client import ProfileServiceProxy, ServiceProxy from bumble.utils import OpenIntEnum @@ -124,7 +129,7 @@ class AudioInputState: mute: Mute = Mute.NOT_MUTED gain_mode: GainMode = GainMode.MANUAL change_counter: int = 0 - attribute_value: Optional[CharacteristicValue] = None + attribute: Optional[Attribute] = None def __bytes__(self) -> bytes: return bytes( @@ -151,10 +156,8 @@ class AudioInputState: self.change_counter = (self.change_counter + 1) % (CHANGE_COUNTER_MAX_VALUE + 1) async def notify_subscribers_via_connection(self, connection: Connection) -> None: - assert self.attribute_value is not None - await connection.device.notify_subscribers( - attribute=self.attribute_value, value=bytes(self) - ) + assert self.attribute is not None + await connection.device.notify_subscribers(attribute=self.attribute) @dataclass @@ -315,24 +318,28 @@ class AudioInputDescription: ''' audio_input_description: str = "Bluetooth" - attribute_value: Optional[CharacteristicValue] = None + attribute: Optional[Attribute] = None def on_read(self, _connection: Optional[Connection]) -> str: return self.audio_input_description async def on_write(self, connection: Optional[Connection], value: str) -> None: assert connection - assert self.attribute_value + assert self.attribute self.audio_input_description = value - await connection.device.notify_subscribers( - attribute=self.attribute_value, value=value - ) + await connection.device.notify_subscribers(attribute=self.attribute) class AICSService(TemplateService): UUID = GATT_AUDIO_INPUT_CONTROL_SERVICE + audio_input_state_characteristic: Characteristic[AudioInputState] + audio_input_type_characteristic: Characteristic[bytes] + audio_input_status_characteristic: Characteristic[bytes] + audio_input_control_point_characteristic: Characteristic[bytes] + gain_settings_properties_characteristic: Characteristic[GainSettingsProperties] + def __init__( self, audio_input_state: Optional[AudioInputState] = None, @@ -374,9 +381,7 @@ class AICSService(TemplateService): ), AudioInputState, ) - self.audio_input_state.attribute_value = ( - self.audio_input_state_characteristic.value - ) + self.audio_input_state.attribute = self.audio_input_state_characteristic self.gain_settings_properties_characteristic = ( SerializableCharacteristicAdapter( @@ -425,8 +430,8 @@ class AICSService(TemplateService): ), ) ) - self.audio_input_description.attribute_value = ( - self.audio_input_control_point_characteristic.value + self.audio_input_description.attribute = ( + self.audio_input_control_point_characteristic ) super().__init__( @@ -448,24 +453,29 @@ class AICSService(TemplateService): class AICSServiceProxy(ProfileServiceProxy): SERVICE_CLASS = AICSService + audio_input_state: CharacteristicProxy[AudioInputState] + gain_settings_properties: CharacteristicProxy[GainSettingsProperties] + audio_input_status: CharacteristicProxy[int] + audio_input_control_point: CharacteristicProxy[bytes] + def __init__(self, service_proxy: ServiceProxy) -> None: self.service_proxy = service_proxy - self.audio_input_state = SerializableCharacteristicAdapter( + self.audio_input_state = SerializableCharacteristicProxyAdapter( service_proxy.get_required_characteristic_by_uuid( GATT_AUDIO_INPUT_STATE_CHARACTERISTIC ), AudioInputState, ) - self.gain_settings_properties = SerializableCharacteristicAdapter( + self.gain_settings_properties = SerializableCharacteristicProxyAdapter( service_proxy.get_required_characteristic_by_uuid( GATT_GAIN_SETTINGS_ATTRIBUTE_CHARACTERISTIC ), GainSettingsProperties, ) - self.audio_input_status = PackedCharacteristicAdapter( + self.audio_input_status = PackedCharacteristicProxyAdapter( service_proxy.get_required_characteristic_by_uuid( GATT_AUDIO_INPUT_STATUS_CHARACTERISTIC ), @@ -478,7 +488,7 @@ class AICSServiceProxy(ProfileServiceProxy): ) ) - self.audio_input_description = UTF8CharacteristicAdapter( + self.audio_input_description = UTF8CharacteristicProxyAdapter( service_proxy.get_required_characteristic_by_uuid( GATT_AUDIO_INPUT_DESCRIPTION_CHARACTERISTIC ) diff --git a/bumble/profiles/asha.py b/bumble/profiles/asha.py index 67838a97..c5dd2d3d 100644 --- a/bumble/profiles/asha.py +++ b/bumble/profiles/asha.py @@ -134,12 +134,14 @@ class AshaService(gatt.TemplateService): ), ) - self.audio_control_point_characteristic = gatt.Characteristic( - gatt.GATT_ASHA_AUDIO_CONTROL_POINT_CHARACTERISTIC, - gatt.Characteristic.Properties.WRITE - | gatt.Characteristic.Properties.WRITE_WITHOUT_RESPONSE, - gatt.Characteristic.WRITEABLE, - gatt.CharacteristicValue(write=self._on_audio_control_point_write), + self.audio_control_point_characteristic: gatt.Characteristic[bytes] = ( + gatt.Characteristic( + gatt.GATT_ASHA_AUDIO_CONTROL_POINT_CHARACTERISTIC, + gatt.Characteristic.Properties.WRITE + | gatt.Characteristic.Properties.WRITE_WITHOUT_RESPONSE, + gatt.Characteristic.WRITEABLE, + gatt.CharacteristicValue(write=self._on_audio_control_point_write), + ) ) self.audio_status_characteristic = gatt.Characteristic( gatt.GATT_ASHA_AUDIO_STATUS_CHARACTERISTIC, @@ -147,7 +149,7 @@ class AshaService(gatt.TemplateService): gatt.Characteristic.READABLE, bytes([AudioStatus.OK]), ) - self.volume_characteristic = gatt.Characteristic( + self.volume_characteristic: gatt.Characteristic[bytes] = gatt.Characteristic( gatt.GATT_ASHA_VOLUME_CHARACTERISTIC, gatt.Characteristic.Properties.WRITE_WITHOUT_RESPONSE, gatt.Characteristic.WRITEABLE, @@ -166,13 +168,13 @@ class AshaService(gatt.TemplateService): struct.pack(' bytes: ) -def decode_subgroups(data: bytes) -> List[SubgroupInfo]: +def decode_subgroups(data: bytes) -> list[SubgroupInfo]: num_subgroups = data[0] offset = 1 subgroups = [] @@ -273,7 +274,7 @@ class BroadcastReceiveState: pa_sync_state: PeriodicAdvertisingSyncState big_encryption: BigEncryption bad_code: bytes - subgroups: List[SubgroupInfo] + subgroups: list[SubgroupInfo] @classmethod def from_bytes(cls, data: bytes) -> BroadcastReceiveState: @@ -354,7 +355,9 @@ class BroadcastAudioScanServiceProxy(gatt_client.ProfileServiceProxy): SERVICE_CLASS = BroadcastAudioScanService broadcast_audio_scan_control_point: gatt_client.CharacteristicProxy - broadcast_receive_states: List[gatt.DelegatedCharacteristicAdapter] + broadcast_receive_states: list[ + gatt_client.CharacteristicProxy[Optional[BroadcastReceiveState]] + ] def __init__(self, service_proxy: gatt_client.ServiceProxy): self.service_proxy = service_proxy @@ -366,7 +369,7 @@ class BroadcastAudioScanServiceProxy(gatt_client.ProfileServiceProxy): ) self.broadcast_receive_states = [ - gatt.DelegatedCharacteristicAdapter( + gatt_adapters.DelegatedCharacteristicProxyAdapter( characteristic, decode=lambda x: BroadcastReceiveState.from_bytes(x) if x else None, ) diff --git a/bumble/profiles/battery_service.py b/bumble/profiles/battery_service.py index 211fee06..464bbb58 100644 --- a/bumble/profiles/battery_service.py +++ b/bumble/profiles/battery_service.py @@ -16,14 +16,20 @@ # ----------------------------------------------------------------------------- # Imports # ----------------------------------------------------------------------------- -from ..gatt_client import ProfileServiceProxy -from ..gatt import ( +from typing import Optional + +from bumble.gatt_client import ProfileServiceProxy +from bumble.gatt import ( GATT_BATTERY_SERVICE, GATT_BATTERY_LEVEL_CHARACTERISTIC, TemplateService, Characteristic, CharacteristicValue, +) +from bumble.gatt_client import CharacteristicProxy +from bumble.gatt_adapters import ( PackedCharacteristicAdapter, + PackedCharacteristicProxyAdapter, ) @@ -32,6 +38,8 @@ class BatteryService(TemplateService): UUID = GATT_BATTERY_SERVICE BATTERY_LEVEL_FORMAT = 'B' + battery_level_characteristic: Characteristic[int] + def __init__(self, read_battery_level): self.battery_level_characteristic = PackedCharacteristicAdapter( Characteristic( @@ -49,13 +57,15 @@ class BatteryService(TemplateService): class BatteryServiceProxy(ProfileServiceProxy): SERVICE_CLASS = BatteryService + battery_level: Optional[CharacteristicProxy[int]] + def __init__(self, service_proxy): self.service_proxy = service_proxy if characteristics := service_proxy.get_characteristics_by_uuid( GATT_BATTERY_LEVEL_CHARACTERISTIC ): - self.battery_level = PackedCharacteristicAdapter( + self.battery_level = PackedCharacteristicProxyAdapter( characteristics[0], pack_format=BatteryService.BATTERY_LEVEL_FORMAT ) else: diff --git a/bumble/profiles/device_information_service.py b/bumble/profiles/device_information_service.py index d1128038..25b34754 100644 --- a/bumble/profiles/device_information_service.py +++ b/bumble/profiles/device_information_service.py @@ -19,7 +19,6 @@ import struct from typing import Optional, Tuple -from bumble.gatt_client import ServiceProxy, ProfileServiceProxy, CharacteristicProxy from bumble.gatt import ( GATT_DEVICE_INFORMATION_SERVICE, GATT_FIRMWARE_REVISION_STRING_CHARACTERISTIC, @@ -32,9 +31,12 @@ from bumble.gatt import ( GATT_REGULATORY_CERTIFICATION_DATA_LIST_CHARACTERISTIC, TemplateService, Characteristic, - DelegatedCharacteristicAdapter, - UTF8CharacteristicAdapter, ) +from bumble.gatt_adapters import ( + DelegatedCharacteristicProxyAdapter, + UTF8CharacteristicProxyAdapter, +) +from bumble.gatt_client import CharacteristicProxy, ProfileServiceProxy, ServiceProxy # ----------------------------------------------------------------------------- @@ -62,7 +64,7 @@ class DeviceInformationService(TemplateService): ieee_regulatory_certification_data_list: Optional[bytes] = None, # TODO: pnp_id ): - characteristics = [ + characteristics: list[Characteristic[bytes]] = [ Characteristic( uuid, Characteristic.Properties.READ, @@ -107,14 +109,14 @@ class DeviceInformationService(TemplateService): class DeviceInformationServiceProxy(ProfileServiceProxy): SERVICE_CLASS = DeviceInformationService - manufacturer_name: Optional[UTF8CharacteristicAdapter] - model_number: Optional[UTF8CharacteristicAdapter] - serial_number: Optional[UTF8CharacteristicAdapter] - hardware_revision: Optional[UTF8CharacteristicAdapter] - firmware_revision: Optional[UTF8CharacteristicAdapter] - software_revision: Optional[UTF8CharacteristicAdapter] - system_id: Optional[DelegatedCharacteristicAdapter] - ieee_regulatory_certification_data_list: Optional[CharacteristicProxy] + manufacturer_name: Optional[CharacteristicProxy[str]] + model_number: Optional[CharacteristicProxy[str]] + serial_number: Optional[CharacteristicProxy[str]] + hardware_revision: Optional[CharacteristicProxy[str]] + firmware_revision: Optional[CharacteristicProxy[str]] + software_revision: Optional[CharacteristicProxy[str]] + system_id: Optional[CharacteristicProxy[tuple[int, int]]] + ieee_regulatory_certification_data_list: Optional[CharacteristicProxy[bytes]] def __init__(self, service_proxy: ServiceProxy): self.service_proxy = service_proxy @@ -128,7 +130,7 @@ class DeviceInformationServiceProxy(ProfileServiceProxy): ('software_revision', GATT_SOFTWARE_REVISION_STRING_CHARACTERISTIC), ): if characteristics := service_proxy.get_characteristics_by_uuid(uuid): - characteristic = UTF8CharacteristicAdapter(characteristics[0]) + characteristic = UTF8CharacteristicProxyAdapter(characteristics[0]) else: characteristic = None self.__setattr__(field, characteristic) @@ -136,7 +138,7 @@ class DeviceInformationServiceProxy(ProfileServiceProxy): if characteristics := service_proxy.get_characteristics_by_uuid( GATT_SYSTEM_ID_CHARACTERISTIC ): - self.system_id = DelegatedCharacteristicAdapter( + self.system_id = DelegatedCharacteristicProxyAdapter( characteristics[0], encode=lambda v: DeviceInformationService.pack_system_id(*v), decode=DeviceInformationService.unpack_system_id, diff --git a/bumble/profiles/gap.py b/bumble/profiles/gap.py index 0dd6e512..bb13031b 100644 --- a/bumble/profiles/gap.py +++ b/bumble/profiles/gap.py @@ -25,14 +25,15 @@ from bumble.core import Appearance from bumble.gatt import ( TemplateService, Characteristic, - CharacteristicAdapter, - DelegatedCharacteristicAdapter, - UTF8CharacteristicAdapter, GATT_GENERIC_ACCESS_SERVICE, GATT_DEVICE_NAME_CHARACTERISTIC, GATT_APPEARANCE_CHARACTERISTIC, ) -from bumble.gatt_client import ProfileServiceProxy, ServiceProxy +from bumble.gatt_adapters import ( + DelegatedCharacteristicProxyAdapter, + UTF8CharacteristicProxyAdapter, +) +from bumble.gatt_client import CharacteristicProxy, ProfileServiceProxy, ServiceProxy # ----------------------------------------------------------------------------- # Logging @@ -49,6 +50,9 @@ logger = logging.getLogger(__name__) class GenericAccessService(TemplateService): UUID = GATT_GENERIC_ACCESS_SERVICE + device_name_characteristic: Characteristic[bytes] + appearance_characteristic: Characteristic[bytes] + def __init__( self, device_name: str, appearance: Union[Appearance, Tuple[int, int], int] = 0 ): @@ -84,8 +88,8 @@ class GenericAccessService(TemplateService): class GenericAccessServiceProxy(ProfileServiceProxy): SERVICE_CLASS = GenericAccessService - device_name: Optional[CharacteristicAdapter] - appearance: Optional[DelegatedCharacteristicAdapter] + device_name: Optional[CharacteristicProxy[str]] + appearance: Optional[CharacteristicProxy[Appearance]] def __init__(self, service_proxy: ServiceProxy): self.service_proxy = service_proxy @@ -93,14 +97,14 @@ class GenericAccessServiceProxy(ProfileServiceProxy): if characteristics := service_proxy.get_characteristics_by_uuid( GATT_DEVICE_NAME_CHARACTERISTIC ): - self.device_name = UTF8CharacteristicAdapter(characteristics[0]) + self.device_name = UTF8CharacteristicProxyAdapter(characteristics[0]) else: self.device_name = None if characteristics := service_proxy.get_characteristics_by_uuid( GATT_APPEARANCE_CHARACTERISTIC ): - self.appearance = DelegatedCharacteristicAdapter( + self.appearance = DelegatedCharacteristicProxyAdapter( characteristics[0], decode=lambda value: Appearance.from_int( struct.unpack_from(' None: self.service_proxy = service_proxy - self.gmap_role = DelegatedCharacteristicAdapter( + self.gmap_role = DelegatedCharacteristicProxyAdapter( service_proxy.get_required_characteristic_by_uuid( GATT_GMAP_ROLE_CHARACTERISTIC ), @@ -163,31 +168,31 @@ class GamingAudioServiceProxy(ProfileServiceProxy): if characteristics := service_proxy.get_characteristics_by_uuid( GATT_UGG_FEATURES_CHARACTERISTIC ): - self.ugg_features = DelegatedCharacteristicAdapter( - characteristic=characteristics[0], + self.ugg_features = DelegatedCharacteristicProxyAdapter( + characteristics[0], decode=lambda value: UggFeatures(value[0]), ) if characteristics := service_proxy.get_characteristics_by_uuid( GATT_UGT_FEATURES_CHARACTERISTIC ): - self.ugt_features = DelegatedCharacteristicAdapter( - characteristic=characteristics[0], + self.ugt_features = DelegatedCharacteristicProxyAdapter( + characteristics[0], decode=lambda value: UgtFeatures(value[0]), ) if characteristics := service_proxy.get_characteristics_by_uuid( GATT_BGS_FEATURES_CHARACTERISTIC ): - self.bgs_features = DelegatedCharacteristicAdapter( - characteristic=characteristics[0], + self.bgs_features = DelegatedCharacteristicProxyAdapter( + characteristics[0], decode=lambda value: BgsFeatures(value[0]), ) if characteristics := service_proxy.get_characteristics_by_uuid( GATT_BGR_FEATURES_CHARACTERISTIC ): - self.bgr_features = DelegatedCharacteristicAdapter( - characteristic=characteristics[0], + self.bgr_features = DelegatedCharacteristicProxyAdapter( + characteristics[0], decode=lambda value: BgrFeatures(value[0]), ) diff --git a/bumble/profiles/hap.py b/bumble/profiles/hap.py index 5c912d85..7bb02a08 100644 --- a/bumble/profiles/hap.py +++ b/bumble/profiles/hap.py @@ -18,14 +18,15 @@ from __future__ import annotations import asyncio import functools -from bumble import att, gatt, gatt_client +from dataclasses import dataclass, field +import logging +from typing import Any, Dict, List, Optional, Set, Union + +from bumble import att, gatt, gatt_adapters, gatt_client from bumble.core import InvalidArgumentError, InvalidStateError from bumble.device import Device, Connection from bumble.utils import AsyncRunner, OpenIntEnum from bumble.hci import Address -from dataclasses import dataclass, field -import logging -from typing import Any, Dict, List, Optional, Set, Union # ----------------------------------------------------------------------------- @@ -631,11 +632,12 @@ class HearingAccessServiceProxy(gatt_client.ProfileServiceProxy): hearing_aid_preset_control_point: gatt_client.CharacteristicProxy preset_control_point_indications: asyncio.Queue + active_preset_index_notification: asyncio.Queue def __init__(self, service_proxy: gatt_client.ServiceProxy) -> None: self.service_proxy = service_proxy - self.server_features = gatt.PackedCharacteristicAdapter( + self.server_features = gatt_adapters.PackedCharacteristicProxyAdapter( service_proxy.get_characteristics_by_uuid( gatt.GATT_HEARING_AID_FEATURES_CHARACTERISTIC )[0], @@ -648,7 +650,7 @@ class HearingAccessServiceProxy(gatt_client.ProfileServiceProxy): )[0] ) - self.active_preset_index = gatt.PackedCharacteristicAdapter( + self.active_preset_index = gatt_adapters.PackedCharacteristicProxyAdapter( service_proxy.get_characteristics_by_uuid( gatt.GATT_ACTIVE_PRESET_INDEX_CHARACTERISTIC )[0], diff --git a/bumble/profiles/heart_rate_service.py b/bumble/profiles/heart_rate_service.py index 7685e52e..090951b0 100644 --- a/bumble/profiles/heart_rate_service.py +++ b/bumble/profiles/heart_rate_service.py @@ -16,13 +16,14 @@ # ----------------------------------------------------------------------------- # Imports # ----------------------------------------------------------------------------- +from __future__ import annotations from enum import IntEnum import struct +from typing import Optional from bumble import core -from ..gatt_client import ProfileServiceProxy -from ..att import ATT_Error -from ..gatt import ( +from bumble.att import ATT_Error +from bumble.gatt import ( GATT_HEART_RATE_SERVICE, GATT_HEART_RATE_MEASUREMENT_CHARACTERISTIC, GATT_BODY_SENSOR_LOCATION_CHARACTERISTIC, @@ -30,10 +31,13 @@ from ..gatt import ( TemplateService, Characteristic, CharacteristicValue, - SerializableCharacteristicAdapter, +) +from bumble.gatt_adapters import ( DelegatedCharacteristicAdapter, PackedCharacteristicAdapter, + SerializableCharacteristicAdapter, ) +from bumble.gatt_client import CharacteristicProxy, ProfileServiceProxy # ----------------------------------------------------------------------------- @@ -43,6 +47,10 @@ class HeartRateService(TemplateService): CONTROL_POINT_NOT_SUPPORTED = 0x80 RESET_ENERGY_EXPENDED = 0x01 + heart_rate_measurement_characteristic: Characteristic[HeartRateMeasurement] + body_sensor_location_characteristic: Characteristic[BodySensorLocation] + heart_rate_control_point_characteristic: Characteristic[int] + class BodySensorLocation(IntEnum): OTHER = 0 CHEST = 1 @@ -198,6 +206,14 @@ class HeartRateService(TemplateService): class HeartRateServiceProxy(ProfileServiceProxy): SERVICE_CLASS = HeartRateService + heart_rate_measurement: Optional[ + CharacteristicProxy[HeartRateService.HeartRateMeasurement] + ] + body_sensor_location: Optional[ + CharacteristicProxy[HeartRateService.BodySensorLocation] + ] + heart_rate_control_point: Optional[CharacteristicProxy[int]] + def __init__(self, service_proxy): self.service_proxy = service_proxy diff --git a/bumble/profiles/mcp.py b/bumble/profiles/mcp.py index 5e12573f..c4952f47 100644 --- a/bumble/profiles/mcp.py +++ b/bumble/profiles/mcp.py @@ -208,7 +208,7 @@ class MediaControlService(gatt.TemplateService): properties=gatt.Characteristic.Properties.READ | gatt.Characteristic.Properties.NOTIFY, permissions=gatt.Characteristic.Permissions.READ_REQUIRES_ENCRYPTION, - value=media_player_name or 'Bumble Player', + value=(media_player_name or 'Bumble Player').encode(), ) self.track_changed_characteristic = gatt.Characteristic( uuid=gatt.GATT_TRACK_CHANGED_CHARACTERISTIC, @@ -247,14 +247,16 @@ class MediaControlService(gatt.TemplateService): permissions=gatt.Characteristic.Permissions.READ_REQUIRES_ENCRYPTION, value=b'', ) - self.media_control_point_characteristic = gatt.Characteristic( - uuid=gatt.GATT_MEDIA_CONTROL_POINT_CHARACTERISTIC, - properties=gatt.Characteristic.Properties.WRITE - | gatt.Characteristic.Properties.WRITE_WITHOUT_RESPONSE - | gatt.Characteristic.Properties.NOTIFY, - permissions=gatt.Characteristic.Permissions.READ_REQUIRES_ENCRYPTION - | gatt.Characteristic.Permissions.WRITE_REQUIRES_ENCRYPTION, - value=gatt.CharacteristicValue(write=self.on_media_control_point), + self.media_control_point_characteristic: gatt.Characteristic[bytes] = ( + gatt.Characteristic( + uuid=gatt.GATT_MEDIA_CONTROL_POINT_CHARACTERISTIC, + properties=gatt.Characteristic.Properties.WRITE + | gatt.Characteristic.Properties.WRITE_WITHOUT_RESPONSE + | gatt.Characteristic.Properties.NOTIFY, + permissions=gatt.Characteristic.Permissions.READ_REQUIRES_ENCRYPTION + | gatt.Characteristic.Permissions.WRITE_REQUIRES_ENCRYPTION, + value=gatt.CharacteristicValue(write=self.on_media_control_point), + ) ) self.media_control_point_opcodes_supported_characteristic = gatt.Characteristic( uuid=gatt.GATT_MEDIA_CONTROL_POINT_OPCODES_SUPPORTED_CHARACTERISTIC, diff --git a/bumble/profiles/pacs.py b/bumble/profiles/pacs.py index 983bfbfb..d0952ece 100644 --- a/bumble/profiles/pacs.py +++ b/bumble/profiles/pacs.py @@ -25,6 +25,7 @@ from typing import Optional, Sequence, Union from bumble.profiles.bap import AudioLocation, CodecSpecificCapabilities, ContextType from bumble.profiles import le_audio from bumble import gatt +from bumble import gatt_adapters from bumble import gatt_client from bumble import hci @@ -185,34 +186,42 @@ class PublishedAudioCapabilitiesService(gatt.TemplateService): class PublishedAudioCapabilitiesServiceProxy(gatt_client.ProfileServiceProxy): SERVICE_CLASS = PublishedAudioCapabilitiesService - sink_pac: Optional[gatt.DelegatedCharacteristicAdapter] = None - sink_audio_locations: Optional[gatt.DelegatedCharacteristicAdapter] = None - source_pac: Optional[gatt.DelegatedCharacteristicAdapter] = None - source_audio_locations: Optional[gatt.DelegatedCharacteristicAdapter] = None - available_audio_contexts: gatt.DelegatedCharacteristicAdapter - supported_audio_contexts: gatt.DelegatedCharacteristicAdapter + sink_pac: Optional[gatt_client.CharacteristicProxy[list[PacRecord]]] = None + sink_audio_locations: Optional[gatt_client.CharacteristicProxy[AudioLocation]] = ( + None + ) + source_pac: Optional[gatt_client.CharacteristicProxy[list[PacRecord]]] = None + source_audio_locations: Optional[gatt_client.CharacteristicProxy[AudioLocation]] = ( + None + ) + available_audio_contexts: gatt_client.CharacteristicProxy[tuple[ContextType, ...]] + supported_audio_contexts: gatt_client.CharacteristicProxy[tuple[ContextType, ...]] def __init__(self, service_proxy: gatt_client.ServiceProxy): self.service_proxy = service_proxy - self.available_audio_contexts = gatt.DelegatedCharacteristicAdapter( - service_proxy.get_required_characteristic_by_uuid( - gatt.GATT_AVAILABLE_AUDIO_CONTEXTS_CHARACTERISTIC - ), - decode=lambda x: tuple(map(ContextType, struct.unpack(' None: self.service_proxy = service_proxy - self.volume_state = gatt.SerializableCharacteristicAdapter( + self.volume_state = gatt_adapters.SerializableCharacteristicProxyAdapter( service_proxy.get_required_characteristic_by_uuid( gatt.GATT_VOLUME_STATE_CHARACTERISTIC ), @@ -227,7 +228,7 @@ class VolumeControlServiceProxy(gatt_client.ProfileServiceProxy): gatt.GATT_VOLUME_CONTROL_POINT_CHARACTERISTIC ) - self.volume_flags = gatt.DelegatedCharacteristicAdapter( + self.volume_flags = gatt_adapters.DelegatedCharacteristicProxyAdapter( service_proxy.get_required_characteristic_by_uuid( gatt.GATT_VOLUME_FLAGS_CHARACTERISTIC ), diff --git a/bumble/profiles/vocs.py b/bumble/profiles/vocs.py index af5447fe..a3bcf7c0 100644 --- a/bumble/profiles/vocs.py +++ b/bumble/profiles/vocs.py @@ -24,17 +24,19 @@ from bumble.device import Connection from bumble.att import ATT_Error from bumble.gatt import ( Characteristic, - DelegatedCharacteristicAdapter, TemplateService, CharacteristicValue, - SerializableCharacteristicAdapter, - UTF8CharacteristicAdapter, GATT_VOLUME_OFFSET_CONTROL_SERVICE, GATT_VOLUME_OFFSET_STATE_CHARACTERISTIC, GATT_AUDIO_LOCATION_CHARACTERISTIC, GATT_VOLUME_OFFSET_CONTROL_POINT_CHARACTERISTIC, GATT_AUDIO_OUTPUT_DESCRIPTION_CHARACTERISTIC, ) +from bumble.gatt_adapters import ( + DelegatedCharacteristicProxyAdapter, + SerializableCharacteristicProxyAdapter, + UTF8CharacteristicProxyAdapter, +) from bumble.gatt_client import ProfileServiceProxy, ServiceProxy from bumble.utils import OpenIntEnum from bumble.profiles.bap import AudioLocation @@ -67,7 +69,7 @@ class ErrorCode(OpenIntEnum): class VolumeOffsetState: volume_offset: int = 0 change_counter: int = 0 - attribute_value: Optional[CharacteristicValue] = None + attribute: Optional[Characteristic] = None def __bytes__(self) -> bytes: return struct.pack(' None: - assert self.attribute_value is not None - await connection.device.notify_subscribers(attribute=self.attribute_value) + assert self.attribute is not None + await connection.device.notify_subscribers(attribute=self.attribute) def on_read(self, _connection: Optional[Connection]) -> bytes: return bytes(self) @@ -91,7 +93,7 @@ class VolumeOffsetState: @dataclass class VocsAudioLocation: audio_location: AudioLocation = AudioLocation.NOT_ALLOWED - attribute_value: Optional[CharacteristicValue] = None + attribute: Optional[Characteristic] = None def __bytes__(self) -> bytes: return struct.pack(' None: assert connection - assert self.attribute_value + assert self.attribute self.audio_location = AudioLocation(int.from_bytes(value, 'little')) - await connection.device.notify_subscribers(attribute=self.attribute_value) + await connection.device.notify_subscribers(attribute=self.attribute) @dataclass @@ -148,7 +150,7 @@ class VolumeOffsetControlPoint: @dataclass class AudioOutputDescription: audio_output_description: str = '' - attribute_value: Optional[CharacteristicValue] = None + attribute: Optional[Characteristic] = None @classmethod def from_bytes(cls, data: bytes): @@ -162,10 +164,10 @@ class AudioOutputDescription: async def on_write(self, connection: Optional[Connection], value: bytes) -> None: assert connection - assert self.attribute_value + assert self.attribute self.audio_output_description = value.decode('utf-8') - await connection.device.notify_subscribers(attribute=self.attribute_value) + await connection.device.notify_subscribers(attribute=self.attribute) # ----------------------------------------------------------------------------- @@ -197,7 +199,7 @@ class VolumeOffsetControlService(TemplateService): VolumeOffsetControlPoint(self.volume_offset_state) ) - self.volume_offset_state_characteristic = Characteristic( + self.volume_offset_state_characteristic: Characteristic[bytes] = Characteristic( uuid=GATT_VOLUME_OFFSET_STATE_CHARACTERISTIC, properties=( Characteristic.Properties.READ | Characteristic.Properties.NOTIFY @@ -206,7 +208,7 @@ class VolumeOffsetControlService(TemplateService): value=CharacteristicValue(read=self.volume_offset_state.on_read), ) - self.audio_location_characteristic = Characteristic( + self.audio_location_characteristic: Characteristic[bytes] = Characteristic( uuid=GATT_AUDIO_LOCATION_CHARACTERISTIC, properties=( Characteristic.Properties.READ @@ -222,33 +224,39 @@ class VolumeOffsetControlService(TemplateService): write=self.audio_location.on_write, ), ) - self.audio_location.attribute_value = self.audio_location_characteristic.value + self.audio_location.attribute = self.audio_location_characteristic - self.volume_offset_control_point_characteristic = Characteristic( - uuid=GATT_VOLUME_OFFSET_CONTROL_POINT_CHARACTERISTIC, - properties=Characteristic.Properties.WRITE, - permissions=Characteristic.Permissions.WRITE_REQUIRES_ENCRYPTION, - value=CharacteristicValue(write=self.volume_offset_control_point.on_write), + self.volume_offset_control_point_characteristic: Characteristic[bytes] = ( + Characteristic( + uuid=GATT_VOLUME_OFFSET_CONTROL_POINT_CHARACTERISTIC, + properties=Characteristic.Properties.WRITE, + permissions=Characteristic.Permissions.WRITE_REQUIRES_ENCRYPTION, + value=CharacteristicValue( + write=self.volume_offset_control_point.on_write + ), + ) ) - self.audio_output_description_characteristic = Characteristic( - uuid=GATT_AUDIO_OUTPUT_DESCRIPTION_CHARACTERISTIC, - properties=( - Characteristic.Properties.READ - | Characteristic.Properties.NOTIFY - | Characteristic.Properties.WRITE_WITHOUT_RESPONSE - ), - permissions=( - Characteristic.Permissions.READ_REQUIRES_ENCRYPTION - | Characteristic.Permissions.WRITE_REQUIRES_ENCRYPTION - ), - value=CharacteristicValue( - read=self.audio_output_description.on_read, - write=self.audio_output_description.on_write, - ), + self.audio_output_description_characteristic: Characteristic[bytes] = ( + Characteristic( + uuid=GATT_AUDIO_OUTPUT_DESCRIPTION_CHARACTERISTIC, + properties=( + Characteristic.Properties.READ + | Characteristic.Properties.NOTIFY + | Characteristic.Properties.WRITE_WITHOUT_RESPONSE + ), + permissions=( + Characteristic.Permissions.READ_REQUIRES_ENCRYPTION + | Characteristic.Permissions.WRITE_REQUIRES_ENCRYPTION + ), + value=CharacteristicValue( + read=self.audio_output_description.on_read, + write=self.audio_output_description.on_write, + ), + ) ) - self.audio_output_description.attribute_value = ( - self.audio_output_description_characteristic.value + self.audio_output_description.attribute = ( + self.audio_output_description_characteristic ) super().__init__( @@ -271,14 +279,14 @@ class VolumeOffsetControlServiceProxy(ProfileServiceProxy): def __init__(self, service_proxy: ServiceProxy) -> None: self.service_proxy = service_proxy - self.volume_offset_state = SerializableCharacteristicAdapter( + self.volume_offset_state = SerializableCharacteristicProxyAdapter( service_proxy.get_required_characteristic_by_uuid( GATT_VOLUME_OFFSET_STATE_CHARACTERISTIC ), VolumeOffsetState, ) - self.audio_location = DelegatedCharacteristicAdapter( + self.audio_location = DelegatedCharacteristicProxyAdapter( service_proxy.get_required_characteristic_by_uuid( GATT_AUDIO_LOCATION_CHARACTERISTIC ), @@ -292,7 +300,7 @@ class VolumeOffsetControlServiceProxy(ProfileServiceProxy): ) ) - self.audio_output_description = UTF8CharacteristicAdapter( + self.audio_output_description = UTF8CharacteristicProxyAdapter( service_proxy.get_required_characteristic_by_uuid( GATT_AUDIO_OUTPUT_DESCRIPTION_CHARACTERISTIC ) diff --git a/bumble/utils.py b/bumble/utils.py index ba09f141..2eb6eaf4 100644 --- a/bumble/utils.py +++ b/bumble/utils.py @@ -502,3 +502,13 @@ class ByteSerializable(Protocol): def from_bytes(cls, data: bytes) -> Self: ... def __bytes__(self) -> bytes: ... + + +# ----------------------------------------------------------------------------- +class IntConvertible(Protocol): + """ + Type protocol for classes that can be instantiated from int and converted to int. + """ + + def __init__(self, value: int) -> None: ... + def __int__(self) -> int: ... diff --git a/examples/heart_rate_server.py b/examples/heart_rate_server.py index e40d5db4..3ccf1924 100644 --- a/examples/heart_rate_server.py +++ b/examples/heart_rate_server.py @@ -102,7 +102,6 @@ async def main() -> None: ) # Notify subscribers of the current value as soon as they subscribe - @heart_rate_service.heart_rate_measurement_characteristic.on('subscription') def on_subscription(connection, notify_enabled, indicate_enabled): if notify_enabled or indicate_enabled: AsyncRunner.spawn( @@ -112,6 +111,10 @@ async def main() -> None: ) ) + heart_rate_service.heart_rate_measurement_characteristic.on( + 'subscription', on_subscription + ) + # Go! await device.power_on() await device.start_advertising(auto_restart=True) diff --git a/examples/run_gatt_client_and_server.py b/examples/run_gatt_client_and_server.py index e25d14cf..0811d748 100644 --- a/examples/run_gatt_client_and_server.py +++ b/examples/run_gatt_client_and_server.py @@ -70,13 +70,13 @@ async def main() -> None: descriptor = Descriptor( GATT_CHARACTERISTIC_USER_DESCRIPTION_DESCRIPTOR, Descriptor.READABLE, - 'My Description', + 'My Description'.encode(), ) - manufacturer_name_characteristic = Characteristic( + manufacturer_name_characteristic = Characteristic[bytes]( GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC, Characteristic.Properties.READ, Characteristic.READABLE, - "Fitbit", + "Fitbit".encode(), [descriptor], ) device_info_service = Service( diff --git a/examples/run_gatt_server.py b/examples/run_gatt_server.py index 66f65538..1860cf28 100644 --- a/examples/run_gatt_server.py +++ b/examples/run_gatt_server.py @@ -94,13 +94,13 @@ async def main() -> None: descriptor = Descriptor( GATT_CHARACTERISTIC_USER_DESCRIPTION_DESCRIPTOR, Descriptor.READABLE, - 'My Description', + 'My Description'.encode(), ) manufacturer_name_characteristic = Characteristic( GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC, Characteristic.Properties.READ, Characteristic.READABLE, - 'Fitbit', + 'Fitbit'.encode(), [descriptor], ) device_info_service = Service( diff --git a/examples/run_gatt_with_adapters.py b/examples/run_gatt_with_adapters.py index 97fb8917..ba6b95a9 100644 --- a/examples/run_gatt_with_adapters.py +++ b/examples/run_gatt_with_adapters.py @@ -1,4 +1,4 @@ -# Copyright 2024 Google LLC +# Copyright 2025 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -18,6 +18,8 @@ from __future__ import annotations import asyncio import dataclasses +import functools +import enum import logging import os import random @@ -28,6 +30,8 @@ from typing import Any, List, Union from bumble.device import Device, Peer from bumble import transport from bumble import gatt +from bumble import gatt_adapters +from bumble import gatt_client from bumble import hci from bumble import core @@ -36,6 +40,9 @@ from bumble import core SERVICE_UUID = core.UUID("50DB505C-8AC4-4738-8448-3B1D9CC09CC5") CHARACTERISTIC_UUID_BASE = "D901B45B-4916-412E-ACCA-0000000000" +DEFAULT_CLIENT_ADDRESS = "F0:F1:F2:F3:F4:F5" +DEFAULT_SERVER_ADDRESS = "F1:F2:F3:F4:F5:F6" + # ----------------------------------------------------------------------------- @dataclasses.dataclass @@ -65,6 +72,12 @@ class CustomClass: return struct.pack(">II", self.a, self.b) +# ----------------------------------------------------------------------------- +class CustomEnum(enum.IntEnum): + FOO = 1234 + BAR = 5678 + + # ----------------------------------------------------------------------------- async def client(device: Device, address: hci.Address) -> None: print(f'=== Connecting to {address}...') @@ -78,8 +91,8 @@ async def client(device: Device, address: hci.Address) -> None: print("*** Discovery complete") service = peer.get_services_by_uuid(SERVICE_UUID)[0] - characteristics = [] - for index in range(1, 9): + characteristics: list[gatt_client.CharacteristicProxy] = [] + for index in range(1, 10): characteristics.append( service.get_characteristics_by_uuid( core.UUID(CHARACTERISTIC_UUID_BASE + f"{index:02X}") @@ -91,59 +104,92 @@ async def client(device: Device, address: hci.Address) -> None: value = await characteristic.read_value() print(f"### {characteristic} = {value!r} ({value.hex()})") + # Subscribe to all characteristics as a raw bytes listener. + def on_raw_characteristic_update(characteristic, value): + print(f"^^^ Update[RAW] {characteristic.uuid} value = {value.hex()}") + + for characteristic in characteristics: + await characteristic.subscribe( + functools.partial(on_raw_characteristic_update, characteristic) + ) + + # Function to subscribe to adapted characteristics + def on_adapted_characteristic_update(characteristic, value): + print( + f"^^^ Update[ADAPTED] {characteristic.uuid} value = {value!r}, " + f"type={type(value)}" + ) + # Static characteristic with a bytes value. c1 = characteristics[0] c1_value = await c1.read_value() print(f"@@@ C1 {c1} value = {c1_value!r} (type={type(c1_value)})") await c1.write_value("happy π day".encode("utf-8")) + await c1.subscribe(functools.partial(on_adapted_characteristic_update, c1)) # Static characteristic with a string value. - c2 = gatt.UTF8CharacteristicAdapter(characteristics[1]) + c2 = gatt_adapters.UTF8CharacteristicProxyAdapter(characteristics[1]) c2_value = await c2.read_value() print(f"@@@ C2 {c2} value = {c2_value} (type={type(c2_value)})") await c2.write_value("happy π day") + await c2.subscribe(functools.partial(on_adapted_characteristic_update, c2)) # Static characteristic with a tuple value. - c3 = gatt.PackedCharacteristicAdapter(characteristics[2], ">III") + c3 = gatt_adapters.PackedCharacteristicProxyAdapter(characteristics[2], ">III") c3_value = await c3.read_value() print(f"@@@ C3 {c3} value = {c3_value} (type={type(c3_value)})") await c3.write_value((2001, 2002, 2003)) + await c3.subscribe(functools.partial(on_adapted_characteristic_update, c3)) # Static characteristic with a named tuple value. - c4 = gatt.MappedCharacteristicAdapter( + c4 = gatt_adapters.MappedCharacteristicProxyAdapter( characteristics[3], ">III", ["f1", "f2", "f3"] ) c4_value = await c4.read_value() print(f"@@@ C4 {c4} value = {c4_value} (type={type(c4_value)})") await c4.write_value({"f1": 4001, "f2": 4002, "f3": 4003}) + await c4.subscribe(functools.partial(on_adapted_characteristic_update, c4)) # Static characteristic with a serializable value. - c5 = gatt.SerializableCharacteristicAdapter( + c5 = gatt_adapters.SerializableCharacteristicProxyAdapter( characteristics[4], CustomSerializableClass ) c5_value = await c5.read_value() print(f"@@@ C5 {c5} value = {c5_value} (type={type(c5_value)})") await c5.write_value(CustomSerializableClass(56, 57)) + await c5.subscribe(functools.partial(on_adapted_characteristic_update, c5)) # Static characteristic with a delegated value. - c6 = gatt.DelegatedCharacteristicAdapter( + c6 = gatt_adapters.DelegatedCharacteristicProxyAdapter( characteristics[5], encode=CustomClass.encode, decode=CustomClass.decode ) c6_value = await c6.read_value() print(f"@@@ C6 {c6} value = {c6_value} (type={type(c6_value)})") await c6.write_value(CustomClass(6, 7)) + await c6.subscribe(functools.partial(on_adapted_characteristic_update, c6)) # Dynamic characteristic with a bytes value. c7 = characteristics[6] c7_value = await c7.read_value() print(f"@@@ C7 {c7} value = {c7_value!r} (type={type(c7_value)})") await c7.write_value(bytes.fromhex("01020304")) + await c7.subscribe(functools.partial(on_adapted_characteristic_update, c7)) # Dynamic characteristic with a string value. - c8 = gatt.UTF8CharacteristicAdapter(characteristics[7]) + c8 = gatt_adapters.UTF8CharacteristicProxyAdapter(characteristics[7]) c8_value = await c8.read_value() print(f"@@@ C8 {c8} value = {c8_value} (type={type(c8_value)})") await c8.write_value("howdy") + await c8.subscribe(functools.partial(on_adapted_characteristic_update, c8)) + + # Static characteristic with an enum value + c9 = gatt_adapters.EnumCharacteristicProxyAdapter( + characteristics[8], CustomEnum, 3, 'big' + ) + c9_value = await c9.read_value() + print(f"@@@ C9 {c9} value = {c9_value.name} (type={type(c9_value)})") + await c9.write_value(CustomEnum.BAR) + await c9.subscribe(functools.partial(on_adapted_characteristic_update, c9)) # ----------------------------------------------------------------------------- @@ -175,142 +221,213 @@ def on_characteristic_write(characteristic: gatt.Characteristic, value: Any) -> print(f"<<< WRITE: {characteristic} <- {value} ({type(value)})") +# ----------------------------------------------------------------------------- +async def server(device: Device) -> None: + # Static characteristic with a bytes value. + c1 = gatt.Characteristic( + CHARACTERISTIC_UUID_BASE + "01", + gatt.Characteristic.Properties.READ + | gatt.Characteristic.Properties.WRITE + | gatt.Characteristic.Properties.NOTIFY, + gatt.Characteristic.READABLE | gatt.Characteristic.WRITEABLE, + b'hello', + ) + + # Static characteristic with a string value. + c2 = gatt_adapters.UTF8CharacteristicAdapter( + gatt.Characteristic( + CHARACTERISTIC_UUID_BASE + "02", + gatt.Characteristic.Properties.READ + | gatt.Characteristic.Properties.WRITE + | gatt.Characteristic.Properties.NOTIFY, + gatt.Characteristic.READABLE | gatt.Characteristic.WRITEABLE, + 'hello', + ) + ) + + # Static characteristic with a tuple value. + c3 = gatt_adapters.PackedCharacteristicAdapter( + gatt.Characteristic( + CHARACTERISTIC_UUID_BASE + "03", + gatt.Characteristic.Properties.READ + | gatt.Characteristic.Properties.WRITE + | gatt.Characteristic.Properties.NOTIFY, + gatt.Characteristic.READABLE | gatt.Characteristic.WRITEABLE, + (1007, 1008, 1009), + ), + ">III", + ) + + # Static characteristic with a named tuple value. + c4 = gatt_adapters.MappedCharacteristicAdapter( + gatt.Characteristic( + CHARACTERISTIC_UUID_BASE + "04", + gatt.Characteristic.Properties.READ + | gatt.Characteristic.Properties.WRITE + | gatt.Characteristic.Properties.NOTIFY, + gatt.Characteristic.READABLE | gatt.Characteristic.WRITEABLE, + {"f1": 3007, "f2": 3008, "f3": 3009}, + ), + ">III", + ["f1", "f2", "f3"], + ) + + # Static characteristic with a serializable value. + c5 = gatt_adapters.SerializableCharacteristicAdapter( + gatt.Characteristic( + CHARACTERISTIC_UUID_BASE + "05", + gatt.Characteristic.Properties.READ + | gatt.Characteristic.Properties.WRITE + | gatt.Characteristic.Properties.NOTIFY, + gatt.Characteristic.READABLE | gatt.Characteristic.WRITEABLE, + CustomSerializableClass(11, 12), + ), + CustomSerializableClass, + ) + + # Static characteristic with a delegated value. + c6 = gatt_adapters.DelegatedCharacteristicAdapter( + gatt.Characteristic( + CHARACTERISTIC_UUID_BASE + "06", + gatt.Characteristic.Properties.READ + | gatt.Characteristic.Properties.WRITE + | gatt.Characteristic.Properties.NOTIFY, + gatt.Characteristic.READABLE | gatt.Characteristic.WRITEABLE, + CustomClass(1, 2), + ), + encode=CustomClass.encode, + decode=CustomClass.decode, + ) + + # Dynamic characteristic with a bytes value. + c7 = gatt.Characteristic( + CHARACTERISTIC_UUID_BASE + "07", + gatt.Characteristic.Properties.READ + | gatt.Characteristic.Properties.WRITE + | gatt.Characteristic.Properties.NOTIFY, + gatt.Characteristic.READABLE | gatt.Characteristic.WRITEABLE, + gatt.CharacteristicValue( + read=lambda connection: dynamic_read("bytes"), + write=lambda connection, value: dynamic_write("bytes", value), + ), + ) + + # Dynamic characteristic with a string value. + c8 = gatt_adapters.UTF8CharacteristicAdapter( + gatt.Characteristic( + CHARACTERISTIC_UUID_BASE + "08", + gatt.Characteristic.Properties.READ + | gatt.Characteristic.Properties.WRITE + | gatt.Characteristic.Properties.NOTIFY, + gatt.Characteristic.READABLE | gatt.Characteristic.WRITEABLE, + gatt.CharacteristicValue( + read=lambda connection: dynamic_read("string"), + write=lambda connection, value: dynamic_write("string", value), + ), + ) + ) + + # Static characteristic with an enum value + c9 = gatt_adapters.EnumCharacteristicAdapter( + gatt.Characteristic( + CHARACTERISTIC_UUID_BASE + "09", + gatt.Characteristic.Properties.READ + | gatt.Characteristic.Properties.WRITE + | gatt.Characteristic.Properties.NOTIFY, + gatt.Characteristic.READABLE | gatt.Characteristic.WRITEABLE, + CustomEnum.FOO, + ), + cls=CustomEnum, + length=3, + byteorder='big', + ) + + characteristics: List[gatt.Characteristic] = [ + c1, + c2, + c3, + c4, + c5, + c6, + c7, + c8, + c9, + ] + + # Listen for read and write events. + for characteristic in characteristics: + characteristic.on( + "read", + lambda _, value, c=characteristic: on_characteristic_read(c, value), + ) + characteristic.on( + "write", + lambda _, value, c=characteristic: on_characteristic_write(c, value), + ) + + device.add_service(gatt.Service(SERVICE_UUID, characteristics)) + + # Notify every 3 seconds + i = 0 + while True: + await asyncio.sleep(3) + + # Notifying can be done with the characteristic's current value, or + # by explicitly passing a value to notify with. Both variants are used + # here: for c1..c4 we set the value and then notify, for c4..c9 we notify + # with an explicit value. + c1.value = f'hello c1 {i}'.encode() + await device.notify_subscribers(c1) + c2.value = f'hello c2 {i}' + await device.notify_subscribers(c2) + c3.value = (1000 + i, 2000 + i, 3000 + i) + await device.notify_subscribers(c3) + c4.value = {"f1": 4000 + i, "f2": 5000 + i, "f3": 6000 + i} + await device.notify_subscribers(c4) + await device.notify_subscribers(c5, CustomSerializableClass(1000 + i, 2000 + i)) + await device.notify_subscribers(c6, CustomClass(3000 + i, 4000 + i)) + await device.notify_subscribers(c7, bytes([1, 2, 3, i % 256])) + await device.notify_subscribers(c8, f'hello c8 {i}') + await device.notify_subscribers( + c9, CustomEnum.FOO if i % 2 == 0 else CustomEnum.BAR + ) + + i += 1 + + # ----------------------------------------------------------------------------- async def main() -> None: if len(sys.argv) < 2: - print("Usage: run_gatt_with_adapters.py []") - print("example: run_gatt_with_adapters.py usb:0 E1:CA:72:48:C4:E8") + print("Usage: run_gatt_with_adapters.py client|server") + print("example: run_gatt_with_adapters.py usb:0 F0:F1:F2:F3:F4:F5") return async with await transport.open_transport(sys.argv[1]) as hci_transport: + is_client = sys.argv[2] == "client" + # Create a device to manage the host device = Device.with_hci( "Bumble", - hci.Address("F0:F1:F2:F3:F4:F5"), + hci.Address( + DEFAULT_CLIENT_ADDRESS if is_client else DEFAULT_SERVER_ADDRESS + ), hci_transport.source, hci_transport.sink, ) - # Static characteristic with a bytes value. - c1 = gatt.Characteristic( - CHARACTERISTIC_UUID_BASE + "01", - gatt.Characteristic.Properties.READ | gatt.Characteristic.Properties.WRITE, - gatt.Characteristic.READABLE | gatt.Characteristic.WRITEABLE, - b'hello', - ) - - # Static characteristic with a string value. - c2 = gatt.UTF8CharacteristicAdapter( - gatt.Characteristic( - CHARACTERISTIC_UUID_BASE + "02", - gatt.Characteristic.Properties.READ - | gatt.Characteristic.Properties.WRITE, - gatt.Characteristic.READABLE | gatt.Characteristic.WRITEABLE, - 'hello', - ) - ) - - # Static characteristic with a tuple value. - c3 = gatt.PackedCharacteristicAdapter( - gatt.Characteristic( - CHARACTERISTIC_UUID_BASE + "03", - gatt.Characteristic.Properties.READ - | gatt.Characteristic.Properties.WRITE, - gatt.Characteristic.READABLE | gatt.Characteristic.WRITEABLE, - (1007, 1008, 1009), - ), - ">III", - ) - - # Static characteristic with a named tuple value. - c4 = gatt.MappedCharacteristicAdapter( - gatt.Characteristic( - CHARACTERISTIC_UUID_BASE + "04", - gatt.Characteristic.Properties.READ - | gatt.Characteristic.Properties.WRITE, - gatt.Characteristic.READABLE | gatt.Characteristic.WRITEABLE, - {"f1": 3007, "f2": 3008, "f3": 3009}, - ), - ">III", - ["f1", "f2", "f3"], - ) - - # Static characteristic with a serializable value. - c5 = gatt.SerializableCharacteristicAdapter( - gatt.Characteristic( - CHARACTERISTIC_UUID_BASE + "05", - gatt.Characteristic.Properties.READ - | gatt.Characteristic.Properties.WRITE, - gatt.Characteristic.READABLE | gatt.Characteristic.WRITEABLE, - CustomSerializableClass(11, 12), - ), - CustomSerializableClass, - ) - - # Static characteristic with a delegated value. - c6 = gatt.DelegatedCharacteristicAdapter( - gatt.Characteristic( - CHARACTERISTIC_UUID_BASE + "06", - gatt.Characteristic.Properties.READ - | gatt.Characteristic.Properties.WRITE, - gatt.Characteristic.READABLE | gatt.Characteristic.WRITEABLE, - CustomClass(1, 2), - ), - encode=CustomClass.encode, - decode=CustomClass.decode, - ) - - # Dynamic characteristic with a bytes value. - c7 = gatt.Characteristic( - CHARACTERISTIC_UUID_BASE + "07", - gatt.Characteristic.Properties.READ | gatt.Characteristic.Properties.WRITE, - gatt.Characteristic.READABLE | gatt.Characteristic.WRITEABLE, - gatt.CharacteristicValue( - read=lambda connection: dynamic_read("bytes"), - write=lambda connection, value: dynamic_write("bytes", value), - ), - ) - - # Dynamic characteristic with a string value. - c8 = gatt.UTF8CharacteristicAdapter( - gatt.Characteristic( - CHARACTERISTIC_UUID_BASE + "08", - gatt.Characteristic.Properties.READ - | gatt.Characteristic.Properties.WRITE, - gatt.Characteristic.READABLE | gatt.Characteristic.WRITEABLE, - gatt.CharacteristicValue( - read=lambda connection: dynamic_read("string"), - write=lambda connection, value: dynamic_write("string", value), - ), - ) - ) - - characteristics: List[ - Union[gatt.Characteristic, gatt.CharacteristicAdapter] - ] = [c1, c2, c3, c4, c5, c6, c7, c8] - - # Listen for read and write events. - for characteristic in characteristics: - characteristic.on( - "read", - lambda _, value, c=characteristic: on_characteristic_read(c, value), - ) - characteristic.on( - "write", - lambda _, value, c=characteristic: on_characteristic_write(c, value), - ) - - device.add_service(gatt.Service(SERVICE_UUID, characteristics)) # type: ignore - # Get things going await device.power_on() - # Connect to a peer - if len(sys.argv) > 2: - await client(device, hci.Address(sys.argv[2])) + if is_client: + # Connect a client to a peer + await client(device, hci.Address(DEFAULT_SERVER_ADDRESS)) else: + # Advertise so a peer can connect await device.start_advertising(auto_restart=True) + # Setup a server + await server(device) + await hci_transport.source.wait_for_termination() diff --git a/tests/gatt_service_test.py b/tests/gatt_service_test.py index 89cc5eac..d8e44776 100644 --- a/tests/gatt_service_test.py +++ b/tests/gatt_service_test.py @@ -16,6 +16,7 @@ # Imports # ----------------------------------------------------------------------------- from __future__ import annotations +import pytest from . import test_utils @@ -25,6 +26,7 @@ from bumble.profiles import gatt_service # ----------------------------------------------------------------------------- +@pytest.mark.asyncio async def test_database_hash(): devices = await test_utils.TwoDevices.create_with_connection() devices[0].gatt_server.services.clear() @@ -118,6 +120,7 @@ async def test_database_hash(): # ----------------------------------------------------------------------------- +@pytest.mark.asyncio async def test_service_changed(): devices = await test_utils.TwoDevices.create_with_connection() assert (service := devices[0].gatt_service) diff --git a/tests/gatt_test.py b/tests/gatt_test.py index fd5f8c6b..b6f46571 100644 --- a/tests/gatt_test.py +++ b/tests/gatt_test.py @@ -17,32 +17,43 @@ # ----------------------------------------------------------------------------- from __future__ import annotations import asyncio +import enum import logging import os import struct import pytest +from typing import Any from typing_extensions import Self from unittest.mock import AsyncMock, Mock, ANY from bumble.controller import Controller -from bumble.gatt_client import CharacteristicProxy from bumble.link import LocalLink from bumble.device import Device, Peer from bumble.host import Host from bumble.gatt import ( GATT_BATTERY_LEVEL_CHARACTERISTIC, GATT_CLIENT_CHARACTERISTIC_CONFIGURATION_DESCRIPTOR, - CharacteristicAdapter, - SerializableCharacteristicAdapter, - DelegatedCharacteristicAdapter, - PackedCharacteristicAdapter, - MappedCharacteristicAdapter, - UTF8CharacteristicAdapter, Service, Characteristic, CharacteristicValue, Descriptor, ) +from bumble.gatt_client import CharacteristicProxy +from bumble.gatt_adapters import ( + CharacteristicProxyAdapter, + SerializableCharacteristicAdapter, + SerializableCharacteristicProxyAdapter, + DelegatedCharacteristicAdapter, + DelegatedCharacteristicProxyAdapter, + PackedCharacteristicAdapter, + PackedCharacteristicProxyAdapter, + MappedCharacteristicAdapter, + MappedCharacteristicProxyAdapter, + UTF8CharacteristicAdapter, + UTF8CharacteristicProxyAdapter, + EnumCharacteristicAdapter, + EnumCharacteristicProxyAdapter, +) from bumble.transport import AsyncPipeSink from bumble.core import UUID from bumble.att import ( @@ -199,7 +210,7 @@ async def test_characteristic_encoding(): await async_barrier() assert characteristic.value == bytes([125]) - cd = DelegatedCharacteristicAdapter(c, encode=lambda x: bytes([x // 2])) + cd = DelegatedCharacteristicProxyAdapter(c, encode=lambda x: bytes([x // 2])) await cd.write_value(100, with_response=True) await async_barrier() assert characteristic.value == bytes([50]) @@ -207,7 +218,7 @@ async def test_characteristic_encoding(): c2 = peer.get_characteristics_by_uuid(async_characteristic.uuid) assert len(c2) == 1 c2 = c2[0] - cd2 = PackedCharacteristicAdapter(c2, ">I") + cd2 = PackedCharacteristicProxyAdapter(c2, ">I") cd2v = await cd2.read_value() assert cd2v == 0x05060708 @@ -249,7 +260,7 @@ async def test_characteristic_encoding(): await async_barrier() assert last_change is None - cd = DelegatedCharacteristicAdapter(c, decode=lambda x: x[0]) + cd = DelegatedCharacteristicProxyAdapter(c, decode=lambda x: x[0]) await cd.subscribe(on_change) await server.notify_subscribers(characteristic) await async_barrier() @@ -314,21 +325,16 @@ async def test_attribute_getters(): # ----------------------------------------------------------------------------- @pytest.mark.asyncio async def test_CharacteristicAdapter() -> None: - # Check that the CharacteristicAdapter base class is transparent v = bytes([1, 2, 3]) - c = Characteristic( + c: Characteristic[Any] = Characteristic( GATT_BATTERY_LEVEL_CHARACTERISTIC, Characteristic.Properties.READ, Characteristic.READABLE, v, ) - a = CharacteristicAdapter(c) - - value = await a.read_value(None) - assert value == v v = bytes([3, 4, 5]) - await a.write_value(None, v) + await c.write_value(None, v) assert c.value == v # Simple delegated adapter @@ -415,11 +421,171 @@ async def test_CharacteristicAdapter() -> None: class_read_value = await class_c.read_value(None) assert class_read_value == class_value_bytes - c.value = b'' + class_c.value = b'' await class_c.write_value(None, class_value_bytes) - assert isinstance(c.value, BlaBla) - assert c.value.a == 3 - assert c.value.b == 4 + assert isinstance(class_c.value, BlaBla) + assert class_c.value.a == class_value.a + assert class_c.value.b == class_value.b + + # Enum adapter + class MyEnum(enum.IntEnum): + ENUM_1 = 1234 + ENUM_2 = 5678 + + enum_value = MyEnum.ENUM_2 + enum_value_bytes = int(enum_value).to_bytes(3, 'big') + c.value = enum_value + enum_c = EnumCharacteristicAdapter(c, MyEnum, 3, 'big') + enum_read_value = await enum_c.read_value(None) + assert enum_read_value == enum_value_bytes + enum_c.value = b'' + await enum_c.write_value(None, enum_value_bytes) + assert isinstance(enum_c.value, MyEnum) + assert enum_c.value == enum_value + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_CharacteristicProxyAdapter() -> None: + class Client: + def __init__(self, value): + self.value = value + + async def read_value(self, handle, no_long_read=False) -> bytes: + return self.value + + async def write_value(self, handle, value, with_response=False): + self.value = value + + class TestAttributeProxy(CharacteristicProxy): + def __init__(self, value) -> None: + super().__init__(Client(value), 0, 0, None, 0) # type: ignore + + @property + def value(self): + return self.client.value + + @value.setter + def value(self, value): + self.client.value = value + + v = bytes([1, 2, 3]) + c = TestAttributeProxy(v) + a: CharacteristicProxyAdapter = CharacteristicProxyAdapter(c) + + value = await a.read_value() + assert value == v + + v = bytes([3, 4, 5]) + await a.write_value(v) + assert c.value == v + + # Simple delegated adapter + delegated = DelegatedCharacteristicProxyAdapter( + c, lambda x: bytes(reversed(x)), lambda x: bytes(reversed(x)) + ) + + delegated_value = await delegated.read_value() + assert delegated_value == bytes(reversed(v)) + + delegated_value2 = bytes([3, 4, 5]) + await delegated.write_value(delegated_value2) + assert c.value == bytes(reversed(delegated_value2)) + + # Packed adapter with single element format + packed_value_ref = 1234 + packed_value_bytes = struct.pack('>H', packed_value_ref) + c.value = packed_value_bytes + packed = PackedCharacteristicProxyAdapter(c, '>H') + + packed_value_read = await packed.read_value() + assert packed_value_read == packed_value_ref + c.value = None + await packed.write_value(packed_value_ref) + assert c.value == packed_value_bytes + + # Packed adapter with multi-element format + v1 = 1234 + v2 = 5678 + packed_multi_value_bytes = struct.pack('>HH', v1, v2) + c.value = packed_multi_value_bytes + packed_multi = PackedCharacteristicProxyAdapter(c, '>HH') + + packed_multi_read_value = await packed_multi.read_value() + assert packed_multi_read_value == (v1, v2) + c.value = b'' + await packed_multi.write_value((v1, v2)) + assert c.value == packed_multi_value_bytes + + # Mapped adapter + v1 = 1234 + v2 = 5678 + packed_mapped_value_bytes = struct.pack('>HH', v1, v2) + mapped = {'v1': v1, 'v2': v2} + c.value = packed_mapped_value_bytes + packed_mapped = MappedCharacteristicProxyAdapter(c, '>HH', ('v1', 'v2')) + + packed_mapped_read_value = await packed_mapped.read_value() + assert packed_mapped_read_value == mapped + c.value = b'' + await packed_mapped.write_value(mapped) + assert c.value == packed_mapped_value_bytes + + # UTF-8 adapter + string_value = 'Hello π' + string_value_bytes = string_value.encode('utf-8') + c.value = string_value_bytes + string_c = UTF8CharacteristicProxyAdapter(c) + + string_read_value = await string_c.read_value() + assert string_read_value == string_value + c.value = b'' + await string_c.write_value(string_value) + assert c.value == string_value_bytes + + # Class adapter + class BlaBla: + def __init__(self, a: int, b: int) -> None: + self.a = a + self.b = b + + @classmethod + def from_bytes(cls, data: bytes) -> Self: + a, b = struct.unpack(">II", data) + return cls(a, b) + + def __bytes__(self) -> bytes: + return struct.pack(">II", self.a, self.b) + + class_value = BlaBla(3, 4) + class_value_bytes = struct.pack(">II", 3, 4) + c.value = class_value_bytes + class_c = SerializableCharacteristicProxyAdapter(c, BlaBla) + + class_read_value = await class_c.read_value() + assert isinstance(class_read_value, BlaBla) + assert class_read_value.a == class_value.a + assert class_read_value.b == class_value.b + c.value = b'' + await class_c.write_value(class_value) + assert c.value == class_value_bytes + + # Enum adapter + class MyEnum(enum.IntEnum): + ENUM_1 = 1234 + ENUM_2 = 5678 + + enum_value = MyEnum.ENUM_1 + enum_value_bytes = int(enum_value).to_bytes(3, 'little') + c.value = enum_value_bytes + enum_c = EnumCharacteristicProxyAdapter(c, MyEnum, 3) + + enum_read_value = await enum_c.read_value() + assert isinstance(enum_read_value, MyEnum) + assert enum_read_value == enum_value + c.value = b'' + await enum_c.write_value(enum_value) + assert c.value == enum_value_bytes # ----------------------------------------------------------------------------- @@ -601,7 +767,7 @@ async def test_read_write2(): v1 = await c1.read_value() assert v1 == v - a1 = PackedCharacteristicAdapter(c1, '>I') + a1 = PackedCharacteristicProxyAdapter(c1, '>I') v1 = await a1.read_value() assert v1 == struct.unpack('>I', v)[0] @@ -1114,6 +1280,7 @@ async def async_main(): await test_CharacteristicValue() await test_CharacteristicValue_async() await test_CharacteristicAdapter() + await test_CharacteristicProxyAdapter() # ----------------------------------------------------------------------------- diff --git a/tests/gmap_test.py b/tests/gmap_test.py index 7e48c232..9901267f 100644 --- a/tests/gmap_test.py +++ b/tests/gmap_test.py @@ -78,7 +78,11 @@ async def test_init_service(gmap_client: GamingAudioServiceProxy): | GmapRole.BROADCAST_GAME_RECEIVER | GmapRole.BROADCAST_GAME_SENDER ) + assert gmap_client.ugg_features is not None assert await gmap_client.ugg_features.read_value() == UggFeatures.UGG_MULTISINK + assert gmap_client.ugt_features is not None assert await gmap_client.ugt_features.read_value() == UgtFeatures.UGT_SOURCE + assert gmap_client.bgr_features is not None assert await gmap_client.bgr_features.read_value() == BgrFeatures.BGR_MULTISINK + assert gmap_client.bgs_features is not None assert await gmap_client.bgs_features.read_value() == BgsFeatures.BGS_96_KBPS