forked from auracaster/bumble_mirror
add in-context uuids and service proxy factories
This commit is contained in:
@@ -137,6 +137,17 @@ class Peer:
|
||||
def get_characteristics_by_uuid(self, uuid, service = None):
|
||||
return self.gatt_client.get_characteristics_by_uuid(uuid, service)
|
||||
|
||||
def create_service_proxy(self, proxy_class):
|
||||
return proxy_class.from_client(self.gatt_client)
|
||||
|
||||
async def discover_service_and_create_proxy(self, proxy_class):
|
||||
# Discover the first matching service and its characteristics
|
||||
services = await self.discover_service(proxy_class.SERVICE_CLASS.UUID)
|
||||
if services:
|
||||
service = services[0]
|
||||
await service.discover_characteristics()
|
||||
return self.create_service_proxy(proxy_class)
|
||||
|
||||
# [Classic only]
|
||||
async def request_name(self):
|
||||
return await self.connection.request_remote_name()
|
||||
|
||||
@@ -197,6 +197,18 @@ class Service(Attribute):
|
||||
return f'Service(handle=0x{self.handle:04X}, end=0x{self.end_group_handle:04X}, uuid={self.uuid}){"" if self.primary else "*"}'
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
class TemplateService(Service):
|
||||
'''
|
||||
Convenience abstract class that can be used by profile-specific subclasses that want
|
||||
to expose their UUID as a class property
|
||||
'''
|
||||
UUID = None
|
||||
|
||||
def __init__(self, characteristics, primary=True):
|
||||
super().__init__(self.UUID, characteristics, primary)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
class Characteristic(Attribute):
|
||||
'''
|
||||
|
||||
@@ -35,6 +35,7 @@ from .gatt import (
|
||||
GATT_CLIENT_CHARACTERISTIC_CONFIGURATION_DESCRIPTOR,
|
||||
GATT_REQUEST_TIMEOUT,
|
||||
GATT_PRIMARY_SERVICE_ATTRIBUTE_TYPE,
|
||||
GATT_SECONDARY_SERVICE_ATTRIBUTE_TYPE,
|
||||
GATT_CHARACTERISTIC_ATTRIBUTE_TYPE,
|
||||
Characteristic
|
||||
)
|
||||
@@ -49,12 +50,12 @@ logger = logging.getLogger(__name__)
|
||||
# Proxies
|
||||
# -----------------------------------------------------------------------------
|
||||
class AttributeProxy(EventEmitter):
|
||||
def __init__(self, client, handle, end_group_handle, uuid):
|
||||
def __init__(self, client, handle, end_group_handle, attribute_type):
|
||||
EventEmitter.__init__(self)
|
||||
self.client = client
|
||||
self.handle = handle
|
||||
self.end_group_handle = end_group_handle
|
||||
self.uuid = uuid
|
||||
self.type = attribute_type
|
||||
|
||||
async def read_value(self, no_long_read=False):
|
||||
return await self.client.read_value(self.handle, no_long_read)
|
||||
@@ -63,13 +64,22 @@ class AttributeProxy(EventEmitter):
|
||||
return await self.client.write_value(self.handle, value, with_response)
|
||||
|
||||
def __str__(self):
|
||||
return f'Attribute(handle=0x{self.handle:04X}, uuid={self.uuid})'
|
||||
return f'Attribute(handle=0x{self.handle:04X}, type={self.uuid})'
|
||||
|
||||
|
||||
class ServiceProxy(AttributeProxy):
|
||||
def __init__(self, client, handle, end_group_handle, uuid):
|
||||
super().__init__(client, handle, end_group_handle, uuid)
|
||||
self.characteristics = []
|
||||
@staticmethod
|
||||
def from_client(cls, client, service_uuid):
|
||||
# The service and its characteristics are considered to have already been discovered
|
||||
services = client.get_services_by_uuid(service_uuid)
|
||||
service = services[0] if services else None
|
||||
return cls(service) if service else None
|
||||
|
||||
def __init__(self, client, handle, end_group_handle, uuid, primary=True):
|
||||
attribute_type = GATT_PRIMARY_SERVICE_ATTRIBUTE_TYPE if primary else GATT_SECONDARY_SERVICE_ATTRIBUTE_TYPE
|
||||
super().__init__(client, handle, end_group_handle, attribute_type)
|
||||
self.uuid = uuid
|
||||
self.characteristics = []
|
||||
|
||||
async def discover_characteristics(self, uuids=[]):
|
||||
return await self.client.discover_characteristics(uuids, self)
|
||||
@@ -84,13 +94,14 @@ class ServiceProxy(AttributeProxy):
|
||||
class CharacteristicProxy(AttributeProxy):
|
||||
def __init__(self, client, handle, end_group_handle, uuid, properties):
|
||||
super().__init__(client, handle, end_group_handle, uuid)
|
||||
self.uuid = uuid
|
||||
self.properties = properties
|
||||
self.descriptors = []
|
||||
self.descriptors_discovered = False
|
||||
|
||||
def get_descriptor(self, descriptor_type):
|
||||
for descriptor in self.descriptors:
|
||||
if descriptor.uuid == descriptor_type:
|
||||
if descriptor.type == descriptor_type:
|
||||
return descriptor
|
||||
|
||||
async def discover_descriptors(self):
|
||||
@@ -104,11 +115,20 @@ class CharacteristicProxy(AttributeProxy):
|
||||
|
||||
|
||||
class DescriptorProxy(AttributeProxy):
|
||||
def __init__(self, client, handle, uuid):
|
||||
super().__init__(client, handle, 0, uuid)
|
||||
def __init__(self, client, handle, descriptor_type):
|
||||
super().__init__(client, handle, 0, descriptor_type)
|
||||
|
||||
def __str__(self):
|
||||
return f'Descriptor(handle=0x{self.handle:04X}, uuid={self.uuid})'
|
||||
return f'Descriptor(handle=0x{self.handle:04X}, type={self.type})'
|
||||
|
||||
|
||||
class ProfileServiceProxy:
|
||||
'''
|
||||
Base class for profile-specific service proxies
|
||||
'''
|
||||
@classmethod
|
||||
def from_client(cls, client):
|
||||
return ServiceProxy.from_client(cls, client, cls.SERVICE_CLASS.UUID)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@@ -238,7 +258,13 @@ class Client:
|
||||
return
|
||||
|
||||
# Create a service proxy for this service
|
||||
service = ServiceProxy(self, attribute_handle, end_group_handle, UUID.from_bytes(attribute_value))
|
||||
service = ServiceProxy(
|
||||
self,
|
||||
attribute_handle,
|
||||
end_group_handle,
|
||||
UUID.from_bytes(attribute_value),
|
||||
True
|
||||
)
|
||||
|
||||
# Filter out returned services based on the given uuids list
|
||||
if (not uuids) or (service.uuid in uuids):
|
||||
@@ -296,7 +322,7 @@ class Client:
|
||||
return
|
||||
|
||||
# Create a service proxy for this service
|
||||
service = ServiceProxy(self, attribute_handle, end_group_handle, uuid)
|
||||
service = ServiceProxy(self, attribute_handle, end_group_handle, uuid, True)
|
||||
|
||||
# Add the service to the peer's service list
|
||||
services.append(service)
|
||||
|
||||
@@ -16,10 +16,11 @@
|
||||
# -----------------------------------------------------------------------------
|
||||
# Imports
|
||||
# -----------------------------------------------------------------------------
|
||||
from ..gatt_client import ProfileServiceProxy
|
||||
from ..gatt import (
|
||||
GATT_BATTERY_SERVICE,
|
||||
GATT_BATTERY_LEVEL_CHARACTERISTIC,
|
||||
Service,
|
||||
TemplateService,
|
||||
Characteristic,
|
||||
CharacteristicValue,
|
||||
PackedCharacteristicAdapter
|
||||
@@ -27,7 +28,8 @@ from ..gatt import (
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
class BatteryService(Service):
|
||||
class BatteryService(TemplateService):
|
||||
UUID = GATT_BATTERY_SERVICE
|
||||
BATTERY_LEVEL_FORMAT = 'B'
|
||||
|
||||
def __init__(self, read_battery_level):
|
||||
@@ -40,11 +42,13 @@ class BatteryService(Service):
|
||||
),
|
||||
format=BatteryService.BATTERY_LEVEL_FORMAT
|
||||
)
|
||||
super().__init__(GATT_BATTERY_SERVICE, [self.battery_level_characteristic])
|
||||
super().__init__([self.battery_level_characteristic])
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
class BatteryServiceProxy:
|
||||
class BatteryServiceProxy(ProfileServiceProxy):
|
||||
SERVICE_CLASS = BatteryService
|
||||
|
||||
def __init__(self, service_proxy):
|
||||
self.service_proxy = service_proxy
|
||||
|
||||
|
||||
@@ -19,6 +19,7 @@
|
||||
import struct
|
||||
from typing import Tuple
|
||||
|
||||
from ..gatt_client import ProfileServiceProxy
|
||||
from ..gatt import (
|
||||
GATT_DEVICE_INFORMATION_SERVICE,
|
||||
GATT_FIRMWARE_REVISION_STRING_CHARACTERISTIC,
|
||||
@@ -29,7 +30,7 @@ from ..gatt import (
|
||||
GATT_SOFTWARE_REVISION_STRING_CHARACTERISTIC,
|
||||
GATT_SYSTEM_ID_CHARACTERISTIC,
|
||||
GATT_REGULATORY_CERTIFICATION_DATA_LIST_CHARACTERISTIC,
|
||||
Service,
|
||||
TemplateService,
|
||||
Characteristic,
|
||||
DelegatedCharacteristicAdapter,
|
||||
UTF8CharacteristicAdapter
|
||||
@@ -37,7 +38,9 @@ from ..gatt import (
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
class DeviceInformationService(Service):
|
||||
class DeviceInformationService(TemplateService):
|
||||
UUID = GATT_DEVICE_INFORMATION_SERVICE
|
||||
|
||||
@staticmethod
|
||||
def pack_system_id(oui, manufacturer_id):
|
||||
return struct.pack('<Q', oui << 40 | manufacturer_id)
|
||||
@@ -93,11 +96,13 @@ class DeviceInformationService(Service):
|
||||
ieee_regulatory_certification_data_list
|
||||
))
|
||||
|
||||
super().__init__(GATT_DEVICE_INFORMATION_SERVICE, characteristics)
|
||||
super().__init__(characteristics)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
class DeviceInformationServiceProxy:
|
||||
class DeviceInformationServiceProxy(ProfileServiceProxy):
|
||||
SERVICE_CLASS = DeviceInformationService
|
||||
|
||||
def __init__(self, service_proxy):
|
||||
self.service_proxy = service_proxy
|
||||
|
||||
|
||||
@@ -21,9 +21,7 @@ import os
|
||||
import logging
|
||||
from colors import color
|
||||
from bumble.device import Device, Peer
|
||||
from bumble.host import Host
|
||||
from bumble.transport import open_transport
|
||||
from bumble import gatt
|
||||
from bumble.profiles.battery_service import BatteryServiceProxy
|
||||
|
||||
|
||||
@@ -39,8 +37,7 @@ async def main():
|
||||
print('<<< connected')
|
||||
|
||||
# Create and start a device
|
||||
host = Host(controller_source=hci_source, controller_sink=hci_sink)
|
||||
device = Device('Bumble', address = 'F0:F1:F2:F3:F4:F5', host = host)
|
||||
device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink)
|
||||
await device.power_on()
|
||||
|
||||
# Connect to the peer
|
||||
@@ -52,25 +49,19 @@ async def main():
|
||||
# Discover the Battery Service
|
||||
peer = Peer(connection)
|
||||
print('=== Discovering Battery Service')
|
||||
await peer.discover_services([gatt.GATT_BATTERY_SERVICE])
|
||||
battery_service = await peer.discover_and_create_service_proxy(BatteryServiceProxy)
|
||||
|
||||
# Check that the service was found
|
||||
battery_services = peer.get_services_by_uuid(gatt.GATT_BATTERY_SERVICE)
|
||||
if not battery_services:
|
||||
if not battery_service:
|
||||
print('!!! Service not found')
|
||||
return
|
||||
battery_service = battery_services[0]
|
||||
await battery_service.discover_characteristics()
|
||||
|
||||
# Create a service-specific proxy to read and decode the values
|
||||
battery_client = BatteryServiceProxy(battery_service)
|
||||
|
||||
# Subscribe to and read the battery level
|
||||
if battery_client.battery_level:
|
||||
await battery_client.battery_level.subscribe(
|
||||
if battery_service.battery_level:
|
||||
await battery_service.battery_level.subscribe(
|
||||
lambda value: print(f'{color("Battery Level Update:", "green")} {value}')
|
||||
)
|
||||
value = await battery_client.battery_level.read_value()
|
||||
value = await battery_service.battery_level.read_value()
|
||||
print(f'{color("Initial Battery Level:", "green")} {value}')
|
||||
|
||||
await hci_source.wait_for_termination()
|
||||
|
||||
@@ -21,17 +21,15 @@ import os
|
||||
import logging
|
||||
from colors import color
|
||||
from bumble.device import Device, Peer
|
||||
from bumble.host import Host
|
||||
from bumble.profiles.device_information_service import DeviceInformationServiceProxy
|
||||
from bumble.transport import open_transport
|
||||
from bumble import gatt
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def main():
|
||||
if len(sys.argv) != 3:
|
||||
print('Usage: device_info_client.py <transport-spec> <bluetooth-address>')
|
||||
print('example: device_info_client.py usb:0 E1:CA:72:48:C4:E8')
|
||||
print('Usage: device_information_client.py <transport-spec> <bluetooth-address>')
|
||||
print('example: device_information_client.py usb:0 E1:CA:72:48:C4:E8')
|
||||
return
|
||||
|
||||
print('<<< connecting to HCI...')
|
||||
@@ -39,8 +37,7 @@ async def main():
|
||||
print('<<< connected')
|
||||
|
||||
# Create and start a device
|
||||
host = Host(controller_source=hci_source, controller_sink=hci_sink)
|
||||
device = Device('Bumble', address = 'F0:F1:F2:F3:F4:F5', host = host)
|
||||
device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink)
|
||||
await device.power_on()
|
||||
|
||||
# Connect to the peer
|
||||
@@ -52,36 +49,30 @@ async def main():
|
||||
# Discover the Device Information service
|
||||
peer = Peer(connection)
|
||||
print('=== Discovering Device Information Service')
|
||||
await peer.discover_services([gatt.GATT_DEVICE_INFORMATION_SERVICE])
|
||||
device_information_service = await peer.discover_service_and_create_proxy(DeviceInformationServiceProxy)
|
||||
|
||||
# Check that the service was found
|
||||
device_info_services = peer.get_services_by_uuid(gatt.GATT_DEVICE_INFORMATION_SERVICE)
|
||||
if not device_info_services:
|
||||
if device_information_service is None:
|
||||
print('!!! Service not found')
|
||||
return
|
||||
device_info_service = device_info_services[0]
|
||||
await device_info_service.discover_characteristics()
|
||||
|
||||
# Create a service-specific proxy to read and decode the values
|
||||
device_info = DeviceInformationServiceProxy(device_info_service)
|
||||
|
||||
# Read and print the fields
|
||||
if device_info.manufacturer_name is not None:
|
||||
print(color('Manufacturer Name: ', 'green'), await device_info.manufacturer_name.read_value())
|
||||
if device_info.model_number is not None:
|
||||
print(color('Model Number: ', 'green'), await device_info.model_number.read_value())
|
||||
if device_info.serial_number is not None:
|
||||
print(color('Serial Number: ', 'green'), await device_info.serial_number.read_value())
|
||||
if device_info.hardware_revision is not None:
|
||||
print(color('Hardware Revision: ', 'green'), await device_info.hardware_revision.read_value())
|
||||
if device_info.firmware_revision is not None:
|
||||
print(color('Firmware Revision: ', 'green'), await device_info.firmware_revision.read_value())
|
||||
if device_info.software_revision is not None:
|
||||
print(color('Software Revision: ', 'green'), await device_info.software_revision.read_value())
|
||||
if device_info.system_id is not None:
|
||||
print(color('System ID: ', 'green'), await device_info.system_id.read_value())
|
||||
if device_info.ieee_regulatory_certification_data_list is not None:
|
||||
print(color('Regulatory Certification:', 'green'), (await device_info.ieee_regulatory_certification_data_list.read_value()).hex())
|
||||
if device_information_service.manufacturer_name is not None:
|
||||
print(color('Manufacturer Name: ', 'green'), await device_information_service.manufacturer_name.read_value())
|
||||
if device_information_service.model_number is not None:
|
||||
print(color('Model Number: ', 'green'), await device_information_service.model_number.read_value())
|
||||
if device_information_service.serial_number is not None:
|
||||
print(color('Serial Number: ', 'green'), await device_information_service.serial_number.read_value())
|
||||
if device_information_service.hardware_revision is not None:
|
||||
print(color('Hardware Revision: ', 'green'), await device_information_service.hardware_revision.read_value())
|
||||
if device_information_service.firmware_revision is not None:
|
||||
print(color('Firmware Revision: ', 'green'), await device_information_service.firmware_revision.read_value())
|
||||
if device_information_service.software_revision is not None:
|
||||
print(color('Software Revision: ', 'green'), await device_information_service.software_revision.read_value())
|
||||
if device_information_service.system_id is not None:
|
||||
print(color('System ID: ', 'green'), await device_information_service.system_id.read_value())
|
||||
if device_information_service.ieee_regulatory_certification_data_list is not None:
|
||||
print(color('Regulatory Certification:', 'green'), (await device_information_service.ieee_regulatory_certification_data_list.read_value()).hex())
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
Reference in New Issue
Block a user