From bb297e7516786706185eef8ce15e63ef0932a855 Mon Sep 17 00:00:00 2001 From: Alan Rosenthal Date: Thu, 19 Jan 2023 12:21:37 -0500 Subject: [PATCH] Fix `show attributes` `show attributes` wasn't being populated since `show_attributes()` was never called. Also updated `show attributes` to match the color and indentation of `show services` --- apps/console.py | 53 ++++++++++++++++++++++++--------------- bumble/controller.py | 3 +-- bumble/transport/pyusb.py | 6 ++--- bumble/transport/usb.py | 8 ++++-- 4 files changed, 42 insertions(+), 28 deletions(-) diff --git a/apps/console.py b/apps/console.py index 779b903..a1d4d3a 100644 --- a/apps/console.py +++ b/apps/console.py @@ -57,7 +57,7 @@ from bumble.core import UUID, AdvertisingData, BT_LE_TRANSPORT from bumble.device import ConnectionParametersPreferences, Device, Connection, Peer from bumble.utils import AsyncRunner from bumble.transport import open_transport_or_link -from bumble.gatt import Characteristic +from bumble.gatt import Characteristic, Service, CharacteristicDeclaration, Descriptor from bumble.hci import ( HCI_Constant, HCI_LE_1M_PHY, @@ -154,10 +154,10 @@ class ConsoleApp: 'rssi': {'on': None, 'off': None}, 'show': { 'scan': None, - 'services': None, - 'attributes': None, 'log': None, 'device': None, + 'local-services': None, + 'remote-services': None, }, 'filter': { 'address': None, @@ -197,8 +197,8 @@ class ConsoleApp: ) self.output_max_lines = 20 self.scan_results_text = FormattedTextControl() - self.services_text = FormattedTextControl() - self.attributes_text = FormattedTextControl() + self.local_services_text = FormattedTextControl() + self.remote_services_text = FormattedTextControl() self.device_text = FormattedTextControl() self.log_text = FormattedTextControl( get_cursor_position=lambda: Point(0, max(0, len(self.log_lines) - 1)) @@ -214,12 +214,12 @@ class ConsoleApp: filter=Condition(lambda: self.top_tab == 'scan'), ), ConditionalContainer( - Frame(Window(self.services_text), title='Services'), - filter=Condition(lambda: self.top_tab == 'services'), + Frame(Window(self.local_services_text), title='Local Services'), + filter=Condition(lambda: self.top_tab == 'local-services'), ), ConditionalContainer( - Frame(Window(self.attributes_text), title='Attributes'), - filter=Condition(lambda: self.top_tab == 'attributes'), + Frame(Window(self.remote_services_text), title='Remove Services'), + filter=Condition(lambda: self.top_tab == 'remote-services'), ), ConditionalContainer( Frame(Window(self.log_text, height=self.log_height), title='Log'), @@ -281,6 +281,7 @@ class ConsoleApp: self.device.listener = DeviceListener(self) await self.device.power_on() self.show_device(self.device) + self.show_local_services(self.device.gatt_server.attributes) # Run the UI await self.ui.run_async() @@ -359,32 +360,38 @@ class ConsoleApp: self.scan_results_text.text = ANSI('\n'.join(lines)) self.ui.invalidate() - def show_services(self, services): + def show_remote_services(self, services): lines = [] del self.known_attributes[:] for service in services: - lines.append(('ansicyan', str(service) + '\n')) + lines.append(("ansicyan", f"{service}\n")) for characteristic in service.characteristics: - lines.append(('ansimagenta', ' ' + str(characteristic) + '\n')) + lines.append(('ansimagenta', f' {characteristic} + \n')) self.known_attributes.append( f'{service.uuid.to_hex_str()}.{characteristic.uuid.to_hex_str()}' ) self.known_attributes.append(f'*.{characteristic.uuid.to_hex_str()}') self.known_attributes.append(f'#{characteristic.handle:X}') for descriptor in characteristic.descriptors: - lines.append(('ansigreen', ' ' + str(descriptor) + '\n')) + lines.append(("ansigreen", f" {descriptor}\n")) - self.services_text.text = lines + self.remote_services_text.text = lines self.ui.invalidate() - def show_attributes(self, attributes): + def show_local_services(self, attributes): lines = [] - for attribute in attributes: - lines.append(('ansicyan', f'{attribute}\n')) + if isinstance(attribute, Service): + lines.append(("ansicyan", f"{attribute}\n")) + elif isinstance(attribute, (Characteristic, CharacteristicDeclaration)): + lines.append(("ansimagenta", f" {attribute}\n")) + elif isinstance(attribute, Descriptor): + lines.append(("ansigreen", f" {attribute}\n")) + else: + lines.append(("ansiyellow", f"{attribute}\n")) - self.attributes_text.text = lines + self.local_services_text.text = lines self.ui.invalidate() def show_device(self, device): @@ -469,7 +476,7 @@ class ConsoleApp: await self.connected_peer.discover_descriptors(characteristic) self.append_to_output('discovery completed') - self.show_services(self.connected_peer.services) + self.show_remote_services(self.connected_peer.services) async def discover_attributes(self): if not self.connected_peer: @@ -655,7 +662,13 @@ class ConsoleApp: async def do_show(self, params): if params: - if params[0] in {'scan', 'services', 'attributes', 'log', 'device'}: + if params[0] in { + 'scan', + 'log', + 'device', + 'local-services', + 'remote-services', + }: self.top_tab = params[0] self.ui.invalidate() diff --git a/bumble/controller.py b/bumble/controller.py index ddfd8fd..bc59fe4 100644 --- a/bumble/controller.py +++ b/bumble/controller.py @@ -46,7 +46,6 @@ from bumble.hci import ( HCI_LE_Connection_Complete_Event, HCI_LE_Read_Remote_Features_Complete_Event, HCI_Number_Of_Completed_Packets_Event, - HCI_Object, HCI_Packet, ) @@ -1029,7 +1028,7 @@ class Controller: } return bytes([HCI_SUCCESS]) - def on_hci_le_read_transmit_power_command(self, command): + def on_hci_le_read_transmit_power_command(self, _command): ''' See Bluetooth spec Vol 2, Part E - 7.8.74 LE Read Transmit Power Command ''' diff --git a/bumble/transport/pyusb.py b/bumble/transport/pyusb.py index f804d66..6b3152a 100644 --- a/bumble/transport/pyusb.py +++ b/bumble/transport/pyusb.py @@ -204,7 +204,7 @@ async def open_pyusb_transport(spec): await self.sink.stop() usb.util.release_interface(self.device, 0) - usb_find = usb.core.find + usb_find = usb.core.find try: import libusb_package except ImportError: @@ -215,9 +215,7 @@ async def open_pyusb_transport(spec): # Find the device according to the spec moniker if ':' in spec: vendor_id, product_id = spec.split(':') - device = usb_find( - idVendor=int(vendor_id, 16), idProduct=int(product_id, 16) - ) + device = usb_find(idVendor=int(vendor_id, 16), idProduct=int(product_id, 16)) else: device_index = int(spec) devices = list( diff --git a/bumble/transport/usb.py b/bumble/transport/usb.py index 928e5b5..f0efa32 100644 --- a/bumble/transport/usb.py +++ b/bumble/transport/usb.py @@ -51,8 +51,12 @@ def load_libusb(): else: if libusb_path := libusb_package.get_library_path(): logger.debug(f'loading libusb library at {libusb_path}') - dll_loader = ctypes.WinDLL if platform.system() == 'Windows' else ctypes.CDLL - libusb_dll = dll_loader(str(libusb_path), use_errno=True, use_last_error=True) + dll_loader = ( + ctypes.WinDLL if platform.system() == 'Windows' else ctypes.CDLL + ) + libusb_dll = dll_loader( + str(libusb_path), use_errno=True, use_last_error=True + ) usb1.loadLibrary(libusb_dll)