forked from auracaster/bumble_mirror
Compare commits
9 Commits
gbg/gatt-m
...
v0.0.125
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a916b7a21a | ||
|
|
7fa2eb7658 | ||
|
|
fbb46dd736 | ||
|
|
d1e119f176 | ||
|
|
2fc7a0bf04 | ||
|
|
d6c4644b23 | ||
|
|
073757d5dd | ||
|
|
df1962e8da | ||
|
|
d2227f017f |
@@ -90,7 +90,7 @@ class SnoopPacketReader:
|
||||
@click.command()
|
||||
@click.option('--format', type=click.Choice(['h4', 'snoop']), default='h4', help='Format of the input file')
|
||||
@click.argument('filename')
|
||||
def show(format, filename):
|
||||
def main(format, filename):
|
||||
input = open(filename, 'rb')
|
||||
if format == 'h4':
|
||||
packet_reader = PacketReader(input)
|
||||
@@ -117,4 +117,4 @@ def show(format, filename):
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
if __name__ == '__main__':
|
||||
show()
|
||||
main()
|
||||
|
||||
@@ -28,6 +28,8 @@
|
||||
# -----------------------------------------------------------------------------
|
||||
import os
|
||||
import logging
|
||||
import sys
|
||||
import click
|
||||
import usb1
|
||||
from colors import color
|
||||
|
||||
@@ -35,6 +37,7 @@ from colors import color
|
||||
# -----------------------------------------------------------------------------
|
||||
# Constants
|
||||
# -----------------------------------------------------------------------------
|
||||
USB_DEVICE_CLASS_DEVICE = 0x00
|
||||
USB_DEVICE_CLASS_WIRELESS_CONTROLLER = 0xE0
|
||||
USB_DEVICE_SUBCLASS_RF_CONTROLLER = 0x01
|
||||
USB_DEVICE_PROTOCOL_BLUETOOTH_PRIMARY_CONTROLLER = 0x01
|
||||
@@ -75,9 +78,81 @@ USB_DEVICE_CLASSES = {
|
||||
0xFF: 'Vendor Specific'
|
||||
}
|
||||
|
||||
USB_ENDPOINT_IN = 0x80
|
||||
USB_ENDPOINT_TYPES = ['CONTROL', 'ISOCHRONOUS', 'BULK', 'INTERRUPT']
|
||||
|
||||
USB_BT_HCI_CLASS_TUPLE = (
|
||||
USB_DEVICE_CLASS_WIRELESS_CONTROLLER,
|
||||
USB_DEVICE_SUBCLASS_RF_CONTROLLER,
|
||||
USB_DEVICE_PROTOCOL_BLUETOOTH_PRIMARY_CONTROLLER
|
||||
)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
def main():
|
||||
def show_device_details(device):
|
||||
for configuration in device:
|
||||
print(f' Configuration {configuration.getConfigurationValue()}')
|
||||
for interface in configuration:
|
||||
for setting in interface:
|
||||
alternateSetting = setting.getAlternateSetting()
|
||||
suffix = f'/{alternateSetting}' if interface.getNumSettings() > 1 else ''
|
||||
(class_string, subclass_string) = get_class_info(
|
||||
setting.getClass(),
|
||||
setting.getSubClass(),
|
||||
setting.getProtocol()
|
||||
)
|
||||
details = f'({class_string}, {subclass_string})'
|
||||
print(f' Interface: {setting.getNumber()}{suffix} {details}')
|
||||
for endpoint in setting:
|
||||
endpoint_type = USB_ENDPOINT_TYPES[endpoint.getAttributes() & 3]
|
||||
endpoint_direction = 'OUT' if (endpoint.getAddress() & USB_ENDPOINT_IN == 0) else 'IN'
|
||||
print(f' Endpoint 0x{endpoint.getAddress():02X}: {endpoint_type} {endpoint_direction}')
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
def get_class_info(cls, subclass, protocol):
|
||||
class_info = USB_DEVICE_CLASSES.get(cls)
|
||||
protocol_string = ''
|
||||
if class_info is None:
|
||||
class_string = f'0x{cls:02X}'
|
||||
else:
|
||||
if type(class_info) is tuple:
|
||||
class_string = class_info[0]
|
||||
subclass_info = class_info[1].get(subclass)
|
||||
if subclass_info:
|
||||
protocol_string = subclass_info.get(protocol)
|
||||
if protocol_string is not None:
|
||||
protocol_string = f' [{protocol_string}]'
|
||||
|
||||
else:
|
||||
class_string = class_info
|
||||
|
||||
subclass_string = f'{subclass}/{protocol}{protocol_string}'
|
||||
|
||||
return (class_string, subclass_string)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
def is_bluetooth_hci(device):
|
||||
# Check if the device class indicates a match
|
||||
if (device.getDeviceClass(), device.getDeviceSubClass(), device.getDeviceProtocol()) == USB_BT_HCI_CLASS_TUPLE:
|
||||
return True
|
||||
|
||||
# If the device class is 'Device', look for a matching interface
|
||||
if device.getDeviceClass() == USB_DEVICE_CLASS_DEVICE:
|
||||
for configuration in device:
|
||||
for interface in configuration:
|
||||
for setting in interface:
|
||||
if (setting.getClass(), setting.getSubClass(), setting.getProtocol()) == USB_BT_HCI_CLASS_TUPLE:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@click.command()
|
||||
@click.option('--verbose', is_flag=True, default=False, help='Print more details')
|
||||
def main(verbose):
|
||||
logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'WARNING').upper())
|
||||
|
||||
with usb1.USBContext() as context:
|
||||
@@ -91,23 +166,28 @@ def main():
|
||||
|
||||
device_id = (device.getVendorID(), device.getProductID())
|
||||
|
||||
device_is_bluetooth_hci = (
|
||||
device_class == USB_DEVICE_CLASS_WIRELESS_CONTROLLER and
|
||||
device_subclass == USB_DEVICE_SUBCLASS_RF_CONTROLLER and
|
||||
device_protocol == USB_DEVICE_PROTOCOL_BLUETOOTH_PRIMARY_CONTROLLER
|
||||
(device_class_string, device_subclass_string) = get_class_info(
|
||||
device_class,
|
||||
device_subclass,
|
||||
device_protocol
|
||||
)
|
||||
|
||||
device_class_details = ''
|
||||
device_class_info = USB_DEVICE_CLASSES.get(device_class)
|
||||
if device_class_info is not None:
|
||||
if type(device_class_info) is tuple:
|
||||
device_class = device_class_info[0]
|
||||
device_subclass_info = device_class_info[1].get(device_subclass)
|
||||
if device_subclass_info:
|
||||
device_class_details = f' [{device_subclass_info.get(device_protocol)}]'
|
||||
else:
|
||||
device_class = device_class_info
|
||||
try:
|
||||
device_serial_number = device.getSerialNumber()
|
||||
except usb1.USBError:
|
||||
device_serial_number = None
|
||||
|
||||
try:
|
||||
device_manufacturer = device.getManufacturer()
|
||||
except usb1.USBError:
|
||||
device_manufacturer = None
|
||||
|
||||
try:
|
||||
device_product = device.getProduct()
|
||||
except usb1.USBError:
|
||||
device_product = None
|
||||
|
||||
device_is_bluetooth_hci = is_bluetooth_hci(device)
|
||||
if device_is_bluetooth_hci:
|
||||
bluetooth_device_count += 1
|
||||
fg_color = 'black'
|
||||
@@ -123,33 +203,35 @@ def main():
|
||||
if device_is_bluetooth_hci:
|
||||
bumble_transport_names.append(f'usb:{bluetooth_device_count - 1}')
|
||||
|
||||
serial_number_collision = False
|
||||
if device_id in devices:
|
||||
for device_serial in devices[device_id]:
|
||||
if device_serial == device.getSerialNumber():
|
||||
serial_number_collision = True
|
||||
|
||||
if device_id not in devices:
|
||||
bumble_transport_names.append(basic_transport_name)
|
||||
else:
|
||||
bumble_transport_names.append(f'{basic_transport_name}#{len(devices[device_id])}')
|
||||
|
||||
if device.getSerialNumber() and not serial_number_collision:
|
||||
bumble_transport_names.append(f'{basic_transport_name}/{device.getSerialNumber()}')
|
||||
if device_serial_number is not None:
|
||||
if device_id not in devices or device_serial_number not in devices[device_id]:
|
||||
bumble_transport_names.append(f'{basic_transport_name}/{device_serial_number}')
|
||||
|
||||
# Print the results
|
||||
print(color(f'ID {device.getVendorID():04X}:{device.getProductID():04X}', fg=fg_color, bg=bg_color))
|
||||
if bumble_transport_names:
|
||||
print(color(' Bumble Transport Names:', 'blue'), ' or '.join(color(x, 'cyan' if device_is_bluetooth_hci else 'red') for x in bumble_transport_names))
|
||||
print(color(' Bus/Device: ', 'green'), f'{device.getBusNumber():03}/{device.getDeviceAddress():03}')
|
||||
if device.getSerialNumber():
|
||||
print(color(' Serial: ', 'green'), device.getSerialNumber())
|
||||
print(color(' Class: ', 'green'), device_class)
|
||||
print(color(' Subclass/Protocol: ', 'green'), f'{device_subclass}/{device_protocol}{device_class_details}')
|
||||
print(color(' Manufacturer: ', 'green'), device.getManufacturer())
|
||||
print(color(' Product: ', 'green'), device.getProduct())
|
||||
print(color(' Class: ', 'green'), device_class_string)
|
||||
print(color(' Subclass/Protocol: ', 'green'), device_subclass_string)
|
||||
if device_serial_number is not None:
|
||||
print(color(' Serial: ', 'green'), device_serial_number)
|
||||
if device_manufacturer is not None:
|
||||
print(color(' Manufacturer: ', 'green'), device_manufacturer)
|
||||
if device_product is not None:
|
||||
print(color(' Product: ', 'green'), device_product)
|
||||
|
||||
if verbose:
|
||||
show_device_details(device)
|
||||
|
||||
print()
|
||||
|
||||
devices.setdefault(device_id, []).append(device.getSerialNumber())
|
||||
devices.setdefault(device_id, []).append(device_serial_number)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
@@ -346,8 +346,11 @@ class CharacteristicAdapter:
|
||||
async def read_decoded_value(self):
|
||||
return self.decode_value(await self.wrapped_characteristic.read_value())
|
||||
|
||||
async def write_decoded_value(self, value):
|
||||
return await self.wrapped_characteristic.write_value(self.encode_value(value))
|
||||
async def write_decoded_value(self, value, with_response=False):
|
||||
return await self.wrapped_characteristic.write_value(
|
||||
self.encode_value(value),
|
||||
with_response
|
||||
)
|
||||
|
||||
def encode_value(self, value):
|
||||
return value
|
||||
|
||||
@@ -184,8 +184,8 @@ class Client:
|
||||
# Wait until we can send (only one pending command at a time for the connection)
|
||||
response = None
|
||||
async with self.request_semaphore:
|
||||
assert(self.pending_request is None)
|
||||
assert(self.pending_response is None)
|
||||
assert self.pending_request is None
|
||||
assert self.pending_response is None
|
||||
|
||||
# Create a future value to hold the eventual response
|
||||
self.pending_response = asyncio.get_running_loop().create_future()
|
||||
|
||||
@@ -36,7 +36,7 @@ logger = logging.getLogger(__name__)
|
||||
async def open_usb_transport(spec):
|
||||
'''
|
||||
Open a USB transport.
|
||||
The parameter string has this syntax:
|
||||
The moniker string has this syntax:
|
||||
either <index> or
|
||||
<vendor>:<product> or
|
||||
<vendor>:<product>/<serial-number>] or
|
||||
@@ -47,15 +47,21 @@ async def open_usb_transport(spec):
|
||||
/<serial-number> suffix or #<index> suffix max be specified when more than one device with
|
||||
the same vendor and product identifiers are present.
|
||||
|
||||
In addition, if the moniker ends with the symbol "!", the device will be used in "forced" mode:
|
||||
the first USB interface of the device will be used, regardless of the interface class/subclass.
|
||||
This may be useful for some devices that use a custom class/subclass but may nonetheless work as-is.
|
||||
|
||||
Examples:
|
||||
0 --> the first BT USB dongle
|
||||
04b4:f901 --> the BT USB dongle with vendor=04b4 and product=f901
|
||||
04b4:f901#2 --> the third USB device with vendor=04b4 and product=f901
|
||||
04b4:f901/00E04C239987 --> the BT USB dongle with vendor=04b4 and product=f901 and serial number 00E04C239987
|
||||
usb:0B05:17CB! --> the BT USB dongle vendor=0B05 and product=17CB, in "forced" mode.
|
||||
'''
|
||||
|
||||
USB_RECIPIENT_DEVICE = 0x00
|
||||
USB_REQUEST_TYPE_CLASS = 0x01 << 5
|
||||
USB_DEVICE_CLASS_DEVICE = 0x00
|
||||
USB_DEVICE_CLASS_WIRELESS_CONTROLLER = 0xE0
|
||||
USB_DEVICE_SUBCLASS_RF_CONTROLLER = 0x01
|
||||
USB_DEVICE_PROTOCOL_BLUETOOTH_PRIMARY_CONTROLLER = 0x01
|
||||
@@ -63,6 +69,12 @@ async def open_usb_transport(spec):
|
||||
USB_ENDPOINT_TRANSFER_TYPE_INTERRUPT = 0x03
|
||||
USB_ENDPOINT_IN = 0x80
|
||||
|
||||
USB_BT_HCI_CLASS_TUPLE = (
|
||||
USB_DEVICE_CLASS_WIRELESS_CONTROLLER,
|
||||
USB_DEVICE_SUBCLASS_RF_CONTROLLER,
|
||||
USB_DEVICE_PROTOCOL_BLUETOOTH_PRIMARY_CONTROLLER
|
||||
)
|
||||
|
||||
READ_SIZE = 1024
|
||||
|
||||
class UsbPacketSink:
|
||||
@@ -280,6 +292,13 @@ async def open_usb_transport(spec):
|
||||
context.open()
|
||||
try:
|
||||
found = None
|
||||
|
||||
if spec.endswith('!'):
|
||||
spec = spec[:-1]
|
||||
forced_mode = True
|
||||
else:
|
||||
forced_mode = False
|
||||
|
||||
if ':' in spec:
|
||||
vendor_id, product_id = spec.split(':')
|
||||
serial_number = None
|
||||
@@ -291,10 +310,14 @@ async def open_usb_transport(spec):
|
||||
device_index = int(device_index_str)
|
||||
|
||||
for device in context.getDeviceIterator(skip_on_error=True):
|
||||
try:
|
||||
device_serial_number = device.getSerialNumber()
|
||||
except usb1.USBError:
|
||||
device_serial_number = None
|
||||
if (
|
||||
device.getVendorID() == int(vendor_id, 16) and
|
||||
device.getProductID() == int(product_id, 16) and
|
||||
(serial_number is None or device.getSerialNumber() == serial_number)
|
||||
(serial_number is None or serial_number == device_serial_number)
|
||||
):
|
||||
if device_index == 0:
|
||||
found = device
|
||||
@@ -302,13 +325,27 @@ async def open_usb_transport(spec):
|
||||
device_index -= 1
|
||||
device.close()
|
||||
else:
|
||||
# Look for a compatible device by index
|
||||
def device_is_bluetooth_hci(device):
|
||||
# Check if the device class indicates a match
|
||||
if (device.getDeviceClass(), device.getDeviceSubClass(), device.getDeviceProtocol()) == \
|
||||
USB_BT_HCI_CLASS_TUPLE:
|
||||
return True
|
||||
|
||||
# If the device class is 'Device', look for a matching interface
|
||||
if device.getDeviceClass() == USB_DEVICE_CLASS_DEVICE:
|
||||
for configuration in device:
|
||||
for interface in configuration:
|
||||
for setting in interface:
|
||||
if (setting.getClass(), setting.getSubClass(), setting.getProtocol()) == \
|
||||
USB_BT_HCI_CLASS_TUPLE:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
device_index = int(spec)
|
||||
for device in context.getDeviceIterator(skip_on_error=True):
|
||||
if (
|
||||
device.getDeviceClass() == USB_DEVICE_CLASS_WIRELESS_CONTROLLER and
|
||||
device.getDeviceSubClass() == USB_DEVICE_SUBCLASS_RF_CONTROLLER and
|
||||
device.getDeviceProtocol() == USB_DEVICE_PROTOCOL_BLUETOOTH_PRIMARY_CONTROLLER
|
||||
):
|
||||
if device_is_bluetooth_hci(device):
|
||||
if device_index == 0:
|
||||
found = device
|
||||
break
|
||||
@@ -329,9 +366,8 @@ async def open_usb_transport(spec):
|
||||
setting = None
|
||||
for setting in interface:
|
||||
if (
|
||||
setting.getClass() != USB_DEVICE_CLASS_WIRELESS_CONTROLLER or
|
||||
setting.getSubClass() != USB_DEVICE_SUBCLASS_RF_CONTROLLER or
|
||||
setting.getProtocol() != USB_DEVICE_PROTOCOL_BLUETOOTH_PRIMARY_CONTROLLER
|
||||
not forced_mode and
|
||||
(setting.getClass(), setting.getSubClass(), setting.getProtocol()) != USB_BT_HCI_CLASS_TUPLE
|
||||
):
|
||||
continue
|
||||
|
||||
|
||||
@@ -45,6 +45,10 @@ nav:
|
||||
- HCI Bridge: apps_and_tools/hci_bridge.md
|
||||
- Golden Gate Bridge: apps_and_tools/gg_bridge.md
|
||||
- Show: apps_and_tools/show.md
|
||||
- GATT Dump: apps_and_tools/gatt_dump.md
|
||||
- Pair: apps_and_tools/pair.md
|
||||
- Unbond: apps_and_tools/unbond.md
|
||||
- USB Probe: apps_and_tools/usb_probe.md
|
||||
- Hardware:
|
||||
- Overview: hardware/index.md
|
||||
- Platforms:
|
||||
|
||||
@@ -13,17 +13,29 @@ type of device (there's no way to tell).
|
||||
|
||||
## Usage
|
||||
|
||||
This command line tool takes no arguments.
|
||||
This command line tool may be invoked with no arguments, or with `--verbose`
|
||||
for extra details.
|
||||
When installed from PyPI, run as
|
||||
```
|
||||
$ bumble-usb-probe
|
||||
```
|
||||
|
||||
or, for extra details, with the `--verbose` argument
|
||||
```
|
||||
$ bumble-usb-probe --v
|
||||
```
|
||||
|
||||
When running from the source distribution:
|
||||
```
|
||||
$ python3 apps/usb-probe.py
|
||||
```
|
||||
|
||||
or
|
||||
|
||||
```
|
||||
$ python3 apps/usb-probe.py --verbose
|
||||
```
|
||||
|
||||
!!! example
|
||||
```
|
||||
$ python3 apps/usb_probe.py
|
||||
|
||||
@@ -5,6 +5,7 @@ The USB transport interfaces with a local Bluetooth USB dongle.
|
||||
|
||||
## Moniker
|
||||
The moniker for a USB transport is either:
|
||||
|
||||
* `usb:<index>`
|
||||
* `usb:<vendor>:<product>`
|
||||
* `usb:<vendor>:<product>/<serial-number>`
|
||||
@@ -16,6 +17,10 @@ In the `usb:<vendor>:<product>#<index>` form, matching devices are the ones with
|
||||
|
||||
`<vendor>` and `<product>` are a vendor ID and product ID in hexadecimal.
|
||||
|
||||
In addition, if the moniker ends with the symbol "!", the device will be used in "forced" mode:
|
||||
the first USB interface of the device will be used, regardless of the interface class/subclass.
|
||||
This may be useful for some devices that use a custom class/subclass but may nonetheless work as-is.
|
||||
|
||||
!!! examples
|
||||
`usb:04b4:f901`
|
||||
The USB dongle with `<vendor>` equal to `04b4` and `<product>` equal to `f901`
|
||||
@@ -29,6 +34,10 @@ In the `usb:<vendor>:<product>#<index>` form, matching devices are the ones with
|
||||
`usb:04b4:f901/#1`
|
||||
The second USB dongle with `<vendor>` equal to `04b4` and `<product>` equal to `f901`
|
||||
|
||||
`usb:0B05:17CB!`
|
||||
The BT USB dongle vendor=0B05 and product=17CB, in "forced" mode.
|
||||
|
||||
|
||||
## Alternative
|
||||
The library includes two different implementations of the USB transport, implemented using different python bindings for `libusb`.
|
||||
Using the transport prefix `pyusb:` instead of `usb:` selects the implementation based on [PyUSB](https://pypi.org/project/pyusb/), using the synchronous API of `libusb`, whereas the default implementation is based on [libusb1](https://pypi.org/project/libusb1/), using the asynchronous API of `libusb`. In order to use the alternative PyUSB-based implementation, you need to ensure that you have installed that python module, as it isn't installed by default as a dependency of Bumble.
|
||||
|
||||
@@ -164,6 +164,17 @@ async def test_characteristic_encoding():
|
||||
await async_barrier()
|
||||
assert characteristic.value == bytes([124])
|
||||
|
||||
v = await cp.read_value()
|
||||
assert v == 124
|
||||
await cp.write_value(125, with_response=True)
|
||||
await async_barrier()
|
||||
assert characteristic.value == bytes([125])
|
||||
|
||||
cd = DelegatedCharacteristicAdapter(c, encode=lambda x: bytes([x // 2]))
|
||||
await cd.write_value(100, with_response=True)
|
||||
await async_barrier()
|
||||
assert characteristic.value == bytes([50])
|
||||
|
||||
last_change = None
|
||||
|
||||
def on_change(value):
|
||||
|
||||
Reference in New Issue
Block a user