mirror of
https://github.com/google/bumble.git
synced 2026-04-16 00:25:31 +00:00
Merge pull request #856 from zxzxwu/typing
Add annotation for Heart Rate and Battery Service
This commit is contained in:
@@ -16,35 +16,31 @@
|
||||
# -----------------------------------------------------------------------------
|
||||
# Imports
|
||||
# -----------------------------------------------------------------------------
|
||||
from collections.abc import Callable
|
||||
|
||||
from bumble.gatt import (
|
||||
GATT_BATTERY_LEVEL_CHARACTERISTIC,
|
||||
GATT_BATTERY_SERVICE,
|
||||
Characteristic,
|
||||
CharacteristicValue,
|
||||
TemplateService,
|
||||
)
|
||||
from bumble.gatt_adapters import (
|
||||
PackedCharacteristicAdapter,
|
||||
PackedCharacteristicProxyAdapter,
|
||||
)
|
||||
from bumble.gatt_client import CharacteristicProxy, ProfileServiceProxy
|
||||
from bumble import device
|
||||
from bumble import gatt
|
||||
from bumble import gatt_adapters
|
||||
from bumble import gatt_client
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
class BatteryService(TemplateService):
|
||||
UUID = GATT_BATTERY_SERVICE
|
||||
class BatteryService(gatt.TemplateService):
|
||||
UUID = gatt.GATT_BATTERY_SERVICE
|
||||
BATTERY_LEVEL_FORMAT = 'B'
|
||||
|
||||
battery_level_characteristic: Characteristic[int]
|
||||
battery_level_characteristic: gatt.Characteristic[int]
|
||||
|
||||
def __init__(self, read_battery_level):
|
||||
self.battery_level_characteristic = PackedCharacteristicAdapter(
|
||||
Characteristic(
|
||||
GATT_BATTERY_LEVEL_CHARACTERISTIC,
|
||||
Characteristic.Properties.READ | Characteristic.Properties.NOTIFY,
|
||||
Characteristic.READABLE,
|
||||
CharacteristicValue(read=read_battery_level),
|
||||
def __init__(self, read_battery_level: Callable[[device.Connection], int]) -> None:
|
||||
self.battery_level_characteristic = gatt_adapters.PackedCharacteristicAdapter(
|
||||
gatt.Characteristic(
|
||||
gatt.GATT_BATTERY_LEVEL_CHARACTERISTIC,
|
||||
properties=(
|
||||
gatt.Characteristic.Properties.READ
|
||||
| gatt.Characteristic.Properties.NOTIFY
|
||||
),
|
||||
permissions=gatt.Characteristic.READABLE,
|
||||
value=gatt.CharacteristicValue(read=read_battery_level),
|
||||
),
|
||||
pack_format=BatteryService.BATTERY_LEVEL_FORMAT,
|
||||
)
|
||||
@@ -52,18 +48,18 @@ class BatteryService(TemplateService):
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
class BatteryServiceProxy(ProfileServiceProxy):
|
||||
class BatteryServiceProxy(gatt_client.ProfileServiceProxy):
|
||||
SERVICE_CLASS = BatteryService
|
||||
|
||||
battery_level: CharacteristicProxy[int] | None
|
||||
battery_level: gatt_client.CharacteristicProxy[int] | None
|
||||
|
||||
def __init__(self, service_proxy):
|
||||
def __init__(self, service_proxy: gatt_client.ServiceProxy) -> None:
|
||||
self.service_proxy = service_proxy
|
||||
|
||||
if characteristics := service_proxy.get_characteristics_by_uuid(
|
||||
GATT_BATTERY_LEVEL_CHARACTERISTIC
|
||||
gatt.GATT_BATTERY_LEVEL_CHARACTERISTIC
|
||||
):
|
||||
self.battery_level = PackedCharacteristicProxyAdapter(
|
||||
self.battery_level = gatt_adapters.PackedCharacteristicProxyAdapter(
|
||||
characteristics[0], pack_format=BatteryService.BATTERY_LEVEL_FORMAT
|
||||
)
|
||||
else:
|
||||
|
||||
@@ -18,40 +18,35 @@
|
||||
# -----------------------------------------------------------------------------
|
||||
from __future__ import annotations
|
||||
|
||||
import dataclasses
|
||||
from typing import Any
|
||||
from typing_extensions import Self
|
||||
from collections.abc import Sequence, Callable
|
||||
import struct
|
||||
from enum import IntEnum
|
||||
import enum
|
||||
|
||||
from bumble import core
|
||||
from bumble.att import ATT_Error
|
||||
from bumble.gatt import (
|
||||
GATT_BODY_SENSOR_LOCATION_CHARACTERISTIC,
|
||||
GATT_HEART_RATE_CONTROL_POINT_CHARACTERISTIC,
|
||||
GATT_HEART_RATE_MEASUREMENT_CHARACTERISTIC,
|
||||
GATT_HEART_RATE_SERVICE,
|
||||
Characteristic,
|
||||
CharacteristicValue,
|
||||
TemplateService,
|
||||
)
|
||||
from bumble.gatt_adapters import (
|
||||
DelegatedCharacteristicAdapter,
|
||||
PackedCharacteristicAdapter,
|
||||
SerializableCharacteristicAdapter,
|
||||
)
|
||||
from bumble.gatt_client import CharacteristicProxy, ProfileServiceProxy
|
||||
from bumble import device
|
||||
from bumble import utils
|
||||
from bumble import att
|
||||
from bumble import gatt
|
||||
from bumble import gatt_adapters
|
||||
from bumble import gatt_client
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
class HeartRateService(TemplateService):
|
||||
UUID = GATT_HEART_RATE_SERVICE
|
||||
class HeartRateService(gatt.TemplateService):
|
||||
UUID = gatt.GATT_HEART_RATE_SERVICE
|
||||
|
||||
HEART_RATE_CONTROL_POINT_FORMAT = 'B'
|
||||
CONTROL_POINT_NOT_SUPPORTED = 0x80
|
||||
RESET_ENERGY_EXPENDED = 0x01
|
||||
|
||||
heart_rate_measurement_characteristic: Characteristic[HeartRateMeasurement]
|
||||
body_sensor_location_characteristic: Characteristic[BodySensorLocation]
|
||||
heart_rate_control_point_characteristic: Characteristic[int]
|
||||
heart_rate_measurement_characteristic: gatt.Characteristic[HeartRateMeasurement]
|
||||
body_sensor_location_characteristic: gatt.Characteristic[BodySensorLocation]
|
||||
heart_rate_control_point_characteristic: gatt.Characteristic[int]
|
||||
|
||||
class BodySensorLocation(IntEnum):
|
||||
class BodySensorLocation(utils.OpenIntEnum):
|
||||
OTHER = 0
|
||||
CHEST = 1
|
||||
WRIST = 2
|
||||
@@ -60,56 +55,58 @@ class HeartRateService(TemplateService):
|
||||
EAR_LOBE = 5
|
||||
FOOT = 6
|
||||
|
||||
@dataclasses.dataclass
|
||||
class HeartRateMeasurement:
|
||||
def __init__(
|
||||
self,
|
||||
heart_rate,
|
||||
sensor_contact_detected=None,
|
||||
energy_expended=None,
|
||||
rr_intervals=None,
|
||||
):
|
||||
if heart_rate < 0 or heart_rate > 0xFFFF:
|
||||
heart_rate: int
|
||||
sensor_contact_detected: bool | None = None
|
||||
energy_expended: int | None = None
|
||||
rr_intervals: Sequence[int] | None = None
|
||||
|
||||
class Flag(enum.IntFlag):
|
||||
INT16_HEART_RATE = 1 << 0
|
||||
SENSOR_CONTACT_DETECTED = 1 << 1
|
||||
SENSOR_CONTACT_SUPPORTED = 1 << 2
|
||||
ENERGY_EXPENDED_STATUS = 1 << 3
|
||||
RR_INTERVAL = 1 << 3
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
if self.heart_rate < 0 or self.heart_rate > 0xFFFF:
|
||||
raise core.InvalidArgumentError('heart_rate out of range')
|
||||
|
||||
if energy_expended is not None and (
|
||||
energy_expended < 0 or energy_expended > 0xFFFF
|
||||
if self.energy_expended is not None and (
|
||||
self.energy_expended < 0 or self.energy_expended > 0xFFFF
|
||||
):
|
||||
raise core.InvalidArgumentError('energy_expended out of range')
|
||||
|
||||
if rr_intervals:
|
||||
for rr_interval in rr_intervals:
|
||||
if self.rr_intervals:
|
||||
for rr_interval in self.rr_intervals:
|
||||
if rr_interval < 0 or rr_interval * 1024 > 0xFFFF:
|
||||
raise core.InvalidArgumentError('rr_intervals out of range')
|
||||
|
||||
self.heart_rate = heart_rate
|
||||
self.sensor_contact_detected = sensor_contact_detected
|
||||
self.energy_expended = energy_expended
|
||||
self.rr_intervals = rr_intervals
|
||||
|
||||
@classmethod
|
||||
def from_bytes(cls, data):
|
||||
def from_bytes(cls, data: bytes) -> Self:
|
||||
flags = data[0]
|
||||
offset = 1
|
||||
|
||||
if flags & 1:
|
||||
hr = struct.unpack_from('<H', data, offset)[0]
|
||||
if flags & cls.Flag.INT16_HEART_RATE:
|
||||
heart_rate = struct.unpack_from('<H', data, offset)[0]
|
||||
offset += 2
|
||||
else:
|
||||
hr = struct.unpack_from('B', data, offset)[0]
|
||||
heart_rate = struct.unpack_from('B', data, offset)[0]
|
||||
offset += 1
|
||||
|
||||
if flags & (1 << 2):
|
||||
sensor_contact_detected = flags & (1 << 1) != 0
|
||||
if flags & cls.Flag.SENSOR_CONTACT_SUPPORTED:
|
||||
sensor_contact_detected = flags & cls.Flag.SENSOR_CONTACT_DETECTED != 0
|
||||
else:
|
||||
sensor_contact_detected = None
|
||||
|
||||
if flags & (1 << 3):
|
||||
if flags & cls.Flag.ENERGY_EXPENDED_STATUS:
|
||||
energy_expended = struct.unpack_from('<H', data, offset)[0]
|
||||
offset += 2
|
||||
else:
|
||||
energy_expended = None
|
||||
|
||||
if flags & (1 << 4):
|
||||
if flags & cls.Flag.RR_INTERVAL:
|
||||
rr_intervals = tuple(
|
||||
struct.unpack_from('<H', data, offset + i * 2)[0] / 1024
|
||||
for i in range((len(data) - offset) // 2)
|
||||
@@ -117,25 +114,32 @@ class HeartRateService(TemplateService):
|
||||
else:
|
||||
rr_intervals = ()
|
||||
|
||||
return cls(hr, sensor_contact_detected, energy_expended, rr_intervals)
|
||||
return cls(
|
||||
heart_rate=heart_rate,
|
||||
sensor_contact_detected=sensor_contact_detected,
|
||||
energy_expended=energy_expended,
|
||||
rr_intervals=rr_intervals,
|
||||
)
|
||||
|
||||
def __bytes__(self):
|
||||
def __bytes__(self) -> bytes:
|
||||
flags = 0
|
||||
if self.heart_rate < 256:
|
||||
flags = 0
|
||||
data = struct.pack('B', self.heart_rate)
|
||||
else:
|
||||
flags = 1
|
||||
flags |= self.Flag.INT16_HEART_RATE
|
||||
data = struct.pack('<H', self.heart_rate)
|
||||
|
||||
if self.sensor_contact_detected is not None:
|
||||
flags |= ((1 if self.sensor_contact_detected else 0) << 1) | (1 << 2)
|
||||
flags |= self.Flag.SENSOR_CONTACT_SUPPORTED
|
||||
if self.sensor_contact_detected:
|
||||
flags |= self.Flag.SENSOR_CONTACT_DETECTED
|
||||
|
||||
if self.energy_expended is not None:
|
||||
flags |= 1 << 3
|
||||
flags |= self.Flag.ENERGY_EXPENDED_STATUS
|
||||
data += struct.pack('<H', self.energy_expended)
|
||||
|
||||
if self.rr_intervals:
|
||||
flags |= 1 << 4
|
||||
if self.rr_intervals is not None:
|
||||
flags |= self.Flag.RR_INTERVAL
|
||||
data += b''.join(
|
||||
[
|
||||
struct.pack('<H', int(rr_interval * 1024))
|
||||
@@ -145,37 +149,41 @@ class HeartRateService(TemplateService):
|
||||
|
||||
return bytes([flags]) + data
|
||||
|
||||
def __str__(self):
|
||||
return (
|
||||
f'HeartRateMeasurement(heart_rate={self.heart_rate},'
|
||||
f' sensor_contact_detected={self.sensor_contact_detected},'
|
||||
f' energy_expended={self.energy_expended},'
|
||||
f' rr_intervals={self.rr_intervals})'
|
||||
)
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
read_heart_rate_measurement,
|
||||
body_sensor_location=None,
|
||||
reset_energy_expended=None,
|
||||
read_heart_rate_measurement: Callable[
|
||||
[device.Connection], HeartRateMeasurement
|
||||
],
|
||||
body_sensor_location: HeartRateService.BodySensorLocation | None = None,
|
||||
reset_energy_expended: Callable[[device.Connection], Any] | None = None,
|
||||
):
|
||||
self.heart_rate_measurement_characteristic = SerializableCharacteristicAdapter(
|
||||
Characteristic(
|
||||
GATT_HEART_RATE_MEASUREMENT_CHARACTERISTIC,
|
||||
Characteristic.Properties.NOTIFY,
|
||||
0,
|
||||
CharacteristicValue(read=read_heart_rate_measurement),
|
||||
),
|
||||
HeartRateService.HeartRateMeasurement,
|
||||
self.heart_rate_measurement_characteristic = (
|
||||
gatt_adapters.SerializableCharacteristicAdapter(
|
||||
gatt.Characteristic(
|
||||
uuid=gatt.GATT_HEART_RATE_MEASUREMENT_CHARACTERISTIC,
|
||||
properties=gatt.Characteristic.Properties.NOTIFY,
|
||||
permissions=gatt.Characteristic.Permissions(0),
|
||||
value=gatt.CharacteristicValue(read=read_heart_rate_measurement),
|
||||
),
|
||||
HeartRateService.HeartRateMeasurement,
|
||||
)
|
||||
)
|
||||
characteristics = [self.heart_rate_measurement_characteristic]
|
||||
characteristics: list[gatt.Characteristic] = [
|
||||
self.heart_rate_measurement_characteristic
|
||||
]
|
||||
|
||||
if body_sensor_location is not None:
|
||||
self.body_sensor_location_characteristic = Characteristic(
|
||||
GATT_BODY_SENSOR_LOCATION_CHARACTERISTIC,
|
||||
Characteristic.Properties.READ,
|
||||
Characteristic.READABLE,
|
||||
bytes([int(body_sensor_location)]),
|
||||
self.body_sensor_location_characteristic = (
|
||||
gatt_adapters.EnumCharacteristicAdapter(
|
||||
gatt.Characteristic(
|
||||
uuid=gatt.GATT_BODY_SENSOR_LOCATION_CHARACTERISTIC,
|
||||
properties=gatt.Characteristic.Properties.READ,
|
||||
permissions=gatt.Characteristic.READABLE,
|
||||
value=body_sensor_location,
|
||||
),
|
||||
cls=self.BodySensorLocation,
|
||||
length=1,
|
||||
)
|
||||
)
|
||||
characteristics.append(self.body_sensor_location_characteristic)
|
||||
|
||||
@@ -186,16 +194,20 @@ class HeartRateService(TemplateService):
|
||||
if reset_energy_expended is not None:
|
||||
reset_energy_expended(connection)
|
||||
else:
|
||||
raise ATT_Error(self.CONTROL_POINT_NOT_SUPPORTED)
|
||||
raise att.ATT_Error(self.CONTROL_POINT_NOT_SUPPORTED)
|
||||
|
||||
self.heart_rate_control_point_characteristic = PackedCharacteristicAdapter(
|
||||
Characteristic(
|
||||
GATT_HEART_RATE_CONTROL_POINT_CHARACTERISTIC,
|
||||
Characteristic.Properties.WRITE,
|
||||
Characteristic.WRITEABLE,
|
||||
CharacteristicValue(write=write_heart_rate_control_point_value),
|
||||
),
|
||||
pack_format=HeartRateService.HEART_RATE_CONTROL_POINT_FORMAT,
|
||||
self.heart_rate_control_point_characteristic = (
|
||||
gatt_adapters.PackedCharacteristicAdapter(
|
||||
gatt.Characteristic(
|
||||
uuid=gatt.GATT_HEART_RATE_CONTROL_POINT_CHARACTERISTIC,
|
||||
properties=gatt.Characteristic.Properties.WRITE,
|
||||
permissions=gatt.Characteristic.WRITEABLE,
|
||||
value=gatt.CharacteristicValue(
|
||||
write=write_heart_rate_control_point_value
|
||||
),
|
||||
),
|
||||
pack_format=HeartRateService.HEART_RATE_CONTROL_POINT_FORMAT,
|
||||
)
|
||||
)
|
||||
characteristics.append(self.heart_rate_control_point_characteristic)
|
||||
|
||||
@@ -203,50 +215,56 @@ class HeartRateService(TemplateService):
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
class HeartRateServiceProxy(ProfileServiceProxy):
|
||||
class HeartRateServiceProxy(gatt_client.ProfileServiceProxy):
|
||||
SERVICE_CLASS = HeartRateService
|
||||
|
||||
heart_rate_measurement: (
|
||||
CharacteristicProxy[HeartRateService.HeartRateMeasurement] | None
|
||||
gatt_client.CharacteristicProxy[HeartRateService.HeartRateMeasurement] | None
|
||||
)
|
||||
body_sensor_location: (
|
||||
CharacteristicProxy[HeartRateService.BodySensorLocation] | None
|
||||
gatt_client.CharacteristicProxy[HeartRateService.BodySensorLocation] | None
|
||||
)
|
||||
heart_rate_control_point: CharacteristicProxy[int] | None
|
||||
heart_rate_control_point: gatt_client.CharacteristicProxy[int] | None
|
||||
|
||||
def __init__(self, service_proxy):
|
||||
def __init__(self, service_proxy: gatt_client.ServiceProxy) -> None:
|
||||
self.service_proxy = service_proxy
|
||||
|
||||
if characteristics := service_proxy.get_characteristics_by_uuid(
|
||||
GATT_HEART_RATE_MEASUREMENT_CHARACTERISTIC
|
||||
gatt.GATT_HEART_RATE_MEASUREMENT_CHARACTERISTIC
|
||||
):
|
||||
self.heart_rate_measurement = SerializableCharacteristicAdapter(
|
||||
characteristics[0], HeartRateService.HeartRateMeasurement
|
||||
self.heart_rate_measurement = (
|
||||
gatt_adapters.SerializableCharacteristicProxyAdapter(
|
||||
characteristics[0], HeartRateService.HeartRateMeasurement
|
||||
)
|
||||
)
|
||||
else:
|
||||
self.heart_rate_measurement = None
|
||||
|
||||
if characteristics := service_proxy.get_characteristics_by_uuid(
|
||||
GATT_BODY_SENSOR_LOCATION_CHARACTERISTIC
|
||||
gatt.GATT_BODY_SENSOR_LOCATION_CHARACTERISTIC
|
||||
):
|
||||
self.body_sensor_location = DelegatedCharacteristicAdapter(
|
||||
characteristics[0],
|
||||
decode=lambda value: HeartRateService.BodySensorLocation(value[0]),
|
||||
self.body_sensor_location = (
|
||||
gatt_adapters.DelegatedCharacteristicProxyAdapter(
|
||||
characteristics[0],
|
||||
decode=lambda value: HeartRateService.BodySensorLocation(value[0]),
|
||||
)
|
||||
)
|
||||
else:
|
||||
self.body_sensor_location = None
|
||||
|
||||
if characteristics := service_proxy.get_characteristics_by_uuid(
|
||||
GATT_HEART_RATE_CONTROL_POINT_CHARACTERISTIC
|
||||
gatt.GATT_HEART_RATE_CONTROL_POINT_CHARACTERISTIC
|
||||
):
|
||||
self.heart_rate_control_point = PackedCharacteristicAdapter(
|
||||
characteristics[0],
|
||||
pack_format=HeartRateService.HEART_RATE_CONTROL_POINT_FORMAT,
|
||||
self.heart_rate_control_point = (
|
||||
gatt_adapters.PackedCharacteristicProxyAdapter(
|
||||
characteristics[0],
|
||||
pack_format=HeartRateService.HEART_RATE_CONTROL_POINT_FORMAT,
|
||||
)
|
||||
)
|
||||
else:
|
||||
self.heart_rate_control_point = None
|
||||
|
||||
async def reset_energy_expended(self):
|
||||
async def reset_energy_expended(self) -> None:
|
||||
if self.heart_rate_control_point is not None:
|
||||
return await self.heart_rate_control_point.write_value(
|
||||
HeartRateService.RESET_ENERGY_EXPENDED
|
||||
|
||||
@@ -71,8 +71,8 @@ async def main() -> None:
|
||||
rr_intervals=random.choice(
|
||||
(
|
||||
(
|
||||
random.randint(900, 1100) / 1000,
|
||||
random.randint(900, 1100) / 1000,
|
||||
random.randint(900, 1100) // 1000,
|
||||
random.randint(900, 1100) // 1000,
|
||||
),
|
||||
None,
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user