Add annotation for Heart Rate and Battery Service

This commit is contained in:
Josh Wu
2026-01-08 14:40:02 +08:00
parent 36f81b798c
commit 91a2b4f676
3 changed files with 150 additions and 136 deletions

View File

@@ -16,35 +16,31 @@
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# Imports # Imports
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
from collections.abc import Callable
from bumble.gatt import ( from bumble import device
GATT_BATTERY_LEVEL_CHARACTERISTIC, from bumble import gatt
GATT_BATTERY_SERVICE, from bumble import gatt_adapters
Characteristic, from bumble import gatt_client
CharacteristicValue,
TemplateService,
)
from bumble.gatt_adapters import (
PackedCharacteristicAdapter,
PackedCharacteristicProxyAdapter,
)
from bumble.gatt_client import CharacteristicProxy, ProfileServiceProxy
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class BatteryService(TemplateService): class BatteryService(gatt.TemplateService):
UUID = GATT_BATTERY_SERVICE UUID = gatt.GATT_BATTERY_SERVICE
BATTERY_LEVEL_FORMAT = 'B' BATTERY_LEVEL_FORMAT = 'B'
battery_level_characteristic: Characteristic[int] battery_level_characteristic: gatt.Characteristic[int]
def __init__(self, read_battery_level): def __init__(self, read_battery_level: Callable[[device.Connection], int]) -> None:
self.battery_level_characteristic = PackedCharacteristicAdapter( self.battery_level_characteristic = gatt_adapters.PackedCharacteristicAdapter(
Characteristic( gatt.Characteristic(
GATT_BATTERY_LEVEL_CHARACTERISTIC, gatt.GATT_BATTERY_LEVEL_CHARACTERISTIC,
Characteristic.Properties.READ | Characteristic.Properties.NOTIFY, properties=(
Characteristic.READABLE, gatt.Characteristic.Properties.READ
CharacteristicValue(read=read_battery_level), | gatt.Characteristic.Properties.NOTIFY
),
permissions=gatt.Characteristic.READABLE,
value=gatt.CharacteristicValue(read=read_battery_level),
), ),
pack_format=BatteryService.BATTERY_LEVEL_FORMAT, pack_format=BatteryService.BATTERY_LEVEL_FORMAT,
) )
@@ -52,18 +48,18 @@ class BatteryService(TemplateService):
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class BatteryServiceProxy(ProfileServiceProxy): class BatteryServiceProxy(gatt_client.ProfileServiceProxy):
SERVICE_CLASS = BatteryService SERVICE_CLASS = BatteryService
battery_level: CharacteristicProxy[int] | None battery_level: gatt_client.CharacteristicProxy[int] | None
def __init__(self, service_proxy): def __init__(self, service_proxy: gatt_client.ServiceProxy) -> None:
self.service_proxy = service_proxy self.service_proxy = service_proxy
if characteristics := service_proxy.get_characteristics_by_uuid( if characteristics := service_proxy.get_characteristics_by_uuid(
GATT_BATTERY_LEVEL_CHARACTERISTIC gatt.GATT_BATTERY_LEVEL_CHARACTERISTIC
): ):
self.battery_level = PackedCharacteristicProxyAdapter( self.battery_level = gatt_adapters.PackedCharacteristicProxyAdapter(
characteristics[0], pack_format=BatteryService.BATTERY_LEVEL_FORMAT characteristics[0], pack_format=BatteryService.BATTERY_LEVEL_FORMAT
) )
else: else:

View File

@@ -18,40 +18,35 @@
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
from __future__ import annotations from __future__ import annotations
import dataclasses
from typing import Any
from typing_extensions import Self
from collections.abc import Sequence, Callable
import struct import struct
from enum import IntEnum import enum
from bumble import core from bumble import core
from bumble.att import ATT_Error from bumble import device
from bumble.gatt import ( from bumble import utils
GATT_BODY_SENSOR_LOCATION_CHARACTERISTIC, from bumble import att
GATT_HEART_RATE_CONTROL_POINT_CHARACTERISTIC, from bumble import gatt
GATT_HEART_RATE_MEASUREMENT_CHARACTERISTIC, from bumble import gatt_adapters
GATT_HEART_RATE_SERVICE, from bumble import gatt_client
Characteristic,
CharacteristicValue,
TemplateService,
)
from bumble.gatt_adapters import (
DelegatedCharacteristicAdapter,
PackedCharacteristicAdapter,
SerializableCharacteristicAdapter,
)
from bumble.gatt_client import CharacteristicProxy, ProfileServiceProxy
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class HeartRateService(TemplateService): class HeartRateService(gatt.TemplateService):
UUID = GATT_HEART_RATE_SERVICE UUID = gatt.GATT_HEART_RATE_SERVICE
HEART_RATE_CONTROL_POINT_FORMAT = 'B' HEART_RATE_CONTROL_POINT_FORMAT = 'B'
CONTROL_POINT_NOT_SUPPORTED = 0x80 CONTROL_POINT_NOT_SUPPORTED = 0x80
RESET_ENERGY_EXPENDED = 0x01 RESET_ENERGY_EXPENDED = 0x01
heart_rate_measurement_characteristic: Characteristic[HeartRateMeasurement] heart_rate_measurement_characteristic: gatt.Characteristic[HeartRateMeasurement]
body_sensor_location_characteristic: Characteristic[BodySensorLocation] body_sensor_location_characteristic: gatt.Characteristic[BodySensorLocation]
heart_rate_control_point_characteristic: Characteristic[int] heart_rate_control_point_characteristic: gatt.Characteristic[int]
class BodySensorLocation(IntEnum): class BodySensorLocation(utils.OpenIntEnum):
OTHER = 0 OTHER = 0
CHEST = 1 CHEST = 1
WRIST = 2 WRIST = 2
@@ -60,56 +55,58 @@ class HeartRateService(TemplateService):
EAR_LOBE = 5 EAR_LOBE = 5
FOOT = 6 FOOT = 6
@dataclasses.dataclass
class HeartRateMeasurement: class HeartRateMeasurement:
def __init__( heart_rate: int
self, sensor_contact_detected: bool | None = None
heart_rate, energy_expended: int | None = None
sensor_contact_detected=None, rr_intervals: Sequence[int] | None = None
energy_expended=None,
rr_intervals=None, class Flag(enum.IntFlag):
): INT16_HEART_RATE = 1 << 0
if heart_rate < 0 or heart_rate > 0xFFFF: SENSOR_CONTACT_DETECTED = 1 << 1
SENSOR_CONTACT_SUPPORTED = 1 << 2
ENERGY_EXPENDED_STATUS = 1 << 3
RR_INTERVAL = 1 << 3
def __post_init__(self) -> None:
if self.heart_rate < 0 or self.heart_rate > 0xFFFF:
raise core.InvalidArgumentError('heart_rate out of range') raise core.InvalidArgumentError('heart_rate out of range')
if energy_expended is not None and ( if self.energy_expended is not None and (
energy_expended < 0 or energy_expended > 0xFFFF self.energy_expended < 0 or self.energy_expended > 0xFFFF
): ):
raise core.InvalidArgumentError('energy_expended out of range') raise core.InvalidArgumentError('energy_expended out of range')
if rr_intervals: if self.rr_intervals:
for rr_interval in rr_intervals: for rr_interval in self.rr_intervals:
if rr_interval < 0 or rr_interval * 1024 > 0xFFFF: if rr_interval < 0 or rr_interval * 1024 > 0xFFFF:
raise core.InvalidArgumentError('rr_intervals out of range') raise core.InvalidArgumentError('rr_intervals out of range')
self.heart_rate = heart_rate
self.sensor_contact_detected = sensor_contact_detected
self.energy_expended = energy_expended
self.rr_intervals = rr_intervals
@classmethod @classmethod
def from_bytes(cls, data): def from_bytes(cls, data: bytes) -> Self:
flags = data[0] flags = data[0]
offset = 1 offset = 1
if flags & 1: if flags & cls.Flag.INT16_HEART_RATE:
hr = struct.unpack_from('<H', data, offset)[0] heart_rate = struct.unpack_from('<H', data, offset)[0]
offset += 2 offset += 2
else: else:
hr = struct.unpack_from('B', data, offset)[0] heart_rate = struct.unpack_from('B', data, offset)[0]
offset += 1 offset += 1
if flags & (1 << 2): if flags & cls.Flag.SENSOR_CONTACT_SUPPORTED:
sensor_contact_detected = flags & (1 << 1) != 0 sensor_contact_detected = flags & cls.Flag.SENSOR_CONTACT_DETECTED != 0
else: else:
sensor_contact_detected = None sensor_contact_detected = None
if flags & (1 << 3): if flags & cls.Flag.ENERGY_EXPENDED_STATUS:
energy_expended = struct.unpack_from('<H', data, offset)[0] energy_expended = struct.unpack_from('<H', data, offset)[0]
offset += 2 offset += 2
else: else:
energy_expended = None energy_expended = None
if flags & (1 << 4): if flags & cls.Flag.RR_INTERVAL:
rr_intervals = tuple( rr_intervals = tuple(
struct.unpack_from('<H', data, offset + i * 2)[0] / 1024 struct.unpack_from('<H', data, offset + i * 2)[0] / 1024
for i in range((len(data) - offset) // 2) for i in range((len(data) - offset) // 2)
@@ -117,25 +114,32 @@ class HeartRateService(TemplateService):
else: else:
rr_intervals = () rr_intervals = ()
return cls(hr, sensor_contact_detected, energy_expended, rr_intervals) return cls(
heart_rate=heart_rate,
sensor_contact_detected=sensor_contact_detected,
energy_expended=energy_expended,
rr_intervals=rr_intervals,
)
def __bytes__(self): def __bytes__(self) -> bytes:
flags = 0
if self.heart_rate < 256: if self.heart_rate < 256:
flags = 0
data = struct.pack('B', self.heart_rate) data = struct.pack('B', self.heart_rate)
else: else:
flags = 1 flags |= self.Flag.INT16_HEART_RATE
data = struct.pack('<H', self.heart_rate) data = struct.pack('<H', self.heart_rate)
if self.sensor_contact_detected is not None: if self.sensor_contact_detected is not None:
flags |= ((1 if self.sensor_contact_detected else 0) << 1) | (1 << 2) flags |= self.Flag.SENSOR_CONTACT_SUPPORTED
if self.sensor_contact_detected:
flags |= self.Flag.SENSOR_CONTACT_DETECTED
if self.energy_expended is not None: if self.energy_expended is not None:
flags |= 1 << 3 flags |= self.Flag.ENERGY_EXPENDED_STATUS
data += struct.pack('<H', self.energy_expended) data += struct.pack('<H', self.energy_expended)
if self.rr_intervals: if self.rr_intervals is not None:
flags |= 1 << 4 flags |= self.Flag.RR_INTERVAL
data += b''.join( data += b''.join(
[ [
struct.pack('<H', int(rr_interval * 1024)) struct.pack('<H', int(rr_interval * 1024))
@@ -145,37 +149,41 @@ class HeartRateService(TemplateService):
return bytes([flags]) + data return bytes([flags]) + data
def __str__(self):
return (
f'HeartRateMeasurement(heart_rate={self.heart_rate},'
f' sensor_contact_detected={self.sensor_contact_detected},'
f' energy_expended={self.energy_expended},'
f' rr_intervals={self.rr_intervals})'
)
def __init__( def __init__(
self, self,
read_heart_rate_measurement, read_heart_rate_measurement: Callable[
body_sensor_location=None, [device.Connection], HeartRateMeasurement
reset_energy_expended=None, ],
body_sensor_location: HeartRateService.BodySensorLocation | None = None,
reset_energy_expended: Callable[[device.Connection], Any] | None = None,
): ):
self.heart_rate_measurement_characteristic = SerializableCharacteristicAdapter( self.heart_rate_measurement_characteristic = (
Characteristic( gatt_adapters.SerializableCharacteristicAdapter(
GATT_HEART_RATE_MEASUREMENT_CHARACTERISTIC, gatt.Characteristic(
Characteristic.Properties.NOTIFY, uuid=gatt.GATT_HEART_RATE_MEASUREMENT_CHARACTERISTIC,
0, properties=gatt.Characteristic.Properties.NOTIFY,
CharacteristicValue(read=read_heart_rate_measurement), permissions=gatt.Characteristic.Permissions(0),
), value=gatt.CharacteristicValue(read=read_heart_rate_measurement),
HeartRateService.HeartRateMeasurement, ),
HeartRateService.HeartRateMeasurement,
)
) )
characteristics = [self.heart_rate_measurement_characteristic] characteristics: list[gatt.Characteristic] = [
self.heart_rate_measurement_characteristic
]
if body_sensor_location is not None: if body_sensor_location is not None:
self.body_sensor_location_characteristic = Characteristic( self.body_sensor_location_characteristic = (
GATT_BODY_SENSOR_LOCATION_CHARACTERISTIC, gatt_adapters.EnumCharacteristicAdapter(
Characteristic.Properties.READ, gatt.Characteristic(
Characteristic.READABLE, uuid=gatt.GATT_BODY_SENSOR_LOCATION_CHARACTERISTIC,
bytes([int(body_sensor_location)]), properties=gatt.Characteristic.Properties.READ,
permissions=gatt.Characteristic.READABLE,
value=body_sensor_location,
),
cls=self.BodySensorLocation,
length=1,
)
) )
characteristics.append(self.body_sensor_location_characteristic) characteristics.append(self.body_sensor_location_characteristic)
@@ -186,16 +194,20 @@ class HeartRateService(TemplateService):
if reset_energy_expended is not None: if reset_energy_expended is not None:
reset_energy_expended(connection) reset_energy_expended(connection)
else: else:
raise ATT_Error(self.CONTROL_POINT_NOT_SUPPORTED) raise att.ATT_Error(self.CONTROL_POINT_NOT_SUPPORTED)
self.heart_rate_control_point_characteristic = PackedCharacteristicAdapter( self.heart_rate_control_point_characteristic = (
Characteristic( gatt_adapters.PackedCharacteristicAdapter(
GATT_HEART_RATE_CONTROL_POINT_CHARACTERISTIC, gatt.Characteristic(
Characteristic.Properties.WRITE, uuid=gatt.GATT_HEART_RATE_CONTROL_POINT_CHARACTERISTIC,
Characteristic.WRITEABLE, properties=gatt.Characteristic.Properties.WRITE,
CharacteristicValue(write=write_heart_rate_control_point_value), permissions=gatt.Characteristic.WRITEABLE,
), value=gatt.CharacteristicValue(
pack_format=HeartRateService.HEART_RATE_CONTROL_POINT_FORMAT, write=write_heart_rate_control_point_value
),
),
pack_format=HeartRateService.HEART_RATE_CONTROL_POINT_FORMAT,
)
) )
characteristics.append(self.heart_rate_control_point_characteristic) characteristics.append(self.heart_rate_control_point_characteristic)
@@ -203,50 +215,56 @@ class HeartRateService(TemplateService):
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class HeartRateServiceProxy(ProfileServiceProxy): class HeartRateServiceProxy(gatt_client.ProfileServiceProxy):
SERVICE_CLASS = HeartRateService SERVICE_CLASS = HeartRateService
heart_rate_measurement: ( heart_rate_measurement: (
CharacteristicProxy[HeartRateService.HeartRateMeasurement] | None gatt_client.CharacteristicProxy[HeartRateService.HeartRateMeasurement] | None
) )
body_sensor_location: ( body_sensor_location: (
CharacteristicProxy[HeartRateService.BodySensorLocation] | None gatt_client.CharacteristicProxy[HeartRateService.BodySensorLocation] | None
) )
heart_rate_control_point: CharacteristicProxy[int] | None heart_rate_control_point: gatt_client.CharacteristicProxy[int] | None
def __init__(self, service_proxy): def __init__(self, service_proxy: gatt_client.ServiceProxy) -> None:
self.service_proxy = service_proxy self.service_proxy = service_proxy
if characteristics := service_proxy.get_characteristics_by_uuid( if characteristics := service_proxy.get_characteristics_by_uuid(
GATT_HEART_RATE_MEASUREMENT_CHARACTERISTIC gatt.GATT_HEART_RATE_MEASUREMENT_CHARACTERISTIC
): ):
self.heart_rate_measurement = SerializableCharacteristicAdapter( self.heart_rate_measurement = (
characteristics[0], HeartRateService.HeartRateMeasurement gatt_adapters.SerializableCharacteristicProxyAdapter(
characteristics[0], HeartRateService.HeartRateMeasurement
)
) )
else: else:
self.heart_rate_measurement = None self.heart_rate_measurement = None
if characteristics := service_proxy.get_characteristics_by_uuid( if characteristics := service_proxy.get_characteristics_by_uuid(
GATT_BODY_SENSOR_LOCATION_CHARACTERISTIC gatt.GATT_BODY_SENSOR_LOCATION_CHARACTERISTIC
): ):
self.body_sensor_location = DelegatedCharacteristicAdapter( self.body_sensor_location = (
characteristics[0], gatt_adapters.DelegatedCharacteristicProxyAdapter(
decode=lambda value: HeartRateService.BodySensorLocation(value[0]), characteristics[0],
decode=lambda value: HeartRateService.BodySensorLocation(value[0]),
)
) )
else: else:
self.body_sensor_location = None self.body_sensor_location = None
if characteristics := service_proxy.get_characteristics_by_uuid( if characteristics := service_proxy.get_characteristics_by_uuid(
GATT_HEART_RATE_CONTROL_POINT_CHARACTERISTIC gatt.GATT_HEART_RATE_CONTROL_POINT_CHARACTERISTIC
): ):
self.heart_rate_control_point = PackedCharacteristicAdapter( self.heart_rate_control_point = (
characteristics[0], gatt_adapters.PackedCharacteristicProxyAdapter(
pack_format=HeartRateService.HEART_RATE_CONTROL_POINT_FORMAT, characteristics[0],
pack_format=HeartRateService.HEART_RATE_CONTROL_POINT_FORMAT,
)
) )
else: else:
self.heart_rate_control_point = None self.heart_rate_control_point = None
async def reset_energy_expended(self): async def reset_energy_expended(self) -> None:
if self.heart_rate_control_point is not None: if self.heart_rate_control_point is not None:
return await self.heart_rate_control_point.write_value( return await self.heart_rate_control_point.write_value(
HeartRateService.RESET_ENERGY_EXPENDED HeartRateService.RESET_ENERGY_EXPENDED

View File

@@ -71,8 +71,8 @@ async def main() -> None:
rr_intervals=random.choice( rr_intervals=random.choice(
( (
( (
random.randint(900, 1100) / 1000, random.randint(900, 1100) // 1000,
random.randint(900, 1100) / 1000, random.randint(900, 1100) // 1000,
), ),
None, None,
) )