diff --git a/apps/console.py b/apps/console.py index 37afd22e..498e2248 100644 --- a/apps/console.py +++ b/apps/console.py @@ -24,7 +24,7 @@ import logging import os import random import re -from typing import Optional +from typing import Optional, Union from collections import OrderedDict import click @@ -126,7 +126,8 @@ class ConsoleApp: def __init__(self): self.known_addresses = set() - self.known_attributes = [] + self.known_remote_attributes = [] + self.known_local_attributes = [] self.device = None self.connected_peer = None self.top_tab = 'device' @@ -174,10 +175,11 @@ class ConsoleApp: 'disconnect': None, 'discover': {'services': None, 'attributes': None}, 'request-mtu': None, - 'read': LiveCompleter(self.known_attributes), - 'write': LiveCompleter(self.known_attributes), - 'subscribe': LiveCompleter(self.known_attributes), - 'unsubscribe': LiveCompleter(self.known_attributes), + 'read': LiveCompleter(self.known_remote_attributes), + 'write': LiveCompleter(self.known_remote_attributes), + 'local-write': LiveCompleter(self.known_local_attributes), + 'subscribe': LiveCompleter(self.known_remote_attributes), + 'unsubscribe': LiveCompleter(self.known_remote_attributes), 'set-phy': {'1m': None, '2m': None, 'coded': None}, 'set-default-phy': None, 'quit': None, @@ -373,17 +375,19 @@ class ConsoleApp: def show_remote_services(self, services): lines = [] - del self.known_attributes[:] + del self.known_remote_attributes[:] for service in services: lines.append(("ansicyan", f"{service}\n")) for characteristic in service.characteristics: lines.append(('ansimagenta', f' {characteristic} + \n')) - self.known_attributes.append( + self.known_remote_attributes.append( f'{service.uuid.to_hex_str()}.{characteristic.uuid.to_hex_str()}' ) - self.known_attributes.append(f'*.{characteristic.uuid.to_hex_str()}') - self.known_attributes.append(f'#{characteristic.handle:X}') + self.known_remote_attributes.append( + f'*.{characteristic.uuid.to_hex_str()}' + ) + self.known_remote_attributes.append(f'#{characteristic.handle:X}') for descriptor in characteristic.descriptors: lines.append(("ansigreen", f" {descriptor}\n")) @@ -392,12 +396,31 @@ class ConsoleApp: def show_local_services(self, attributes): lines = [] + del self.known_local_attributes[:] for attribute in attributes: if isinstance(attribute, Service): + # Save the most recent service for use later + service = attribute lines.append(("ansicyan", f"{attribute}\n")) - elif isinstance(attribute, (Characteristic, CharacteristicDeclaration)): + elif isinstance(attribute, Characteristic): + # CharacteristicDeclaration includes all info from Characteristic + # no need to print it twice + continue + elif isinstance(attribute, CharacteristicDeclaration): + # Save the most recent characteristic declaration for use later + characteristic_declaration = attribute + self.known_local_attributes.append( + f'{service.uuid.to_hex_str()}.{attribute.characteristic.uuid.to_hex_str()}' + ) + self.known_local_attributes.append( + f'#{attribute.characteristic.handle:X}' + ) lines.append(("ansimagenta", f" {attribute}\n")) elif isinstance(attribute, Descriptor): + self.known_local_attributes.append( + f'{service.uuid.to_hex_str()}.{characteristic_declaration.characteristic.uuid.to_hex_str()}.{attribute.type.to_hex_str()}' + ) + self.known_local_attributes.append(f'#{attribute.handle:X}') lines.append(("ansigreen", f" {attribute}\n")) else: lines.append(("ansiyellow", f"{attribute}\n")) @@ -501,7 +524,7 @@ class ConsoleApp: self.show_attributes(attributes) - def find_characteristic(self, param) -> Optional[CharacteristicProxy]: + def find_remote_characteristic(self, param) -> Optional[CharacteristicProxy]: if not self.connected_peer: return None parts = param.split('.') @@ -523,6 +546,38 @@ class ConsoleApp: return None + def find_local_attribute( + self, param + ) -> Optional[Union[Characteristic, Descriptor]]: + parts = param.split('.') + if len(parts) == 3: + service_uuid = UUID(parts[0]) + characteristic_uuid = UUID(parts[1]) + descriptor_uuid = UUID(parts[2]) + return self.device.gatt_server.get_descriptor_attribute( + service_uuid, characteristic_uuid, descriptor_uuid + ) + if len(parts) == 2: + service_uuid = UUID(parts[0]) + characteristic_uuid = UUID(parts[1]) + characteristic_attributes = ( + self.device.gatt_server.get_characteristic_attributes( + service_uuid, characteristic_uuid + ) + ) + if characteristic_attributes: + return characteristic_attributes[1] + return None + elif len(parts) == 1: + if parts[0].startswith('#'): + attribute_handle = int(f'{parts[0][1:]}', 16) + attribute = self.device.gatt_server.get_attribute(attribute_handle) + if isinstance(attribute, (Characteristic, Descriptor)): + return attribute + return None + + return None + async def rssi_monitor_loop(self): while True: if self.monitor_rssi and self.connected_peer: @@ -787,7 +842,7 @@ class ConsoleApp: self.show_error('not connected') return - characteristic = self.find_characteristic(params[0]) + characteristic = self.find_remote_characteristic(params[0]) if characteristic is None: self.show_error('no such characteristic') return @@ -812,7 +867,7 @@ class ConsoleApp: except ValueError: value = str.encode(params[1]) # must be a string - characteristic = self.find_characteristic(params[0]) + characteristic = self.find_remote_characteristic(params[0]) if characteristic is None: self.show_error('no such characteristic') return @@ -821,6 +876,34 @@ class ConsoleApp: with_response = characteristic.properties & Characteristic.WRITE await characteristic.write_value(value, with_response=with_response) + async def do_local_write(self, params): + if len(params) != 2: + self.show_error( + 'invalid syntax', 'expected local-write ' + ) + return + + if params[1].upper().startswith("0X"): + value = bytes.fromhex(params[1][2:]) # parse as hex string + else: + try: + value = int(params[1]).to_bytes(2, "little") # try as 2 byte integer + except ValueError: + value = str.encode(params[1]) # must be a string + + attribute = self.find_local_attribute(params[0]) + if not attribute: + self.show_error('invalid syntax', 'unable to find attribute') + return + + # send data to any subscribers + if isinstance(attribute, Characteristic): + attribute.write_value(None, value) + if attribute.has_properties([Characteristic.NOTIFY]): + await self.device.gatt_server.notify_subscribers(attribute) + if attribute.has_properties([Characteristic.INDICATE]): + await self.device.gatt_server.indicate_subscribers(attribute) + async def do_subscribe(self, params): if not self.connected_peer: self.show_error('not connected') @@ -830,7 +913,7 @@ class ConsoleApp: self.show_error('invalid syntax', 'expected subscribe ') return - characteristic = self.find_characteristic(params[0]) + characteristic = self.find_remote_characteristic(params[0]) if characteristic is None: self.show_error('no such characteristic') return @@ -850,7 +933,7 @@ class ConsoleApp: self.show_error('invalid syntax', 'expected subscribe ') return - characteristic = self.find_characteristic(params[0]) + characteristic = self.find_remote_characteristic(params[0]) if characteristic is None: self.show_error('no such characteristic') return diff --git a/bumble/att.py b/bumble/att.py index 8311d181..55ae8a5d 100644 --- a/bumble/att.py +++ b/bumble/att.py @@ -750,10 +750,10 @@ class Attribute(EventEmitter): permissions_str.split(","), 0, ) - except TypeError: + except TypeError as exc: raise TypeError( - f"Attribute::permissions error:\nExpected a string containing any of the keys, seperated by commas: {','.join(Attribute.PERMISSION_NAMES.values())}\nGot: {permissions_str}" - ) + f"Attribute::permissions error:\nExpected a string containing any of the keys, separated by commas: {','.join(Attribute.PERMISSION_NAMES.values())}\nGot: {permissions_str}" + ) from exc def __init__(self, attribute_type, permissions, value=b''): EventEmitter.__init__(self) diff --git a/bumble/gatt.py b/bumble/gatt.py index 7aa065c5..74ba6f0d 100644 --- a/bumble/gatt.py +++ b/bumble/gatt.py @@ -28,7 +28,7 @@ import enum import functools import logging import struct -from typing import Optional, Sequence +from typing import Optional, Sequence, List, Any, Iterable from .colors import color from .core import UUID, get_dict_key_by_value @@ -259,6 +259,8 @@ class Characteristic(Attribute): See Vol 3, Part G - 3.3 CHARACTERISTIC DEFINITION ''' + uuid: UUID + # Property flags BROADCAST = 0x01 READ = 0x02 @@ -325,6 +327,12 @@ class Characteristic(Attribute): return None + def has_properties(self, properties: Iterable[int]): + for prop in properties: + if self.properties & prop == 0: + return False + return True + def __str__(self): return ( f'Characteristic(handle=0x{self.handle:04X}, ' @@ -340,6 +348,8 @@ class CharacteristicDeclaration(Attribute): See Vol 3, Part G - 3.3.1 CHARACTERISTIC DECLARATION ''' + characteristic: Characteristic + def __init__(self, characteristic, value_handle): declaration_bytes = ( struct.pack('