Merge pull request #650 from google/gbg/gatt-adapter-typing

new GATT adapter classes with proper typing support
This commit is contained in:
Gilles Boccon-Gibod
2025-02-23 18:06:16 -08:00
committed by GitHub
31 changed files with 1266 additions and 632 deletions

View File

@@ -514,14 +514,19 @@ async def run_assist(
return
# Subscribe to and read the broadcast receive state characteristics
def on_broadcast_receive_state_update(
value: bass.BroadcastReceiveState, index: int
) -> None:
print(
f"{color(f'Broadcast Receive State Update [{index}]:', 'green')} {value}"
)
for i, broadcast_receive_state in enumerate(
bass_client.broadcast_receive_states
):
try:
await broadcast_receive_state.subscribe(
lambda value, i=i: print(
f"{color(f'Broadcast Receive State Update [{i}]:', 'green')} {value}"
)
functools.partial(on_broadcast_receive_state_update, index=i)
)
except core.ProtocolError as error:
print(

View File

@@ -234,7 +234,7 @@ class GattlinkNodeBridge(GattlinkL2capEndpoint, Device.Listener):
Characteristic.WRITEABLE,
CharacteristicValue(write=self.on_rx_write),
)
self.tx_characteristic = Characteristic(
self.tx_characteristic: Characteristic[bytes] = Characteristic(
GG_GATTLINK_TX_CHARACTERISTIC_UUID,
Characteristic.Properties.NOTIFY,
Characteristic.READABLE,

View File

@@ -29,13 +29,14 @@ import functools
import inspect
import struct
from typing import (
Any,
Awaitable,
Callable,
Generic,
Dict,
List,
Optional,
Type,
TypeVar,
Union,
TYPE_CHECKING,
)
@@ -43,13 +44,18 @@ from typing import (
from pyee import EventEmitter
from bumble import utils
from bumble.core import UUID, name_or_number, ProtocolError
from bumble.core import UUID, name_or_number, InvalidOperationError, ProtocolError
from bumble.hci import HCI_Object, key_with_value
from bumble.colors import color
# -----------------------------------------------------------------------------
# Typing
# -----------------------------------------------------------------------------
if TYPE_CHECKING:
from bumble.device import Connection
_T = TypeVar('_T')
# -----------------------------------------------------------------------------
# Constants
# -----------------------------------------------------------------------------
@@ -748,7 +754,7 @@ class ATT_Handle_Value_Confirmation(ATT_PDU):
# -----------------------------------------------------------------------------
class AttributeValue:
class AttributeValue(Generic[_T]):
'''
Attribute value where reading and/or writing is delegated to functions
passed as arguments to the constructor.
@@ -757,33 +763,34 @@ class AttributeValue:
def __init__(
self,
read: Union[
Callable[[Optional[Connection]], Any],
Callable[[Optional[Connection]], Awaitable[Any]],
Callable[[Optional[Connection]], _T],
Callable[[Optional[Connection]], Awaitable[_T]],
None,
] = None,
write: Union[
Callable[[Optional[Connection], Any], None],
Callable[[Optional[Connection], Any], Awaitable[None]],
Callable[[Optional[Connection], _T], None],
Callable[[Optional[Connection], _T], Awaitable[None]],
None,
] = None,
):
self._read = read
self._write = write
def read(self, connection: Optional[Connection]) -> Union[bytes, Awaitable[bytes]]:
return self._read(connection) if self._read else b''
def read(self, connection: Optional[Connection]) -> Union[_T, Awaitable[_T]]:
if self._read is None:
raise InvalidOperationError('AttributeValue has no read function')
return self._read(connection)
def write(
self, connection: Optional[Connection], value: bytes
self, connection: Optional[Connection], value: _T
) -> Union[Awaitable[None], None]:
if self._write:
return self._write(connection, value)
return None
if self._write is None:
raise InvalidOperationError('AttributeValue has no write function')
return self._write(connection, value)
# -----------------------------------------------------------------------------
class Attribute(EventEmitter):
class Attribute(EventEmitter, Generic[_T]):
class Permissions(enum.IntFlag):
READABLE = 0x01
WRITEABLE = 0x02
@@ -822,13 +829,13 @@ class Attribute(EventEmitter):
READ_REQUIRES_AUTHORIZATION = Permissions.READ_REQUIRES_AUTHORIZATION
WRITE_REQUIRES_AUTHORIZATION = Permissions.WRITE_REQUIRES_AUTHORIZATION
value: Any
value: Union[AttributeValue[_T], _T, None]
def __init__(
self,
attribute_type: Union[str, bytes, UUID],
permissions: Union[str, Attribute.Permissions],
value: Any = b'',
value: Union[AttributeValue[_T], _T, None] = None,
) -> None:
EventEmitter.__init__(self)
self.handle = 0
@@ -848,11 +855,11 @@ class Attribute(EventEmitter):
self.value = value
def encode_value(self, value: Any) -> bytes:
return value
def encode_value(self, value: _T) -> bytes:
return value # type: ignore
def decode_value(self, value_bytes: bytes) -> Any:
return value_bytes
def decode_value(self, value: bytes) -> _T:
return value # type: ignore
async def read_value(self, connection: Optional[Connection]) -> bytes:
if (
@@ -877,11 +884,14 @@ class Attribute(EventEmitter):
error_code=ATT_INSUFFICIENT_AUTHORIZATION_ERROR, att_handle=self.handle
)
if hasattr(self.value, 'read'):
value: Union[_T, None]
if isinstance(self.value, AttributeValue):
try:
value = self.value.read(connection)
if inspect.isawaitable(value):
value = await value
read_value = self.value.read(connection)
if inspect.isawaitable(read_value):
value = await read_value
else:
value = read_value
except ATT_Error as error:
raise ATT_Error(
error_code=error.error_code, att_handle=self.handle
@@ -889,20 +899,24 @@ class Attribute(EventEmitter):
else:
value = self.value
self.emit('read', connection, value)
self.emit('read', connection, b'' if value is None else value)
return self.encode_value(value)
return b'' if value is None else self.encode_value(value)
async def write_value(self, connection: Connection, value_bytes: bytes) -> None:
async def write_value(self, connection: Optional[Connection], value: bytes) -> None:
if (
self.permissions & self.WRITE_REQUIRES_ENCRYPTION
) and not connection.encryption:
(self.permissions & self.WRITE_REQUIRES_ENCRYPTION)
and connection is not None
and not connection.encryption
):
raise ATT_Error(
error_code=ATT_INSUFFICIENT_ENCRYPTION_ERROR, att_handle=self.handle
)
if (
self.permissions & self.WRITE_REQUIRES_AUTHENTICATION
) and not connection.authenticated:
(self.permissions & self.WRITE_REQUIRES_AUTHENTICATION)
and connection is not None
and not connection.authenticated
):
raise ATT_Error(
error_code=ATT_INSUFFICIENT_AUTHENTICATION_ERROR, att_handle=self.handle
)
@@ -912,11 +926,11 @@ class Attribute(EventEmitter):
error_code=ATT_INSUFFICIENT_AUTHORIZATION_ERROR, att_handle=self.handle
)
value = self.decode_value(value_bytes)
decoded_value = self.decode_value(value)
if hasattr(self.value, 'write'):
if isinstance(self.value, AttributeValue):
try:
result = self.value.write(connection, value)
result = self.value.write(connection, decoded_value)
if inspect.isawaitable(result):
await result
except ATT_Error as error:
@@ -924,9 +938,9 @@ class Attribute(EventEmitter):
error_code=error.error_code, att_handle=self.handle
) from error
else:
self.value = value
self.value = decoded_value
self.emit('write', connection, value)
self.emit('write', connection, decoded_value)
def __repr__(self):
if isinstance(self.value, bytes):

View File

@@ -53,7 +53,7 @@ from pyee import EventEmitter
from .colors import color
from .att import ATT_CID, ATT_DEFAULT_MTU, ATT_PDU
from .gatt import Characteristic, Descriptor, Service
from .gatt import Attribute, Characteristic, Descriptor, Service
from .host import DataPacketQueue, Host
from .profiles.gap import GenericAccessService
from .core import (
@@ -2223,7 +2223,7 @@ class Device(CompositeEventEmitter):
permissions=descriptor["permissions"],
)
descriptors.append(new_descriptor)
new_characteristic = Characteristic(
new_characteristic: Characteristic[bytes] = Characteristic(
uuid=characteristic["uuid"],
properties=Characteristic.Properties.from_string(
characteristic["properties"]
@@ -4923,16 +4923,84 @@ class Device(CompositeEventEmitter):
self.gatt_service = gatt_service.GenericAttributeProfileService()
self.gatt_server.add_service(self.gatt_service)
async def notify_subscriber(self, connection, attribute, value=None, force=False):
async def notify_subscriber(
self,
connection: Connection,
attribute: Attribute,
value: Optional[Any] = None,
force: bool = False,
) -> None:
"""
Send a notification to an attribute subscriber.
Args:
connection:
The connection of the subscriber.
attribute:
The attribute whose value is notified.
value:
The value of the attribute (if None, the value is read from the attribute)
force:
If True, send a notification even if there is no subscriber.
"""
await self.gatt_server.notify_subscriber(connection, attribute, value, force)
async def notify_subscribers(self, attribute, value=None, force=False):
async def notify_subscribers(
self, attribute: Attribute, value=None, force=False
) -> None:
"""
Send a notification to all the subscribers of an attribute.
Args:
attribute:
The attribute whose value is notified.
value:
The value of the attribute (if None, the value is read from the attribute)
force:
If True, send a notification for every connection even if there is no
subscriber.
"""
await self.gatt_server.notify_subscribers(attribute, value, force)
async def indicate_subscriber(self, connection, attribute, value=None, force=False):
async def indicate_subscriber(
self,
connection: Connection,
attribute: Attribute,
value: Optional[Any] = None,
force: bool = False,
):
"""
Send an indication to an attribute subscriber.
This method returns when the response to the indication has been received.
Args:
connection:
The connection of the subscriber.
attribute:
The attribute whose value is indicated.
value:
The value of the attribute (if None, the value is read from the attribute)
force:
If True, send an indication even if there is no subscriber.
"""
await self.gatt_server.indicate_subscriber(connection, attribute, value, force)
async def indicate_subscribers(self, attribute, value=None, force=False):
async def indicate_subscribers(
self, attribute: Attribute, value: Optional[Any] = None, force: bool = False
):
"""
Send an indication to all the subscribers of an attribute.
Args:
attribute:
The attribute whose value is notified.
value:
The value of the attribute (if None, the value is read from the attribute)
force:
If True, send an indication for every connection even if there is no
subscriber.
"""
await self.gatt_server.indicate_subscribers(attribute, value, force)
@host_event_handler

View File

@@ -27,28 +27,16 @@ import enum
import functools
import logging
import struct
from typing import (
Any,
Callable,
Dict,
Iterable,
List,
Optional,
Sequence,
SupportsBytes,
Type,
Union,
TYPE_CHECKING,
)
from typing import Iterable, List, Optional, Sequence, TypeVar, Union
from bumble.colors import color
from bumble.core import BaseBumbleError, InvalidOperationError, UUID
from bumble.core import BaseBumbleError, UUID
from bumble.att import Attribute, AttributeValue
from bumble.utils import ByteSerializable
if TYPE_CHECKING:
from bumble.gatt_client import AttributeProxy
# -----------------------------------------------------------------------------
# Typing
# -----------------------------------------------------------------------------
_T = TypeVar('_T')
# -----------------------------------------------------------------------------
# Logging
@@ -436,7 +424,7 @@ class IncludedServiceDeclaration(Attribute):
# -----------------------------------------------------------------------------
class Characteristic(Attribute):
class Characteristic(Attribute[_T]):
'''
See Vol 3, Part G - 3.3 CHARACTERISTIC DEFINITION
'''
@@ -499,7 +487,7 @@ class Characteristic(Attribute):
uuid: Union[str, bytes, UUID],
properties: Characteristic.Properties,
permissions: Union[str, Attribute.Permissions],
value: Any = b'',
value: Union[AttributeValue[_T], _T, None] = None,
descriptors: Sequence[Descriptor] = (),
):
super().__init__(uuid, permissions, value)
@@ -559,217 +547,10 @@ class CharacteristicDeclaration(Attribute):
# -----------------------------------------------------------------------------
class CharacteristicValue(AttributeValue):
class CharacteristicValue(AttributeValue[_T]):
"""Same as AttributeValue, for backward compatibility"""
# -----------------------------------------------------------------------------
class CharacteristicAdapter:
'''
An adapter that can adapt Characteristic and AttributeProxy objects
by wrapping their `read_value()` and `write_value()` methods with ones that
return/accept encoded/decoded values.
For proxies (i.e used by a GATT client), the adaptation is one where the return
value of `read_value()` is decoded and the value passed to `write_value()` is
encoded. The `subscribe()` method, is wrapped with one where the values are decoded
before being passed to the subscriber.
For local values (i.e hosted by a GATT server) the adaptation is one where the
return value of `read_value()` is encoded and the value passed to `write_value()`
is decoded.
'''
read_value: Callable
write_value: Callable
def __init__(self, characteristic: Union[Characteristic, AttributeProxy]):
self.wrapped_characteristic = characteristic
self.subscribers: Dict[Callable, Callable] = (
{}
) # Map from subscriber to proxy subscriber
if isinstance(characteristic, Characteristic):
self.read_value = self.read_encoded_value
self.write_value = self.write_encoded_value
else:
self.read_value = self.read_decoded_value
self.write_value = self.write_decoded_value
self.subscribe = self.wrapped_subscribe
self.unsubscribe = self.wrapped_unsubscribe
def __getattr__(self, name):
return getattr(self.wrapped_characteristic, name)
def __setattr__(self, name, value):
if name in (
'wrapped_characteristic',
'subscribers',
'read_value',
'write_value',
'subscribe',
'unsubscribe',
):
super().__setattr__(name, value)
else:
setattr(self.wrapped_characteristic, name, value)
async def read_encoded_value(self, connection):
return self.encode_value(
await self.wrapped_characteristic.read_value(connection)
)
async def write_encoded_value(self, connection, value):
return await self.wrapped_characteristic.write_value(
connection, self.decode_value(value)
)
async def read_decoded_value(self):
return self.decode_value(await self.wrapped_characteristic.read_value())
async def write_decoded_value(self, value, with_response=False):
return await self.wrapped_characteristic.write_value(
self.encode_value(value), with_response
)
def encode_value(self, value):
return value
def decode_value(self, value):
return value
def wrapped_subscribe(self, subscriber=None):
if subscriber is not None:
if subscriber in self.subscribers:
# We already have a proxy subscriber
subscriber = self.subscribers[subscriber]
else:
# Create and register a proxy that will decode the value
original_subscriber = subscriber
def on_change(value):
original_subscriber(self.decode_value(value))
self.subscribers[subscriber] = on_change
subscriber = on_change
return self.wrapped_characteristic.subscribe(subscriber)
def wrapped_unsubscribe(self, subscriber=None):
if subscriber in self.subscribers:
subscriber = self.subscribers.pop(subscriber)
return self.wrapped_characteristic.unsubscribe(subscriber)
def __str__(self) -> str:
wrapped = str(self.wrapped_characteristic)
return f'{self.__class__.__name__}({wrapped})'
# -----------------------------------------------------------------------------
class DelegatedCharacteristicAdapter(CharacteristicAdapter):
'''
Adapter that converts bytes values using an encode and a decode function.
'''
def __init__(self, characteristic, encode=None, decode=None):
super().__init__(characteristic)
self.encode = encode
self.decode = decode
def encode_value(self, value):
if self.encode is None:
raise InvalidOperationError('delegated adapter does not have an encoder')
return self.encode(value)
def decode_value(self, value):
if self.decode is None:
raise InvalidOperationError('delegate adapter does not have a decoder')
return self.decode(value)
# -----------------------------------------------------------------------------
class PackedCharacteristicAdapter(CharacteristicAdapter):
'''
Adapter that packs/unpacks characteristic values according to a standard
Python `struct` format.
For formats with a single value, the adapted `read_value` and `write_value`
methods return/accept single values. For formats with multiple values,
they return/accept a tuple with the same number of elements as is required for
the format.
'''
def __init__(self, characteristic, pack_format):
super().__init__(characteristic)
self.struct = struct.Struct(pack_format)
def pack(self, *values):
return self.struct.pack(*values)
def unpack(self, buffer):
return self.struct.unpack(buffer)
def encode_value(self, value):
return self.pack(*value if isinstance(value, tuple) else (value,))
def decode_value(self, value):
unpacked = self.unpack(value)
return unpacked[0] if len(unpacked) == 1 else unpacked
# -----------------------------------------------------------------------------
class MappedCharacteristicAdapter(PackedCharacteristicAdapter):
'''
Adapter that packs/unpacks characteristic values according to a standard
Python `struct` format.
The adapted `read_value` and `write_value` methods return/accept a dictionary which
is packed/unpacked according to format, with the arguments extracted from the
dictionary by key, in the same order as they occur in the `keys` parameter.
'''
def __init__(self, characteristic, pack_format, keys):
super().__init__(characteristic, pack_format)
self.keys = keys
# pylint: disable=arguments-differ
def pack(self, values):
return super().pack(*(values[key] for key in self.keys))
def unpack(self, buffer):
return dict(zip(self.keys, super().unpack(buffer)))
# -----------------------------------------------------------------------------
class UTF8CharacteristicAdapter(CharacteristicAdapter):
'''
Adapter that converts strings to/from bytes using UTF-8 encoding
'''
def encode_value(self, value: str) -> bytes:
return value.encode('utf-8')
def decode_value(self, value: bytes) -> str:
return value.decode('utf-8')
# -----------------------------------------------------------------------------
class SerializableCharacteristicAdapter(CharacteristicAdapter):
'''
Adapter that converts any class to/from bytes using the class'
`to_bytes` and `__bytes__` methods, respectively.
'''
def __init__(self, characteristic, cls: Type[ByteSerializable]):
super().__init__(characteristic)
self.cls = cls
def encode_value(self, value: SupportsBytes) -> bytes:
return bytes(value)
def decode_value(self, value: bytes) -> Any:
return self.cls.from_bytes(value)
# -----------------------------------------------------------------------------
class Descriptor(Attribute):
'''

374
bumble/gatt_adapters.py Normal file
View File

@@ -0,0 +1,374 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# GATT - Type Adapters
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import struct
from typing import (
Any,
Callable,
Generic,
Iterable,
Literal,
Optional,
Type,
TypeVar,
)
from bumble.core import InvalidOperationError
from bumble.gatt import Characteristic
from bumble.gatt_client import CharacteristicProxy
from bumble.utils import ByteSerializable, IntConvertible
# -----------------------------------------------------------------------------
# Typing
# -----------------------------------------------------------------------------
_T = TypeVar('_T')
_T2 = TypeVar('_T2', bound=ByteSerializable)
_T3 = TypeVar('_T3', bound=IntConvertible)
# -----------------------------------------------------------------------------
class CharacteristicAdapter(Characteristic, Generic[_T]):
'''Base class for GATT Characteristic adapters.'''
def __init__(self, characteristic: Characteristic) -> None:
super().__init__(
characteristic.uuid,
characteristic.properties,
characteristic.permissions,
characteristic.value,
characteristic.descriptors,
)
# -----------------------------------------------------------------------------
class CharacteristicProxyAdapter(CharacteristicProxy[_T]):
'''Base class for GATT CharacteristicProxy adapters.'''
def __init__(self, characteristic_proxy: CharacteristicProxy):
super().__init__(
characteristic_proxy.client,
characteristic_proxy.handle,
characteristic_proxy.end_group_handle,
characteristic_proxy.uuid,
characteristic_proxy.properties,
)
# -----------------------------------------------------------------------------
class DelegatedCharacteristicAdapter(CharacteristicAdapter[_T]):
'''
Adapter that converts bytes values using an encode and/or a decode function.
'''
def __init__(
self,
characteristic: Characteristic,
encode: Optional[Callable[[_T], bytes]] = None,
decode: Optional[Callable[[bytes], _T]] = None,
):
super().__init__(characteristic)
self.encode = encode
self.decode = decode
def encode_value(self, value: _T) -> bytes:
if self.encode is None:
raise InvalidOperationError('delegated adapter does not have an encoder')
return self.encode(value)
def decode_value(self, value: bytes) -> _T:
if self.decode is None:
raise InvalidOperationError('delegate adapter does not have a decoder')
return self.decode(value)
# -----------------------------------------------------------------------------
class DelegatedCharacteristicProxyAdapter(CharacteristicProxyAdapter[_T]):
'''
Adapter that converts bytes values using an encode and a decode function.
'''
def __init__(
self,
characteristic_proxy: CharacteristicProxy,
encode: Optional[Callable[[_T], bytes]] = None,
decode: Optional[Callable[[bytes], _T]] = None,
):
super().__init__(characteristic_proxy)
self.encode = encode
self.decode = decode
def encode_value(self, value: _T) -> bytes:
if self.encode is None:
raise InvalidOperationError('delegated adapter does not have an encoder')
return self.encode(value)
def decode_value(self, value: bytes) -> _T:
if self.decode is None:
raise InvalidOperationError('delegate adapter does not have a decoder')
return self.decode(value)
# -----------------------------------------------------------------------------
class PackedCharacteristicAdapter(CharacteristicAdapter):
'''
Adapter that packs/unpacks characteristic values according to a standard
Python `struct` format.
For formats with a single value, the adapted `read_value` and `write_value`
methods return/accept single values. For formats with multiple values,
they return/accept a tuple with the same number of elements as is required for
the format.
'''
def __init__(self, characteristic: Characteristic, pack_format: str) -> None:
super().__init__(characteristic)
self.struct = struct.Struct(pack_format)
def pack(self, *values) -> bytes:
return self.struct.pack(*values)
def unpack(self, buffer: bytes) -> tuple:
return self.struct.unpack(buffer)
def encode_value(self, value: Any) -> bytes:
return self.pack(*value if isinstance(value, tuple) else (value,))
def decode_value(self, value: bytes) -> Any:
unpacked = self.unpack(value)
return unpacked[0] if len(unpacked) == 1 else unpacked
# -----------------------------------------------------------------------------
class PackedCharacteristicProxyAdapter(CharacteristicProxyAdapter):
'''
Adapter that packs/unpacks characteristic values according to a standard
Python `struct` format.
For formats with a single value, the adapted `read_value` and `write_value`
methods return/accept single values. For formats with multiple values,
they return/accept a tuple with the same number of elements as is required for
the format.
'''
def __init__(self, characteristic_proxy, pack_format):
super().__init__(characteristic_proxy)
self.struct = struct.Struct(pack_format)
def pack(self, *values) -> bytes:
return self.struct.pack(*values)
def unpack(self, buffer: bytes) -> tuple:
return self.struct.unpack(buffer)
def encode_value(self, value: Any) -> bytes:
return self.pack(*value if isinstance(value, tuple) else (value,))
def decode_value(self, value: bytes) -> Any:
unpacked = self.unpack(value)
return unpacked[0] if len(unpacked) == 1 else unpacked
# -----------------------------------------------------------------------------
class MappedCharacteristicAdapter(PackedCharacteristicAdapter):
'''
Adapter that packs/unpacks characteristic values according to a standard
Python `struct` format.
The adapted `read_value` and `write_value` methods return/accept a dictionary which
is packed/unpacked according to format, with the arguments extracted from the
dictionary by key, in the same order as they occur in the `keys` parameter.
'''
def __init__(
self, characteristic: Characteristic, pack_format: str, keys: Iterable[str]
) -> None:
super().__init__(characteristic, pack_format)
self.keys = keys
# pylint: disable=arguments-differ
def pack(self, values) -> bytes:
return super().pack(*(values[key] for key in self.keys))
def unpack(self, buffer: bytes) -> Any:
return dict(zip(self.keys, super().unpack(buffer)))
# -----------------------------------------------------------------------------
class MappedCharacteristicProxyAdapter(PackedCharacteristicProxyAdapter):
'''
Adapter that packs/unpacks characteristic values according to a standard
Python `struct` format.
The adapted `read_value` and `write_value` methods return/accept a dictionary which
is packed/unpacked according to format, with the arguments extracted from the
dictionary by key, in the same order as they occur in the `keys` parameter.
'''
def __init__(
self,
characteristic_proxy: CharacteristicProxy,
pack_format: str,
keys: Iterable[str],
) -> None:
super().__init__(characteristic_proxy, pack_format)
self.keys = keys
# pylint: disable=arguments-differ
def pack(self, values) -> bytes:
return super().pack(*(values[key] for key in self.keys))
def unpack(self, buffer: bytes) -> Any:
return dict(zip(self.keys, super().unpack(buffer)))
# -----------------------------------------------------------------------------
class UTF8CharacteristicAdapter(CharacteristicAdapter[str]):
'''
Adapter that converts strings to/from bytes using UTF-8 encoding
'''
def encode_value(self, value: str) -> bytes:
return value.encode('utf-8')
def decode_value(self, value: bytes) -> str:
return value.decode('utf-8')
# -----------------------------------------------------------------------------
class UTF8CharacteristicProxyAdapter(CharacteristicProxyAdapter[str]):
'''
Adapter that converts strings to/from bytes using UTF-8 encoding
'''
def encode_value(self, value: str) -> bytes:
return value.encode('utf-8')
def decode_value(self, value: bytes) -> str:
return value.decode('utf-8')
# -----------------------------------------------------------------------------
class SerializableCharacteristicAdapter(CharacteristicAdapter[_T2]):
'''
Adapter that converts any class to/from bytes using the class'
`to_bytes` and `__bytes__` methods, respectively.
'''
def __init__(self, characteristic: Characteristic, cls: Type[_T2]) -> None:
super().__init__(characteristic)
self.cls = cls
def encode_value(self, value: _T2) -> bytes:
return bytes(value)
def decode_value(self, value: bytes) -> _T2:
return self.cls.from_bytes(value)
# -----------------------------------------------------------------------------
class SerializableCharacteristicProxyAdapter(CharacteristicProxyAdapter[_T2]):
'''
Adapter that converts any class to/from bytes using the class'
`to_bytes` and `__bytes__` methods, respectively.
'''
def __init__(
self, characteristic_proxy: CharacteristicProxy, cls: Type[_T2]
) -> None:
super().__init__(characteristic_proxy)
self.cls = cls
def encode_value(self, value: _T2) -> bytes:
return bytes(value)
def decode_value(self, value: bytes) -> _T2:
return self.cls.from_bytes(value)
# -----------------------------------------------------------------------------
class EnumCharacteristicAdapter(CharacteristicAdapter[_T3]):
'''
Adapter that converts int-enum-like classes to/from bytes using the class'
`int().to_bytes()` and `from_bytes()` methods, respectively.
'''
def __init__(
self,
characteristic: Characteristic,
cls: Type[_T3],
length: int,
byteorder: Literal['little', 'big'] = 'little',
):
"""
Initialize an instance.
Params:
characteristic: the Characteristic to adapt to/from
cls: the class to/from which to convert integer values
length: number of bytes used to represent integer values
byteorder: byte order of the byte representation of integers.
"""
super().__init__(characteristic)
self.cls = cls
self.length = length
self.byteorder = byteorder
def encode_value(self, value: _T3) -> bytes:
return int(value).to_bytes(self.length, self.byteorder)
def decode_value(self, value: bytes) -> _T3:
int_value = int.from_bytes(value, self.byteorder)
return self.cls(int_value)
# -----------------------------------------------------------------------------
class EnumCharacteristicProxyAdapter(CharacteristicProxyAdapter[_T3]):
'''
Adapter that converts int-enum-like classes to/from bytes using the class'
`int().to_bytes()` and `from_bytes()` methods, respectively.
'''
def __init__(
self,
characteristic_proxy: CharacteristicProxy,
cls: Type[_T3],
length: int,
byteorder: Literal['little', 'big'] = 'little',
):
"""
Initialize an instance.
Params:
characteristic_proxy: the CharacteristicProxy to adapt to/from
cls: the class to/from which to convert integer values
length: number of bytes used to represent integer values
byteorder: byte order of the byte representation of integers.
"""
super().__init__(characteristic_proxy)
self.cls = cls
self.length = length
self.byteorder = byteorder
def encode_value(self, value: _T3) -> bytes:
return int(value).to_bytes(self.length, self.byteorder)
def decode_value(self, value: bytes) -> _T3:
int_value = int.from_bytes(value, self.byteorder)
a = self.cls(int_value)
return self.cls(int_value)

View File

@@ -29,16 +29,18 @@ import logging
import struct
from datetime import datetime
from typing import (
Any,
Callable,
Dict,
Generic,
Iterable,
List,
Optional,
Dict,
Tuple,
Callable,
Union,
Any,
Iterable,
Type,
Set,
Tuple,
Union,
Type,
TypeVar,
TYPE_CHECKING,
)
@@ -82,9 +84,14 @@ from .gatt import (
TemplateService,
)
# -----------------------------------------------------------------------------
# Typing
# -----------------------------------------------------------------------------
if TYPE_CHECKING:
from bumble.device import Connection
_T = TypeVar('_T')
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
@@ -110,7 +117,7 @@ def show_services(services: Iterable[ServiceProxy]) -> None:
# -----------------------------------------------------------------------------
# Proxies
# -----------------------------------------------------------------------------
class AttributeProxy(EventEmitter):
class AttributeProxy(EventEmitter, Generic[_T]):
def __init__(
self, client: Client, handle: int, end_group_handle: int, attribute_type: UUID
) -> None:
@@ -120,21 +127,21 @@ class AttributeProxy(EventEmitter):
self.end_group_handle = end_group_handle
self.type = attribute_type
async def read_value(self, no_long_read: bool = False) -> bytes:
async def read_value(self, no_long_read: bool = False) -> _T:
return self.decode_value(
await self.client.read_value(self.handle, no_long_read)
)
async def write_value(self, value, with_response=False):
async def write_value(self, value: _T, with_response=False):
return await self.client.write_value(
self.handle, self.encode_value(value), with_response
)
def encode_value(self, value: Any) -> bytes:
return value
def encode_value(self, value: _T) -> bytes:
return value # type: ignore
def decode_value(self, value_bytes: bytes) -> Any:
return value_bytes
def decode_value(self, value: bytes) -> _T:
return value # type: ignore
def __str__(self) -> str:
return f'Attribute(handle=0x{self.handle:04X}, type={self.type})'
@@ -184,19 +191,19 @@ class ServiceProxy(AttributeProxy):
return f'Service(handle=0x{self.handle:04X}, uuid={self.uuid})'
class CharacteristicProxy(AttributeProxy):
class CharacteristicProxy(AttributeProxy[_T]):
properties: Characteristic.Properties
descriptors: List[DescriptorProxy]
subscribers: Dict[Any, Callable[[bytes], Any]]
subscribers: Dict[Any, Callable[[_T], Any]]
def __init__(
self,
client,
handle,
end_group_handle,
uuid,
client: Client,
handle: int,
end_group_handle: int,
uuid: UUID,
properties: int,
):
) -> None:
super().__init__(client, handle, end_group_handle, uuid)
self.uuid = uuid
self.properties = Characteristic.Properties(properties)
@@ -204,21 +211,21 @@ class CharacteristicProxy(AttributeProxy):
self.descriptors_discovered = False
self.subscribers = {} # Map from subscriber to proxy subscriber
def get_descriptor(self, descriptor_type):
def get_descriptor(self, descriptor_type: UUID) -> Optional[DescriptorProxy]:
for descriptor in self.descriptors:
if descriptor.type == descriptor_type:
return descriptor
return None
async def discover_descriptors(self):
async def discover_descriptors(self) -> list[DescriptorProxy]:
return await self.client.discover_descriptors(self)
async def subscribe(
self,
subscriber: Optional[Callable[[bytes], Any]] = None,
subscriber: Optional[Callable[[_T], Any]] = None,
prefer_notify: bool = True,
):
) -> None:
if subscriber is not None:
if subscriber in self.subscribers:
# We already have a proxy subscriber
@@ -233,13 +240,13 @@ class CharacteristicProxy(AttributeProxy):
self.subscribers[subscriber] = on_change
subscriber = on_change
return await self.client.subscribe(self, subscriber, prefer_notify)
await self.client.subscribe(self, subscriber, prefer_notify)
async def unsubscribe(self, subscriber=None, force=False):
async def unsubscribe(self, subscriber=None, force=False) -> None:
if subscriber in self.subscribers:
subscriber = self.subscribers.pop(subscriber)
return await self.client.unsubscribe(self, subscriber, force)
await self.client.unsubscribe(self, subscriber, force)
def __str__(self) -> str:
return (
@@ -250,7 +257,7 @@ class CharacteristicProxy(AttributeProxy):
class DescriptorProxy(AttributeProxy):
def __init__(self, client, handle, descriptor_type):
def __init__(self, client: Client, handle: int, descriptor_type: UUID) -> None:
super().__init__(client, handle, 0, descriptor_type)
def __str__(self) -> str:
@@ -679,7 +686,7 @@ class Client:
properties, handle = struct.unpack_from('<BH', attribute_value)
characteristic_uuid = UUID.from_bytes(attribute_value[3:])
characteristic = CharacteristicProxy(
characteristic: CharacteristicProxy = CharacteristicProxy(
self, handle, 0, characteristic_uuid, properties
)
@@ -805,7 +812,7 @@ class Client:
logger.warning(f'bogus handle value: {attribute_handle}')
return []
attribute = AttributeProxy(
attribute: AttributeProxy = AttributeProxy(
self, attribute_handle, 0, UUID.from_bytes(attribute_uuid)
)
attributes.append(attribute)
@@ -818,7 +825,7 @@ class Client:
async def subscribe(
self,
characteristic: CharacteristicProxy,
subscriber: Optional[Callable[[bytes], Any]] = None,
subscriber: Optional[Callable[[Any], Any]] = None,
prefer_notify: bool = True,
) -> None:
# If we haven't already discovered the descriptors for this characteristic,
@@ -868,7 +875,7 @@ class Client:
async def unsubscribe(
self,
characteristic: CharacteristicProxy,
subscriber: Optional[Callable[[bytes], Any]] = None,
subscriber: Optional[Callable[[Any], Any]] = None,
force: bool = False,
) -> None:
'''

View File

@@ -36,7 +36,6 @@ from typing import (
Tuple,
TypeVar,
Type,
Union,
TYPE_CHECKING,
)
from pyee import EventEmitter
@@ -78,7 +77,6 @@ from bumble.gatt import (
GATT_REQUEST_TIMEOUT,
GATT_SECONDARY_SERVICE_ATTRIBUTE_TYPE,
Characteristic,
CharacteristicAdapter,
CharacteristicDeclaration,
CharacteristicValue,
IncludedServiceDeclaration,
@@ -469,7 +467,7 @@ class Server(EventEmitter):
finally:
self.pending_confirmations[connection.handle] = None
async def notify_or_indicate_subscribers(
async def _notify_or_indicate_subscribers(
self,
indicate: bool,
attribute: Attribute,
@@ -503,7 +501,9 @@ class Server(EventEmitter):
value: Optional[bytes] = None,
force: bool = False,
):
return await self.notify_or_indicate_subscribers(False, attribute, value, force)
return await self._notify_or_indicate_subscribers(
False, attribute, value, force
)
async def indicate_subscribers(
self,
@@ -511,7 +511,7 @@ class Server(EventEmitter):
value: Optional[bytes] = None,
force: bool = False,
):
return await self.notify_or_indicate_subscribers(True, attribute, value, force)
return await self._notify_or_indicate_subscribers(True, attribute, value, force)
def on_disconnection(self, connection: Connection) -> None:
if connection.handle in self.subscribers:

View File

@@ -24,16 +24,13 @@ import struct
from dataclasses import dataclass
from typing import Optional
from bumble import gatt
from bumble.device import Connection
from bumble.att import ATT_Error
from bumble.gatt import (
Attribute,
Characteristic,
SerializableCharacteristicAdapter,
PackedCharacteristicAdapter,
TemplateService,
CharacteristicValue,
UTF8CharacteristicAdapter,
GATT_AUDIO_INPUT_CONTROL_SERVICE,
GATT_AUDIO_INPUT_STATE_CHARACTERISTIC,
GATT_GAIN_SETTINGS_ATTRIBUTE_CHARACTERISTIC,
@@ -42,6 +39,14 @@ from bumble.gatt import (
GATT_AUDIO_INPUT_CONTROL_POINT_CHARACTERISTIC,
GATT_AUDIO_INPUT_DESCRIPTION_CHARACTERISTIC,
)
from bumble.gatt_adapters import (
CharacteristicProxy,
PackedCharacteristicProxyAdapter,
SerializableCharacteristicAdapter,
SerializableCharacteristicProxyAdapter,
UTF8CharacteristicAdapter,
UTF8CharacteristicProxyAdapter,
)
from bumble.gatt_client import ProfileServiceProxy, ServiceProxy
from bumble.utils import OpenIntEnum
@@ -124,7 +129,7 @@ class AudioInputState:
mute: Mute = Mute.NOT_MUTED
gain_mode: GainMode = GainMode.MANUAL
change_counter: int = 0
attribute_value: Optional[CharacteristicValue] = None
attribute: Optional[Attribute] = None
def __bytes__(self) -> bytes:
return bytes(
@@ -151,10 +156,8 @@ class AudioInputState:
self.change_counter = (self.change_counter + 1) % (CHANGE_COUNTER_MAX_VALUE + 1)
async def notify_subscribers_via_connection(self, connection: Connection) -> None:
assert self.attribute_value is not None
await connection.device.notify_subscribers(
attribute=self.attribute_value, value=bytes(self)
)
assert self.attribute is not None
await connection.device.notify_subscribers(attribute=self.attribute)
@dataclass
@@ -315,24 +318,28 @@ class AudioInputDescription:
'''
audio_input_description: str = "Bluetooth"
attribute_value: Optional[CharacteristicValue] = None
attribute: Optional[Attribute] = None
def on_read(self, _connection: Optional[Connection]) -> str:
return self.audio_input_description
async def on_write(self, connection: Optional[Connection], value: str) -> None:
assert connection
assert self.attribute_value
assert self.attribute
self.audio_input_description = value
await connection.device.notify_subscribers(
attribute=self.attribute_value, value=value
)
await connection.device.notify_subscribers(attribute=self.attribute)
class AICSService(TemplateService):
UUID = GATT_AUDIO_INPUT_CONTROL_SERVICE
audio_input_state_characteristic: Characteristic[AudioInputState]
audio_input_type_characteristic: Characteristic[bytes]
audio_input_status_characteristic: Characteristic[bytes]
audio_input_control_point_characteristic: Characteristic[bytes]
gain_settings_properties_characteristic: Characteristic[GainSettingsProperties]
def __init__(
self,
audio_input_state: Optional[AudioInputState] = None,
@@ -374,9 +381,7 @@ class AICSService(TemplateService):
),
AudioInputState,
)
self.audio_input_state.attribute_value = (
self.audio_input_state_characteristic.value
)
self.audio_input_state.attribute = self.audio_input_state_characteristic
self.gain_settings_properties_characteristic = (
SerializableCharacteristicAdapter(
@@ -425,8 +430,8 @@ class AICSService(TemplateService):
),
)
)
self.audio_input_description.attribute_value = (
self.audio_input_control_point_characteristic.value
self.audio_input_description.attribute = (
self.audio_input_control_point_characteristic
)
super().__init__(
@@ -448,24 +453,29 @@ class AICSService(TemplateService):
class AICSServiceProxy(ProfileServiceProxy):
SERVICE_CLASS = AICSService
audio_input_state: CharacteristicProxy[AudioInputState]
gain_settings_properties: CharacteristicProxy[GainSettingsProperties]
audio_input_status: CharacteristicProxy[int]
audio_input_control_point: CharacteristicProxy[bytes]
def __init__(self, service_proxy: ServiceProxy) -> None:
self.service_proxy = service_proxy
self.audio_input_state = SerializableCharacteristicAdapter(
self.audio_input_state = SerializableCharacteristicProxyAdapter(
service_proxy.get_required_characteristic_by_uuid(
GATT_AUDIO_INPUT_STATE_CHARACTERISTIC
),
AudioInputState,
)
self.gain_settings_properties = SerializableCharacteristicAdapter(
self.gain_settings_properties = SerializableCharacteristicProxyAdapter(
service_proxy.get_required_characteristic_by_uuid(
GATT_GAIN_SETTINGS_ATTRIBUTE_CHARACTERISTIC
),
GainSettingsProperties,
)
self.audio_input_status = PackedCharacteristicAdapter(
self.audio_input_status = PackedCharacteristicProxyAdapter(
service_proxy.get_required_characteristic_by_uuid(
GATT_AUDIO_INPUT_STATUS_CHARACTERISTIC
),
@@ -478,7 +488,7 @@ class AICSServiceProxy(ProfileServiceProxy):
)
)
self.audio_input_description = UTF8CharacteristicAdapter(
self.audio_input_description = UTF8CharacteristicProxyAdapter(
service_proxy.get_required_characteristic_by_uuid(
GATT_AUDIO_INPUT_DESCRIPTION_CHARACTERISTIC
)

View File

@@ -134,12 +134,14 @@ class AshaService(gatt.TemplateService):
),
)
self.audio_control_point_characteristic = gatt.Characteristic(
gatt.GATT_ASHA_AUDIO_CONTROL_POINT_CHARACTERISTIC,
gatt.Characteristic.Properties.WRITE
| gatt.Characteristic.Properties.WRITE_WITHOUT_RESPONSE,
gatt.Characteristic.WRITEABLE,
gatt.CharacteristicValue(write=self._on_audio_control_point_write),
self.audio_control_point_characteristic: gatt.Characteristic[bytes] = (
gatt.Characteristic(
gatt.GATT_ASHA_AUDIO_CONTROL_POINT_CHARACTERISTIC,
gatt.Characteristic.Properties.WRITE
| gatt.Characteristic.Properties.WRITE_WITHOUT_RESPONSE,
gatt.Characteristic.WRITEABLE,
gatt.CharacteristicValue(write=self._on_audio_control_point_write),
)
)
self.audio_status_characteristic = gatt.Characteristic(
gatt.GATT_ASHA_AUDIO_STATUS_CHARACTERISTIC,
@@ -147,7 +149,7 @@ class AshaService(gatt.TemplateService):
gatt.Characteristic.READABLE,
bytes([AudioStatus.OK]),
)
self.volume_characteristic = gatt.Characteristic(
self.volume_characteristic: gatt.Characteristic[bytes] = gatt.Characteristic(
gatt.GATT_ASHA_VOLUME_CHARACTERISTIC,
gatt.Characteristic.Properties.WRITE_WITHOUT_RESPONSE,
gatt.Characteristic.WRITEABLE,
@@ -166,13 +168,13 @@ class AshaService(gatt.TemplateService):
struct.pack('<H', self.psm),
)
characteristics = [
characteristics = (
self.read_only_properties_characteristic,
self.audio_control_point_characteristic,
self.audio_status_characteristic,
self.volume_characteristic,
self.le_psm_out_characteristic,
]
)
super().__init__(characteristics)

View File

@@ -20,11 +20,12 @@ from __future__ import annotations
import dataclasses
import logging
import struct
from typing import ClassVar, List, Optional, Sequence
from typing import ClassVar, Optional, Sequence
from bumble import core
from bumble import device
from bumble import gatt
from bumble import gatt_adapters
from bumble import gatt_client
from bumble import hci
from bumble import utils
@@ -52,7 +53,7 @@ def encode_subgroups(subgroups: Sequence[SubgroupInfo]) -> bytes:
)
def decode_subgroups(data: bytes) -> List[SubgroupInfo]:
def decode_subgroups(data: bytes) -> list[SubgroupInfo]:
num_subgroups = data[0]
offset = 1
subgroups = []
@@ -273,7 +274,7 @@ class BroadcastReceiveState:
pa_sync_state: PeriodicAdvertisingSyncState
big_encryption: BigEncryption
bad_code: bytes
subgroups: List[SubgroupInfo]
subgroups: list[SubgroupInfo]
@classmethod
def from_bytes(cls, data: bytes) -> BroadcastReceiveState:
@@ -354,7 +355,9 @@ class BroadcastAudioScanServiceProxy(gatt_client.ProfileServiceProxy):
SERVICE_CLASS = BroadcastAudioScanService
broadcast_audio_scan_control_point: gatt_client.CharacteristicProxy
broadcast_receive_states: List[gatt.DelegatedCharacteristicAdapter]
broadcast_receive_states: list[
gatt_client.CharacteristicProxy[Optional[BroadcastReceiveState]]
]
def __init__(self, service_proxy: gatt_client.ServiceProxy):
self.service_proxy = service_proxy
@@ -366,7 +369,7 @@ class BroadcastAudioScanServiceProxy(gatt_client.ProfileServiceProxy):
)
self.broadcast_receive_states = [
gatt.DelegatedCharacteristicAdapter(
gatt_adapters.DelegatedCharacteristicProxyAdapter(
characteristic,
decode=lambda x: BroadcastReceiveState.from_bytes(x) if x else None,
)

View File

@@ -16,14 +16,20 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from ..gatt_client import ProfileServiceProxy
from ..gatt import (
from typing import Optional
from bumble.gatt_client import ProfileServiceProxy
from bumble.gatt import (
GATT_BATTERY_SERVICE,
GATT_BATTERY_LEVEL_CHARACTERISTIC,
TemplateService,
Characteristic,
CharacteristicValue,
)
from bumble.gatt_client import CharacteristicProxy
from bumble.gatt_adapters import (
PackedCharacteristicAdapter,
PackedCharacteristicProxyAdapter,
)
@@ -32,6 +38,8 @@ class BatteryService(TemplateService):
UUID = GATT_BATTERY_SERVICE
BATTERY_LEVEL_FORMAT = 'B'
battery_level_characteristic: Characteristic[int]
def __init__(self, read_battery_level):
self.battery_level_characteristic = PackedCharacteristicAdapter(
Characteristic(
@@ -49,13 +57,15 @@ class BatteryService(TemplateService):
class BatteryServiceProxy(ProfileServiceProxy):
SERVICE_CLASS = BatteryService
battery_level: Optional[CharacteristicProxy[int]]
def __init__(self, service_proxy):
self.service_proxy = service_proxy
if characteristics := service_proxy.get_characteristics_by_uuid(
GATT_BATTERY_LEVEL_CHARACTERISTIC
):
self.battery_level = PackedCharacteristicAdapter(
self.battery_level = PackedCharacteristicProxyAdapter(
characteristics[0], pack_format=BatteryService.BATTERY_LEVEL_FORMAT
)
else:

View File

@@ -19,7 +19,6 @@
import struct
from typing import Optional, Tuple
from bumble.gatt_client import ServiceProxy, ProfileServiceProxy, CharacteristicProxy
from bumble.gatt import (
GATT_DEVICE_INFORMATION_SERVICE,
GATT_FIRMWARE_REVISION_STRING_CHARACTERISTIC,
@@ -32,9 +31,12 @@ from bumble.gatt import (
GATT_REGULATORY_CERTIFICATION_DATA_LIST_CHARACTERISTIC,
TemplateService,
Characteristic,
DelegatedCharacteristicAdapter,
UTF8CharacteristicAdapter,
)
from bumble.gatt_adapters import (
DelegatedCharacteristicProxyAdapter,
UTF8CharacteristicProxyAdapter,
)
from bumble.gatt_client import CharacteristicProxy, ProfileServiceProxy, ServiceProxy
# -----------------------------------------------------------------------------
@@ -62,7 +64,7 @@ class DeviceInformationService(TemplateService):
ieee_regulatory_certification_data_list: Optional[bytes] = None,
# TODO: pnp_id
):
characteristics = [
characteristics: list[Characteristic[bytes]] = [
Characteristic(
uuid,
Characteristic.Properties.READ,
@@ -107,14 +109,14 @@ class DeviceInformationService(TemplateService):
class DeviceInformationServiceProxy(ProfileServiceProxy):
SERVICE_CLASS = DeviceInformationService
manufacturer_name: Optional[UTF8CharacteristicAdapter]
model_number: Optional[UTF8CharacteristicAdapter]
serial_number: Optional[UTF8CharacteristicAdapter]
hardware_revision: Optional[UTF8CharacteristicAdapter]
firmware_revision: Optional[UTF8CharacteristicAdapter]
software_revision: Optional[UTF8CharacteristicAdapter]
system_id: Optional[DelegatedCharacteristicAdapter]
ieee_regulatory_certification_data_list: Optional[CharacteristicProxy]
manufacturer_name: Optional[CharacteristicProxy[str]]
model_number: Optional[CharacteristicProxy[str]]
serial_number: Optional[CharacteristicProxy[str]]
hardware_revision: Optional[CharacteristicProxy[str]]
firmware_revision: Optional[CharacteristicProxy[str]]
software_revision: Optional[CharacteristicProxy[str]]
system_id: Optional[CharacteristicProxy[tuple[int, int]]]
ieee_regulatory_certification_data_list: Optional[CharacteristicProxy[bytes]]
def __init__(self, service_proxy: ServiceProxy):
self.service_proxy = service_proxy
@@ -128,7 +130,7 @@ class DeviceInformationServiceProxy(ProfileServiceProxy):
('software_revision', GATT_SOFTWARE_REVISION_STRING_CHARACTERISTIC),
):
if characteristics := service_proxy.get_characteristics_by_uuid(uuid):
characteristic = UTF8CharacteristicAdapter(characteristics[0])
characteristic = UTF8CharacteristicProxyAdapter(characteristics[0])
else:
characteristic = None
self.__setattr__(field, characteristic)
@@ -136,7 +138,7 @@ class DeviceInformationServiceProxy(ProfileServiceProxy):
if characteristics := service_proxy.get_characteristics_by_uuid(
GATT_SYSTEM_ID_CHARACTERISTIC
):
self.system_id = DelegatedCharacteristicAdapter(
self.system_id = DelegatedCharacteristicProxyAdapter(
characteristics[0],
encode=lambda v: DeviceInformationService.pack_system_id(*v),
decode=DeviceInformationService.unpack_system_id,

View File

@@ -25,14 +25,15 @@ from bumble.core import Appearance
from bumble.gatt import (
TemplateService,
Characteristic,
CharacteristicAdapter,
DelegatedCharacteristicAdapter,
UTF8CharacteristicAdapter,
GATT_GENERIC_ACCESS_SERVICE,
GATT_DEVICE_NAME_CHARACTERISTIC,
GATT_APPEARANCE_CHARACTERISTIC,
)
from bumble.gatt_client import ProfileServiceProxy, ServiceProxy
from bumble.gatt_adapters import (
DelegatedCharacteristicProxyAdapter,
UTF8CharacteristicProxyAdapter,
)
from bumble.gatt_client import CharacteristicProxy, ProfileServiceProxy, ServiceProxy
# -----------------------------------------------------------------------------
# Logging
@@ -49,6 +50,9 @@ logger = logging.getLogger(__name__)
class GenericAccessService(TemplateService):
UUID = GATT_GENERIC_ACCESS_SERVICE
device_name_characteristic: Characteristic[bytes]
appearance_characteristic: Characteristic[bytes]
def __init__(
self, device_name: str, appearance: Union[Appearance, Tuple[int, int], int] = 0
):
@@ -84,8 +88,8 @@ class GenericAccessService(TemplateService):
class GenericAccessServiceProxy(ProfileServiceProxy):
SERVICE_CLASS = GenericAccessService
device_name: Optional[CharacteristicAdapter]
appearance: Optional[DelegatedCharacteristicAdapter]
device_name: Optional[CharacteristicProxy[str]]
appearance: Optional[CharacteristicProxy[Appearance]]
def __init__(self, service_proxy: ServiceProxy):
self.service_proxy = service_proxy
@@ -93,14 +97,14 @@ class GenericAccessServiceProxy(ProfileServiceProxy):
if characteristics := service_proxy.get_characteristics_by_uuid(
GATT_DEVICE_NAME_CHARACTERISTIC
):
self.device_name = UTF8CharacteristicAdapter(characteristics[0])
self.device_name = UTF8CharacteristicProxyAdapter(characteristics[0])
else:
self.device_name = None
if characteristics := service_proxy.get_characteristics_by_uuid(
GATT_APPEARANCE_CHARACTERISTIC
):
self.appearance = DelegatedCharacteristicAdapter(
self.appearance = DelegatedCharacteristicProxyAdapter(
characteristics[0],
decode=lambda value: Appearance.from_int(
struct.unpack_from('<H', value, 0)[0],

View File

@@ -110,6 +110,7 @@ class GenericAttributeProfileService(gatt.TemplateService):
gatt.GATT_CHARACTERISTIC_ATTRIBUTE_TYPE,
gatt.GATT_CHARACTERISTIC_EXTENDED_PROPERTIES_DESCRIPTOR,
):
assert isinstance(attribute.value, bytes)
return (
struct.pack("<H", attribute.handle)
+ attribute.type.to_bytes()

View File

@@ -22,7 +22,6 @@ from typing import Optional
from bumble.gatt import (
TemplateService,
DelegatedCharacteristicAdapter,
Characteristic,
GATT_GAMING_AUDIO_SERVICE,
GATT_GMAP_ROLE_CHARACTERISTIC,
@@ -31,7 +30,8 @@ from bumble.gatt import (
GATT_BGS_FEATURES_CHARACTERISTIC,
GATT_BGR_FEATURES_CHARACTERISTIC,
)
from bumble.gatt_client import ProfileServiceProxy, ServiceProxy
from bumble.gatt_adapters import DelegatedCharacteristicProxyAdapter
from bumble.gatt_client import CharacteristicProxy, ProfileServiceProxy, ServiceProxy
from enum import IntFlag
@@ -150,10 +150,15 @@ class GamingAudioService(TemplateService):
class GamingAudioServiceProxy(ProfileServiceProxy):
SERVICE_CLASS = GamingAudioService
ugg_features: Optional[CharacteristicProxy[UggFeatures]] = None
ugt_features: Optional[CharacteristicProxy[UgtFeatures]] = None
bgs_features: Optional[CharacteristicProxy[BgsFeatures]] = None
bgr_features: Optional[CharacteristicProxy[BgrFeatures]] = None
def __init__(self, service_proxy: ServiceProxy) -> None:
self.service_proxy = service_proxy
self.gmap_role = DelegatedCharacteristicAdapter(
self.gmap_role = DelegatedCharacteristicProxyAdapter(
service_proxy.get_required_characteristic_by_uuid(
GATT_GMAP_ROLE_CHARACTERISTIC
),
@@ -163,31 +168,31 @@ class GamingAudioServiceProxy(ProfileServiceProxy):
if characteristics := service_proxy.get_characteristics_by_uuid(
GATT_UGG_FEATURES_CHARACTERISTIC
):
self.ugg_features = DelegatedCharacteristicAdapter(
characteristic=characteristics[0],
self.ugg_features = DelegatedCharacteristicProxyAdapter(
characteristics[0],
decode=lambda value: UggFeatures(value[0]),
)
if characteristics := service_proxy.get_characteristics_by_uuid(
GATT_UGT_FEATURES_CHARACTERISTIC
):
self.ugt_features = DelegatedCharacteristicAdapter(
characteristic=characteristics[0],
self.ugt_features = DelegatedCharacteristicProxyAdapter(
characteristics[0],
decode=lambda value: UgtFeatures(value[0]),
)
if characteristics := service_proxy.get_characteristics_by_uuid(
GATT_BGS_FEATURES_CHARACTERISTIC
):
self.bgs_features = DelegatedCharacteristicAdapter(
characteristic=characteristics[0],
self.bgs_features = DelegatedCharacteristicProxyAdapter(
characteristics[0],
decode=lambda value: BgsFeatures(value[0]),
)
if characteristics := service_proxy.get_characteristics_by_uuid(
GATT_BGR_FEATURES_CHARACTERISTIC
):
self.bgr_features = DelegatedCharacteristicAdapter(
characteristic=characteristics[0],
self.bgr_features = DelegatedCharacteristicProxyAdapter(
characteristics[0],
decode=lambda value: BgrFeatures(value[0]),
)

View File

@@ -18,14 +18,15 @@
from __future__ import annotations
import asyncio
import functools
from bumble import att, gatt, gatt_client
from dataclasses import dataclass, field
import logging
from typing import Any, Dict, List, Optional, Set, Union
from bumble import att, gatt, gatt_adapters, gatt_client
from bumble.core import InvalidArgumentError, InvalidStateError
from bumble.device import Device, Connection
from bumble.utils import AsyncRunner, OpenIntEnum
from bumble.hci import Address
from dataclasses import dataclass, field
import logging
from typing import Any, Dict, List, Optional, Set, Union
# -----------------------------------------------------------------------------
@@ -631,11 +632,12 @@ class HearingAccessServiceProxy(gatt_client.ProfileServiceProxy):
hearing_aid_preset_control_point: gatt_client.CharacteristicProxy
preset_control_point_indications: asyncio.Queue
active_preset_index_notification: asyncio.Queue
def __init__(self, service_proxy: gatt_client.ServiceProxy) -> None:
self.service_proxy = service_proxy
self.server_features = gatt.PackedCharacteristicAdapter(
self.server_features = gatt_adapters.PackedCharacteristicProxyAdapter(
service_proxy.get_characteristics_by_uuid(
gatt.GATT_HEARING_AID_FEATURES_CHARACTERISTIC
)[0],
@@ -648,7 +650,7 @@ class HearingAccessServiceProxy(gatt_client.ProfileServiceProxy):
)[0]
)
self.active_preset_index = gatt.PackedCharacteristicAdapter(
self.active_preset_index = gatt_adapters.PackedCharacteristicProxyAdapter(
service_proxy.get_characteristics_by_uuid(
gatt.GATT_ACTIVE_PRESET_INDEX_CHARACTERISTIC
)[0],

View File

@@ -16,13 +16,14 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
from enum import IntEnum
import struct
from typing import Optional
from bumble import core
from ..gatt_client import ProfileServiceProxy
from ..att import ATT_Error
from ..gatt import (
from bumble.att import ATT_Error
from bumble.gatt import (
GATT_HEART_RATE_SERVICE,
GATT_HEART_RATE_MEASUREMENT_CHARACTERISTIC,
GATT_BODY_SENSOR_LOCATION_CHARACTERISTIC,
@@ -30,10 +31,13 @@ from ..gatt import (
TemplateService,
Characteristic,
CharacteristicValue,
SerializableCharacteristicAdapter,
)
from bumble.gatt_adapters import (
DelegatedCharacteristicAdapter,
PackedCharacteristicAdapter,
SerializableCharacteristicAdapter,
)
from bumble.gatt_client import CharacteristicProxy, ProfileServiceProxy
# -----------------------------------------------------------------------------
@@ -43,6 +47,10 @@ class HeartRateService(TemplateService):
CONTROL_POINT_NOT_SUPPORTED = 0x80
RESET_ENERGY_EXPENDED = 0x01
heart_rate_measurement_characteristic: Characteristic[HeartRateMeasurement]
body_sensor_location_characteristic: Characteristic[BodySensorLocation]
heart_rate_control_point_characteristic: Characteristic[int]
class BodySensorLocation(IntEnum):
OTHER = 0
CHEST = 1
@@ -198,6 +206,14 @@ class HeartRateService(TemplateService):
class HeartRateServiceProxy(ProfileServiceProxy):
SERVICE_CLASS = HeartRateService
heart_rate_measurement: Optional[
CharacteristicProxy[HeartRateService.HeartRateMeasurement]
]
body_sensor_location: Optional[
CharacteristicProxy[HeartRateService.BodySensorLocation]
]
heart_rate_control_point: Optional[CharacteristicProxy[int]]
def __init__(self, service_proxy):
self.service_proxy = service_proxy

View File

@@ -208,7 +208,7 @@ class MediaControlService(gatt.TemplateService):
properties=gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.NOTIFY,
permissions=gatt.Characteristic.Permissions.READ_REQUIRES_ENCRYPTION,
value=media_player_name or 'Bumble Player',
value=(media_player_name or 'Bumble Player').encode(),
)
self.track_changed_characteristic = gatt.Characteristic(
uuid=gatt.GATT_TRACK_CHANGED_CHARACTERISTIC,
@@ -247,14 +247,16 @@ class MediaControlService(gatt.TemplateService):
permissions=gatt.Characteristic.Permissions.READ_REQUIRES_ENCRYPTION,
value=b'',
)
self.media_control_point_characteristic = gatt.Characteristic(
uuid=gatt.GATT_MEDIA_CONTROL_POINT_CHARACTERISTIC,
properties=gatt.Characteristic.Properties.WRITE
| gatt.Characteristic.Properties.WRITE_WITHOUT_RESPONSE
| gatt.Characteristic.Properties.NOTIFY,
permissions=gatt.Characteristic.Permissions.READ_REQUIRES_ENCRYPTION
| gatt.Characteristic.Permissions.WRITE_REQUIRES_ENCRYPTION,
value=gatt.CharacteristicValue(write=self.on_media_control_point),
self.media_control_point_characteristic: gatt.Characteristic[bytes] = (
gatt.Characteristic(
uuid=gatt.GATT_MEDIA_CONTROL_POINT_CHARACTERISTIC,
properties=gatt.Characteristic.Properties.WRITE
| gatt.Characteristic.Properties.WRITE_WITHOUT_RESPONSE
| gatt.Characteristic.Properties.NOTIFY,
permissions=gatt.Characteristic.Permissions.READ_REQUIRES_ENCRYPTION
| gatt.Characteristic.Permissions.WRITE_REQUIRES_ENCRYPTION,
value=gatt.CharacteristicValue(write=self.on_media_control_point),
)
)
self.media_control_point_opcodes_supported_characteristic = gatt.Characteristic(
uuid=gatt.GATT_MEDIA_CONTROL_POINT_OPCODES_SUPPORTED_CHARACTERISTIC,

View File

@@ -25,6 +25,7 @@ from typing import Optional, Sequence, Union
from bumble.profiles.bap import AudioLocation, CodecSpecificCapabilities, ContextType
from bumble.profiles import le_audio
from bumble import gatt
from bumble import gatt_adapters
from bumble import gatt_client
from bumble import hci
@@ -185,34 +186,42 @@ class PublishedAudioCapabilitiesService(gatt.TemplateService):
class PublishedAudioCapabilitiesServiceProxy(gatt_client.ProfileServiceProxy):
SERVICE_CLASS = PublishedAudioCapabilitiesService
sink_pac: Optional[gatt.DelegatedCharacteristicAdapter] = None
sink_audio_locations: Optional[gatt.DelegatedCharacteristicAdapter] = None
source_pac: Optional[gatt.DelegatedCharacteristicAdapter] = None
source_audio_locations: Optional[gatt.DelegatedCharacteristicAdapter] = None
available_audio_contexts: gatt.DelegatedCharacteristicAdapter
supported_audio_contexts: gatt.DelegatedCharacteristicAdapter
sink_pac: Optional[gatt_client.CharacteristicProxy[list[PacRecord]]] = None
sink_audio_locations: Optional[gatt_client.CharacteristicProxy[AudioLocation]] = (
None
)
source_pac: Optional[gatt_client.CharacteristicProxy[list[PacRecord]]] = None
source_audio_locations: Optional[gatt_client.CharacteristicProxy[AudioLocation]] = (
None
)
available_audio_contexts: gatt_client.CharacteristicProxy[tuple[ContextType, ...]]
supported_audio_contexts: gatt_client.CharacteristicProxy[tuple[ContextType, ...]]
def __init__(self, service_proxy: gatt_client.ServiceProxy):
self.service_proxy = service_proxy
self.available_audio_contexts = gatt.DelegatedCharacteristicAdapter(
service_proxy.get_required_characteristic_by_uuid(
gatt.GATT_AVAILABLE_AUDIO_CONTEXTS_CHARACTERISTIC
),
decode=lambda x: tuple(map(ContextType, struct.unpack('<HH', x))),
self.available_audio_contexts = (
gatt_adapters.DelegatedCharacteristicProxyAdapter(
service_proxy.get_required_characteristic_by_uuid(
gatt.GATT_AVAILABLE_AUDIO_CONTEXTS_CHARACTERISTIC
),
decode=lambda x: tuple(map(ContextType, struct.unpack('<HH', x))),
)
)
self.supported_audio_contexts = gatt.DelegatedCharacteristicAdapter(
service_proxy.get_required_characteristic_by_uuid(
gatt.GATT_SUPPORTED_AUDIO_CONTEXTS_CHARACTERISTIC
),
decode=lambda x: tuple(map(ContextType, struct.unpack('<HH', x))),
self.supported_audio_contexts = (
gatt_adapters.DelegatedCharacteristicProxyAdapter(
service_proxy.get_required_characteristic_by_uuid(
gatt.GATT_SUPPORTED_AUDIO_CONTEXTS_CHARACTERISTIC
),
decode=lambda x: tuple(map(ContextType, struct.unpack('<HH', x))),
)
)
if characteristics := service_proxy.get_characteristics_by_uuid(
gatt.GATT_SINK_PAC_CHARACTERISTIC
):
self.sink_pac = gatt.DelegatedCharacteristicAdapter(
self.sink_pac = gatt_adapters.DelegatedCharacteristicProxyAdapter(
characteristics[0],
decode=PacRecord.list_from_bytes,
)
@@ -220,7 +229,7 @@ class PublishedAudioCapabilitiesServiceProxy(gatt_client.ProfileServiceProxy):
if characteristics := service_proxy.get_characteristics_by_uuid(
gatt.GATT_SOURCE_PAC_CHARACTERISTIC
):
self.source_pac = gatt.DelegatedCharacteristicAdapter(
self.source_pac = gatt_adapters.DelegatedCharacteristicProxyAdapter(
characteristics[0],
decode=PacRecord.list_from_bytes,
)
@@ -228,15 +237,19 @@ class PublishedAudioCapabilitiesServiceProxy(gatt_client.ProfileServiceProxy):
if characteristics := service_proxy.get_characteristics_by_uuid(
gatt.GATT_SINK_AUDIO_LOCATION_CHARACTERISTIC
):
self.sink_audio_locations = gatt.DelegatedCharacteristicAdapter(
characteristics[0],
decode=lambda x: AudioLocation(struct.unpack('<I', x)[0]),
self.sink_audio_locations = (
gatt_adapters.DelegatedCharacteristicProxyAdapter(
characteristics[0],
decode=lambda x: AudioLocation(struct.unpack('<I', x)[0]),
)
)
if characteristics := service_proxy.get_characteristics_by_uuid(
gatt.GATT_SOURCE_AUDIO_LOCATION_CHARACTERISTIC
):
self.source_audio_locations = gatt.DelegatedCharacteristicAdapter(
characteristics[0],
decode=lambda x: AudioLocation(struct.unpack('<I', x)[0]),
self.source_audio_locations = (
gatt_adapters.DelegatedCharacteristicProxyAdapter(
characteristics[0],
decode=lambda x: AudioLocation(struct.unpack('<I', x)[0]),
)
)

View File

@@ -24,11 +24,11 @@ import struct
from bumble.gatt import (
TemplateService,
Characteristic,
DelegatedCharacteristicAdapter,
GATT_TELEPHONY_AND_MEDIA_AUDIO_SERVICE,
GATT_TMAP_ROLE_CHARACTERISTIC,
)
from bumble.gatt_client import ProfileServiceProxy, ServiceProxy
from bumble.gatt_adapters import DelegatedCharacteristicProxyAdapter
from bumble.gatt_client import CharacteristicProxy, ProfileServiceProxy, ServiceProxy
# -----------------------------------------------------------------------------
@@ -53,6 +53,8 @@ class Role(enum.IntFlag):
class TelephonyAndMediaAudioService(TemplateService):
UUID = GATT_TELEPHONY_AND_MEDIA_AUDIO_SERVICE
role_characteristic: Characteristic[bytes]
def __init__(self, role: Role):
self.role_characteristic = Characteristic(
GATT_TMAP_ROLE_CHARACTERISTIC,
@@ -68,12 +70,12 @@ class TelephonyAndMediaAudioService(TemplateService):
class TelephonyAndMediaAudioServiceProxy(ProfileServiceProxy):
SERVICE_CLASS = TelephonyAndMediaAudioService
role: DelegatedCharacteristicAdapter
role: CharacteristicProxy[Role]
def __init__(self, service_proxy: ServiceProxy):
self.service_proxy = service_proxy
self.role = DelegatedCharacteristicAdapter(
self.role = DelegatedCharacteristicProxyAdapter(
service_proxy.get_required_characteristic_by_uuid(
GATT_TMAP_ROLE_CHARACTERISTIC
),

View File

@@ -25,6 +25,7 @@ from typing import Optional, Sequence
from bumble import att
from bumble import device
from bumble import gatt
from bumble import gatt_adapters
from bumble import gatt_client
@@ -209,14 +210,14 @@ class VolumeControlService(gatt.TemplateService):
class VolumeControlServiceProxy(gatt_client.ProfileServiceProxy):
SERVICE_CLASS = VolumeControlService
volume_control_point: gatt_client.CharacteristicProxy
volume_state: gatt.SerializableCharacteristicAdapter
volume_flags: gatt.DelegatedCharacteristicAdapter
volume_control_point: gatt_client.CharacteristicProxy[bytes]
volume_state: gatt_client.CharacteristicProxy[VolumeState]
volume_flags: gatt_client.CharacteristicProxy[VolumeFlags]
def __init__(self, service_proxy: gatt_client.ServiceProxy) -> None:
self.service_proxy = service_proxy
self.volume_state = gatt.SerializableCharacteristicAdapter(
self.volume_state = gatt_adapters.SerializableCharacteristicProxyAdapter(
service_proxy.get_required_characteristic_by_uuid(
gatt.GATT_VOLUME_STATE_CHARACTERISTIC
),
@@ -227,7 +228,7 @@ class VolumeControlServiceProxy(gatt_client.ProfileServiceProxy):
gatt.GATT_VOLUME_CONTROL_POINT_CHARACTERISTIC
)
self.volume_flags = gatt.DelegatedCharacteristicAdapter(
self.volume_flags = gatt_adapters.DelegatedCharacteristicProxyAdapter(
service_proxy.get_required_characteristic_by_uuid(
gatt.GATT_VOLUME_FLAGS_CHARACTERISTIC
),

View File

@@ -24,17 +24,19 @@ from bumble.device import Connection
from bumble.att import ATT_Error
from bumble.gatt import (
Characteristic,
DelegatedCharacteristicAdapter,
TemplateService,
CharacteristicValue,
SerializableCharacteristicAdapter,
UTF8CharacteristicAdapter,
GATT_VOLUME_OFFSET_CONTROL_SERVICE,
GATT_VOLUME_OFFSET_STATE_CHARACTERISTIC,
GATT_AUDIO_LOCATION_CHARACTERISTIC,
GATT_VOLUME_OFFSET_CONTROL_POINT_CHARACTERISTIC,
GATT_AUDIO_OUTPUT_DESCRIPTION_CHARACTERISTIC,
)
from bumble.gatt_adapters import (
DelegatedCharacteristicProxyAdapter,
SerializableCharacteristicProxyAdapter,
UTF8CharacteristicProxyAdapter,
)
from bumble.gatt_client import ProfileServiceProxy, ServiceProxy
from bumble.utils import OpenIntEnum
from bumble.profiles.bap import AudioLocation
@@ -67,7 +69,7 @@ class ErrorCode(OpenIntEnum):
class VolumeOffsetState:
volume_offset: int = 0
change_counter: int = 0
attribute_value: Optional[CharacteristicValue] = None
attribute: Optional[Characteristic] = None
def __bytes__(self) -> bytes:
return struct.pack('<hB', self.volume_offset, self.change_counter)
@@ -81,8 +83,8 @@ class VolumeOffsetState:
self.change_counter = (self.change_counter + 1) % (CHANGE_COUNTER_MAX_VALUE + 1)
async def notify_subscribers_via_connection(self, connection: Connection) -> None:
assert self.attribute_value is not None
await connection.device.notify_subscribers(attribute=self.attribute_value)
assert self.attribute is not None
await connection.device.notify_subscribers(attribute=self.attribute)
def on_read(self, _connection: Optional[Connection]) -> bytes:
return bytes(self)
@@ -91,7 +93,7 @@ class VolumeOffsetState:
@dataclass
class VocsAudioLocation:
audio_location: AudioLocation = AudioLocation.NOT_ALLOWED
attribute_value: Optional[CharacteristicValue] = None
attribute: Optional[Characteristic] = None
def __bytes__(self) -> bytes:
return struct.pack('<I', self.audio_location)
@@ -106,10 +108,10 @@ class VocsAudioLocation:
async def on_write(self, connection: Optional[Connection], value: bytes) -> None:
assert connection
assert self.attribute_value
assert self.attribute
self.audio_location = AudioLocation(int.from_bytes(value, 'little'))
await connection.device.notify_subscribers(attribute=self.attribute_value)
await connection.device.notify_subscribers(attribute=self.attribute)
@dataclass
@@ -148,7 +150,7 @@ class VolumeOffsetControlPoint:
@dataclass
class AudioOutputDescription:
audio_output_description: str = ''
attribute_value: Optional[CharacteristicValue] = None
attribute: Optional[Characteristic] = None
@classmethod
def from_bytes(cls, data: bytes):
@@ -162,10 +164,10 @@ class AudioOutputDescription:
async def on_write(self, connection: Optional[Connection], value: bytes) -> None:
assert connection
assert self.attribute_value
assert self.attribute
self.audio_output_description = value.decode('utf-8')
await connection.device.notify_subscribers(attribute=self.attribute_value)
await connection.device.notify_subscribers(attribute=self.attribute)
# -----------------------------------------------------------------------------
@@ -197,7 +199,7 @@ class VolumeOffsetControlService(TemplateService):
VolumeOffsetControlPoint(self.volume_offset_state)
)
self.volume_offset_state_characteristic = Characteristic(
self.volume_offset_state_characteristic: Characteristic[bytes] = Characteristic(
uuid=GATT_VOLUME_OFFSET_STATE_CHARACTERISTIC,
properties=(
Characteristic.Properties.READ | Characteristic.Properties.NOTIFY
@@ -206,7 +208,7 @@ class VolumeOffsetControlService(TemplateService):
value=CharacteristicValue(read=self.volume_offset_state.on_read),
)
self.audio_location_characteristic = Characteristic(
self.audio_location_characteristic: Characteristic[bytes] = Characteristic(
uuid=GATT_AUDIO_LOCATION_CHARACTERISTIC,
properties=(
Characteristic.Properties.READ
@@ -222,33 +224,39 @@ class VolumeOffsetControlService(TemplateService):
write=self.audio_location.on_write,
),
)
self.audio_location.attribute_value = self.audio_location_characteristic.value
self.audio_location.attribute = self.audio_location_characteristic
self.volume_offset_control_point_characteristic = Characteristic(
uuid=GATT_VOLUME_OFFSET_CONTROL_POINT_CHARACTERISTIC,
properties=Characteristic.Properties.WRITE,
permissions=Characteristic.Permissions.WRITE_REQUIRES_ENCRYPTION,
value=CharacteristicValue(write=self.volume_offset_control_point.on_write),
self.volume_offset_control_point_characteristic: Characteristic[bytes] = (
Characteristic(
uuid=GATT_VOLUME_OFFSET_CONTROL_POINT_CHARACTERISTIC,
properties=Characteristic.Properties.WRITE,
permissions=Characteristic.Permissions.WRITE_REQUIRES_ENCRYPTION,
value=CharacteristicValue(
write=self.volume_offset_control_point.on_write
),
)
)
self.audio_output_description_characteristic = Characteristic(
uuid=GATT_AUDIO_OUTPUT_DESCRIPTION_CHARACTERISTIC,
properties=(
Characteristic.Properties.READ
| Characteristic.Properties.NOTIFY
| Characteristic.Properties.WRITE_WITHOUT_RESPONSE
),
permissions=(
Characteristic.Permissions.READ_REQUIRES_ENCRYPTION
| Characteristic.Permissions.WRITE_REQUIRES_ENCRYPTION
),
value=CharacteristicValue(
read=self.audio_output_description.on_read,
write=self.audio_output_description.on_write,
),
self.audio_output_description_characteristic: Characteristic[bytes] = (
Characteristic(
uuid=GATT_AUDIO_OUTPUT_DESCRIPTION_CHARACTERISTIC,
properties=(
Characteristic.Properties.READ
| Characteristic.Properties.NOTIFY
| Characteristic.Properties.WRITE_WITHOUT_RESPONSE
),
permissions=(
Characteristic.Permissions.READ_REQUIRES_ENCRYPTION
| Characteristic.Permissions.WRITE_REQUIRES_ENCRYPTION
),
value=CharacteristicValue(
read=self.audio_output_description.on_read,
write=self.audio_output_description.on_write,
),
)
)
self.audio_output_description.attribute_value = (
self.audio_output_description_characteristic.value
self.audio_output_description.attribute = (
self.audio_output_description_characteristic
)
super().__init__(
@@ -271,14 +279,14 @@ class VolumeOffsetControlServiceProxy(ProfileServiceProxy):
def __init__(self, service_proxy: ServiceProxy) -> None:
self.service_proxy = service_proxy
self.volume_offset_state = SerializableCharacteristicAdapter(
self.volume_offset_state = SerializableCharacteristicProxyAdapter(
service_proxy.get_required_characteristic_by_uuid(
GATT_VOLUME_OFFSET_STATE_CHARACTERISTIC
),
VolumeOffsetState,
)
self.audio_location = DelegatedCharacteristicAdapter(
self.audio_location = DelegatedCharacteristicProxyAdapter(
service_proxy.get_required_characteristic_by_uuid(
GATT_AUDIO_LOCATION_CHARACTERISTIC
),
@@ -292,7 +300,7 @@ class VolumeOffsetControlServiceProxy(ProfileServiceProxy):
)
)
self.audio_output_description = UTF8CharacteristicAdapter(
self.audio_output_description = UTF8CharacteristicProxyAdapter(
service_proxy.get_required_characteristic_by_uuid(
GATT_AUDIO_OUTPUT_DESCRIPTION_CHARACTERISTIC
)

View File

@@ -502,3 +502,13 @@ class ByteSerializable(Protocol):
def from_bytes(cls, data: bytes) -> Self: ...
def __bytes__(self) -> bytes: ...
# -----------------------------------------------------------------------------
class IntConvertible(Protocol):
"""
Type protocol for classes that can be instantiated from int and converted to int.
"""
def __init__(self, value: int) -> None: ...
def __int__(self) -> int: ...

View File

@@ -102,7 +102,6 @@ async def main() -> None:
)
# Notify subscribers of the current value as soon as they subscribe
@heart_rate_service.heart_rate_measurement_characteristic.on('subscription')
def on_subscription(connection, notify_enabled, indicate_enabled):
if notify_enabled or indicate_enabled:
AsyncRunner.spawn(
@@ -112,6 +111,10 @@ async def main() -> None:
)
)
heart_rate_service.heart_rate_measurement_characteristic.on(
'subscription', on_subscription
)
# Go!
await device.power_on()
await device.start_advertising(auto_restart=True)

View File

@@ -70,13 +70,13 @@ async def main() -> None:
descriptor = Descriptor(
GATT_CHARACTERISTIC_USER_DESCRIPTION_DESCRIPTOR,
Descriptor.READABLE,
'My Description',
'My Description'.encode(),
)
manufacturer_name_characteristic = Characteristic(
manufacturer_name_characteristic = Characteristic[bytes](
GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC,
Characteristic.Properties.READ,
Characteristic.READABLE,
"Fitbit",
"Fitbit".encode(),
[descriptor],
)
device_info_service = Service(

View File

@@ -94,13 +94,13 @@ async def main() -> None:
descriptor = Descriptor(
GATT_CHARACTERISTIC_USER_DESCRIPTION_DESCRIPTOR,
Descriptor.READABLE,
'My Description',
'My Description'.encode(),
)
manufacturer_name_characteristic = Characteristic(
GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC,
Characteristic.Properties.READ,
Characteristic.READABLE,
'Fitbit',
'Fitbit'.encode(),
[descriptor],
)
device_info_service = Service(

View File

@@ -1,4 +1,4 @@
# Copyright 2024 Google LLC
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -18,6 +18,8 @@
from __future__ import annotations
import asyncio
import dataclasses
import functools
import enum
import logging
import os
import random
@@ -28,6 +30,8 @@ from typing import Any, List, Union
from bumble.device import Device, Peer
from bumble import transport
from bumble import gatt
from bumble import gatt_adapters
from bumble import gatt_client
from bumble import hci
from bumble import core
@@ -36,6 +40,9 @@ from bumble import core
SERVICE_UUID = core.UUID("50DB505C-8AC4-4738-8448-3B1D9CC09CC5")
CHARACTERISTIC_UUID_BASE = "D901B45B-4916-412E-ACCA-0000000000"
DEFAULT_CLIENT_ADDRESS = "F0:F1:F2:F3:F4:F5"
DEFAULT_SERVER_ADDRESS = "F1:F2:F3:F4:F5:F6"
# -----------------------------------------------------------------------------
@dataclasses.dataclass
@@ -65,6 +72,12 @@ class CustomClass:
return struct.pack(">II", self.a, self.b)
# -----------------------------------------------------------------------------
class CustomEnum(enum.IntEnum):
FOO = 1234
BAR = 5678
# -----------------------------------------------------------------------------
async def client(device: Device, address: hci.Address) -> None:
print(f'=== Connecting to {address}...')
@@ -78,8 +91,8 @@ async def client(device: Device, address: hci.Address) -> None:
print("*** Discovery complete")
service = peer.get_services_by_uuid(SERVICE_UUID)[0]
characteristics = []
for index in range(1, 9):
characteristics: list[gatt_client.CharacteristicProxy] = []
for index in range(1, 10):
characteristics.append(
service.get_characteristics_by_uuid(
core.UUID(CHARACTERISTIC_UUID_BASE + f"{index:02X}")
@@ -91,59 +104,92 @@ async def client(device: Device, address: hci.Address) -> None:
value = await characteristic.read_value()
print(f"### {characteristic} = {value!r} ({value.hex()})")
# Subscribe to all characteristics as a raw bytes listener.
def on_raw_characteristic_update(characteristic, value):
print(f"^^^ Update[RAW] {characteristic.uuid} value = {value.hex()}")
for characteristic in characteristics:
await characteristic.subscribe(
functools.partial(on_raw_characteristic_update, characteristic)
)
# Function to subscribe to adapted characteristics
def on_adapted_characteristic_update(characteristic, value):
print(
f"^^^ Update[ADAPTED] {characteristic.uuid} value = {value!r}, "
f"type={type(value)}"
)
# Static characteristic with a bytes value.
c1 = characteristics[0]
c1_value = await c1.read_value()
print(f"@@@ C1 {c1} value = {c1_value!r} (type={type(c1_value)})")
await c1.write_value("happy π day".encode("utf-8"))
await c1.subscribe(functools.partial(on_adapted_characteristic_update, c1))
# Static characteristic with a string value.
c2 = gatt.UTF8CharacteristicAdapter(characteristics[1])
c2 = gatt_adapters.UTF8CharacteristicProxyAdapter(characteristics[1])
c2_value = await c2.read_value()
print(f"@@@ C2 {c2} value = {c2_value} (type={type(c2_value)})")
await c2.write_value("happy π day")
await c2.subscribe(functools.partial(on_adapted_characteristic_update, c2))
# Static characteristic with a tuple value.
c3 = gatt.PackedCharacteristicAdapter(characteristics[2], ">III")
c3 = gatt_adapters.PackedCharacteristicProxyAdapter(characteristics[2], ">III")
c3_value = await c3.read_value()
print(f"@@@ C3 {c3} value = {c3_value} (type={type(c3_value)})")
await c3.write_value((2001, 2002, 2003))
await c3.subscribe(functools.partial(on_adapted_characteristic_update, c3))
# Static characteristic with a named tuple value.
c4 = gatt.MappedCharacteristicAdapter(
c4 = gatt_adapters.MappedCharacteristicProxyAdapter(
characteristics[3], ">III", ["f1", "f2", "f3"]
)
c4_value = await c4.read_value()
print(f"@@@ C4 {c4} value = {c4_value} (type={type(c4_value)})")
await c4.write_value({"f1": 4001, "f2": 4002, "f3": 4003})
await c4.subscribe(functools.partial(on_adapted_characteristic_update, c4))
# Static characteristic with a serializable value.
c5 = gatt.SerializableCharacteristicAdapter(
c5 = gatt_adapters.SerializableCharacteristicProxyAdapter(
characteristics[4], CustomSerializableClass
)
c5_value = await c5.read_value()
print(f"@@@ C5 {c5} value = {c5_value} (type={type(c5_value)})")
await c5.write_value(CustomSerializableClass(56, 57))
await c5.subscribe(functools.partial(on_adapted_characteristic_update, c5))
# Static characteristic with a delegated value.
c6 = gatt.DelegatedCharacteristicAdapter(
c6 = gatt_adapters.DelegatedCharacteristicProxyAdapter(
characteristics[5], encode=CustomClass.encode, decode=CustomClass.decode
)
c6_value = await c6.read_value()
print(f"@@@ C6 {c6} value = {c6_value} (type={type(c6_value)})")
await c6.write_value(CustomClass(6, 7))
await c6.subscribe(functools.partial(on_adapted_characteristic_update, c6))
# Dynamic characteristic with a bytes value.
c7 = characteristics[6]
c7_value = await c7.read_value()
print(f"@@@ C7 {c7} value = {c7_value!r} (type={type(c7_value)})")
await c7.write_value(bytes.fromhex("01020304"))
await c7.subscribe(functools.partial(on_adapted_characteristic_update, c7))
# Dynamic characteristic with a string value.
c8 = gatt.UTF8CharacteristicAdapter(characteristics[7])
c8 = gatt_adapters.UTF8CharacteristicProxyAdapter(characteristics[7])
c8_value = await c8.read_value()
print(f"@@@ C8 {c8} value = {c8_value} (type={type(c8_value)})")
await c8.write_value("howdy")
await c8.subscribe(functools.partial(on_adapted_characteristic_update, c8))
# Static characteristic with an enum value
c9 = gatt_adapters.EnumCharacteristicProxyAdapter(
characteristics[8], CustomEnum, 3, 'big'
)
c9_value = await c9.read_value()
print(f"@@@ C9 {c9} value = {c9_value.name} (type={type(c9_value)})")
await c9.write_value(CustomEnum.BAR)
await c9.subscribe(functools.partial(on_adapted_characteristic_update, c9))
# -----------------------------------------------------------------------------
@@ -175,142 +221,213 @@ def on_characteristic_write(characteristic: gatt.Characteristic, value: Any) ->
print(f"<<< WRITE: {characteristic} <- {value} ({type(value)})")
# -----------------------------------------------------------------------------
async def server(device: Device) -> None:
# Static characteristic with a bytes value.
c1 = gatt.Characteristic(
CHARACTERISTIC_UUID_BASE + "01",
gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.WRITE
| gatt.Characteristic.Properties.NOTIFY,
gatt.Characteristic.READABLE | gatt.Characteristic.WRITEABLE,
b'hello',
)
# Static characteristic with a string value.
c2 = gatt_adapters.UTF8CharacteristicAdapter(
gatt.Characteristic(
CHARACTERISTIC_UUID_BASE + "02",
gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.WRITE
| gatt.Characteristic.Properties.NOTIFY,
gatt.Characteristic.READABLE | gatt.Characteristic.WRITEABLE,
'hello',
)
)
# Static characteristic with a tuple value.
c3 = gatt_adapters.PackedCharacteristicAdapter(
gatt.Characteristic(
CHARACTERISTIC_UUID_BASE + "03",
gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.WRITE
| gatt.Characteristic.Properties.NOTIFY,
gatt.Characteristic.READABLE | gatt.Characteristic.WRITEABLE,
(1007, 1008, 1009),
),
">III",
)
# Static characteristic with a named tuple value.
c4 = gatt_adapters.MappedCharacteristicAdapter(
gatt.Characteristic(
CHARACTERISTIC_UUID_BASE + "04",
gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.WRITE
| gatt.Characteristic.Properties.NOTIFY,
gatt.Characteristic.READABLE | gatt.Characteristic.WRITEABLE,
{"f1": 3007, "f2": 3008, "f3": 3009},
),
">III",
["f1", "f2", "f3"],
)
# Static characteristic with a serializable value.
c5 = gatt_adapters.SerializableCharacteristicAdapter(
gatt.Characteristic(
CHARACTERISTIC_UUID_BASE + "05",
gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.WRITE
| gatt.Characteristic.Properties.NOTIFY,
gatt.Characteristic.READABLE | gatt.Characteristic.WRITEABLE,
CustomSerializableClass(11, 12),
),
CustomSerializableClass,
)
# Static characteristic with a delegated value.
c6 = gatt_adapters.DelegatedCharacteristicAdapter(
gatt.Characteristic(
CHARACTERISTIC_UUID_BASE + "06",
gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.WRITE
| gatt.Characteristic.Properties.NOTIFY,
gatt.Characteristic.READABLE | gatt.Characteristic.WRITEABLE,
CustomClass(1, 2),
),
encode=CustomClass.encode,
decode=CustomClass.decode,
)
# Dynamic characteristic with a bytes value.
c7 = gatt.Characteristic(
CHARACTERISTIC_UUID_BASE + "07",
gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.WRITE
| gatt.Characteristic.Properties.NOTIFY,
gatt.Characteristic.READABLE | gatt.Characteristic.WRITEABLE,
gatt.CharacteristicValue(
read=lambda connection: dynamic_read("bytes"),
write=lambda connection, value: dynamic_write("bytes", value),
),
)
# Dynamic characteristic with a string value.
c8 = gatt_adapters.UTF8CharacteristicAdapter(
gatt.Characteristic(
CHARACTERISTIC_UUID_BASE + "08",
gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.WRITE
| gatt.Characteristic.Properties.NOTIFY,
gatt.Characteristic.READABLE | gatt.Characteristic.WRITEABLE,
gatt.CharacteristicValue(
read=lambda connection: dynamic_read("string"),
write=lambda connection, value: dynamic_write("string", value),
),
)
)
# Static characteristic with an enum value
c9 = gatt_adapters.EnumCharacteristicAdapter(
gatt.Characteristic(
CHARACTERISTIC_UUID_BASE + "09",
gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.WRITE
| gatt.Characteristic.Properties.NOTIFY,
gatt.Characteristic.READABLE | gatt.Characteristic.WRITEABLE,
CustomEnum.FOO,
),
cls=CustomEnum,
length=3,
byteorder='big',
)
characteristics: List[gatt.Characteristic] = [
c1,
c2,
c3,
c4,
c5,
c6,
c7,
c8,
c9,
]
# Listen for read and write events.
for characteristic in characteristics:
characteristic.on(
"read",
lambda _, value, c=characteristic: on_characteristic_read(c, value),
)
characteristic.on(
"write",
lambda _, value, c=characteristic: on_characteristic_write(c, value),
)
device.add_service(gatt.Service(SERVICE_UUID, characteristics))
# Notify every 3 seconds
i = 0
while True:
await asyncio.sleep(3)
# Notifying can be done with the characteristic's current value, or
# by explicitly passing a value to notify with. Both variants are used
# here: for c1..c4 we set the value and then notify, for c4..c9 we notify
# with an explicit value.
c1.value = f'hello c1 {i}'.encode()
await device.notify_subscribers(c1)
c2.value = f'hello c2 {i}'
await device.notify_subscribers(c2)
c3.value = (1000 + i, 2000 + i, 3000 + i)
await device.notify_subscribers(c3)
c4.value = {"f1": 4000 + i, "f2": 5000 + i, "f3": 6000 + i}
await device.notify_subscribers(c4)
await device.notify_subscribers(c5, CustomSerializableClass(1000 + i, 2000 + i))
await device.notify_subscribers(c6, CustomClass(3000 + i, 4000 + i))
await device.notify_subscribers(c7, bytes([1, 2, 3, i % 256]))
await device.notify_subscribers(c8, f'hello c8 {i}')
await device.notify_subscribers(
c9, CustomEnum.FOO if i % 2 == 0 else CustomEnum.BAR
)
i += 1
# -----------------------------------------------------------------------------
async def main() -> None:
if len(sys.argv) < 2:
print("Usage: run_gatt_with_adapters.py <transport-spec> [<bluetooth-address>]")
print("example: run_gatt_with_adapters.py usb:0 E1:CA:72:48:C4:E8")
print("Usage: run_gatt_with_adapters.py <transport-spec> client|server")
print("example: run_gatt_with_adapters.py usb:0 F0:F1:F2:F3:F4:F5")
return
async with await transport.open_transport(sys.argv[1]) as hci_transport:
is_client = sys.argv[2] == "client"
# Create a device to manage the host
device = Device.with_hci(
"Bumble",
hci.Address("F0:F1:F2:F3:F4:F5"),
hci.Address(
DEFAULT_CLIENT_ADDRESS if is_client else DEFAULT_SERVER_ADDRESS
),
hci_transport.source,
hci_transport.sink,
)
# Static characteristic with a bytes value.
c1 = gatt.Characteristic(
CHARACTERISTIC_UUID_BASE + "01",
gatt.Characteristic.Properties.READ | gatt.Characteristic.Properties.WRITE,
gatt.Characteristic.READABLE | gatt.Characteristic.WRITEABLE,
b'hello',
)
# Static characteristic with a string value.
c2 = gatt.UTF8CharacteristicAdapter(
gatt.Characteristic(
CHARACTERISTIC_UUID_BASE + "02",
gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.WRITE,
gatt.Characteristic.READABLE | gatt.Characteristic.WRITEABLE,
'hello',
)
)
# Static characteristic with a tuple value.
c3 = gatt.PackedCharacteristicAdapter(
gatt.Characteristic(
CHARACTERISTIC_UUID_BASE + "03",
gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.WRITE,
gatt.Characteristic.READABLE | gatt.Characteristic.WRITEABLE,
(1007, 1008, 1009),
),
">III",
)
# Static characteristic with a named tuple value.
c4 = gatt.MappedCharacteristicAdapter(
gatt.Characteristic(
CHARACTERISTIC_UUID_BASE + "04",
gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.WRITE,
gatt.Characteristic.READABLE | gatt.Characteristic.WRITEABLE,
{"f1": 3007, "f2": 3008, "f3": 3009},
),
">III",
["f1", "f2", "f3"],
)
# Static characteristic with a serializable value.
c5 = gatt.SerializableCharacteristicAdapter(
gatt.Characteristic(
CHARACTERISTIC_UUID_BASE + "05",
gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.WRITE,
gatt.Characteristic.READABLE | gatt.Characteristic.WRITEABLE,
CustomSerializableClass(11, 12),
),
CustomSerializableClass,
)
# Static characteristic with a delegated value.
c6 = gatt.DelegatedCharacteristicAdapter(
gatt.Characteristic(
CHARACTERISTIC_UUID_BASE + "06",
gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.WRITE,
gatt.Characteristic.READABLE | gatt.Characteristic.WRITEABLE,
CustomClass(1, 2),
),
encode=CustomClass.encode,
decode=CustomClass.decode,
)
# Dynamic characteristic with a bytes value.
c7 = gatt.Characteristic(
CHARACTERISTIC_UUID_BASE + "07",
gatt.Characteristic.Properties.READ | gatt.Characteristic.Properties.WRITE,
gatt.Characteristic.READABLE | gatt.Characteristic.WRITEABLE,
gatt.CharacteristicValue(
read=lambda connection: dynamic_read("bytes"),
write=lambda connection, value: dynamic_write("bytes", value),
),
)
# Dynamic characteristic with a string value.
c8 = gatt.UTF8CharacteristicAdapter(
gatt.Characteristic(
CHARACTERISTIC_UUID_BASE + "08",
gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.WRITE,
gatt.Characteristic.READABLE | gatt.Characteristic.WRITEABLE,
gatt.CharacteristicValue(
read=lambda connection: dynamic_read("string"),
write=lambda connection, value: dynamic_write("string", value),
),
)
)
characteristics: List[
Union[gatt.Characteristic, gatt.CharacteristicAdapter]
] = [c1, c2, c3, c4, c5, c6, c7, c8]
# Listen for read and write events.
for characteristic in characteristics:
characteristic.on(
"read",
lambda _, value, c=characteristic: on_characteristic_read(c, value),
)
characteristic.on(
"write",
lambda _, value, c=characteristic: on_characteristic_write(c, value),
)
device.add_service(gatt.Service(SERVICE_UUID, characteristics)) # type: ignore
# Get things going
await device.power_on()
# Connect to a peer
if len(sys.argv) > 2:
await client(device, hci.Address(sys.argv[2]))
if is_client:
# Connect a client to a peer
await client(device, hci.Address(DEFAULT_SERVER_ADDRESS))
else:
# Advertise so a peer can connect
await device.start_advertising(auto_restart=True)
# Setup a server
await server(device)
await hci_transport.source.wait_for_termination()

View File

@@ -16,6 +16,7 @@
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import pytest
from . import test_utils
@@ -25,6 +26,7 @@ from bumble.profiles import gatt_service
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_database_hash():
devices = await test_utils.TwoDevices.create_with_connection()
devices[0].gatt_server.services.clear()
@@ -118,6 +120,7 @@ async def test_database_hash():
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_service_changed():
devices = await test_utils.TwoDevices.create_with_connection()
assert (service := devices[0].gatt_service)

View File

@@ -17,32 +17,43 @@
# -----------------------------------------------------------------------------
from __future__ import annotations
import asyncio
import enum
import logging
import os
import struct
import pytest
from typing import Any
from typing_extensions import Self
from unittest.mock import AsyncMock, Mock, ANY
from bumble.controller import Controller
from bumble.gatt_client import CharacteristicProxy
from bumble.link import LocalLink
from bumble.device import Device, Peer
from bumble.host import Host
from bumble.gatt import (
GATT_BATTERY_LEVEL_CHARACTERISTIC,
GATT_CLIENT_CHARACTERISTIC_CONFIGURATION_DESCRIPTOR,
CharacteristicAdapter,
SerializableCharacteristicAdapter,
DelegatedCharacteristicAdapter,
PackedCharacteristicAdapter,
MappedCharacteristicAdapter,
UTF8CharacteristicAdapter,
Service,
Characteristic,
CharacteristicValue,
Descriptor,
)
from bumble.gatt_client import CharacteristicProxy
from bumble.gatt_adapters import (
CharacteristicProxyAdapter,
SerializableCharacteristicAdapter,
SerializableCharacteristicProxyAdapter,
DelegatedCharacteristicAdapter,
DelegatedCharacteristicProxyAdapter,
PackedCharacteristicAdapter,
PackedCharacteristicProxyAdapter,
MappedCharacteristicAdapter,
MappedCharacteristicProxyAdapter,
UTF8CharacteristicAdapter,
UTF8CharacteristicProxyAdapter,
EnumCharacteristicAdapter,
EnumCharacteristicProxyAdapter,
)
from bumble.transport import AsyncPipeSink
from bumble.core import UUID
from bumble.att import (
@@ -199,7 +210,7 @@ async def test_characteristic_encoding():
await async_barrier()
assert characteristic.value == bytes([125])
cd = DelegatedCharacteristicAdapter(c, encode=lambda x: bytes([x // 2]))
cd = DelegatedCharacteristicProxyAdapter(c, encode=lambda x: bytes([x // 2]))
await cd.write_value(100, with_response=True)
await async_barrier()
assert characteristic.value == bytes([50])
@@ -207,7 +218,7 @@ async def test_characteristic_encoding():
c2 = peer.get_characteristics_by_uuid(async_characteristic.uuid)
assert len(c2) == 1
c2 = c2[0]
cd2 = PackedCharacteristicAdapter(c2, ">I")
cd2 = PackedCharacteristicProxyAdapter(c2, ">I")
cd2v = await cd2.read_value()
assert cd2v == 0x05060708
@@ -249,7 +260,7 @@ async def test_characteristic_encoding():
await async_barrier()
assert last_change is None
cd = DelegatedCharacteristicAdapter(c, decode=lambda x: x[0])
cd = DelegatedCharacteristicProxyAdapter(c, decode=lambda x: x[0])
await cd.subscribe(on_change)
await server.notify_subscribers(characteristic)
await async_barrier()
@@ -314,21 +325,16 @@ async def test_attribute_getters():
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_CharacteristicAdapter() -> None:
# Check that the CharacteristicAdapter base class is transparent
v = bytes([1, 2, 3])
c = Characteristic(
c: Characteristic[Any] = Characteristic(
GATT_BATTERY_LEVEL_CHARACTERISTIC,
Characteristic.Properties.READ,
Characteristic.READABLE,
v,
)
a = CharacteristicAdapter(c)
value = await a.read_value(None)
assert value == v
v = bytes([3, 4, 5])
await a.write_value(None, v)
await c.write_value(None, v)
assert c.value == v
# Simple delegated adapter
@@ -415,11 +421,171 @@ async def test_CharacteristicAdapter() -> None:
class_read_value = await class_c.read_value(None)
assert class_read_value == class_value_bytes
c.value = b''
class_c.value = b''
await class_c.write_value(None, class_value_bytes)
assert isinstance(c.value, BlaBla)
assert c.value.a == 3
assert c.value.b == 4
assert isinstance(class_c.value, BlaBla)
assert class_c.value.a == class_value.a
assert class_c.value.b == class_value.b
# Enum adapter
class MyEnum(enum.IntEnum):
ENUM_1 = 1234
ENUM_2 = 5678
enum_value = MyEnum.ENUM_2
enum_value_bytes = int(enum_value).to_bytes(3, 'big')
c.value = enum_value
enum_c = EnumCharacteristicAdapter(c, MyEnum, 3, 'big')
enum_read_value = await enum_c.read_value(None)
assert enum_read_value == enum_value_bytes
enum_c.value = b''
await enum_c.write_value(None, enum_value_bytes)
assert isinstance(enum_c.value, MyEnum)
assert enum_c.value == enum_value
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_CharacteristicProxyAdapter() -> None:
class Client:
def __init__(self, value):
self.value = value
async def read_value(self, handle, no_long_read=False) -> bytes:
return self.value
async def write_value(self, handle, value, with_response=False):
self.value = value
class TestAttributeProxy(CharacteristicProxy):
def __init__(self, value) -> None:
super().__init__(Client(value), 0, 0, None, 0) # type: ignore
@property
def value(self):
return self.client.value
@value.setter
def value(self, value):
self.client.value = value
v = bytes([1, 2, 3])
c = TestAttributeProxy(v)
a: CharacteristicProxyAdapter = CharacteristicProxyAdapter(c)
value = await a.read_value()
assert value == v
v = bytes([3, 4, 5])
await a.write_value(v)
assert c.value == v
# Simple delegated adapter
delegated = DelegatedCharacteristicProxyAdapter(
c, lambda x: bytes(reversed(x)), lambda x: bytes(reversed(x))
)
delegated_value = await delegated.read_value()
assert delegated_value == bytes(reversed(v))
delegated_value2 = bytes([3, 4, 5])
await delegated.write_value(delegated_value2)
assert c.value == bytes(reversed(delegated_value2))
# Packed adapter with single element format
packed_value_ref = 1234
packed_value_bytes = struct.pack('>H', packed_value_ref)
c.value = packed_value_bytes
packed = PackedCharacteristicProxyAdapter(c, '>H')
packed_value_read = await packed.read_value()
assert packed_value_read == packed_value_ref
c.value = None
await packed.write_value(packed_value_ref)
assert c.value == packed_value_bytes
# Packed adapter with multi-element format
v1 = 1234
v2 = 5678
packed_multi_value_bytes = struct.pack('>HH', v1, v2)
c.value = packed_multi_value_bytes
packed_multi = PackedCharacteristicProxyAdapter(c, '>HH')
packed_multi_read_value = await packed_multi.read_value()
assert packed_multi_read_value == (v1, v2)
c.value = b''
await packed_multi.write_value((v1, v2))
assert c.value == packed_multi_value_bytes
# Mapped adapter
v1 = 1234
v2 = 5678
packed_mapped_value_bytes = struct.pack('>HH', v1, v2)
mapped = {'v1': v1, 'v2': v2}
c.value = packed_mapped_value_bytes
packed_mapped = MappedCharacteristicProxyAdapter(c, '>HH', ('v1', 'v2'))
packed_mapped_read_value = await packed_mapped.read_value()
assert packed_mapped_read_value == mapped
c.value = b''
await packed_mapped.write_value(mapped)
assert c.value == packed_mapped_value_bytes
# UTF-8 adapter
string_value = 'Hello π'
string_value_bytes = string_value.encode('utf-8')
c.value = string_value_bytes
string_c = UTF8CharacteristicProxyAdapter(c)
string_read_value = await string_c.read_value()
assert string_read_value == string_value
c.value = b''
await string_c.write_value(string_value)
assert c.value == string_value_bytes
# Class adapter
class BlaBla:
def __init__(self, a: int, b: int) -> None:
self.a = a
self.b = b
@classmethod
def from_bytes(cls, data: bytes) -> Self:
a, b = struct.unpack(">II", data)
return cls(a, b)
def __bytes__(self) -> bytes:
return struct.pack(">II", self.a, self.b)
class_value = BlaBla(3, 4)
class_value_bytes = struct.pack(">II", 3, 4)
c.value = class_value_bytes
class_c = SerializableCharacteristicProxyAdapter(c, BlaBla)
class_read_value = await class_c.read_value()
assert isinstance(class_read_value, BlaBla)
assert class_read_value.a == class_value.a
assert class_read_value.b == class_value.b
c.value = b''
await class_c.write_value(class_value)
assert c.value == class_value_bytes
# Enum adapter
class MyEnum(enum.IntEnum):
ENUM_1 = 1234
ENUM_2 = 5678
enum_value = MyEnum.ENUM_1
enum_value_bytes = int(enum_value).to_bytes(3, 'little')
c.value = enum_value_bytes
enum_c = EnumCharacteristicProxyAdapter(c, MyEnum, 3)
enum_read_value = await enum_c.read_value()
assert isinstance(enum_read_value, MyEnum)
assert enum_read_value == enum_value
c.value = b''
await enum_c.write_value(enum_value)
assert c.value == enum_value_bytes
# -----------------------------------------------------------------------------
@@ -601,7 +767,7 @@ async def test_read_write2():
v1 = await c1.read_value()
assert v1 == v
a1 = PackedCharacteristicAdapter(c1, '>I')
a1 = PackedCharacteristicProxyAdapter(c1, '>I')
v1 = await a1.read_value()
assert v1 == struct.unpack('>I', v)[0]
@@ -1114,6 +1280,7 @@ async def async_main():
await test_CharacteristicValue()
await test_CharacteristicValue_async()
await test_CharacteristicAdapter()
await test_CharacteristicProxyAdapter()
# -----------------------------------------------------------------------------

View File

@@ -78,7 +78,11 @@ async def test_init_service(gmap_client: GamingAudioServiceProxy):
| GmapRole.BROADCAST_GAME_RECEIVER
| GmapRole.BROADCAST_GAME_SENDER
)
assert gmap_client.ugg_features is not None
assert await gmap_client.ugg_features.read_value() == UggFeatures.UGG_MULTISINK
assert gmap_client.ugt_features is not None
assert await gmap_client.ugt_features.read_value() == UgtFeatures.UGT_SOURCE
assert gmap_client.bgr_features is not None
assert await gmap_client.bgr_features.read_value() == BgrFeatures.BGR_MULTISINK
assert gmap_client.bgs_features is not None
assert await gmap_client.bgs_features.read_value() == BgsFeatures.BGS_96_KBPS