Add test for Heart Rate and Battery Service

This commit is contained in:
Josh Wu
2026-01-08 16:19:06 +08:00
parent 884b1c20e4
commit d2df76f6f4
5 changed files with 151 additions and 34 deletions

View File

@@ -1406,7 +1406,7 @@ class Peer:
async def request_name(self) -> str:
return await self.connection.request_remote_name()
async def __aenter__(self):
async def __aenter__(self) -> Self:
await self.discover_services()
for service in self.services:
await service.discover_characteristics()

View File

@@ -51,16 +51,14 @@ class BatteryService(gatt.TemplateService):
class BatteryServiceProxy(gatt_client.ProfileServiceProxy):
SERVICE_CLASS = BatteryService
battery_level: gatt_client.CharacteristicProxy[int] | None
battery_level: gatt_client.CharacteristicProxy[int]
def __init__(self, service_proxy: gatt_client.ServiceProxy) -> None:
self.service_proxy = service_proxy
if characteristics := service_proxy.get_characteristics_by_uuid(
gatt.GATT_BATTERY_LEVEL_CHARACTERISTIC
):
self.battery_level = gatt_adapters.PackedCharacteristicProxyAdapter(
characteristics[0], pack_format=BatteryService.BATTERY_LEVEL_FORMAT
)
else:
self.battery_level = None
self.battery_level = gatt_adapters.PackedCharacteristicProxyAdapter(
service_proxy.get_required_characteristic_by_uuid(
gatt.GATT_BATTERY_LEVEL_CHARACTERISTIC
),
pack_format=BatteryService.BATTERY_LEVEL_FORMAT,
)

View File

@@ -60,14 +60,14 @@ class HeartRateService(gatt.TemplateService):
heart_rate: int
sensor_contact_detected: bool | None = None
energy_expended: int | None = None
rr_intervals: Sequence[int] | None = None
rr_intervals: Sequence[float] | None = None
class Flag(enum.IntFlag):
INT16_HEART_RATE = 1 << 0
SENSOR_CONTACT_DETECTED = 1 << 1
SENSOR_CONTACT_SUPPORTED = 1 << 2
ENERGY_EXPENDED_STATUS = 1 << 3
RR_INTERVAL = 1 << 3
RR_INTERVAL = 1 << 4
def __post_init__(self) -> None:
if self.heart_rate < 0 or self.heart_rate > 0xFFFF:
@@ -106,13 +106,12 @@ class HeartRateService(gatt.TemplateService):
else:
energy_expended = None
rr_intervals: Sequence[float] | None = None
if flags & cls.Flag.RR_INTERVAL:
rr_intervals = tuple(
struct.unpack_from('<H', data, offset + i * 2)[0] / 1024
for i in range((len(data) - offset) // 2)
struct.unpack_from('<H', data, i)[0] / 1024
for i in range(offset, len(data), 2)
)
else:
rr_intervals = ()
return cls(
heart_rate=heart_rate,
@@ -189,7 +188,9 @@ class HeartRateService(gatt.TemplateService):
if reset_energy_expended:
def write_heart_rate_control_point_value(connection, value):
def write_heart_rate_control_point_value(
connection: device.Connection, value: bytes
) -> None:
if value == self.RESET_ENERGY_EXPENDED:
if reset_energy_expended is not None:
reset_energy_expended(connection)
@@ -218,9 +219,9 @@ class HeartRateService(gatt.TemplateService):
class HeartRateServiceProxy(gatt_client.ProfileServiceProxy):
SERVICE_CLASS = HeartRateService
heart_rate_measurement: (
gatt_client.CharacteristicProxy[HeartRateService.HeartRateMeasurement] | None
)
heart_rate_measurement: gatt_client.CharacteristicProxy[
HeartRateService.HeartRateMeasurement
]
body_sensor_location: (
gatt_client.CharacteristicProxy[HeartRateService.BodySensorLocation] | None
)
@@ -229,25 +230,20 @@ class HeartRateServiceProxy(gatt_client.ProfileServiceProxy):
def __init__(self, service_proxy: gatt_client.ServiceProxy) -> None:
self.service_proxy = service_proxy
if characteristics := service_proxy.get_characteristics_by_uuid(
gatt.GATT_HEART_RATE_MEASUREMENT_CHARACTERISTIC
):
self.heart_rate_measurement = (
gatt_adapters.SerializableCharacteristicProxyAdapter(
characteristics[0], HeartRateService.HeartRateMeasurement
)
self.heart_rate_measurement = (
gatt_adapters.SerializableCharacteristicProxyAdapter(
service_proxy.get_required_characteristic_by_uuid(
gatt.GATT_HEART_RATE_MEASUREMENT_CHARACTERISTIC
),
HeartRateService.HeartRateMeasurement,
)
else:
self.heart_rate_measurement = None
)
if characteristics := service_proxy.get_characteristics_by_uuid(
gatt.GATT_BODY_SENSOR_LOCATION_CHARACTERISTIC
):
self.body_sensor_location = (
gatt_adapters.DelegatedCharacteristicProxyAdapter(
characteristics[0],
decode=lambda value: HeartRateService.BodySensorLocation(value[0]),
)
self.body_sensor_location = gatt_adapters.EnumCharacteristicProxyAdapter(
characteristics[0], cls=HeartRateService.BodySensorLocation, length=1
)
else:
self.body_sensor_location = None