Merge pull request #17 from mogenson/console_py_do_write

Implement 'write' command for console.py
This commit is contained in:
Gilles Boccon-Gibod
2022-07-30 16:02:47 -07:00
committed by GitHub
+54 -18
View File
@@ -32,6 +32,7 @@ from bumble.core import UUID, AdvertisingData
from bumble.device import Device, Connection, Peer from bumble.device import Device, Connection, Peer
from bumble.utils import AsyncRunner from bumble.utils import AsyncRunner
from bumble.transport import open_transport_or_link from bumble.transport import open_transport_or_link
from bumble.gatt import Characteristic
from prompt_toolkit import Application from prompt_toolkit import Application
from prompt_toolkit.history import FileHistory from prompt_toolkit.history import FileHistory
@@ -330,9 +331,24 @@ class ConsoleApp:
await self.show_attributes(attributes) await self.show_attributes(attributes)
def find_attribute(self, param):
parts = param.split('.')
if len(parts) == 2:
service_uuid = UUID(parts[0]) if parts[0] != '*' else None
characteristic_uuid = UUID(parts[1])
for service in self.connected_peer.services:
if service_uuid is None or service.uuid == service_uuid:
for characteristic in service.characteristics:
if characteristic.uuid == characteristic_uuid:
return characteristic
elif len(parts) == 1:
if parts[0].startswith('#'):
attribute_handle = int(f'{parts[0][1:]}', 16)
return attribute_handle
async def command(self, command): async def command(self, command):
try: try:
(keyword, *params) = command.strip().split(' ', 1) (keyword, *params) = command.strip().split(' ')
keyword = keyword.replace('-', '_').lower() keyword = keyword.replace('-', '_').lower()
handler = getattr(self, f'do_{keyword}', None) handler = getattr(self, f'do_{keyword}', None)
if handler: if handler:
@@ -441,26 +457,46 @@ class ConsoleApp:
self.show_error('invalid syntax', 'expected read <attribute>') self.show_error('invalid syntax', 'expected read <attribute>')
return return
parts = params[0].split('.') attribute = self.find_attribute(params[0])
if len(parts) == 2: if attribute is None:
service_uuid = UUID(parts[0]) if parts[0] != '*' else None
characteristic_uuid = UUID(parts[1])
for service in self.connected_peer.services:
if service_uuid is None or service.uuid == service_uuid:
for characteristic in service.characteristics:
if characteristic.uuid == characteristic_uuid:
value = await self.connected_peer.read_value(characteristic)
self.append_to_output(f'VALUE: {value}')
return
self.show_error('no such characteristic') self.show_error('no such characteristic')
elif len(parts) == 1: return
if parts[0].startswith('#'):
attribute_handle = int(f'{parts[0][1:]}', 16) value = await self.connected_peer.read_value(attribute)
value = await self.connected_peer.read_value(attribute_handle) self.append_to_output(f'VALUE: {value}')
self.append_to_output(f'VALUE: {value}')
return async def do_write(self, params):
if not self.connected_peer:
self.show_error('not connected')
return
if len(params) != 2:
self.show_error('invalid syntax', 'expected write <attribute> <value>')
return
if params[1].upper().startswith("0X"):
value = bytes.fromhex(params[1][2:]) # parse as hex string
else: else:
try:
value = int(params[1]) # try as integer
except ValueError:
value = str.encode(params[1]) # must be a string
attribute = self.find_attribute(params[0])
if attribute is None:
self.show_error('no such characteristic') self.show_error('no such characteristic')
return
# use write with response if supported
with_response = (
(attribute.properties & Characteristic.WRITE)
if hasattr(attribute, "properties")
else False
)
await self.connected_peer.write_value(
attribute, value, with_response=with_response
)
async def do_exit(self, params): async def do_exit(self, params):
self.ui.exit() self.ui.exit()