diff --git a/bumble/gatt.py b/bumble/gatt.py index df760c3..902fa07 100644 --- a/bumble/gatt.py +++ b/bumble/gatt.py @@ -134,13 +134,21 @@ GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC = UUID.from_16_bits(0x2A2 GATT_REGULATORY_CERTIFICATION_DATA_LIST_CHARACTERISTIC = UUID.from_16_bits(0x2A2A, 'IEEE 11073-20601 Regulatory Certification Data List') GATT_PNP_ID_CHARACTERISTIC = UUID.from_16_bits(0x2A50, 'PnP ID') -# Human Interface Device +# Human Interface Device Service GATT_HID_INFORMATION_CHARACTERISTIC = UUID.from_16_bits(0x2A4A, 'HID Information') GATT_REPORT_MAP_CHARACTERISTIC = UUID.from_16_bits(0x2A4B, 'Report Map') GATT_HID_CONTROL_POINT_CHARACTERISTIC = UUID.from_16_bits(0x2A4C, 'HID Control Point') GATT_REPORT_CHARACTERISTIC = UUID.from_16_bits(0x2A4D, 'Report') GATT_PROTOCOL_MODE_CHARACTERISTIC = UUID.from_16_bits(0x2A4E, 'Protocol Mode') +# Heart Rate Service +GATT_HEART_RATE_MEASUREMENT_CHARACTERISTIC = UUID.from_16_bits(0x2A37, 'Heart Rate Measurement') +GATT_BODY_SENSOR_LOCATION_CHARACTERISTIC = UUID.from_16_bits(0x2A38, 'Body Sensor Location') +GATT_HEART_RATE_CONTROL_POINT_CHARACTERISTIC = UUID.from_16_bits(0x2A39, 'Heart Rate Control Point') + +# Battery Service +GATT_BATTERY_LEVEL_CHARACTERISTIC = UUID.from_16_bits(0x2A19, 'Battery Level') + # Misc GATT_DEVICE_NAME_CHARACTERISTIC = UUID.from_16_bits(0x2A00, 'Device Name') GATT_APPEARANCE_CHARACTERISTIC = UUID.from_16_bits(0x2A01, 'Appearance') @@ -150,7 +158,6 @@ GATT_PERIPHERAL_PREFERRED_CONNECTION_PARAMETERS_CHARACTERISTIC = UUID.from_16_bi GATT_SERVICE_CHANGED_CHARACTERISTIC = UUID.from_16_bits(0x2A05, 'Service Changed') GATT_ALERT_LEVEL_CHARACTERISTIC = UUID.from_16_bits(0x2A06, 'Alert Level') GATT_TX_POWER_LEVEL_CHARACTERISTIC = UUID.from_16_bits(0x2A07, 'Tx Power Level') -GATT_BATTERY_LEVEL_CHARACTERISTIC = UUID.from_16_bits(0x2A19, 'Battery Level') GATT_BOOT_KEYBOARD_INPUT_REPORT_CHARACTERISTIC = UUID.from_16_bits(0x2A22, 'Boot Keyboard Input Report') GATT_CURRENT_TIME_CHARACTERISTIC = UUID.from_16_bits(0x2A2B, 'Current Time') GATT_BOOT_KEYBOARD_OUTPUT_REPORT_CHARACTERISTIC = UUID.from_16_bits(0x2A32, 'Boot Keyboard Output Report') @@ -339,16 +346,19 @@ class CharacteristicAdapter: # ----------------------------------------------------------------------------- class DelegatedCharacteristicAdapter(CharacteristicAdapter): - def __init__(self, characteristic, encode, decode): + ''' + Adapter that converts bytes values using an encode and a decode function. + ''' + def __init__(self, characteristic, encode=None, decode=None): super().__init__(characteristic) self.encode = encode self.decode = decode def encode_value(self, value): - return self.encode(value) + return self.encode(value) if self.encode else value def decode_value(self, value): - return self.decode(value) + return self.decode(value) if self.decode else value # ----------------------------------------------------------------------------- diff --git a/bumble/profiles/heart_rate_service.py b/bumble/profiles/heart_rate_service.py new file mode 100644 index 0000000..b3cd875 --- /dev/null +++ b/bumble/profiles/heart_rate_service.py @@ -0,0 +1,221 @@ +# Copyright 2021-2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# ----------------------------------------------------------------------------- +# Imports +# ----------------------------------------------------------------------------- +from enum import IntEnum +import struct + +from ..gatt_client import ProfileServiceProxy +from ..att import ATT_Error +from ..gatt import ( + GATT_HEART_RATE_SERVICE, + GATT_HEART_RATE_MEASUREMENT_CHARACTERISTIC, + GATT_BODY_SENSOR_LOCATION_CHARACTERISTIC, + GATT_HEART_RATE_CONTROL_POINT_CHARACTERISTIC, + TemplateService, + Characteristic, + CharacteristicValue, + DelegatedCharacteristicAdapter, + PackedCharacteristicAdapter +) + + +# ----------------------------------------------------------------------------- +class HeartRateService(TemplateService): + UUID = GATT_HEART_RATE_SERVICE + HEART_RATE_CONTROL_POINT_FORMAT = 'B' + CONTROL_POINT_NOT_SUPPORTED = 0x80 + RESET_ENERGY_EXPENDED = 0x01 + + class BodySensorLocation(IntEnum): + OTHER = 0, + CHEST = 1, + WRIST = 2, + FINGER = 3, + HAND = 4, + EAR_LOBE = 5, + FOOT = 6 + + class HeartRateMeasurement: + def __init__( + self, + heart_rate, + sensor_contact_detected=None, + energy_expended=None, + rr_intervals=None + ): + if heart_rate < 0 or heart_rate > 0xFFFF: + raise ValueError('heart_rate out of range') + + if energy_expended is not None and (energy_expended < 0 or energy_expended > 0xFFFF): + raise ValueError('energy_expended out of range') + + if rr_intervals: + for rr_interval in rr_intervals: + if rr_interval < 0 or rr_interval * 1024 > 0xFFFF: + raise ValueError('rr_intervals out of range') + + self.heart_rate = heart_rate + self.sensor_contact_detected = sensor_contact_detected + self.energy_expended = energy_expended + self.rr_intervals = rr_intervals + + @classmethod + def from_bytes(cls, data): + flags = data[0] + offset = 1 + + if flags & 1: + hr = struct.unpack_from(' ') + print('example: heart_rate_client.py usb:0 E1:CA:72:48:C4:E8') + return + + print('<<< connecting to HCI...') + async with await open_transport(sys.argv[1]) as (hci_source, hci_sink): + print('<<< connected') + + # Create and start a device + device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink) + await device.power_on() + + # Connect to the peer + target_address = sys.argv[2] + print(f'=== Connecting to {target_address}...') + connection = await device.connect(target_address) + print(f'=== Connected to {connection}') + + # Discover the Heart Rate Service + peer = Peer(connection) + print('=== Discovering Heart Rate Service') + heart_rate_service = await peer.discover_service_and_create_proxy(HeartRateServiceProxy) + + # Check that the service was found + if not heart_rate_service: + print('!!! Service not found') + return + + # Read the body sensor location + if heart_rate_service.body_sensor_location: + location = await heart_rate_service.body_sensor_location.read_value() + print(color('Sensor Location:', 'green'), location) + + # Subscribe to the heart rate measurement + if heart_rate_service.heart_rate_measurement: + await heart_rate_service.heart_rate_measurement.subscribe( + lambda value: print(f'{color("Heart Rate Measurement:", "green")} {value}') + ) + + await hci_source.wait_for_termination() + + +# ----------------------------------------------------------------------------- +logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper()) +asyncio.run(main()) diff --git a/examples/heart_rate_server.py b/examples/heart_rate_server.py new file mode 100644 index 0000000..e6c008c --- /dev/null +++ b/examples/heart_rate_server.py @@ -0,0 +1,95 @@ +# Copyright 2021-2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# ----------------------------------------------------------------------------- +# Imports +# ----------------------------------------------------------------------------- +import sys +import time +import math +import random +import struct +import logging +import asyncio +import os + +from bumble.core import AdvertisingData +from bumble.device import Device +from bumble.transport import open_transport_or_link +from bumble.profiles.device_information_service import DeviceInformationService +from bumble.profiles.heart_rate_service import HeartRateService + + +# ----------------------------------------------------------------------------- +async def main(): + if len(sys.argv) != 3: + print('Usage: python heart_rate_server.py ') + print('example: python heart_rate_server.py device1.json usb:0') + return + + async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): + device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) + + # Keep track of accumulated expended energy + energy_start_time = time.time() + + def reset_energy_expended(): + nonlocal energy_start_time + energy_start_time = time.time() + + # Add a Device Information Service and Heart Rate Service to the GATT sever + device_information_service = DeviceInformationService( + manufacturer_name = 'ACME', + model_number = 'HR-102', + serial_number = '7654321', + hardware_revision = '1.1.3', + software_revision = '2.5.6', + system_id = (0x123456, 0x8877665544) + ) + + heart_rate_service = HeartRateService( + read_heart_rate_measurement = lambda _: HeartRateService.HeartRateMeasurement( + heart_rate = 100 + int(50 * math.sin(time.time() * math.pi / 60)), + sensor_contact_detected = random.choice((True, False, None)), + energy_expended = random.choice((int((time.time() - energy_start_time) * 100), None)), + rr_intervals = random.choice(((random.randint(900, 1100) / 1000, random.randint(900, 1100) / 1000), None)) + ), + body_sensor_location=HeartRateService.BodySensorLocation.WRIST, + reset_energy_expended=lambda _: reset_energy_expended() + ) + + device.add_services([device_information_service, heart_rate_service]) + + # Set the advertising data + device.advertising_data = bytes( + AdvertisingData([ + (AdvertisingData.COMPLETE_LOCAL_NAME, bytes('Bumble Heart', 'utf-8')), + (AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS, bytes(heart_rate_service.uuid)), + (AdvertisingData.APPEARANCE, struct.pack('