Merge pull request #163 from AlanRosenthal/alan/local-values

Add `show local-values`
This commit is contained in:
Alan Rosenthal
2023-03-31 16:03:52 -04:00
committed by GitHub
4 changed files with 172 additions and 1 deletions

View File

@@ -28,6 +28,7 @@ from typing import Optional
from collections import OrderedDict from collections import OrderedDict
import click import click
from prettytable import PrettyTable
from prompt_toolkit import Application from prompt_toolkit import Application
from prompt_toolkit.history import FileHistory from prompt_toolkit.history import FileHistory
@@ -162,6 +163,7 @@ class ConsoleApp:
'device': None, 'device': None,
'local-services': None, 'local-services': None,
'remote-services': None, 'remote-services': None,
'local-values': None,
}, },
'filter': { 'filter': {
'address': None, 'address': None,
@@ -207,6 +209,7 @@ class ConsoleApp:
self.log_text = FormattedTextControl( self.log_text = FormattedTextControl(
get_cursor_position=lambda: Point(0, max(0, len(self.log_lines) - 1)) get_cursor_position=lambda: Point(0, max(0, len(self.log_lines) - 1))
) )
self.local_values_text = FormattedTextControl()
self.log_height = Dimension(min=7, weight=4) self.log_height = Dimension(min=7, weight=4)
self.log_max_lines = 100 self.log_max_lines = 100
self.log_lines = [] self.log_lines = []
@@ -221,6 +224,10 @@ class ConsoleApp:
Frame(Window(self.local_services_text), title='Local Services'), Frame(Window(self.local_services_text), title='Local Services'),
filter=Condition(lambda: self.top_tab == 'local-services'), filter=Condition(lambda: self.top_tab == 'local-services'),
), ),
ConditionalContainer(
Frame(Window(self.local_values_text), title='Local Values'),
filter=Condition(lambda: self.top_tab == 'local-values'),
),
ConditionalContainer( ConditionalContainer(
Frame(Window(self.remote_services_text), title='Remote Services'), Frame(Window(self.remote_services_text), title='Remote Services'),
filter=Condition(lambda: self.top_tab == 'remote-services'), filter=Condition(lambda: self.top_tab == 'remote-services'),
@@ -674,10 +681,70 @@ class ConsoleApp:
'device', 'device',
'local-services', 'local-services',
'remote-services', 'remote-services',
'local-values',
}: }:
self.top_tab = params[0] self.top_tab = params[0]
self.ui.invalidate() self.ui.invalidate()
while self.top_tab == 'local-values':
await self.do_show_local_values()
await asyncio.sleep(1)
async def do_show_local_values(self):
prettytable = PrettyTable()
field_names = ["Service", "Characteristic", "Descriptor"]
# if there's no connections, add a column just for value
if not self.device.connections:
field_names.append("Value")
# if there are connections, add a column for each connection's value
for connection in self.device.connections.values():
field_names.append(f"Connection {connection.handle}")
for attribute in self.device.gatt_server.attributes:
if isinstance(attribute, Characteristic):
service = self.device.gatt_server.get_attribute_group(
attribute.handle, Service
)
if not service:
continue
values = [
attribute.read_value(connection)
for connection in self.device.connections.values()
]
if not values:
values = [attribute.read_value(None)]
prettytable.add_row([f"{service.uuid}", attribute.uuid, ""] + values)
elif isinstance(attribute, Descriptor):
service = self.device.gatt_server.get_attribute_group(
attribute.handle, Service
)
if not service:
continue
characteristic = self.device.gatt_server.get_attribute_group(
attribute.handle, Characteristic
)
if not characteristic:
continue
values = [
attribute.read_value(connection)
for connection in self.device.connections.values()
]
if not values:
values = [attribute.read_value(None)]
# TODO: future optimization: convert CCCD value to human readable string
prettytable.add_row(
[service.uuid, characteristic.uuid, attribute.type] + values
)
prettytable.field_names = field_names
self.local_values_text.text = prettytable.get_string()
self.ui.invalidate()
async def do_get_phy(self, _): async def do_get_phy(self, _):
if not self.connected_peer: if not self.connected_peer:
self.show_error('not connected') self.show_error('not connected')

View File

@@ -27,7 +27,7 @@ import asyncio
import logging import logging
from collections import defaultdict from collections import defaultdict
import struct import struct
from typing import List, Tuple, Optional from typing import List, Tuple, Optional, TypeVar, Type
from pyee import EventEmitter from pyee import EventEmitter
from .colors import color from .colors import color
@@ -135,6 +135,21 @@ class Server(EventEmitter):
return attribute return attribute
return None return None
AttributeGroupType = TypeVar('AttributeGroupType', Service, Characteristic)
def get_attribute_group(
self, handle: int, group_type: Type[AttributeGroupType]
) -> Optional[AttributeGroupType]:
return next(
(
attribute
for attribute in self.attributes
if isinstance(attribute, group_type)
and attribute.handle <= handle <= attribute.end_group_handle
),
None,
)
def get_service_attribute(self, service_uuid: UUID) -> Optional[Service]: def get_service_attribute(self, service_uuid: UUID) -> Optional[Service]:
return next( return next(
( (

View File

@@ -43,6 +43,7 @@ install_requires =
pyserial >= 3.5; platform_system!='Emscripten' pyserial >= 3.5; platform_system!='Emscripten'
pyusb >= 1.2; platform_system!='Emscripten' pyusb >= 1.2; platform_system!='Emscripten'
websockets >= 8.1; platform_system!='Emscripten' websockets >= 8.1; platform_system!='Emscripten'
prettytable >= 3.6.0
[options.entry_points] [options.entry_points]
console_scripts = console_scripts =

View File

@@ -23,6 +23,7 @@ import pytest
from bumble.controller import Controller from bumble.controller import Controller
from bumble.gatt_client import CharacteristicProxy from bumble.gatt_client import CharacteristicProxy
from bumble.gatt_server import Server
from bumble.link import LocalLink from bumble.link import LocalLink
from bumble.device import Device, Peer from bumble.device import Device, Peer
from bumble.host import Host from bumble.host import Host
@@ -886,6 +887,93 @@ def test_descriptor_permissions():
assert descriptor.permissions == 3 assert descriptor.permissions == 3
# -----------------------------------------------------------------------------
def test_get_attribute_group():
device = Device()
# add some services / characteristics to the gatt server
characteristic1 = Characteristic(
'1111',
Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY,
Characteristic.READABLE | Characteristic.WRITEABLE,
bytes([123]),
)
characteristic2 = Characteristic(
'2222',
Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY,
Characteristic.READABLE | Characteristic.WRITEABLE,
bytes([123]),
)
services = [Service('1212', [characteristic1]), Service('3233', [characteristic2])]
device.gatt_server.add_services(services)
# get the handles from gatt server
characteristic_attributes1 = device.gatt_server.get_characteristic_attributes(
UUID('1212'), UUID('1111')
)
assert characteristic_attributes1 is not None
characteristic_attributes2 = device.gatt_server.get_characteristic_attributes(
UUID('3233'), UUID('2222')
)
assert characteristic_attributes2 is not None
descriptor1 = device.gatt_server.get_descriptor_attribute(
UUID('1212'), UUID('1111'), UUID('2902')
)
assert descriptor1 is not None
descriptor2 = device.gatt_server.get_descriptor_attribute(
UUID('3233'), UUID('2222'), UUID('2902')
)
assert descriptor2 is not None
# confirm the handles map back to the service
assert (
UUID('1212')
== device.gatt_server.get_attribute_group(
characteristic_attributes1[0].handle, Service
).uuid
)
assert (
UUID('1212')
== device.gatt_server.get_attribute_group(
characteristic_attributes1[1].handle, Service
).uuid
)
assert (
UUID('1212')
== device.gatt_server.get_attribute_group(descriptor1.handle, Service).uuid
)
assert (
UUID('3233')
== device.gatt_server.get_attribute_group(
characteristic_attributes2[0].handle, Service
).uuid
)
assert (
UUID('3233')
== device.gatt_server.get_attribute_group(
characteristic_attributes2[1].handle, Service
).uuid
)
assert (
UUID('3233')
== device.gatt_server.get_attribute_group(descriptor2.handle, Service).uuid
)
# confirm the handles map back to the characteristic
assert (
UUID('1111')
== device.gatt_server.get_attribute_group(
descriptor1.handle, Characteristic
).uuid
)
assert (
UUID('2222')
== device.gatt_server.get_attribute_group(
descriptor2.handle, Characteristic
).uuid
)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
if __name__ == '__main__': if __name__ == '__main__':
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper()) logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())