From 91a2b4f67651edcd7c656c1483e40213c956074a Mon Sep 17 00:00:00 2001 From: Josh Wu Date: Thu, 8 Jan 2026 14:40:02 +0800 Subject: [PATCH] Add annotation for Heart Rate and Battery Service --- bumble/profiles/battery_service.py | 50 +++--- bumble/profiles/heart_rate_service.py | 232 ++++++++++++++------------ examples/heart_rate_server.py | 4 +- 3 files changed, 150 insertions(+), 136 deletions(-) diff --git a/bumble/profiles/battery_service.py b/bumble/profiles/battery_service.py index f0318d6..b2d69fa 100644 --- a/bumble/profiles/battery_service.py +++ b/bumble/profiles/battery_service.py @@ -16,35 +16,31 @@ # ----------------------------------------------------------------------------- # Imports # ----------------------------------------------------------------------------- +from collections.abc import Callable -from bumble.gatt import ( - GATT_BATTERY_LEVEL_CHARACTERISTIC, - GATT_BATTERY_SERVICE, - Characteristic, - CharacteristicValue, - TemplateService, -) -from bumble.gatt_adapters import ( - PackedCharacteristicAdapter, - PackedCharacteristicProxyAdapter, -) -from bumble.gatt_client import CharacteristicProxy, ProfileServiceProxy +from bumble import device +from bumble import gatt +from bumble import gatt_adapters +from bumble import gatt_client # ----------------------------------------------------------------------------- -class BatteryService(TemplateService): - UUID = GATT_BATTERY_SERVICE +class BatteryService(gatt.TemplateService): + UUID = gatt.GATT_BATTERY_SERVICE BATTERY_LEVEL_FORMAT = 'B' - battery_level_characteristic: Characteristic[int] + battery_level_characteristic: gatt.Characteristic[int] - def __init__(self, read_battery_level): - self.battery_level_characteristic = PackedCharacteristicAdapter( - Characteristic( - GATT_BATTERY_LEVEL_CHARACTERISTIC, - Characteristic.Properties.READ | Characteristic.Properties.NOTIFY, - Characteristic.READABLE, - CharacteristicValue(read=read_battery_level), + def __init__(self, read_battery_level: Callable[[device.Connection], int]) -> None: + self.battery_level_characteristic = gatt_adapters.PackedCharacteristicAdapter( + gatt.Characteristic( + gatt.GATT_BATTERY_LEVEL_CHARACTERISTIC, + properties=( + gatt.Characteristic.Properties.READ + | gatt.Characteristic.Properties.NOTIFY + ), + permissions=gatt.Characteristic.READABLE, + value=gatt.CharacteristicValue(read=read_battery_level), ), pack_format=BatteryService.BATTERY_LEVEL_FORMAT, ) @@ -52,18 +48,18 @@ class BatteryService(TemplateService): # ----------------------------------------------------------------------------- -class BatteryServiceProxy(ProfileServiceProxy): +class BatteryServiceProxy(gatt_client.ProfileServiceProxy): SERVICE_CLASS = BatteryService - battery_level: CharacteristicProxy[int] | None + battery_level: gatt_client.CharacteristicProxy[int] | None - def __init__(self, service_proxy): + def __init__(self, service_proxy: gatt_client.ServiceProxy) -> None: self.service_proxy = service_proxy if characteristics := service_proxy.get_characteristics_by_uuid( - GATT_BATTERY_LEVEL_CHARACTERISTIC + gatt.GATT_BATTERY_LEVEL_CHARACTERISTIC ): - self.battery_level = PackedCharacteristicProxyAdapter( + self.battery_level = gatt_adapters.PackedCharacteristicProxyAdapter( characteristics[0], pack_format=BatteryService.BATTERY_LEVEL_FORMAT ) else: diff --git a/bumble/profiles/heart_rate_service.py b/bumble/profiles/heart_rate_service.py index b31e726..83198ef 100644 --- a/bumble/profiles/heart_rate_service.py +++ b/bumble/profiles/heart_rate_service.py @@ -18,40 +18,35 @@ # ----------------------------------------------------------------------------- from __future__ import annotations +import dataclasses +from typing import Any +from typing_extensions import Self +from collections.abc import Sequence, Callable import struct -from enum import IntEnum +import enum from bumble import core -from bumble.att import ATT_Error -from bumble.gatt import ( - GATT_BODY_SENSOR_LOCATION_CHARACTERISTIC, - GATT_HEART_RATE_CONTROL_POINT_CHARACTERISTIC, - GATT_HEART_RATE_MEASUREMENT_CHARACTERISTIC, - GATT_HEART_RATE_SERVICE, - Characteristic, - CharacteristicValue, - TemplateService, -) -from bumble.gatt_adapters import ( - DelegatedCharacteristicAdapter, - PackedCharacteristicAdapter, - SerializableCharacteristicAdapter, -) -from bumble.gatt_client import CharacteristicProxy, ProfileServiceProxy +from bumble import device +from bumble import utils +from bumble import att +from bumble import gatt +from bumble import gatt_adapters +from bumble import gatt_client # ----------------------------------------------------------------------------- -class HeartRateService(TemplateService): - UUID = GATT_HEART_RATE_SERVICE +class HeartRateService(gatt.TemplateService): + UUID = gatt.GATT_HEART_RATE_SERVICE + HEART_RATE_CONTROL_POINT_FORMAT = 'B' CONTROL_POINT_NOT_SUPPORTED = 0x80 RESET_ENERGY_EXPENDED = 0x01 - heart_rate_measurement_characteristic: Characteristic[HeartRateMeasurement] - body_sensor_location_characteristic: Characteristic[BodySensorLocation] - heart_rate_control_point_characteristic: Characteristic[int] + heart_rate_measurement_characteristic: gatt.Characteristic[HeartRateMeasurement] + body_sensor_location_characteristic: gatt.Characteristic[BodySensorLocation] + heart_rate_control_point_characteristic: gatt.Characteristic[int] - class BodySensorLocation(IntEnum): + class BodySensorLocation(utils.OpenIntEnum): OTHER = 0 CHEST = 1 WRIST = 2 @@ -60,56 +55,58 @@ class HeartRateService(TemplateService): EAR_LOBE = 5 FOOT = 6 + @dataclasses.dataclass class HeartRateMeasurement: - def __init__( - self, - heart_rate, - sensor_contact_detected=None, - energy_expended=None, - rr_intervals=None, - ): - if heart_rate < 0 or heart_rate > 0xFFFF: + heart_rate: int + sensor_contact_detected: bool | None = None + energy_expended: int | None = None + rr_intervals: Sequence[int] | None = None + + class Flag(enum.IntFlag): + INT16_HEART_RATE = 1 << 0 + SENSOR_CONTACT_DETECTED = 1 << 1 + SENSOR_CONTACT_SUPPORTED = 1 << 2 + ENERGY_EXPENDED_STATUS = 1 << 3 + RR_INTERVAL = 1 << 3 + + def __post_init__(self) -> None: + if self.heart_rate < 0 or self.heart_rate > 0xFFFF: raise core.InvalidArgumentError('heart_rate out of range') - if energy_expended is not None and ( - energy_expended < 0 or energy_expended > 0xFFFF + if self.energy_expended is not None and ( + self.energy_expended < 0 or self.energy_expended > 0xFFFF ): raise core.InvalidArgumentError('energy_expended out of range') - if rr_intervals: - for rr_interval in rr_intervals: + if self.rr_intervals: + for rr_interval in self.rr_intervals: if rr_interval < 0 or rr_interval * 1024 > 0xFFFF: raise core.InvalidArgumentError('rr_intervals out of range') - self.heart_rate = heart_rate - self.sensor_contact_detected = sensor_contact_detected - self.energy_expended = energy_expended - self.rr_intervals = rr_intervals - @classmethod - def from_bytes(cls, data): + def from_bytes(cls, data: bytes) -> Self: flags = data[0] offset = 1 - if flags & 1: - hr = struct.unpack_from(' bytes: + flags = 0 if self.heart_rate < 256: - flags = 0 data = struct.pack('B', self.heart_rate) else: - flags = 1 + flags |= self.Flag.INT16_HEART_RATE data = struct.pack(' None: self.service_proxy = service_proxy if characteristics := service_proxy.get_characteristics_by_uuid( - GATT_HEART_RATE_MEASUREMENT_CHARACTERISTIC + gatt.GATT_HEART_RATE_MEASUREMENT_CHARACTERISTIC ): - self.heart_rate_measurement = SerializableCharacteristicAdapter( - characteristics[0], HeartRateService.HeartRateMeasurement + self.heart_rate_measurement = ( + gatt_adapters.SerializableCharacteristicProxyAdapter( + characteristics[0], HeartRateService.HeartRateMeasurement + ) ) else: self.heart_rate_measurement = None if characteristics := service_proxy.get_characteristics_by_uuid( - GATT_BODY_SENSOR_LOCATION_CHARACTERISTIC + gatt.GATT_BODY_SENSOR_LOCATION_CHARACTERISTIC ): - self.body_sensor_location = DelegatedCharacteristicAdapter( - characteristics[0], - decode=lambda value: HeartRateService.BodySensorLocation(value[0]), + self.body_sensor_location = ( + gatt_adapters.DelegatedCharacteristicProxyAdapter( + characteristics[0], + decode=lambda value: HeartRateService.BodySensorLocation(value[0]), + ) ) else: self.body_sensor_location = None if characteristics := service_proxy.get_characteristics_by_uuid( - GATT_HEART_RATE_CONTROL_POINT_CHARACTERISTIC + gatt.GATT_HEART_RATE_CONTROL_POINT_CHARACTERISTIC ): - self.heart_rate_control_point = PackedCharacteristicAdapter( - characteristics[0], - pack_format=HeartRateService.HEART_RATE_CONTROL_POINT_FORMAT, + self.heart_rate_control_point = ( + gatt_adapters.PackedCharacteristicProxyAdapter( + characteristics[0], + pack_format=HeartRateService.HEART_RATE_CONTROL_POINT_FORMAT, + ) ) else: self.heart_rate_control_point = None - async def reset_energy_expended(self): + async def reset_energy_expended(self) -> None: if self.heart_rate_control_point is not None: return await self.heart_rate_control_point.write_value( HeartRateService.RESET_ENERGY_EXPENDED diff --git a/examples/heart_rate_server.py b/examples/heart_rate_server.py index 67494df..a24ef92 100644 --- a/examples/heart_rate_server.py +++ b/examples/heart_rate_server.py @@ -71,8 +71,8 @@ async def main() -> None: rr_intervals=random.choice( ( ( - random.randint(900, 1100) / 1000, - random.randint(900, 1100) / 1000, + random.randint(900, 1100) // 1000, + random.randint(900, 1100) // 1000, ), None, )