diff --git a/apps/console.py b/apps/console.py index 3ac4957..3ed64fb 100644 --- a/apps/console.py +++ b/apps/console.py @@ -32,6 +32,7 @@ from bumble.core import UUID, AdvertisingData from bumble.device import Device, Connection, Peer from bumble.utils import AsyncRunner from bumble.transport import open_transport_or_link +from bumble.gatt import Characteristic from prompt_toolkit import Application from prompt_toolkit.history import FileHistory @@ -330,9 +331,24 @@ class ConsoleApp: await self.show_attributes(attributes) + def find_attribute(self, param): + parts = param.split('.') + if len(parts) == 2: + service_uuid = UUID(parts[0]) if parts[0] != '*' else None + characteristic_uuid = UUID(parts[1]) + for service in self.connected_peer.services: + if service_uuid is None or service.uuid == service_uuid: + for characteristic in service.characteristics: + if characteristic.uuid == characteristic_uuid: + return characteristic + elif len(parts) == 1: + if parts[0].startswith('#'): + attribute_handle = int(f'{parts[0][1:]}', 16) + return attribute_handle + async def command(self, command): try: - (keyword, *params) = command.strip().split(' ', 1) + (keyword, *params) = command.strip().split(' ') keyword = keyword.replace('-', '_').lower() handler = getattr(self, f'do_{keyword}', None) if handler: @@ -441,26 +457,46 @@ class ConsoleApp: self.show_error('invalid syntax', 'expected read ') return - parts = params[0].split('.') - if len(parts) == 2: - service_uuid = UUID(parts[0]) if parts[0] != '*' else None - characteristic_uuid = UUID(parts[1]) - for service in self.connected_peer.services: - if service_uuid is None or service.uuid == service_uuid: - for characteristic in service.characteristics: - if characteristic.uuid == characteristic_uuid: - value = await self.connected_peer.read_value(characteristic) - self.append_to_output(f'VALUE: {value}') - return + attribute = self.find_attribute(params[0]) + if attribute is None: self.show_error('no such characteristic') - elif len(parts) == 1: - if parts[0].startswith('#'): - attribute_handle = int(f'{parts[0][1:]}', 16) - value = await self.connected_peer.read_value(attribute_handle) - self.append_to_output(f'VALUE: {value}') - return + return + + value = await self.connected_peer.read_value(attribute) + self.append_to_output(f'VALUE: {value}') + + async def do_write(self, params): + if not self.connected_peer: + self.show_error('not connected') + return + + if len(params) != 2: + self.show_error('invalid syntax', 'expected write ') + return + + if params[1].upper().startswith("0X"): + value = bytes.fromhex(params[1][2:]) # parse as hex string else: + try: + value = int(params[1]) # try as integer + except ValueError: + value = str.encode(params[1]) # must be a string + + attribute = self.find_attribute(params[0]) + if attribute is None: self.show_error('no such characteristic') + return + + # use write with response if supported + with_response = ( + (attribute.properties & Characteristic.WRITE) + if hasattr(attribute, "properties") + else False + ) + + await self.connected_peer.write_value( + attribute, value, with_response=with_response + ) async def do_exit(self, params): self.ui.exit()