forked from auracaster/bumble_mirror
Fix show attributes
`show attributes` wasn't being populated since `show_attributes()` was never called. Also updated `show attributes` to match the color and indentation of `show services`
This commit is contained in:
@@ -57,7 +57,7 @@ from bumble.core import UUID, AdvertisingData, BT_LE_TRANSPORT
|
||||
from bumble.device import ConnectionParametersPreferences, Device, Connection, Peer
|
||||
from bumble.utils import AsyncRunner
|
||||
from bumble.transport import open_transport_or_link
|
||||
from bumble.gatt import Characteristic
|
||||
from bumble.gatt import Characteristic, Service, CharacteristicDeclaration, Descriptor
|
||||
from bumble.hci import (
|
||||
HCI_Constant,
|
||||
HCI_LE_1M_PHY,
|
||||
@@ -154,10 +154,10 @@ class ConsoleApp:
|
||||
'rssi': {'on': None, 'off': None},
|
||||
'show': {
|
||||
'scan': None,
|
||||
'services': None,
|
||||
'attributes': None,
|
||||
'log': None,
|
||||
'device': None,
|
||||
'local-services': None,
|
||||
'remote-services': None,
|
||||
},
|
||||
'filter': {
|
||||
'address': None,
|
||||
@@ -197,8 +197,8 @@ class ConsoleApp:
|
||||
)
|
||||
self.output_max_lines = 20
|
||||
self.scan_results_text = FormattedTextControl()
|
||||
self.services_text = FormattedTextControl()
|
||||
self.attributes_text = FormattedTextControl()
|
||||
self.local_services_text = FormattedTextControl()
|
||||
self.remote_services_text = FormattedTextControl()
|
||||
self.device_text = FormattedTextControl()
|
||||
self.log_text = FormattedTextControl(
|
||||
get_cursor_position=lambda: Point(0, max(0, len(self.log_lines) - 1))
|
||||
@@ -214,12 +214,12 @@ class ConsoleApp:
|
||||
filter=Condition(lambda: self.top_tab == 'scan'),
|
||||
),
|
||||
ConditionalContainer(
|
||||
Frame(Window(self.services_text), title='Services'),
|
||||
filter=Condition(lambda: self.top_tab == 'services'),
|
||||
Frame(Window(self.local_services_text), title='Local Services'),
|
||||
filter=Condition(lambda: self.top_tab == 'local-services'),
|
||||
),
|
||||
ConditionalContainer(
|
||||
Frame(Window(self.attributes_text), title='Attributes'),
|
||||
filter=Condition(lambda: self.top_tab == 'attributes'),
|
||||
Frame(Window(self.remote_services_text), title='Remove Services'),
|
||||
filter=Condition(lambda: self.top_tab == 'remote-services'),
|
||||
),
|
||||
ConditionalContainer(
|
||||
Frame(Window(self.log_text, height=self.log_height), title='Log'),
|
||||
@@ -281,6 +281,7 @@ class ConsoleApp:
|
||||
self.device.listener = DeviceListener(self)
|
||||
await self.device.power_on()
|
||||
self.show_device(self.device)
|
||||
self.show_local_services(self.device.gatt_server.attributes)
|
||||
|
||||
# Run the UI
|
||||
await self.ui.run_async()
|
||||
@@ -359,32 +360,38 @@ class ConsoleApp:
|
||||
self.scan_results_text.text = ANSI('\n'.join(lines))
|
||||
self.ui.invalidate()
|
||||
|
||||
def show_services(self, services):
|
||||
def show_remote_services(self, services):
|
||||
lines = []
|
||||
del self.known_attributes[:]
|
||||
for service in services:
|
||||
lines.append(('ansicyan', str(service) + '\n'))
|
||||
lines.append(("ansicyan", f"{service}\n"))
|
||||
|
||||
for characteristic in service.characteristics:
|
||||
lines.append(('ansimagenta', ' ' + str(characteristic) + '\n'))
|
||||
lines.append(('ansimagenta', f' {characteristic} + \n'))
|
||||
self.known_attributes.append(
|
||||
f'{service.uuid.to_hex_str()}.{characteristic.uuid.to_hex_str()}'
|
||||
)
|
||||
self.known_attributes.append(f'*.{characteristic.uuid.to_hex_str()}')
|
||||
self.known_attributes.append(f'#{characteristic.handle:X}')
|
||||
for descriptor in characteristic.descriptors:
|
||||
lines.append(('ansigreen', ' ' + str(descriptor) + '\n'))
|
||||
lines.append(("ansigreen", f" {descriptor}\n"))
|
||||
|
||||
self.services_text.text = lines
|
||||
self.remote_services_text.text = lines
|
||||
self.ui.invalidate()
|
||||
|
||||
def show_attributes(self, attributes):
|
||||
def show_local_services(self, attributes):
|
||||
lines = []
|
||||
|
||||
for attribute in attributes:
|
||||
lines.append(('ansicyan', f'{attribute}\n'))
|
||||
if isinstance(attribute, Service):
|
||||
lines.append(("ansicyan", f"{attribute}\n"))
|
||||
elif isinstance(attribute, (Characteristic, CharacteristicDeclaration)):
|
||||
lines.append(("ansimagenta", f" {attribute}\n"))
|
||||
elif isinstance(attribute, Descriptor):
|
||||
lines.append(("ansigreen", f" {attribute}\n"))
|
||||
else:
|
||||
lines.append(("ansiyellow", f"{attribute}\n"))
|
||||
|
||||
self.attributes_text.text = lines
|
||||
self.local_services_text.text = lines
|
||||
self.ui.invalidate()
|
||||
|
||||
def show_device(self, device):
|
||||
@@ -469,7 +476,7 @@ class ConsoleApp:
|
||||
await self.connected_peer.discover_descriptors(characteristic)
|
||||
self.append_to_output('discovery completed')
|
||||
|
||||
self.show_services(self.connected_peer.services)
|
||||
self.show_remote_services(self.connected_peer.services)
|
||||
|
||||
async def discover_attributes(self):
|
||||
if not self.connected_peer:
|
||||
@@ -655,7 +662,13 @@ class ConsoleApp:
|
||||
|
||||
async def do_show(self, params):
|
||||
if params:
|
||||
if params[0] in {'scan', 'services', 'attributes', 'log', 'device'}:
|
||||
if params[0] in {
|
||||
'scan',
|
||||
'log',
|
||||
'device',
|
||||
'local-services',
|
||||
'remote-services',
|
||||
}:
|
||||
self.top_tab = params[0]
|
||||
self.ui.invalidate()
|
||||
|
||||
|
||||
@@ -46,7 +46,6 @@ from bumble.hci import (
|
||||
HCI_LE_Connection_Complete_Event,
|
||||
HCI_LE_Read_Remote_Features_Complete_Event,
|
||||
HCI_Number_Of_Completed_Packets_Event,
|
||||
HCI_Object,
|
||||
HCI_Packet,
|
||||
)
|
||||
|
||||
@@ -1029,7 +1028,7 @@ class Controller:
|
||||
}
|
||||
return bytes([HCI_SUCCESS])
|
||||
|
||||
def on_hci_le_read_transmit_power_command(self, command):
|
||||
def on_hci_le_read_transmit_power_command(self, _command):
|
||||
'''
|
||||
See Bluetooth spec Vol 2, Part E - 7.8.74 LE Read Transmit Power Command
|
||||
'''
|
||||
|
||||
@@ -204,7 +204,7 @@ async def open_pyusb_transport(spec):
|
||||
await self.sink.stop()
|
||||
usb.util.release_interface(self.device, 0)
|
||||
|
||||
usb_find = usb.core.find
|
||||
usb_find = usb.core.find
|
||||
try:
|
||||
import libusb_package
|
||||
except ImportError:
|
||||
@@ -215,9 +215,7 @@ async def open_pyusb_transport(spec):
|
||||
# Find the device according to the spec moniker
|
||||
if ':' in spec:
|
||||
vendor_id, product_id = spec.split(':')
|
||||
device = usb_find(
|
||||
idVendor=int(vendor_id, 16), idProduct=int(product_id, 16)
|
||||
)
|
||||
device = usb_find(idVendor=int(vendor_id, 16), idProduct=int(product_id, 16))
|
||||
else:
|
||||
device_index = int(spec)
|
||||
devices = list(
|
||||
|
||||
@@ -51,8 +51,12 @@ def load_libusb():
|
||||
else:
|
||||
if libusb_path := libusb_package.get_library_path():
|
||||
logger.debug(f'loading libusb library at {libusb_path}')
|
||||
dll_loader = ctypes.WinDLL if platform.system() == 'Windows' else ctypes.CDLL
|
||||
libusb_dll = dll_loader(str(libusb_path), use_errno=True, use_last_error=True)
|
||||
dll_loader = (
|
||||
ctypes.WinDLL if platform.system() == 'Windows' else ctypes.CDLL
|
||||
)
|
||||
libusb_dll = dll_loader(
|
||||
str(libusb_path), use_errno=True, use_last_error=True
|
||||
)
|
||||
usb1.loadLibrary(libusb_dll)
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user