diff --git a/apps/console.py b/apps/console.py index 26223d7a..37afd22e 100644 --- a/apps/console.py +++ b/apps/console.py @@ -28,6 +28,7 @@ from typing import Optional from collections import OrderedDict import click +from prettytable import PrettyTable from prompt_toolkit import Application from prompt_toolkit.history import FileHistory @@ -162,6 +163,7 @@ class ConsoleApp: 'device': None, 'local-services': None, 'remote-services': None, + 'local-values': None, }, 'filter': { 'address': None, @@ -207,6 +209,7 @@ class ConsoleApp: self.log_text = FormattedTextControl( get_cursor_position=lambda: Point(0, max(0, len(self.log_lines) - 1)) ) + self.local_values_text = FormattedTextControl() self.log_height = Dimension(min=7, weight=4) self.log_max_lines = 100 self.log_lines = [] @@ -221,6 +224,10 @@ class ConsoleApp: Frame(Window(self.local_services_text), title='Local Services'), filter=Condition(lambda: self.top_tab == 'local-services'), ), + ConditionalContainer( + Frame(Window(self.local_values_text), title='Local Values'), + filter=Condition(lambda: self.top_tab == 'local-values'), + ), ConditionalContainer( Frame(Window(self.remote_services_text), title='Remote Services'), filter=Condition(lambda: self.top_tab == 'remote-services'), @@ -674,10 +681,70 @@ class ConsoleApp: 'device', 'local-services', 'remote-services', + 'local-values', }: self.top_tab = params[0] self.ui.invalidate() + while self.top_tab == 'local-values': + await self.do_show_local_values() + await asyncio.sleep(1) + + async def do_show_local_values(self): + prettytable = PrettyTable() + field_names = ["Service", "Characteristic", "Descriptor"] + + # if there's no connections, add a column just for value + if not self.device.connections: + field_names.append("Value") + + # if there are connections, add a column for each connection's value + for connection in self.device.connections.values(): + field_names.append(f"Connection {connection.handle}") + + for attribute in self.device.gatt_server.attributes: + if isinstance(attribute, Characteristic): + service = self.device.gatt_server.get_attribute_group( + attribute.handle, Service + ) + if not service: + continue + values = [ + attribute.read_value(connection) + for connection in self.device.connections.values() + ] + if not values: + values = [attribute.read_value(None)] + prettytable.add_row([f"{service.uuid}", attribute.uuid, ""] + values) + + elif isinstance(attribute, Descriptor): + service = self.device.gatt_server.get_attribute_group( + attribute.handle, Service + ) + if not service: + continue + characteristic = self.device.gatt_server.get_attribute_group( + attribute.handle, Characteristic + ) + if not characteristic: + continue + values = [ + attribute.read_value(connection) + for connection in self.device.connections.values() + ] + if not values: + values = [attribute.read_value(None)] + + # TODO: future optimization: convert CCCD value to human readable string + + prettytable.add_row( + [service.uuid, characteristic.uuid, attribute.type] + values + ) + + prettytable.field_names = field_names + self.local_values_text.text = prettytable.get_string() + self.ui.invalidate() + async def do_get_phy(self, _): if not self.connected_peer: self.show_error('not connected') diff --git a/bumble/gatt_server.py b/bumble/gatt_server.py index 3a5953a3..e3529c8a 100644 --- a/bumble/gatt_server.py +++ b/bumble/gatt_server.py @@ -27,7 +27,7 @@ import asyncio import logging from collections import defaultdict import struct -from typing import List, Tuple, Optional +from typing import List, Tuple, Optional, TypeVar, Type from pyee import EventEmitter from .colors import color @@ -135,6 +135,21 @@ class Server(EventEmitter): return attribute return None + AttributeGroupType = TypeVar('AttributeGroupType', Service, Characteristic) + + def get_attribute_group( + self, handle: int, group_type: Type[AttributeGroupType] + ) -> Optional[AttributeGroupType]: + return next( + ( + attribute + for attribute in self.attributes + if isinstance(attribute, group_type) + and attribute.handle <= handle <= attribute.end_group_handle + ), + None, + ) + def get_service_attribute(self, service_uuid: UUID) -> Optional[Service]: return next( ( diff --git a/setup.cfg b/setup.cfg index 0a4aae34..57d39753 100644 --- a/setup.cfg +++ b/setup.cfg @@ -43,6 +43,7 @@ install_requires = pyserial >= 3.5; platform_system!='Emscripten' pyusb >= 1.2; platform_system!='Emscripten' websockets >= 8.1; platform_system!='Emscripten' + prettytable >= 3.6.0 [options.entry_points] console_scripts = diff --git a/tests/gatt_test.py b/tests/gatt_test.py index 70bbdb86..478f2999 100644 --- a/tests/gatt_test.py +++ b/tests/gatt_test.py @@ -23,6 +23,7 @@ import pytest from bumble.controller import Controller from bumble.gatt_client import CharacteristicProxy +from bumble.gatt_server import Server from bumble.link import LocalLink from bumble.device import Device, Peer from bumble.host import Host @@ -886,6 +887,93 @@ def test_descriptor_permissions(): assert descriptor.permissions == 3 +# ----------------------------------------------------------------------------- +def test_get_attribute_group(): + device = Device() + + # add some services / characteristics to the gatt server + characteristic1 = Characteristic( + '1111', + Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY, + Characteristic.READABLE | Characteristic.WRITEABLE, + bytes([123]), + ) + characteristic2 = Characteristic( + '2222', + Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY, + Characteristic.READABLE | Characteristic.WRITEABLE, + bytes([123]), + ) + services = [Service('1212', [characteristic1]), Service('3233', [characteristic2])] + device.gatt_server.add_services(services) + + # get the handles from gatt server + characteristic_attributes1 = device.gatt_server.get_characteristic_attributes( + UUID('1212'), UUID('1111') + ) + assert characteristic_attributes1 is not None + characteristic_attributes2 = device.gatt_server.get_characteristic_attributes( + UUID('3233'), UUID('2222') + ) + assert characteristic_attributes2 is not None + descriptor1 = device.gatt_server.get_descriptor_attribute( + UUID('1212'), UUID('1111'), UUID('2902') + ) + assert descriptor1 is not None + descriptor2 = device.gatt_server.get_descriptor_attribute( + UUID('3233'), UUID('2222'), UUID('2902') + ) + assert descriptor2 is not None + + # confirm the handles map back to the service + assert ( + UUID('1212') + == device.gatt_server.get_attribute_group( + characteristic_attributes1[0].handle, Service + ).uuid + ) + assert ( + UUID('1212') + == device.gatt_server.get_attribute_group( + characteristic_attributes1[1].handle, Service + ).uuid + ) + assert ( + UUID('1212') + == device.gatt_server.get_attribute_group(descriptor1.handle, Service).uuid + ) + assert ( + UUID('3233') + == device.gatt_server.get_attribute_group( + characteristic_attributes2[0].handle, Service + ).uuid + ) + assert ( + UUID('3233') + == device.gatt_server.get_attribute_group( + characteristic_attributes2[1].handle, Service + ).uuid + ) + assert ( + UUID('3233') + == device.gatt_server.get_attribute_group(descriptor2.handle, Service).uuid + ) + + # confirm the handles map back to the characteristic + assert ( + UUID('1111') + == device.gatt_server.get_attribute_group( + descriptor1.handle, Characteristic + ).uuid + ) + assert ( + UUID('2222') + == device.gatt_server.get_attribute_group( + descriptor2.handle, Characteristic + ).uuid + ) + + # ----------------------------------------------------------------------------- if __name__ == '__main__': logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())