Merge pull request #164 from AlanRosenthal/alan/local-write

Add `local-write` to bumble-console
This commit is contained in:
Alan Rosenthal
2023-03-31 16:07:09 -04:00
committed by GitHub
5 changed files with 129 additions and 22 deletions

View File

@@ -24,7 +24,7 @@ import logging
import os import os
import random import random
import re import re
from typing import Optional from typing import Optional, Union
from collections import OrderedDict from collections import OrderedDict
import click import click
@@ -126,7 +126,8 @@ class ConsoleApp:
def __init__(self): def __init__(self):
self.known_addresses = set() self.known_addresses = set()
self.known_attributes = [] self.known_remote_attributes = []
self.known_local_attributes = []
self.device = None self.device = None
self.connected_peer = None self.connected_peer = None
self.top_tab = 'device' self.top_tab = 'device'
@@ -174,10 +175,11 @@ class ConsoleApp:
'disconnect': None, 'disconnect': None,
'discover': {'services': None, 'attributes': None}, 'discover': {'services': None, 'attributes': None},
'request-mtu': None, 'request-mtu': None,
'read': LiveCompleter(self.known_attributes), 'read': LiveCompleter(self.known_remote_attributes),
'write': LiveCompleter(self.known_attributes), 'write': LiveCompleter(self.known_remote_attributes),
'subscribe': LiveCompleter(self.known_attributes), 'local-write': LiveCompleter(self.known_local_attributes),
'unsubscribe': LiveCompleter(self.known_attributes), 'subscribe': LiveCompleter(self.known_remote_attributes),
'unsubscribe': LiveCompleter(self.known_remote_attributes),
'set-phy': {'1m': None, '2m': None, 'coded': None}, 'set-phy': {'1m': None, '2m': None, 'coded': None},
'set-default-phy': None, 'set-default-phy': None,
'quit': None, 'quit': None,
@@ -373,17 +375,19 @@ class ConsoleApp:
def show_remote_services(self, services): def show_remote_services(self, services):
lines = [] lines = []
del self.known_attributes[:] del self.known_remote_attributes[:]
for service in services: for service in services:
lines.append(("ansicyan", f"{service}\n")) lines.append(("ansicyan", f"{service}\n"))
for characteristic in service.characteristics: for characteristic in service.characteristics:
lines.append(('ansimagenta', f' {characteristic} + \n')) lines.append(('ansimagenta', f' {characteristic} + \n'))
self.known_attributes.append( self.known_remote_attributes.append(
f'{service.uuid.to_hex_str()}.{characteristic.uuid.to_hex_str()}' f'{service.uuid.to_hex_str()}.{characteristic.uuid.to_hex_str()}'
) )
self.known_attributes.append(f'*.{characteristic.uuid.to_hex_str()}') self.known_remote_attributes.append(
self.known_attributes.append(f'#{characteristic.handle:X}') f'*.{characteristic.uuid.to_hex_str()}'
)
self.known_remote_attributes.append(f'#{characteristic.handle:X}')
for descriptor in characteristic.descriptors: for descriptor in characteristic.descriptors:
lines.append(("ansigreen", f" {descriptor}\n")) lines.append(("ansigreen", f" {descriptor}\n"))
@@ -392,12 +396,31 @@ class ConsoleApp:
def show_local_services(self, attributes): def show_local_services(self, attributes):
lines = [] lines = []
del self.known_local_attributes[:]
for attribute in attributes: for attribute in attributes:
if isinstance(attribute, Service): if isinstance(attribute, Service):
# Save the most recent service for use later
service = attribute
lines.append(("ansicyan", f"{attribute}\n")) lines.append(("ansicyan", f"{attribute}\n"))
elif isinstance(attribute, (Characteristic, CharacteristicDeclaration)): elif isinstance(attribute, Characteristic):
# CharacteristicDeclaration includes all info from Characteristic
# no need to print it twice
continue
elif isinstance(attribute, CharacteristicDeclaration):
# Save the most recent characteristic declaration for use later
characteristic_declaration = attribute
self.known_local_attributes.append(
f'{service.uuid.to_hex_str()}.{attribute.characteristic.uuid.to_hex_str()}'
)
self.known_local_attributes.append(
f'#{attribute.characteristic.handle:X}'
)
lines.append(("ansimagenta", f" {attribute}\n")) lines.append(("ansimagenta", f" {attribute}\n"))
elif isinstance(attribute, Descriptor): elif isinstance(attribute, Descriptor):
self.known_local_attributes.append(
f'{service.uuid.to_hex_str()}.{characteristic_declaration.characteristic.uuid.to_hex_str()}.{attribute.type.to_hex_str()}'
)
self.known_local_attributes.append(f'#{attribute.handle:X}')
lines.append(("ansigreen", f" {attribute}\n")) lines.append(("ansigreen", f" {attribute}\n"))
else: else:
lines.append(("ansiyellow", f"{attribute}\n")) lines.append(("ansiyellow", f"{attribute}\n"))
@@ -501,7 +524,7 @@ class ConsoleApp:
self.show_attributes(attributes) self.show_attributes(attributes)
def find_characteristic(self, param) -> Optional[CharacteristicProxy]: def find_remote_characteristic(self, param) -> Optional[CharacteristicProxy]:
if not self.connected_peer: if not self.connected_peer:
return None return None
parts = param.split('.') parts = param.split('.')
@@ -523,6 +546,38 @@ class ConsoleApp:
return None return None
def find_local_attribute(
self, param
) -> Optional[Union[Characteristic, Descriptor]]:
parts = param.split('.')
if len(parts) == 3:
service_uuid = UUID(parts[0])
characteristic_uuid = UUID(parts[1])
descriptor_uuid = UUID(parts[2])
return self.device.gatt_server.get_descriptor_attribute(
service_uuid, characteristic_uuid, descriptor_uuid
)
if len(parts) == 2:
service_uuid = UUID(parts[0])
characteristic_uuid = UUID(parts[1])
characteristic_attributes = (
self.device.gatt_server.get_characteristic_attributes(
service_uuid, characteristic_uuid
)
)
if characteristic_attributes:
return characteristic_attributes[1]
return None
elif len(parts) == 1:
if parts[0].startswith('#'):
attribute_handle = int(f'{parts[0][1:]}', 16)
attribute = self.device.gatt_server.get_attribute(attribute_handle)
if isinstance(attribute, (Characteristic, Descriptor)):
return attribute
return None
return None
async def rssi_monitor_loop(self): async def rssi_monitor_loop(self):
while True: while True:
if self.monitor_rssi and self.connected_peer: if self.monitor_rssi and self.connected_peer:
@@ -787,7 +842,7 @@ class ConsoleApp:
self.show_error('not connected') self.show_error('not connected')
return return
characteristic = self.find_characteristic(params[0]) characteristic = self.find_remote_characteristic(params[0])
if characteristic is None: if characteristic is None:
self.show_error('no such characteristic') self.show_error('no such characteristic')
return return
@@ -812,7 +867,7 @@ class ConsoleApp:
except ValueError: except ValueError:
value = str.encode(params[1]) # must be a string value = str.encode(params[1]) # must be a string
characteristic = self.find_characteristic(params[0]) characteristic = self.find_remote_characteristic(params[0])
if characteristic is None: if characteristic is None:
self.show_error('no such characteristic') self.show_error('no such characteristic')
return return
@@ -821,6 +876,34 @@ class ConsoleApp:
with_response = characteristic.properties & Characteristic.WRITE with_response = characteristic.properties & Characteristic.WRITE
await characteristic.write_value(value, with_response=with_response) await characteristic.write_value(value, with_response=with_response)
async def do_local_write(self, params):
if len(params) != 2:
self.show_error(
'invalid syntax', 'expected local-write <attribute> <value>'
)
return
if params[1].upper().startswith("0X"):
value = bytes.fromhex(params[1][2:]) # parse as hex string
else:
try:
value = int(params[1]).to_bytes(2, "little") # try as 2 byte integer
except ValueError:
value = str.encode(params[1]) # must be a string
attribute = self.find_local_attribute(params[0])
if not attribute:
self.show_error('invalid syntax', 'unable to find attribute')
return
# send data to any subscribers
if isinstance(attribute, Characteristic):
attribute.write_value(None, value)
if attribute.has_properties([Characteristic.NOTIFY]):
await self.device.gatt_server.notify_subscribers(attribute)
if attribute.has_properties([Characteristic.INDICATE]):
await self.device.gatt_server.indicate_subscribers(attribute)
async def do_subscribe(self, params): async def do_subscribe(self, params):
if not self.connected_peer: if not self.connected_peer:
self.show_error('not connected') self.show_error('not connected')
@@ -830,7 +913,7 @@ class ConsoleApp:
self.show_error('invalid syntax', 'expected subscribe <attribute>') self.show_error('invalid syntax', 'expected subscribe <attribute>')
return return
characteristic = self.find_characteristic(params[0]) characteristic = self.find_remote_characteristic(params[0])
if characteristic is None: if characteristic is None:
self.show_error('no such characteristic') self.show_error('no such characteristic')
return return
@@ -850,7 +933,7 @@ class ConsoleApp:
self.show_error('invalid syntax', 'expected subscribe <attribute>') self.show_error('invalid syntax', 'expected subscribe <attribute>')
return return
characteristic = self.find_characteristic(params[0]) characteristic = self.find_remote_characteristic(params[0])
if characteristic is None: if characteristic is None:
self.show_error('no such characteristic') self.show_error('no such characteristic')
return return

View File

@@ -750,10 +750,10 @@ class Attribute(EventEmitter):
permissions_str.split(","), permissions_str.split(","),
0, 0,
) )
except TypeError: except TypeError as exc:
raise TypeError( raise TypeError(
f"Attribute::permissions error:\nExpected a string containing any of the keys, seperated by commas: {','.join(Attribute.PERMISSION_NAMES.values())}\nGot: {permissions_str}" f"Attribute::permissions error:\nExpected a string containing any of the keys, separated by commas: {','.join(Attribute.PERMISSION_NAMES.values())}\nGot: {permissions_str}"
) ) from exc
def __init__(self, attribute_type, permissions, value=b''): def __init__(self, attribute_type, permissions, value=b''):
EventEmitter.__init__(self) EventEmitter.__init__(self)

View File

@@ -28,7 +28,7 @@ import enum
import functools import functools
import logging import logging
import struct import struct
from typing import Optional, Sequence from typing import Optional, Sequence, List, Any, Iterable
from .colors import color from .colors import color
from .core import UUID, get_dict_key_by_value from .core import UUID, get_dict_key_by_value
@@ -259,6 +259,8 @@ class Characteristic(Attribute):
See Vol 3, Part G - 3.3 CHARACTERISTIC DEFINITION See Vol 3, Part G - 3.3 CHARACTERISTIC DEFINITION
''' '''
uuid: UUID
# Property flags # Property flags
BROADCAST = 0x01 BROADCAST = 0x01
READ = 0x02 READ = 0x02
@@ -325,6 +327,12 @@ class Characteristic(Attribute):
return None return None
def has_properties(self, properties: Iterable[int]):
for prop in properties:
if self.properties & prop == 0:
return False
return True
def __str__(self): def __str__(self):
return ( return (
f'Characteristic(handle=0x{self.handle:04X}, ' f'Characteristic(handle=0x{self.handle:04X}, '
@@ -340,6 +348,8 @@ class CharacteristicDeclaration(Attribute):
See Vol 3, Part G - 3.3.1 CHARACTERISTIC DECLARATION See Vol 3, Part G - 3.3.1 CHARACTERISTIC DECLARATION
''' '''
characteristic: Characteristic
def __init__(self, characteristic, value_handle): def __init__(self, characteristic, value_handle):
declaration_bytes = ( declaration_bytes = (
struct.pack('<BH', characteristic.properties, value_handle) struct.pack('<BH', characteristic.properties, value_handle)

View File

@@ -62,7 +62,6 @@ from .gatt import (
GATT_PRIMARY_SERVICE_ATTRIBUTE_TYPE, GATT_PRIMARY_SERVICE_ATTRIBUTE_TYPE,
GATT_REQUEST_TIMEOUT, GATT_REQUEST_TIMEOUT,
GATT_SECONDARY_SERVICE_ATTRIBUTE_TYPE, GATT_SECONDARY_SERVICE_ATTRIBUTE_TYPE,
Service,
Characteristic, Characteristic,
ClientCharacteristicConfigurationBits, ClientCharacteristicConfigurationBits,
) )

View File

@@ -872,7 +872,7 @@ def test_attribute_string_to_permissions():
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_charracteristic_permissions(): def test_characteristic_permissions():
characteristic = Characteristic( characteristic = Characteristic(
'FDB159DB-036C-49E3-B3DB-6325AC750806', 'FDB159DB-036C-49E3-B3DB-6325AC750806',
Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY, Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY,
@@ -881,6 +881,21 @@ def test_charracteristic_permissions():
assert characteristic.permissions == 3 assert characteristic.permissions == 3
# -----------------------------------------------------------------------------
def test_characteristic_has_properties():
characteristic = Characteristic(
'FDB159DB-036C-49E3-B3DB-6325AC750806',
Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY,
'READABLE,WRITEABLE',
)
assert characteristic.has_properties([Characteristic.READ])
assert characteristic.has_properties([Characteristic.READ, Characteristic.WRITE])
assert not characteristic.has_properties(
[Characteristic.READ, Characteristic.WRITE, Characteristic.INDICATE]
)
assert not characteristic.has_properties([Characteristic.INDICATE])
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_descriptor_permissions(): def test_descriptor_permissions():
descriptor = Descriptor('2902', 'READABLE,WRITEABLE') descriptor = Descriptor('2902', 'READABLE,WRITEABLE')