diff --git a/bumble/device.py b/bumble/device.py index 7789198..a0e41b9 100644 --- a/bumble/device.py +++ b/bumble/device.py @@ -1406,7 +1406,7 @@ class Peer: async def request_name(self) -> str: return await self.connection.request_remote_name() - async def __aenter__(self): + async def __aenter__(self) -> Self: await self.discover_services() for service in self.services: await service.discover_characteristics() diff --git a/bumble/profiles/battery_service.py b/bumble/profiles/battery_service.py index b2d69fa..765f4a0 100644 --- a/bumble/profiles/battery_service.py +++ b/bumble/profiles/battery_service.py @@ -51,16 +51,14 @@ class BatteryService(gatt.TemplateService): class BatteryServiceProxy(gatt_client.ProfileServiceProxy): SERVICE_CLASS = BatteryService - battery_level: gatt_client.CharacteristicProxy[int] | None + battery_level: gatt_client.CharacteristicProxy[int] def __init__(self, service_proxy: gatt_client.ServiceProxy) -> None: self.service_proxy = service_proxy - if characteristics := service_proxy.get_characteristics_by_uuid( - gatt.GATT_BATTERY_LEVEL_CHARACTERISTIC - ): - self.battery_level = gatt_adapters.PackedCharacteristicProxyAdapter( - characteristics[0], pack_format=BatteryService.BATTERY_LEVEL_FORMAT - ) - else: - self.battery_level = None + self.battery_level = gatt_adapters.PackedCharacteristicProxyAdapter( + service_proxy.get_required_characteristic_by_uuid( + gatt.GATT_BATTERY_LEVEL_CHARACTERISTIC + ), + pack_format=BatteryService.BATTERY_LEVEL_FORMAT, + ) diff --git a/bumble/profiles/heart_rate_service.py b/bumble/profiles/heart_rate_service.py index 83198ef..ca02894 100644 --- a/bumble/profiles/heart_rate_service.py +++ b/bumble/profiles/heart_rate_service.py @@ -60,14 +60,14 @@ class HeartRateService(gatt.TemplateService): heart_rate: int sensor_contact_detected: bool | None = None energy_expended: int | None = None - rr_intervals: Sequence[int] | None = None + rr_intervals: Sequence[float] | None = None class Flag(enum.IntFlag): INT16_HEART_RATE = 1 << 0 SENSOR_CONTACT_DETECTED = 1 << 1 SENSOR_CONTACT_SUPPORTED = 1 << 2 ENERGY_EXPENDED_STATUS = 1 << 3 - RR_INTERVAL = 1 << 3 + RR_INTERVAL = 1 << 4 def __post_init__(self) -> None: if self.heart_rate < 0 or self.heart_rate > 0xFFFF: @@ -106,13 +106,12 @@ class HeartRateService(gatt.TemplateService): else: energy_expended = None + rr_intervals: Sequence[float] | None = None if flags & cls.Flag.RR_INTERVAL: rr_intervals = tuple( - struct.unpack_from(' None: if value == self.RESET_ENERGY_EXPENDED: if reset_energy_expended is not None: reset_energy_expended(connection) @@ -218,9 +219,9 @@ class HeartRateService(gatt.TemplateService): class HeartRateServiceProxy(gatt_client.ProfileServiceProxy): SERVICE_CLASS = HeartRateService - heart_rate_measurement: ( - gatt_client.CharacteristicProxy[HeartRateService.HeartRateMeasurement] | None - ) + heart_rate_measurement: gatt_client.CharacteristicProxy[ + HeartRateService.HeartRateMeasurement + ] body_sensor_location: ( gatt_client.CharacteristicProxy[HeartRateService.BodySensorLocation] | None ) @@ -229,25 +230,20 @@ class HeartRateServiceProxy(gatt_client.ProfileServiceProxy): def __init__(self, service_proxy: gatt_client.ServiceProxy) -> None: self.service_proxy = service_proxy - if characteristics := service_proxy.get_characteristics_by_uuid( - gatt.GATT_HEART_RATE_MEASUREMENT_CHARACTERISTIC - ): - self.heart_rate_measurement = ( - gatt_adapters.SerializableCharacteristicProxyAdapter( - characteristics[0], HeartRateService.HeartRateMeasurement - ) + self.heart_rate_measurement = ( + gatt_adapters.SerializableCharacteristicProxyAdapter( + service_proxy.get_required_characteristic_by_uuid( + gatt.GATT_HEART_RATE_MEASUREMENT_CHARACTERISTIC + ), + HeartRateService.HeartRateMeasurement, ) - else: - self.heart_rate_measurement = None + ) if characteristics := service_proxy.get_characteristics_by_uuid( gatt.GATT_BODY_SENSOR_LOCATION_CHARACTERISTIC ): - self.body_sensor_location = ( - gatt_adapters.DelegatedCharacteristicProxyAdapter( - characteristics[0], - decode=lambda value: HeartRateService.BodySensorLocation(value[0]), - ) + self.body_sensor_location = gatt_adapters.EnumCharacteristicProxyAdapter( + characteristics[0], cls=HeartRateService.BodySensorLocation, length=1 ) else: self.body_sensor_location = None diff --git a/tests/battery_service_test.py b/tests/battery_service_test.py new file mode 100644 index 0000000..ed3246b --- /dev/null +++ b/tests/battery_service_test.py @@ -0,0 +1,34 @@ +# Copyright 2021-2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import pytest + +from bumble import device as device_module +from bumble.profiles import battery_service + +from . import test_utils + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_read_battery_level(): + devices = await test_utils.TwoDevices.create_with_connection() + service = battery_service.BatteryService(lambda _: 1) + devices[0].add_service(service) + + async with device_module.Peer(devices.connections[1]) as peer: + client = peer.create_service_proxy(battery_service.BatteryServiceProxy) + assert client + assert await client.battery_level.read_value() == 1 diff --git a/tests/heart_rate_service_test.py b/tests/heart_rate_service_test.py new file mode 100644 index 0000000..bfc0a1a --- /dev/null +++ b/tests/heart_rate_service_test.py @@ -0,0 +1,89 @@ +# Copyright 2021-2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from collections.abc import Sequence +import asyncio +import itertools + +import pytest + +from bumble import device as device_module +from bumble.profiles import heart_rate_service + +from . import test_utils + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +@pytest.mark.parametrize( + "heart_rate, sensor_contact_detected, energy_expanded, rr_intervals", + itertools.product( + (1, 1000), (True, False, None), (2, None), ((3.0, 4.0, 5.0), None) + ), +) +async def test_read_measurement( + heart_rate: int, + sensor_contact_detected: bool | None, + energy_expanded: int | None, + rr_intervals: Sequence[int] | None, +): + devices = await test_utils.TwoDevices.create_with_connection() + measurement = heart_rate_service.HeartRateService.HeartRateMeasurement( + heart_rate, sensor_contact_detected, energy_expanded, rr_intervals + ) + service = heart_rate_service.HeartRateService(lambda _: measurement) + devices[0].add_service(service) + + async with device_module.Peer(devices.connections[1]) as peer: + client = peer.create_service_proxy(heart_rate_service.HeartRateServiceProxy) + assert client + assert await client.heart_rate_measurement.read_value() == measurement + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_read_body_sensor_location(): + devices = await test_utils.TwoDevices.create_with_connection() + measurement = heart_rate_service.HeartRateService.HeartRateMeasurement(0) + location = heart_rate_service.HeartRateService.BodySensorLocation.FINGER + service = heart_rate_service.HeartRateService( + lambda _: measurement, + body_sensor_location=location, + ) + devices[0].add_service(service) + + async with device_module.Peer(devices.connections[1]) as peer: + client = peer.create_service_proxy(heart_rate_service.HeartRateServiceProxy) + assert client + assert client.body_sensor_location + assert await client.body_sensor_location.read_value() == location + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_reset_energy_expended() -> None: + devices = await test_utils.TwoDevices.create_with_connection() + measurement = heart_rate_service.HeartRateService.HeartRateMeasurement(1) + reset_energy_expended = asyncio.Queue[None]() + service = heart_rate_service.HeartRateService( + lambda _: measurement, + reset_energy_expended=lambda _: reset_energy_expended.put_nowait(None), + ) + devices[0].add_service(service) + + async with device_module.Peer(devices.connections[1]) as peer: + client = peer.create_service_proxy(heart_rate_service.HeartRateServiceProxy) + assert client + await client.reset_energy_expended() + await reset_energy_expended.get()