mirror of
https://github.com/google/bumble.git
synced 2026-05-09 04:08:02 +00:00
Add show local-values
This PR adds a way to display the local gatt characteristics/descriptors values If no connections, it shows the value of every characteristic/descriptor. When there's a connection, it shows the value for each specific connection - CCCDs are connection specific This screen auto-updates every second
This commit is contained in:
committed by
Alan Rosenthal
parent
fb68fa6a33
commit
e026de295f
@@ -28,6 +28,7 @@ from typing import Optional
|
||||
from collections import OrderedDict
|
||||
|
||||
import click
|
||||
from prettytable import PrettyTable
|
||||
|
||||
from prompt_toolkit import Application
|
||||
from prompt_toolkit.history import FileHistory
|
||||
@@ -162,6 +163,7 @@ class ConsoleApp:
|
||||
'device': None,
|
||||
'local-services': None,
|
||||
'remote-services': None,
|
||||
'local-values': None,
|
||||
},
|
||||
'filter': {
|
||||
'address': None,
|
||||
@@ -207,6 +209,7 @@ class ConsoleApp:
|
||||
self.log_text = FormattedTextControl(
|
||||
get_cursor_position=lambda: Point(0, max(0, len(self.log_lines) - 1))
|
||||
)
|
||||
self.local_values_text = FormattedTextControl()
|
||||
self.log_height = Dimension(min=7, weight=4)
|
||||
self.log_max_lines = 100
|
||||
self.log_lines = []
|
||||
@@ -221,6 +224,10 @@ class ConsoleApp:
|
||||
Frame(Window(self.local_services_text), title='Local Services'),
|
||||
filter=Condition(lambda: self.top_tab == 'local-services'),
|
||||
),
|
||||
ConditionalContainer(
|
||||
Frame(Window(self.local_values_text), title='Local Values'),
|
||||
filter=Condition(lambda: self.top_tab == 'local-values'),
|
||||
),
|
||||
ConditionalContainer(
|
||||
Frame(Window(self.remote_services_text), title='Remote Services'),
|
||||
filter=Condition(lambda: self.top_tab == 'remote-services'),
|
||||
@@ -674,10 +681,70 @@ class ConsoleApp:
|
||||
'device',
|
||||
'local-services',
|
||||
'remote-services',
|
||||
'local-values',
|
||||
}:
|
||||
self.top_tab = params[0]
|
||||
self.ui.invalidate()
|
||||
|
||||
while self.top_tab == 'local-values':
|
||||
await self.do_show_local_values()
|
||||
await asyncio.sleep(1)
|
||||
|
||||
async def do_show_local_values(self):
|
||||
prettytable = PrettyTable()
|
||||
field_names = ["Service", "Characteristic", "Descriptor"]
|
||||
|
||||
# if there's no connections, add a column just for value
|
||||
if not self.device.connections:
|
||||
field_names.append("Value")
|
||||
|
||||
# if there are connections, add a column for each connection's value
|
||||
for connection in self.device.connections.values():
|
||||
field_names.append(f"Connection {connection.handle}")
|
||||
|
||||
for attribute in self.device.gatt_server.attributes:
|
||||
if isinstance(attribute, Characteristic):
|
||||
service = self.device.gatt_server.get_attribute_group(
|
||||
attribute.handle, Service
|
||||
)
|
||||
if not service:
|
||||
continue
|
||||
values = [
|
||||
attribute.read_value(connection)
|
||||
for connection in self.device.connections.values()
|
||||
]
|
||||
if not values:
|
||||
values = [attribute.read_value(None)]
|
||||
prettytable.add_row([f"{service.uuid}", attribute.uuid, ""] + values)
|
||||
|
||||
elif isinstance(attribute, Descriptor):
|
||||
service = self.device.gatt_server.get_attribute_group(
|
||||
attribute.handle, Service
|
||||
)
|
||||
if not service:
|
||||
continue
|
||||
characteristic = self.device.gatt_server.get_attribute_group(
|
||||
attribute.handle, Characteristic
|
||||
)
|
||||
if not characteristic:
|
||||
continue
|
||||
values = [
|
||||
attribute.read_value(connection)
|
||||
for connection in self.device.connections.values()
|
||||
]
|
||||
if not values:
|
||||
values = [attribute.read_value(None)]
|
||||
|
||||
# TODO: future optimization: convert CCCD value to human readable string
|
||||
|
||||
prettytable.add_row(
|
||||
[service.uuid, characteristic.uuid, attribute.type] + values
|
||||
)
|
||||
|
||||
prettytable.field_names = field_names
|
||||
self.local_values_text.text = prettytable.get_string()
|
||||
self.ui.invalidate()
|
||||
|
||||
async def do_get_phy(self, _):
|
||||
if not self.connected_peer:
|
||||
self.show_error('not connected')
|
||||
|
||||
@@ -27,7 +27,7 @@ import asyncio
|
||||
import logging
|
||||
from collections import defaultdict
|
||||
import struct
|
||||
from typing import List, Tuple, Optional
|
||||
from typing import List, Tuple, Optional, TypeVar, Type
|
||||
from pyee import EventEmitter
|
||||
|
||||
from .colors import color
|
||||
@@ -135,6 +135,21 @@ class Server(EventEmitter):
|
||||
return attribute
|
||||
return None
|
||||
|
||||
AttributeGroupType = TypeVar('AttributeGroupType', Service, Characteristic)
|
||||
|
||||
def get_attribute_group(
|
||||
self, handle: int, group_type: Type[AttributeGroupType]
|
||||
) -> Optional[AttributeGroupType]:
|
||||
return next(
|
||||
(
|
||||
attribute
|
||||
for attribute in self.attributes
|
||||
if isinstance(attribute, group_type)
|
||||
and attribute.handle <= handle <= attribute.end_group_handle
|
||||
),
|
||||
None,
|
||||
)
|
||||
|
||||
def get_service_attribute(self, service_uuid: UUID) -> Optional[Service]:
|
||||
return next(
|
||||
(
|
||||
|
||||
@@ -43,6 +43,7 @@ install_requires =
|
||||
pyserial >= 3.5; platform_system!='Emscripten'
|
||||
pyusb >= 1.2; platform_system!='Emscripten'
|
||||
websockets >= 8.1; platform_system!='Emscripten'
|
||||
prettytable >= 3.6.0
|
||||
|
||||
[options.entry_points]
|
||||
console_scripts =
|
||||
|
||||
@@ -23,6 +23,7 @@ import pytest
|
||||
|
||||
from bumble.controller import Controller
|
||||
from bumble.gatt_client import CharacteristicProxy
|
||||
from bumble.gatt_server import Server
|
||||
from bumble.link import LocalLink
|
||||
from bumble.device import Device, Peer
|
||||
from bumble.host import Host
|
||||
@@ -886,6 +887,93 @@ def test_descriptor_permissions():
|
||||
assert descriptor.permissions == 3
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
def test_get_attribute_group():
|
||||
device = Device()
|
||||
|
||||
# add some services / characteristics to the gatt server
|
||||
characteristic1 = Characteristic(
|
||||
'1111',
|
||||
Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY,
|
||||
Characteristic.READABLE | Characteristic.WRITEABLE,
|
||||
bytes([123]),
|
||||
)
|
||||
characteristic2 = Characteristic(
|
||||
'2222',
|
||||
Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY,
|
||||
Characteristic.READABLE | Characteristic.WRITEABLE,
|
||||
bytes([123]),
|
||||
)
|
||||
services = [Service('1212', [characteristic1]), Service('3233', [characteristic2])]
|
||||
device.gatt_server.add_services(services)
|
||||
|
||||
# get the handles from gatt server
|
||||
characteristic_attributes1 = device.gatt_server.get_characteristic_attributes(
|
||||
UUID('1212'), UUID('1111')
|
||||
)
|
||||
assert characteristic_attributes1 is not None
|
||||
characteristic_attributes2 = device.gatt_server.get_characteristic_attributes(
|
||||
UUID('3233'), UUID('2222')
|
||||
)
|
||||
assert characteristic_attributes2 is not None
|
||||
descriptor1 = device.gatt_server.get_descriptor_attribute(
|
||||
UUID('1212'), UUID('1111'), UUID('2902')
|
||||
)
|
||||
assert descriptor1 is not None
|
||||
descriptor2 = device.gatt_server.get_descriptor_attribute(
|
||||
UUID('3233'), UUID('2222'), UUID('2902')
|
||||
)
|
||||
assert descriptor2 is not None
|
||||
|
||||
# confirm the handles map back to the service
|
||||
assert (
|
||||
UUID('1212')
|
||||
== device.gatt_server.get_attribute_group(
|
||||
characteristic_attributes1[0].handle, Service
|
||||
).uuid
|
||||
)
|
||||
assert (
|
||||
UUID('1212')
|
||||
== device.gatt_server.get_attribute_group(
|
||||
characteristic_attributes1[1].handle, Service
|
||||
).uuid
|
||||
)
|
||||
assert (
|
||||
UUID('1212')
|
||||
== device.gatt_server.get_attribute_group(descriptor1.handle, Service).uuid
|
||||
)
|
||||
assert (
|
||||
UUID('3233')
|
||||
== device.gatt_server.get_attribute_group(
|
||||
characteristic_attributes2[0].handle, Service
|
||||
).uuid
|
||||
)
|
||||
assert (
|
||||
UUID('3233')
|
||||
== device.gatt_server.get_attribute_group(
|
||||
characteristic_attributes2[1].handle, Service
|
||||
).uuid
|
||||
)
|
||||
assert (
|
||||
UUID('3233')
|
||||
== device.gatt_server.get_attribute_group(descriptor2.handle, Service).uuid
|
||||
)
|
||||
|
||||
# confirm the handles map back to the characteristic
|
||||
assert (
|
||||
UUID('1111')
|
||||
== device.gatt_server.get_attribute_group(
|
||||
descriptor1.handle, Characteristic
|
||||
).uuid
|
||||
)
|
||||
assert (
|
||||
UUID('2222')
|
||||
== device.gatt_server.get_attribute_group(
|
||||
descriptor2.handle, Characteristic
|
||||
).uuid
|
||||
)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
if __name__ == '__main__':
|
||||
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
|
||||
|
||||
Reference in New Issue
Block a user