add support for the heart rate service

This commit is contained in:
Gilles Boccon-Gibod
2022-07-23 09:38:44 -07:00
parent c66b357de6
commit 3040df3179
6 changed files with 408 additions and 7 deletions

View File

@@ -134,13 +134,21 @@ GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC = UUID.from_16_bits(0x2A2
GATT_REGULATORY_CERTIFICATION_DATA_LIST_CHARACTERISTIC = UUID.from_16_bits(0x2A2A, 'IEEE 11073-20601 Regulatory Certification Data List') GATT_REGULATORY_CERTIFICATION_DATA_LIST_CHARACTERISTIC = UUID.from_16_bits(0x2A2A, 'IEEE 11073-20601 Regulatory Certification Data List')
GATT_PNP_ID_CHARACTERISTIC = UUID.from_16_bits(0x2A50, 'PnP ID') GATT_PNP_ID_CHARACTERISTIC = UUID.from_16_bits(0x2A50, 'PnP ID')
# Human Interface Device # Human Interface Device Service
GATT_HID_INFORMATION_CHARACTERISTIC = UUID.from_16_bits(0x2A4A, 'HID Information') GATT_HID_INFORMATION_CHARACTERISTIC = UUID.from_16_bits(0x2A4A, 'HID Information')
GATT_REPORT_MAP_CHARACTERISTIC = UUID.from_16_bits(0x2A4B, 'Report Map') GATT_REPORT_MAP_CHARACTERISTIC = UUID.from_16_bits(0x2A4B, 'Report Map')
GATT_HID_CONTROL_POINT_CHARACTERISTIC = UUID.from_16_bits(0x2A4C, 'HID Control Point') GATT_HID_CONTROL_POINT_CHARACTERISTIC = UUID.from_16_bits(0x2A4C, 'HID Control Point')
GATT_REPORT_CHARACTERISTIC = UUID.from_16_bits(0x2A4D, 'Report') GATT_REPORT_CHARACTERISTIC = UUID.from_16_bits(0x2A4D, 'Report')
GATT_PROTOCOL_MODE_CHARACTERISTIC = UUID.from_16_bits(0x2A4E, 'Protocol Mode') GATT_PROTOCOL_MODE_CHARACTERISTIC = UUID.from_16_bits(0x2A4E, 'Protocol Mode')
# Heart Rate Service
GATT_HEART_RATE_MEASUREMENT_CHARACTERISTIC = UUID.from_16_bits(0x2A37, 'Heart Rate Measurement')
GATT_BODY_SENSOR_LOCATION_CHARACTERISTIC = UUID.from_16_bits(0x2A38, 'Body Sensor Location')
GATT_HEART_RATE_CONTROL_POINT_CHARACTERISTIC = UUID.from_16_bits(0x2A39, 'Heart Rate Control Point')
# Battery Service
GATT_BATTERY_LEVEL_CHARACTERISTIC = UUID.from_16_bits(0x2A19, 'Battery Level')
# Misc # Misc
GATT_DEVICE_NAME_CHARACTERISTIC = UUID.from_16_bits(0x2A00, 'Device Name') GATT_DEVICE_NAME_CHARACTERISTIC = UUID.from_16_bits(0x2A00, 'Device Name')
GATT_APPEARANCE_CHARACTERISTIC = UUID.from_16_bits(0x2A01, 'Appearance') GATT_APPEARANCE_CHARACTERISTIC = UUID.from_16_bits(0x2A01, 'Appearance')
@@ -150,7 +158,6 @@ GATT_PERIPHERAL_PREFERRED_CONNECTION_PARAMETERS_CHARACTERISTIC = UUID.from_16_bi
GATT_SERVICE_CHANGED_CHARACTERISTIC = UUID.from_16_bits(0x2A05, 'Service Changed') GATT_SERVICE_CHANGED_CHARACTERISTIC = UUID.from_16_bits(0x2A05, 'Service Changed')
GATT_ALERT_LEVEL_CHARACTERISTIC = UUID.from_16_bits(0x2A06, 'Alert Level') GATT_ALERT_LEVEL_CHARACTERISTIC = UUID.from_16_bits(0x2A06, 'Alert Level')
GATT_TX_POWER_LEVEL_CHARACTERISTIC = UUID.from_16_bits(0x2A07, 'Tx Power Level') GATT_TX_POWER_LEVEL_CHARACTERISTIC = UUID.from_16_bits(0x2A07, 'Tx Power Level')
GATT_BATTERY_LEVEL_CHARACTERISTIC = UUID.from_16_bits(0x2A19, 'Battery Level')
GATT_BOOT_KEYBOARD_INPUT_REPORT_CHARACTERISTIC = UUID.from_16_bits(0x2A22, 'Boot Keyboard Input Report') GATT_BOOT_KEYBOARD_INPUT_REPORT_CHARACTERISTIC = UUID.from_16_bits(0x2A22, 'Boot Keyboard Input Report')
GATT_CURRENT_TIME_CHARACTERISTIC = UUID.from_16_bits(0x2A2B, 'Current Time') GATT_CURRENT_TIME_CHARACTERISTIC = UUID.from_16_bits(0x2A2B, 'Current Time')
GATT_BOOT_KEYBOARD_OUTPUT_REPORT_CHARACTERISTIC = UUID.from_16_bits(0x2A32, 'Boot Keyboard Output Report') GATT_BOOT_KEYBOARD_OUTPUT_REPORT_CHARACTERISTIC = UUID.from_16_bits(0x2A32, 'Boot Keyboard Output Report')
@@ -339,16 +346,19 @@ class CharacteristicAdapter:
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class DelegatedCharacteristicAdapter(CharacteristicAdapter): class DelegatedCharacteristicAdapter(CharacteristicAdapter):
def __init__(self, characteristic, encode, decode): '''
Adapter that converts bytes values using an encode and a decode function.
'''
def __init__(self, characteristic, encode=None, decode=None):
super().__init__(characteristic) super().__init__(characteristic)
self.encode = encode self.encode = encode
self.decode = decode self.decode = decode
def encode_value(self, value): def encode_value(self, value):
return self.encode(value) return self.encode(value) if self.encode else value
def decode_value(self, value): def decode_value(self, value):
return self.decode(value) return self.decode(value) if self.decode else value
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------

View File

@@ -0,0 +1,221 @@
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from enum import IntEnum
import struct
from ..gatt_client import ProfileServiceProxy
from ..att import ATT_Error
from ..gatt import (
GATT_HEART_RATE_SERVICE,
GATT_HEART_RATE_MEASUREMENT_CHARACTERISTIC,
GATT_BODY_SENSOR_LOCATION_CHARACTERISTIC,
GATT_HEART_RATE_CONTROL_POINT_CHARACTERISTIC,
TemplateService,
Characteristic,
CharacteristicValue,
DelegatedCharacteristicAdapter,
PackedCharacteristicAdapter
)
# -----------------------------------------------------------------------------
class HeartRateService(TemplateService):
UUID = GATT_HEART_RATE_SERVICE
HEART_RATE_CONTROL_POINT_FORMAT = 'B'
CONTROL_POINT_NOT_SUPPORTED = 0x80
RESET_ENERGY_EXPENDED = 0x01
class BodySensorLocation(IntEnum):
OTHER = 0,
CHEST = 1,
WRIST = 2,
FINGER = 3,
HAND = 4,
EAR_LOBE = 5,
FOOT = 6
class HeartRateMeasurement:
def __init__(
self,
heart_rate,
sensor_contact_detected=None,
energy_expended=None,
rr_intervals=None
):
if heart_rate < 0 or heart_rate > 0xFFFF:
raise ValueError('heart_rate out of range')
if energy_expended is not None and (energy_expended < 0 or energy_expended > 0xFFFF):
raise ValueError('energy_expended out of range')
if rr_intervals:
for rr_interval in rr_intervals:
if rr_interval < 0 or rr_interval * 1024 > 0xFFFF:
raise ValueError('rr_intervals out of range')
self.heart_rate = heart_rate
self.sensor_contact_detected = sensor_contact_detected
self.energy_expended = energy_expended
self.rr_intervals = rr_intervals
@classmethod
def from_bytes(cls, data):
flags = data[0]
offset = 1
if flags & 1:
hr = struct.unpack_from('<H', data, offset)[0]
offset += 2
else:
hr = struct.unpack_from('B', data, offset)[0]
offset += 1
if flags & (1 << 2):
sensor_contact_detected = (flags & (1 << 1) != 0)
else:
sensor_contact_detected = None
if flags & (1 << 3):
energy_expended = struct.unpack_from('<H', data, offset)[0]
offset += 2
else:
energy_expended = None
if flags & (1 << 4):
rr_intervals = tuple(
struct.unpack_from('<H', data, offset + i * 2)[0] / 1024
for i in range((len(data) - offset) // 2)
)
else:
rr_intervals = ()
return cls(hr, sensor_contact_detected, energy_expended, rr_intervals)
def __bytes__(self):
if self.heart_rate < 256:
flags = 0
data = struct.pack('B', self.heart_rate)
else:
flags = 1
data = struct.pack('<H', self.heart_rate)
if self.sensor_contact_detected is not None:
flags |= ((1 if self.sensor_contact_detected else 0) << 1) | (1 << 2)
if self.energy_expended is not None:
flags |= (1 << 3)
data += struct.pack('<H', self.energy_expended)
if self.rr_intervals:
flags |= (1 << 4)
data += b''.join([
struct.pack('<H', int(rr_interval * 1024))
for rr_interval in self.rr_intervals
])
return bytes([flags]) + data
def __str__(self):
return f'HeartRateMeasurement(heart_rate={self.heart_rate},'\
f' sensor_contact_detected={self.sensor_contact_detected},'\
f' energy_expended={self.energy_expended},'\
f' rr_intervals={self.rr_intervals})'
def __init__(
self,
read_heart_rate_measurement,
body_sensor_location=None,
reset_energy_expended=None
):
self.heart_rate_measurement_characteristic = DelegatedCharacteristicAdapter(
Characteristic(
GATT_HEART_RATE_MEASUREMENT_CHARACTERISTIC,
Characteristic.NOTIFY,
0,
CharacteristicValue(read=read_heart_rate_measurement)
),
encode=lambda value: bytes(value)
)
characteristics = [self.heart_rate_measurement_characteristic]
if body_sensor_location is not None:
self.body_sensor_location_characteristic = Characteristic(
GATT_BODY_SENSOR_LOCATION_CHARACTERISTIC,
Characteristic.READ,
Characteristic.READABLE,
bytes([int(body_sensor_location)])
)
characteristics.append(self.body_sensor_location_characteristic)
if reset_energy_expended:
def write_heart_rate_control_point_value(connection, value):
if value == self.RESET_ENERGY_EXPENDED:
if reset_energy_expended is not None:
reset_energy_expended(connection)
else:
raise ATT_Error(self.CONTROL_POINT_NOT_SUPPORTED)
self.heart_rate_control_point_characteristic = PackedCharacteristicAdapter(
Characteristic(
GATT_HEART_RATE_CONTROL_POINT_CHARACTERISTIC,
Characteristic.WRITE,
Characteristic.WRITEABLE,
CharacteristicValue(write=write_heart_rate_control_point_value)
),
format=HeartRateService.HEART_RATE_CONTROL_POINT_FORMAT
)
characteristics.append(self.heart_rate_control_point_characteristic)
super().__init__(characteristics)
# -----------------------------------------------------------------------------
class HeartRateServiceProxy(ProfileServiceProxy):
SERVICE_CLASS = HeartRateService
def __init__(self, service_proxy):
self.service_proxy = service_proxy
if characteristics := service_proxy.get_characteristics_by_uuid(GATT_HEART_RATE_MEASUREMENT_CHARACTERISTIC):
self.heart_rate_measurement = DelegatedCharacteristicAdapter(
characteristics[0],
decode=HeartRateService.HeartRateMeasurement.from_bytes
)
else:
self.heart_rate_measurement = None
if characteristics := service_proxy.get_characteristics_by_uuid(GATT_BODY_SENSOR_LOCATION_CHARACTERISTIC):
self.body_sensor_location = DelegatedCharacteristicAdapter(
characteristics[0],
decode=lambda value: HeartRateService.BodySensorLocation(value[0])
)
else:
self.body_sensor_location = None
if characteristics := service_proxy.get_characteristics_by_uuid(GATT_HEART_RATE_CONTROL_POINT_CHARACTERISTIC):
self.heart_rate_control_point = PackedCharacteristicAdapter(
characteristics[0],
format=HeartRateService.HEART_RATE_CONTROL_POINT_FORMAT
)
else:
self.heart_rate_control_point = None
async def reset_energy_expended(self):
if self.heart_rate_control_point is not None:
return await self.heart_rate_control_point.write_value(HeartRateService.RESET_ENERGY_EXPENDED)

View File

@@ -49,7 +49,7 @@ async def main():
# Discover the Battery Service # Discover the Battery Service
peer = Peer(connection) peer = Peer(connection)
print('=== Discovering Battery Service') print('=== Discovering Battery Service')
battery_service = await peer.discover_and_create_service_proxy(BatteryServiceProxy) battery_service = await peer.discover_service_and_create_proxy(BatteryServiceProxy)
# Check that the service was found # Check that the service was found
if not battery_service: if not battery_service:

View File

@@ -38,7 +38,7 @@ async def main():
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
# Add a Device Information Service and Battery Service to the GATT sever # Add a Battery Service to the GATT sever
battery_service = BatteryService(lambda _: random.randint(0, 100)) battery_service = BatteryService(lambda _: random.randint(0, 100))
device.add_service(battery_service) device.add_service(battery_service)

View File

@@ -0,0 +1,75 @@
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import sys
import os
import logging
from colors import color
from bumble.device import Device, Peer
from bumble.transport import open_transport
from bumble.profiles.heart_rate_service import HeartRateServiceProxy
# -----------------------------------------------------------------------------
async def main():
if len(sys.argv) != 3:
print('Usage: heart_rate_client.py <transport-spec> <bluetooth-address>')
print('example: heart_rate_client.py usb:0 E1:CA:72:48:C4:E8')
return
print('<<< connecting to HCI...')
async with await open_transport(sys.argv[1]) as (hci_source, hci_sink):
print('<<< connected')
# Create and start a device
device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink)
await device.power_on()
# Connect to the peer
target_address = sys.argv[2]
print(f'=== Connecting to {target_address}...')
connection = await device.connect(target_address)
print(f'=== Connected to {connection}')
# Discover the Heart Rate Service
peer = Peer(connection)
print('=== Discovering Heart Rate Service')
heart_rate_service = await peer.discover_service_and_create_proxy(HeartRateServiceProxy)
# Check that the service was found
if not heart_rate_service:
print('!!! Service not found')
return
# Read the body sensor location
if heart_rate_service.body_sensor_location:
location = await heart_rate_service.body_sensor_location.read_value()
print(color('Sensor Location:', 'green'), location)
# Subscribe to the heart rate measurement
if heart_rate_service.heart_rate_measurement:
await heart_rate_service.heart_rate_measurement.subscribe(
lambda value: print(f'{color("Heart Rate Measurement:", "green")} {value}')
)
await hci_source.wait_for_termination()
# -----------------------------------------------------------------------------
logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
asyncio.run(main())

View File

@@ -0,0 +1,95 @@
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import sys
import time
import math
import random
import struct
import logging
import asyncio
import os
from bumble.core import AdvertisingData
from bumble.device import Device
from bumble.transport import open_transport_or_link
from bumble.profiles.device_information_service import DeviceInformationService
from bumble.profiles.heart_rate_service import HeartRateService
# -----------------------------------------------------------------------------
async def main():
if len(sys.argv) != 3:
print('Usage: python heart_rate_server.py <device-config> <transport-spec>')
print('example: python heart_rate_server.py device1.json usb:0')
return
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
# Keep track of accumulated expended energy
energy_start_time = time.time()
def reset_energy_expended():
nonlocal energy_start_time
energy_start_time = time.time()
# Add a Device Information Service and Heart Rate Service to the GATT sever
device_information_service = DeviceInformationService(
manufacturer_name = 'ACME',
model_number = 'HR-102',
serial_number = '7654321',
hardware_revision = '1.1.3',
software_revision = '2.5.6',
system_id = (0x123456, 0x8877665544)
)
heart_rate_service = HeartRateService(
read_heart_rate_measurement = lambda _: HeartRateService.HeartRateMeasurement(
heart_rate = 100 + int(50 * math.sin(time.time() * math.pi / 60)),
sensor_contact_detected = random.choice((True, False, None)),
energy_expended = random.choice((int((time.time() - energy_start_time) * 100), None)),
rr_intervals = random.choice(((random.randint(900, 1100) / 1000, random.randint(900, 1100) / 1000), None))
),
body_sensor_location=HeartRateService.BodySensorLocation.WRIST,
reset_energy_expended=lambda _: reset_energy_expended()
)
device.add_services([device_information_service, heart_rate_service])
# Set the advertising data
device.advertising_data = bytes(
AdvertisingData([
(AdvertisingData.COMPLETE_LOCAL_NAME, bytes('Bumble Heart', 'utf-8')),
(AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS, bytes(heart_rate_service.uuid)),
(AdvertisingData.APPEARANCE, struct.pack('<H', 0x0340))
])
)
# Go!
await device.power_on()
await device.start_advertising(auto_restart=True)
# Notify every 3 seconds
while True:
await asyncio.sleep(3.0)
await device.notify_subscribers(heart_rate_service.heart_rate_measurement_characteristic)
# -----------------------------------------------------------------------------
logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
asyncio.run(main())