forked from auracaster/bumble_mirror
Compare commits
9 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| d6c4644b23 | |||
| df1962e8da | |||
| d2227f017f | |||
| a2f18cffc9 | |||
| db5e52f1df | |||
| d7da5a9379 | |||
| daa05b8996 | |||
| 624e860762 | |||
| 159cbf7774 |
+115
-31
@@ -28,6 +28,7 @@
|
||||
# -----------------------------------------------------------------------------
|
||||
import os
|
||||
import logging
|
||||
import sys
|
||||
import usb1
|
||||
from colors import color
|
||||
|
||||
@@ -35,6 +36,7 @@ from colors import color
|
||||
# -----------------------------------------------------------------------------
|
||||
# Constants
|
||||
# -----------------------------------------------------------------------------
|
||||
USB_DEVICE_CLASS_DEVICE = 0x00
|
||||
USB_DEVICE_CLASS_WIRELESS_CONTROLLER = 0xE0
|
||||
USB_DEVICE_SUBCLASS_RF_CONTROLLER = 0x01
|
||||
USB_DEVICE_PROTOCOL_BLUETOOTH_PRIMARY_CONTROLLER = 0x01
|
||||
@@ -75,9 +77,79 @@ USB_DEVICE_CLASSES = {
|
||||
0xFF: 'Vendor Specific'
|
||||
}
|
||||
|
||||
USB_ENDPOINT_IN = 0x80
|
||||
USB_ENDPOINT_TYPES = ['CONTROL', 'ISOCHRONOUS', 'BULK', 'INTERRUPT']
|
||||
|
||||
USB_BT_HCI_CLASS_TUPLE = (
|
||||
USB_DEVICE_CLASS_WIRELESS_CONTROLLER,
|
||||
USB_DEVICE_SUBCLASS_RF_CONTROLLER,
|
||||
USB_DEVICE_PROTOCOL_BLUETOOTH_PRIMARY_CONTROLLER
|
||||
)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
def main():
|
||||
def show_device_details(device):
|
||||
for configuration in device:
|
||||
print(f' Configuration {configuration.getConfigurationValue()}')
|
||||
for interface in configuration:
|
||||
for setting in interface:
|
||||
alternateSetting = setting.getAlternateSetting()
|
||||
suffix = f'/{alternateSetting}' if interface.getNumSettings() > 1 else ''
|
||||
(class_string, subclass_string) = get_class_info(
|
||||
setting.getClass(),
|
||||
setting.getSubClass(),
|
||||
setting.getProtocol()
|
||||
)
|
||||
details = f'({class_string}, {subclass_string})'
|
||||
print(f' Interface: {setting.getNumber()}{suffix} {details}')
|
||||
for endpoint in setting:
|
||||
endpoint_type = USB_ENDPOINT_TYPES[endpoint.getAttributes() & 3]
|
||||
endpoint_direction = 'OUT' if (endpoint.getAddress() & USB_ENDPOINT_IN == 0) else 'IN'
|
||||
print(f' Endpoint 0x{endpoint.getAddress():02X}: {endpoint_type} {endpoint_direction}')
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
def get_class_info(cls, subclass, protocol):
|
||||
class_info = USB_DEVICE_CLASSES.get(cls)
|
||||
protocol_string = ''
|
||||
if class_info is None:
|
||||
class_string = f'0x{cls:02X}'
|
||||
else:
|
||||
if type(class_info) is tuple:
|
||||
class_string = class_info[0]
|
||||
subclass_info = class_info[1].get(subclass)
|
||||
if subclass_info:
|
||||
protocol_string = subclass_info.get(protocol)
|
||||
if protocol_string is not None:
|
||||
protocol_string = f' [{protocol_string}]'
|
||||
|
||||
else:
|
||||
class_string = class_info
|
||||
|
||||
subclass_string = f'{subclass}/{protocol}{protocol_string}'
|
||||
|
||||
return (class_string, subclass_string)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
def is_bluetooth_hci(device):
|
||||
# Check if the device class indicates a match
|
||||
if (device.getDeviceClass(), device.getDeviceSubClass(), device.getDeviceProtocol()) == USB_BT_HCI_CLASS_TUPLE:
|
||||
return True
|
||||
|
||||
# If the device class is 'Device', look for a matching interface
|
||||
if device.getDeviceClass() == USB_DEVICE_CLASS_DEVICE:
|
||||
for configuration in device:
|
||||
for interface in configuration:
|
||||
for setting in interface:
|
||||
if (setting.getClass(), setting.getSubClass(), setting.getProtocol()) == USB_BT_HCI_CLASS_TUPLE:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
def main(verbose):
|
||||
logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'WARNING').upper())
|
||||
|
||||
with usb1.USBContext() as context:
|
||||
@@ -91,23 +163,28 @@ def main():
|
||||
|
||||
device_id = (device.getVendorID(), device.getProductID())
|
||||
|
||||
device_is_bluetooth_hci = (
|
||||
device_class == USB_DEVICE_CLASS_WIRELESS_CONTROLLER and
|
||||
device_subclass == USB_DEVICE_SUBCLASS_RF_CONTROLLER and
|
||||
device_protocol == USB_DEVICE_PROTOCOL_BLUETOOTH_PRIMARY_CONTROLLER
|
||||
(device_class_string, device_subclass_string) = get_class_info(
|
||||
device_class,
|
||||
device_subclass,
|
||||
device_protocol
|
||||
)
|
||||
|
||||
device_class_details = ''
|
||||
device_class_info = USB_DEVICE_CLASSES.get(device_class)
|
||||
if device_class_info is not None:
|
||||
if type(device_class_info) is tuple:
|
||||
device_class = device_class_info[0]
|
||||
device_subclass_info = device_class_info[1].get(device_subclass)
|
||||
if device_subclass_info:
|
||||
device_class_details = f' [{device_subclass_info.get(device_protocol)}]'
|
||||
else:
|
||||
device_class = device_class_info
|
||||
try:
|
||||
device_serial_number = device.getSerialNumber()
|
||||
except usb1.USBError:
|
||||
device_serial_number = None
|
||||
|
||||
try:
|
||||
device_manufacturer = device.getManufacturer()
|
||||
except usb1.USBError:
|
||||
device_manufacturer = None
|
||||
|
||||
try:
|
||||
device_product = device.getProduct()
|
||||
except usb1.USBError:
|
||||
device_product = None
|
||||
|
||||
device_is_bluetooth_hci = is_bluetooth_hci(device)
|
||||
if device_is_bluetooth_hci:
|
||||
bluetooth_device_count += 1
|
||||
fg_color = 'black'
|
||||
@@ -123,35 +200,42 @@ def main():
|
||||
if device_is_bluetooth_hci:
|
||||
bumble_transport_names.append(f'usb:{bluetooth_device_count - 1}')
|
||||
|
||||
serial_number_collision = False
|
||||
if device_id in devices:
|
||||
for device_serial in devices[device_id]:
|
||||
if device_serial == device.getSerialNumber():
|
||||
serial_number_collision = True
|
||||
|
||||
if device_id not in devices:
|
||||
bumble_transport_names.append(basic_transport_name)
|
||||
else:
|
||||
bumble_transport_names.append(f'{basic_transport_name}#{len(devices[device_id])}')
|
||||
|
||||
if device.getSerialNumber() and not serial_number_collision:
|
||||
bumble_transport_names.append(f'{basic_transport_name}/{device.getSerialNumber()}')
|
||||
if device_serial_number is not None:
|
||||
if device_id not in devices or device_serial_number not in devices[device_id]:
|
||||
bumble_transport_names.append(f'{basic_transport_name}/{device_serial_number}')
|
||||
|
||||
# Print the results
|
||||
print(color(f'ID {device.getVendorID():04X}:{device.getProductID():04X}', fg=fg_color, bg=bg_color))
|
||||
if bumble_transport_names:
|
||||
print(color(' Bumble Transport Names:', 'blue'), ' or '.join(color(x, 'cyan' if device_is_bluetooth_hci else 'red') for x in bumble_transport_names))
|
||||
print(color(' Bus/Device: ', 'green'), f'{device.getBusNumber():03}/{device.getDeviceAddress():03}')
|
||||
if device.getSerialNumber():
|
||||
print(color(' Serial: ', 'green'), device.getSerialNumber())
|
||||
print(color(' Class: ', 'green'), device_class)
|
||||
print(color(' Subclass/Protocol: ', 'green'), f'{device_subclass}/{device_protocol}{device_class_details}')
|
||||
print(color(' Manufacturer: ', 'green'), device.getManufacturer())
|
||||
print(color(' Product: ', 'green'), device.getProduct())
|
||||
print(color(' Class: ', 'green'), device_class_string)
|
||||
print(color(' Subclass/Protocol: ', 'green'), device_subclass_string)
|
||||
if device_serial_number is not None:
|
||||
print(color(' Serial: ', 'green'), device_serial_number)
|
||||
if device_manufacturer is not None:
|
||||
print(color(' Manufacturer: ', 'green'), device_manufacturer)
|
||||
if device_product is not None:
|
||||
print(color(' Product: ', 'green'), device_product)
|
||||
|
||||
if verbose:
|
||||
show_device_details(device)
|
||||
|
||||
print()
|
||||
|
||||
devices.setdefault(device_id, []).append(device.getSerialNumber())
|
||||
devices.setdefault(device_id, []).append(device_serial_number)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
if len(sys.argv) == 2 and sys.argv[1] == '--verbose':
|
||||
verbose = True
|
||||
else:
|
||||
verbose = False
|
||||
|
||||
main(verbose)
|
||||
|
||||
+20
-109
@@ -18,8 +18,7 @@
|
||||
import json
|
||||
import asyncio
|
||||
import logging
|
||||
import secrets
|
||||
from contextlib import asynccontextmanager, AsyncExitStack
|
||||
from contextlib import asynccontextmanager, AsyncExitStack
|
||||
|
||||
from .hci import *
|
||||
from .host import Host
|
||||
@@ -33,8 +32,6 @@ from . import smp
|
||||
from . import sdp
|
||||
from . import l2cap
|
||||
from . import keys
|
||||
from . import crypto
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Logging
|
||||
@@ -54,7 +51,6 @@ DEVICE_DEFAULT_SCAN_RESPONSE_DATA = b''
|
||||
DEVICE_DEFAULT_DATA_LENGTH = (27, 328, 27, 328)
|
||||
DEVICE_DEFAULT_SCAN_INTERVAL = 60 # ms
|
||||
DEVICE_DEFAULT_SCAN_WINDOW = 60 # ms
|
||||
DEVICE_DEFAULT_LE_RPA_TIMEOUT = 15 * 60 # 15 minutes (in seconds)
|
||||
DEVICE_MIN_SCAN_INTERVAL = 25
|
||||
DEVICE_MAX_SCAN_INTERVAL = 10240
|
||||
DEVICE_MIN_SCAN_WINDOW = 25
|
||||
@@ -173,6 +169,7 @@ class Peer:
|
||||
async def __aexit__(self, exc_type, exc_value, traceback):
|
||||
pass
|
||||
|
||||
|
||||
def __str__(self):
|
||||
return f'{self.connection.peer_address} as {self.connection.role_name}'
|
||||
|
||||
@@ -205,22 +202,11 @@ class Connection(CompositeEventEmitter):
|
||||
def on_connection_encryption_key_refresh(self):
|
||||
pass
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
device,
|
||||
handle,
|
||||
transport,
|
||||
local_address,
|
||||
peer_address,
|
||||
peer_resolvable_address,
|
||||
role,
|
||||
parameters
|
||||
):
|
||||
def __init__(self, device, handle, transport, peer_address, peer_resolvable_address, role, parameters):
|
||||
super().__init__()
|
||||
self.device = device
|
||||
self.handle = handle
|
||||
self.transport = transport
|
||||
self.local_address = local_address
|
||||
self.peer_address = peer_address
|
||||
self.peer_resolvable_address = peer_resolvable_address
|
||||
self.peer_name = None # Classic only
|
||||
@@ -311,12 +297,7 @@ class Connection(CompositeEventEmitter):
|
||||
raise
|
||||
|
||||
def __str__(self):
|
||||
return (
|
||||
f'Connection(handle=0x{self.handle:04X}, '
|
||||
f'role={self.role_name}, '
|
||||
f'local_address={self.local_address}, '
|
||||
f'peer_address={self.peer_address})'
|
||||
)
|
||||
return f'Connection(handle=0x{self.handle:04X}, role={self.role_name}, address={self.peer_address})'
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@@ -330,10 +311,8 @@ class DeviceConfiguration:
|
||||
self.advertising_interval_min = DEVICE_DEFAULT_ADVERTISING_INTERVAL
|
||||
self.advertising_interval_max = DEVICE_DEFAULT_ADVERTISING_INTERVAL
|
||||
self.le_enabled = True
|
||||
# LE host enable 2nd parameter
|
||||
self.le_simultaneous_enabled = True
|
||||
self.le_privacy_enabled = False
|
||||
self.le_rpa_timeout = DEVICE_DEFAULT_LE_RPA_TIMEOUT
|
||||
self.classic_enabled = False
|
||||
self.classic_sc_enabled = True
|
||||
self.classic_ssp_enabled = True
|
||||
self.connectable = True
|
||||
@@ -341,22 +320,19 @@ class DeviceConfiguration:
|
||||
self.advertising_data = bytes(
|
||||
AdvertisingData([(AdvertisingData.COMPLETE_LOCAL_NAME, bytes(self.name, 'utf-8'))])
|
||||
)
|
||||
self.irk = bytes([0xFF] * 16) # This really must be changed for any level of security
|
||||
self.irk = bytes(16) # This really must be changed for any level of security
|
||||
self.keystore = None
|
||||
|
||||
def load_from_dict(self, config):
|
||||
# Load simple properties
|
||||
self.name = config.get('name', self.name)
|
||||
self.address = Address(config.get('address', self.address))
|
||||
self.class_of_device = config.get('class_of_device', self.class_of_device)
|
||||
self.name = config.get('name', self.name)
|
||||
self.address = Address(config.get('address', self.address))
|
||||
self.class_of_device = config.get('class_of_device', self.class_of_device)
|
||||
self.advertising_interval_min = config.get('advertising_interval', self.advertising_interval_min)
|
||||
self.advertising_interval_max = self.advertising_interval_min
|
||||
self.keystore = config.get('keystore')
|
||||
self.le_enabled = config.get('le_enabled', self.le_enabled)
|
||||
self.le_simultaneous_enabled = config.get('le_simultaneous_enabled', self.le_simultaneous_enabled)
|
||||
self.le_privacy_enabled = config.get('le_privacy_enabled', self.le_privacy_enabled)
|
||||
self.le_rpa_timeout = config.get('le_rpa_timeout', self.le_rpa_timeout)
|
||||
self.classic_enabled = config.get('classic_enabled', self.classic_enabled)
|
||||
self.classic_sc_enabled = config.get('classic_sc_enabled', self.classic_sc_enabled)
|
||||
self.classic_ssp_enabled = config.get('classic_ssp_enabled', self.classic_ssp_enabled)
|
||||
self.connectable = config.get('connectable', self.connectable)
|
||||
@@ -376,10 +352,6 @@ class DeviceConfiguration:
|
||||
advertising_data = config.get('advertising_data')
|
||||
if advertising_data:
|
||||
self.advertising_data = bytes.fromhex(advertising_data)
|
||||
else:
|
||||
self.advertising_data = bytes(
|
||||
AdvertisingData([(AdvertisingData.COMPLETE_LOCAL_NAME, bytes(self.name, 'utf-8'))])
|
||||
)
|
||||
|
||||
def load_from_file(self, filename):
|
||||
with open(filename, 'r') as file:
|
||||
@@ -486,9 +458,9 @@ class Device(CompositeEventEmitter):
|
||||
self.connecting = False
|
||||
self.disconnecting = False
|
||||
self.connections = {} # Connections, by connection handle
|
||||
self.classic_enabled = False
|
||||
self.inquiry_response = None
|
||||
self.address_resolver = None
|
||||
self.le_rpa_task = None
|
||||
|
||||
# Use the initial config or a default
|
||||
self.public_address = Address('00:00:00:00:00:00')
|
||||
@@ -496,7 +468,6 @@ class Device(CompositeEventEmitter):
|
||||
config = DeviceConfiguration()
|
||||
self.name = config.name
|
||||
self.random_address = config.address
|
||||
self.identity_address = config.address
|
||||
self.class_of_device = config.class_of_device
|
||||
self.scan_response_data = config.scan_response_data
|
||||
self.advertising_data = config.advertising_data
|
||||
@@ -506,9 +477,6 @@ class Device(CompositeEventEmitter):
|
||||
self.irk = config.irk
|
||||
self.le_enabled = config.le_enabled
|
||||
self.le_simultaneous_enabled = config.le_simultaneous_enabled
|
||||
self.le_privacy_enabled = config.le_privacy_enabled
|
||||
self.le_rpa_timeout = config.le_rpa_timeout
|
||||
self.classic_enabled = config.classic_enabled
|
||||
self.classic_ssp_enabled = config.classic_ssp_enabled
|
||||
self.classic_sc_enabled = config.classic_sc_enabled
|
||||
self.discoverable = config.discoverable
|
||||
@@ -522,12 +490,11 @@ class Device(CompositeEventEmitter):
|
||||
if address:
|
||||
if type(address) is str:
|
||||
address = Address(address)
|
||||
self.random_address = address
|
||||
self.identity_address = address
|
||||
self.random_address = address
|
||||
|
||||
# Setup SMP
|
||||
# TODO: allow using a public address
|
||||
self.smp_manager = smp.Manager(self, self.random_address, self.identity_address)
|
||||
self.smp_manager = smp.Manager(self, self.random_address)
|
||||
self.l2cap_channel_manager.register_fixed_channel(
|
||||
smp.SMP_CID, self.on_smp_pdu)
|
||||
self.l2cap_channel_manager.register_fixed_channel(
|
||||
@@ -624,14 +591,6 @@ class Device(CompositeEventEmitter):
|
||||
))
|
||||
|
||||
if self.le_enabled:
|
||||
# If LE Privacy is enabled, generate an RPA
|
||||
if self.le_privacy_enabled:
|
||||
self.random_address = self.generate_le_rpa()
|
||||
logger.info(f'Initial RPA: {self.random_address}')
|
||||
if self.le_rpa_timeout > 0:
|
||||
# Start a task to periodically generate a new RPA
|
||||
self.le_rpa_task = asyncio.create_task(self.run_le_rpa_generation())
|
||||
|
||||
# Set the controller address
|
||||
await self.send_command(HCI_LE_Set_Random_Address_Command(
|
||||
random_address = self.random_address
|
||||
@@ -678,48 +637,13 @@ class Device(CompositeEventEmitter):
|
||||
await self.set_connectable(self.connectable)
|
||||
await self.set_discoverable(self.discoverable)
|
||||
|
||||
# Let the SMP manager know about the address
|
||||
# TODO: allow using a public address
|
||||
self.smp_manager.address = self.random_address
|
||||
|
||||
# Done
|
||||
self.powered_on = True
|
||||
|
||||
async def run_le_rpa_generation(self):
|
||||
while self.le_rpa_timeout != 0:
|
||||
await asyncio.sleep(self.le_rpa_timeout)
|
||||
|
||||
# Check if this is a good time to rotate the address
|
||||
if self.advertising or self.scanning or self.connecting:
|
||||
logger.debug('skipping RPA rotation')
|
||||
continue
|
||||
|
||||
random_address = self.generate_le_rpa()
|
||||
response = await self.send_command(HCI_LE_Set_Random_Address_Command(
|
||||
random_address = self.random_address
|
||||
))
|
||||
if response.return_parameters == HCI_SUCCESS:
|
||||
logger.info(f'New RPA: {random_address}')
|
||||
self.random_address = random_address
|
||||
else:
|
||||
logger.warning(f'failed to set RPA: {response.return_parameters}')
|
||||
|
||||
def generate_le_rpa(self):
|
||||
# See 1.3.2.2 Private device address generation
|
||||
|
||||
# Generate `prand`
|
||||
while True:
|
||||
# Generate a 22-bit random number for the random part of `prand`
|
||||
prand_random = secrets.randbelow(0x400000)
|
||||
|
||||
# As least on bit shall be 0 and one bit shall be 1
|
||||
if prand_random != 0 and prand_random != 0x3FFFFF:
|
||||
break
|
||||
|
||||
prand = prand_random | 0x400000 # The two MSBs are |1|0|
|
||||
|
||||
# Generate `hash`
|
||||
hash = crypto.ah(self.irk, struct.pack('<I', prand)[:3])
|
||||
|
||||
# Generate the address from `prand` and `hash`
|
||||
return Address(hash + struct.pack('<I', prand)[:3], Address.RANDOM_IDENTITY_ADDRESS)
|
||||
|
||||
async def start_advertising(self, auto_restart=False):
|
||||
self.auto_restart_advertising = auto_restart
|
||||
|
||||
@@ -751,24 +675,18 @@ class Device(CompositeEventEmitter):
|
||||
))
|
||||
|
||||
# Enable advertising
|
||||
response = await self.send_command(HCI_LE_Set_Advertising_Enable_Command(
|
||||
await self.send_command(HCI_LE_Set_Advertising_Enable_Command(
|
||||
advertising_enable = 1
|
||||
))
|
||||
if response.return_parameters != HCI_SUCCESS:
|
||||
logger.warning(f'HCI_LE_Set_Advertising_Enable_Command failed ({response.return_parameters})')
|
||||
raise HCI_Error(response.return_parameters)
|
||||
|
||||
self.advertising = True
|
||||
|
||||
async def stop_advertising(self):
|
||||
# Disable advertising
|
||||
if self.advertising:
|
||||
response = await self.send_command(HCI_LE_Set_Advertising_Enable_Command(
|
||||
await self.send_command(HCI_LE_Set_Advertising_Enable_Command(
|
||||
advertising_enable = 0
|
||||
))
|
||||
if response.return_parameters != HCI_SUCCESS:
|
||||
logger.warning(f'HCI_LE_Set_Advertising_Enable_Command failed ({response.return_parameters})')
|
||||
raise HCI_Error(response.return_parameters)
|
||||
|
||||
self.advertising = False
|
||||
|
||||
@@ -803,23 +721,17 @@ class Device(CompositeEventEmitter):
|
||||
))
|
||||
|
||||
# Enable scanning
|
||||
response = await self.send_command(HCI_LE_Set_Scan_Enable_Command(
|
||||
await self.send_command(HCI_LE_Set_Scan_Enable_Command(
|
||||
le_scan_enable = 1,
|
||||
filter_duplicates = 1 if filter_duplicates else 0
|
||||
))
|
||||
if response.return_parameters != HCI_SUCCESS:
|
||||
raise HCI_Error(response.return_parameters)
|
||||
|
||||
self.scanning = True
|
||||
|
||||
async def stop_scanning(self):
|
||||
response = await self.send_command(HCI_LE_Set_Scan_Enable_Command(
|
||||
await self.send_command(HCI_LE_Set_Scan_Enable_Command(
|
||||
le_scan_enable = 0,
|
||||
filter_duplicates = 0
|
||||
))
|
||||
if response.return_parameters != HCI_SUCCESS:
|
||||
raise HCI_Error(response.return_parameters)
|
||||
|
||||
self.scanning = False
|
||||
|
||||
@property
|
||||
@@ -1330,7 +1242,6 @@ class Device(CompositeEventEmitter):
|
||||
self,
|
||||
connection_handle,
|
||||
transport,
|
||||
self.public_address if transport == BT_BR_EDR_TRANSPORT else self.random_address,
|
||||
peer_address,
|
||||
peer_resolvable_address,
|
||||
role,
|
||||
|
||||
+3
-5
@@ -1375,11 +1375,9 @@ class HCI_Error(ProtocolError):
|
||||
|
||||
class HCI_StatusError(ProtocolError):
|
||||
def __init__(self, response):
|
||||
super().__init__(
|
||||
response.status,
|
||||
error_namespace=HCI_Command.command_name(response.command_opcode),
|
||||
error_name=HCI_Constant.status_name(response.status)
|
||||
)
|
||||
super().__init__(response.status,
|
||||
error_namespace=HCI_Command.command_name(response.command_opcode),
|
||||
error_name=HCI_Constant.status_name(response.status))
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
@@ -18,6 +18,8 @@
|
||||
import logging
|
||||
from colors import color
|
||||
|
||||
from bumble.smp import SMP_CID, SMP_Command
|
||||
|
||||
from .core import name_or_number
|
||||
from .gatt import ATT_PDU, ATT_CID
|
||||
from .l2cap import (
|
||||
@@ -73,6 +75,9 @@ class PacketTracer:
|
||||
if l2cap_pdu.cid == ATT_CID:
|
||||
att_pdu = ATT_PDU.from_bytes(l2cap_pdu.payload)
|
||||
self.analyzer.emit(att_pdu)
|
||||
elif l2cap_pdu.cid == SMP_CID:
|
||||
smp_command = SMP_Command.from_bytes(l2cap_pdu.payload)
|
||||
self.analyzer.emit(smp_command)
|
||||
elif l2cap_pdu.cid == L2CAP_SIGNALING_CID or l2cap_pdu.cid == L2CAP_LE_SIGNALING_CID:
|
||||
control_frame = L2CAP_Control_Frame.from_bytes(l2cap_pdu.payload)
|
||||
self.analyzer.emit(control_frame)
|
||||
|
||||
+23
-16
@@ -155,6 +155,7 @@ SMP_CT2_AUTHREQ = 0b00100000
|
||||
SMP_CTKD_H7_LEBR_SALT = bytes.fromhex('00000000000000000000000000000000746D7031')
|
||||
SMP_CTKD_H7_BRLE_SALT = bytes.fromhex('00000000000000000000000000000000746D7032')
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Utils
|
||||
# -----------------------------------------------------------------------------
|
||||
@@ -638,13 +639,13 @@ class Session:
|
||||
# Set up addresses
|
||||
peer_address = connection.peer_resolvable_address or connection.peer_address
|
||||
if self.is_initiator:
|
||||
self.ia = bytes(connection.local_address)
|
||||
self.iat = 1 if connection.local_address.is_random else 0
|
||||
self.ia = bytes(manager.address)
|
||||
self.iat = 1 if manager.address.is_random else 0
|
||||
self.ra = bytes(peer_address)
|
||||
self.rat = 1 if peer_address.is_random else 0
|
||||
else:
|
||||
self.ra = bytes(connection.local_address)
|
||||
self.rat = 1 if connection.local_address.is_random else 0
|
||||
self.ra = bytes(manager.address)
|
||||
self.rat = 1 if manager.address.is_random else 0
|
||||
self.ia = bytes(peer_address)
|
||||
self.iat = 1 if peer_address.is_random else 0
|
||||
|
||||
@@ -906,8 +907,8 @@ class Session:
|
||||
SMP_Identity_Information_Command(identity_resolving_key=self.manager.device.irk)
|
||||
)
|
||||
self.send_command(SMP_Identity_Address_Information_Command(
|
||||
addr_type = self.manager.identity_address.address_type,
|
||||
bd_addr = self.manager.identity_address
|
||||
addr_type = self.manager.address.address_type,
|
||||
bd_addr = self.manager.address
|
||||
))
|
||||
|
||||
# Distribute CSRK
|
||||
@@ -938,8 +939,8 @@ class Session:
|
||||
SMP_Identity_Information_Command(identity_resolving_key=self.manager.device.irk)
|
||||
)
|
||||
self.send_command(SMP_Identity_Address_Information_Command(
|
||||
addr_type = self.manager.identity_address.address_type,
|
||||
bd_addr = self.manager.identity_address
|
||||
addr_type = self.manager.address.address_type,
|
||||
bd_addr = self.manager.address
|
||||
))
|
||||
|
||||
# Distribute CSRK
|
||||
@@ -980,12 +981,7 @@ class Session:
|
||||
self.peer_expected_distributions.remove(command_class)
|
||||
logger.debug(f'remaining distributions: {[c.__name__ for c in self.peer_expected_distributions]}')
|
||||
if not self.peer_expected_distributions:
|
||||
# The initiator can now send its keys
|
||||
if self.is_initiator:
|
||||
self.distribute_keys()
|
||||
|
||||
# Nothing left to expect, we're done
|
||||
asyncio.create_task(self.on_pairing())
|
||||
self.on_peer_key_distribution_complete()
|
||||
else:
|
||||
logger.warn(color(f'!!! unexpected key distribution command: {command_class.__name__}', 'red'))
|
||||
self.send_pairing_failed(SMP_UNSPECIFIED_REASON_ERROR)
|
||||
@@ -1006,12 +1002,23 @@ class Session:
|
||||
self.connection.remove_listener('connection_encryption_key_refresh', self.on_connection_encryption_key_refresh)
|
||||
self.manager.on_session_end(self)
|
||||
|
||||
def on_peer_key_distribution_complete(self):
|
||||
# The initiator can now send its keys
|
||||
if self.is_initiator:
|
||||
self.distribute_keys()
|
||||
|
||||
asyncio.create_task(self.on_pairing())
|
||||
|
||||
def on_connection_encryption_change(self):
|
||||
if self.connection.is_encrypted:
|
||||
if self.is_responder:
|
||||
# The responder distributes its keys first, the initiator later
|
||||
self.distribute_keys()
|
||||
|
||||
# If we're not expecting key distributions from the peer, we're done
|
||||
if not self.peer_expected_distributions:
|
||||
self.on_peer_key_distribution_complete()
|
||||
|
||||
def on_connection_encryption_key_refresh(self):
|
||||
# Do as if the connection had just been encrypted
|
||||
self.on_connection_encryption_change()
|
||||
@@ -1479,10 +1486,10 @@ class Manager(EventEmitter):
|
||||
Implements the Initiator and Responder roles of the Security Manager Protocol
|
||||
'''
|
||||
|
||||
def __init__(self, device, address, identity_address):
|
||||
def __init__(self, device, address):
|
||||
super().__init__()
|
||||
self.device = device
|
||||
self.identity_address = identity_address
|
||||
self.address = address
|
||||
self.sessions = {}
|
||||
self._ecc_key = None
|
||||
self.pairing_config_factory = lambda connection: PairingConfig()
|
||||
|
||||
+125
-29
@@ -36,7 +36,7 @@ logger = logging.getLogger(__name__)
|
||||
async def open_usb_transport(spec):
|
||||
'''
|
||||
Open a USB transport.
|
||||
The parameter string has this syntax:
|
||||
The moniker string has this syntax:
|
||||
either <index> or
|
||||
<vendor>:<product> or
|
||||
<vendor>:<product>/<serial-number>] or
|
||||
@@ -47,27 +47,40 @@ async def open_usb_transport(spec):
|
||||
/<serial-number> suffix or #<index> suffix max be specified when more than one device with
|
||||
the same vendor and product identifiers are present.
|
||||
|
||||
In addition, if the moniker ends with the symbol "!", the device will be used in "forced" mode:
|
||||
the first USB interface of the device will be used, regardless of the interface class/subclass.
|
||||
This may be useful for some devices that use a custom class/subclass but may nonetheless work as-is.
|
||||
|
||||
Examples:
|
||||
0 --> the first BT USB dongle
|
||||
04b4:f901 --> the BT USB dongle with vendor=04b4 and product=f901
|
||||
04b4:f901#2 --> the third USB device with vendor=04b4 and product=f901
|
||||
04b4:f901/00E04C239987 --> the BT USB dongle with vendor=04b4 and product=f901 and serial number 00E04C239987
|
||||
usb:0B05:17CB! --> the BT USB dongle vendor=0B05 and product=17CB, in "forced" mode.
|
||||
'''
|
||||
|
||||
USB_RECIPIENT_DEVICE = 0x00
|
||||
USB_REQUEST_TYPE_CLASS = 0x01 << 5
|
||||
USB_ENDPOINT_EVENTS_IN = 0x81
|
||||
USB_ENDPOINT_ACL_IN = 0x82
|
||||
USB_ENDPOINT_ACL_OUT = 0x02
|
||||
USB_DEVICE_CLASS_DEVICE = 0x00
|
||||
USB_DEVICE_CLASS_WIRELESS_CONTROLLER = 0xE0
|
||||
USB_DEVICE_SUBCLASS_RF_CONTROLLER = 0x01
|
||||
USB_DEVICE_PROTOCOL_BLUETOOTH_PRIMARY_CONTROLLER = 0x01
|
||||
USB_ENDPOINT_TRANSFER_TYPE_BULK = 0x02
|
||||
USB_ENDPOINT_TRANSFER_TYPE_INTERRUPT = 0x03
|
||||
USB_ENDPOINT_IN = 0x80
|
||||
|
||||
USB_BT_HCI_CLASS_TUPLE = (
|
||||
USB_DEVICE_CLASS_WIRELESS_CONTROLLER,
|
||||
USB_DEVICE_SUBCLASS_RF_CONTROLLER,
|
||||
USB_DEVICE_PROTOCOL_BLUETOOTH_PRIMARY_CONTROLLER
|
||||
)
|
||||
|
||||
READ_SIZE = 1024
|
||||
|
||||
class UsbPacketSink:
|
||||
def __init__(self, device):
|
||||
def __init__(self, device, acl_out):
|
||||
self.device = device
|
||||
self.acl_out = acl_out
|
||||
self.transfer = device.getTransfer()
|
||||
self.packets = collections.deque() # Queue of packets waiting to be sent
|
||||
self.loop = asyncio.get_running_loop()
|
||||
@@ -116,7 +129,7 @@ async def open_usb_transport(spec):
|
||||
packet_type = packet[0]
|
||||
if packet_type == hci.HCI_ACL_DATA_PACKET:
|
||||
self.transfer.setBulk(
|
||||
USB_ENDPOINT_ACL_OUT,
|
||||
self.acl_out,
|
||||
packet[1:],
|
||||
callback=self.on_packet_sent
|
||||
)
|
||||
@@ -152,10 +165,12 @@ async def open_usb_transport(spec):
|
||||
logger.debug('OUT transfer likely already completed')
|
||||
|
||||
class UsbPacketSource(asyncio.Protocol, ParserSource):
|
||||
def __init__(self, context, device):
|
||||
def __init__(self, context, device, acl_in, events_in):
|
||||
super().__init__()
|
||||
self.context = context
|
||||
self.device = device
|
||||
self.acl_in = acl_in
|
||||
self.events_in = events_in
|
||||
self.loop = asyncio.get_running_loop()
|
||||
self.queue = asyncio.Queue()
|
||||
self.closed = False
|
||||
@@ -172,7 +187,7 @@ async def open_usb_transport(spec):
|
||||
# Set up transfer objects for input
|
||||
self.events_in_transfer = device.getTransfer()
|
||||
self.events_in_transfer.setInterrupt(
|
||||
USB_ENDPOINT_EVENTS_IN,
|
||||
self.events_in,
|
||||
READ_SIZE,
|
||||
callback=self.on_packet_received,
|
||||
user_data=hci.HCI_EVENT_PACKET
|
||||
@@ -181,7 +196,7 @@ async def open_usb_transport(spec):
|
||||
|
||||
self.acl_in_transfer = device.getTransfer()
|
||||
self.acl_in_transfer.setBulk(
|
||||
USB_ENDPOINT_ACL_IN,
|
||||
self.acl_in,
|
||||
READ_SIZE,
|
||||
callback=self.on_packet_received,
|
||||
user_data=hci.HCI_ACL_DATA_PACKET
|
||||
@@ -248,7 +263,7 @@ async def open_usb_transport(spec):
|
||||
await self.event_loop_done
|
||||
|
||||
class UsbTransport(Transport):
|
||||
def __init__(self, context, device, interface, source, sink):
|
||||
def __init__(self, context, device, interface, setting, source, sink):
|
||||
super().__init__(source, sink)
|
||||
self.context = context
|
||||
self.device = device
|
||||
@@ -257,6 +272,10 @@ async def open_usb_transport(spec):
|
||||
# Get exclusive access
|
||||
device.claimInterface(interface)
|
||||
|
||||
# Set the alternate setting if not the default
|
||||
if setting != 0:
|
||||
device.setInterfaceAltSetting(interface, setting)
|
||||
|
||||
# The source and sink can now start
|
||||
source.start()
|
||||
sink.start()
|
||||
@@ -273,6 +292,13 @@ async def open_usb_transport(spec):
|
||||
context.open()
|
||||
try:
|
||||
found = None
|
||||
|
||||
if spec.endswith('!'):
|
||||
spec = spec[:-1]
|
||||
forced_mode = True
|
||||
else:
|
||||
forced_mode = False
|
||||
|
||||
if ':' in spec:
|
||||
vendor_id, product_id = spec.split(':')
|
||||
serial_number = None
|
||||
@@ -284,10 +310,14 @@ async def open_usb_transport(spec):
|
||||
device_index = int(device_index_str)
|
||||
|
||||
for device in context.getDeviceIterator(skip_on_error=True):
|
||||
try:
|
||||
device_serial_number = device.getSerialNumber()
|
||||
except usb1.USBError:
|
||||
device_serial_number = None
|
||||
if (
|
||||
device.getVendorID() == int(vendor_id, 16) and
|
||||
device.getProductID() == int(product_id, 16) and
|
||||
(serial_number is None or device.getSerialNumber() == serial_number)
|
||||
(serial_number is None or serial_number == device_serial_number)
|
||||
):
|
||||
if device_index == 0:
|
||||
found = device
|
||||
@@ -295,13 +325,27 @@ async def open_usb_transport(spec):
|
||||
device_index -= 1
|
||||
device.close()
|
||||
else:
|
||||
# Look for a compatible device by index
|
||||
def device_is_bluetooth_hci(device):
|
||||
# Check if the device class indicates a match
|
||||
if (device.getDeviceClass(), device.getDeviceSubClass(), device.getDeviceProtocol()) == \
|
||||
USB_BT_HCI_CLASS_TUPLE:
|
||||
return True
|
||||
|
||||
# If the device class is 'Device', look for a matching interface
|
||||
if device.getDeviceClass() == USB_DEVICE_CLASS_DEVICE:
|
||||
for configuration in device:
|
||||
for interface in configuration:
|
||||
for setting in interface:
|
||||
if (setting.getClass(), setting.getSubClass(), setting.getProtocol()) == \
|
||||
USB_BT_HCI_CLASS_TUPLE:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
device_index = int(spec)
|
||||
for device in context.getDeviceIterator(skip_on_error=True):
|
||||
if (
|
||||
device.getDeviceClass() == USB_DEVICE_CLASS_WIRELESS_CONTROLLER and
|
||||
device.getDeviceSubClass() == USB_DEVICE_SUBCLASS_RF_CONTROLLER and
|
||||
device.getDeviceProtocol() == USB_DEVICE_PROTOCOL_BLUETOOTH_PRIMARY_CONTROLLER
|
||||
):
|
||||
if device_is_bluetooth_hci(device):
|
||||
if device_index == 0:
|
||||
found = device
|
||||
break
|
||||
@@ -313,10 +357,62 @@ async def open_usb_transport(spec):
|
||||
raise ValueError('device not found')
|
||||
|
||||
logger.debug(f'USB Device: {found}')
|
||||
device = found.open()
|
||||
|
||||
# Use the first interface
|
||||
interface = 0
|
||||
# Look for the first interface with the right class and endpoints
|
||||
def find_endpoints(device):
|
||||
for (configuration_index, configuration) in enumerate(device):
|
||||
interface = None
|
||||
for interface in configuration:
|
||||
setting = None
|
||||
for setting in interface:
|
||||
if (
|
||||
not forced_mode and
|
||||
(setting.getClass(), setting.getSubClass(), setting.getProtocol()) != USB_BT_HCI_CLASS_TUPLE
|
||||
):
|
||||
continue
|
||||
|
||||
events_in = None
|
||||
acl_in = None
|
||||
acl_out = None
|
||||
for endpoint in setting:
|
||||
attributes = endpoint.getAttributes()
|
||||
address = endpoint.getAddress()
|
||||
if attributes & 0x03 == USB_ENDPOINT_TRANSFER_TYPE_BULK:
|
||||
if address & USB_ENDPOINT_IN and acl_in is None:
|
||||
acl_in = address
|
||||
elif acl_out is None:
|
||||
acl_out = address
|
||||
elif attributes & 0x03 == USB_ENDPOINT_TRANSFER_TYPE_INTERRUPT:
|
||||
if address & USB_ENDPOINT_IN and events_in is None:
|
||||
events_in = address
|
||||
|
||||
# Return if we found all 3 endpoints
|
||||
if acl_in is not None and acl_out is not None and events_in is not None:
|
||||
return (
|
||||
configuration_index + 1,
|
||||
setting.getNumber(),
|
||||
setting.getAlternateSetting(),
|
||||
acl_in,
|
||||
acl_out,
|
||||
events_in
|
||||
)
|
||||
else:
|
||||
logger.debug(f'skipping configuration {configuration_index + 1} / interface {setting.getNumber()}')
|
||||
|
||||
endpoints = find_endpoints(found)
|
||||
if endpoints is None:
|
||||
raise ValueError('no compatible interface found for device')
|
||||
(configuration, interface, setting, acl_in, acl_out, events_in) = endpoints
|
||||
logger.debug(
|
||||
f'selected endpoints: configuration={configuration}, '
|
||||
f'interface={interface}, '
|
||||
f'setting={setting}, '
|
||||
f'acl_in=0x{acl_in:02X}, '
|
||||
f'acl_out=0x{acl_out:02X}, '
|
||||
f'events_in=0x{events_in:02X}, '
|
||||
)
|
||||
|
||||
device = found.open()
|
||||
|
||||
# Detach the kernel driver if supported and needed
|
||||
if usb1.hasCapability(usb1.CAP_SUPPORTS_DETACH_KERNEL_DRIVER):
|
||||
@@ -329,21 +425,21 @@ async def open_usb_transport(spec):
|
||||
|
||||
# Set the configuration if needed
|
||||
try:
|
||||
configuration = device.getConfiguration()
|
||||
logger.debug(f'current configuration = {configuration}')
|
||||
current_configuration = device.getConfiguration()
|
||||
logger.debug(f'current configuration = {current_configuration}')
|
||||
except usb1.USBError:
|
||||
configuration = 0
|
||||
current_configuration = 0
|
||||
|
||||
if configuration != 1:
|
||||
if current_configuration != configuration:
|
||||
try:
|
||||
logger.debug('setting configuration 1')
|
||||
device.setConfiguration(1)
|
||||
logger.debug(f'setting configuration {configuration}')
|
||||
device.setConfiguration(configuration)
|
||||
except usb1.USBError:
|
||||
logger.warning('failed to set configuration 1')
|
||||
logger.warning('failed to set configuration')
|
||||
|
||||
source = UsbPacketSource(context, device)
|
||||
sink = UsbPacketSink(device)
|
||||
return UsbTransport(context, device, interface, source, sink)
|
||||
source = UsbPacketSource(context, device, acl_in, events_in)
|
||||
sink = UsbPacketSink(device, acl_out)
|
||||
return UsbTransport(context, device, interface, setting, source, sink)
|
||||
except usb1.USBError as error:
|
||||
logger.warning(color(f'!!! failed to open USB device: {error}', 'red'))
|
||||
context.close()
|
||||
|
||||
@@ -45,6 +45,10 @@ nav:
|
||||
- HCI Bridge: apps_and_tools/hci_bridge.md
|
||||
- Golden Gate Bridge: apps_and_tools/gg_bridge.md
|
||||
- Show: apps_and_tools/show.md
|
||||
- GATT Dump: apps_and_tools/gatt_dump.md
|
||||
- Pair: apps_and_tools/pair.md
|
||||
- Unbond: apps_and_tools/unbond.md
|
||||
- USB Probe: apps_and_tools/usb_probe.md
|
||||
- Hardware:
|
||||
- Overview: hardware/index.md
|
||||
- Platforms:
|
||||
|
||||
@@ -13,17 +13,29 @@ type of device (there's no way to tell).
|
||||
|
||||
## Usage
|
||||
|
||||
This command line tool takes no arguments.
|
||||
This command line tool may be invoked with no arguments, or with `--verbose`
|
||||
for extra details.
|
||||
When installed from PyPI, run as
|
||||
```
|
||||
$ bumble-usb-probe
|
||||
```
|
||||
|
||||
or, for extra details, with the `--verbose` argument
|
||||
```
|
||||
$ bumble-usb-probe --v
|
||||
```
|
||||
|
||||
When running from the source distribution:
|
||||
```
|
||||
$ python3 apps/usb-probe.py
|
||||
```
|
||||
|
||||
or
|
||||
|
||||
```
|
||||
$ python3 apps/usb-probe.py --verbose
|
||||
```
|
||||
|
||||
!!! example
|
||||
```
|
||||
$ python3 apps/usb_probe.py
|
||||
|
||||
@@ -5,6 +5,7 @@ The USB transport interfaces with a local Bluetooth USB dongle.
|
||||
|
||||
## Moniker
|
||||
The moniker for a USB transport is either:
|
||||
|
||||
* `usb:<index>`
|
||||
* `usb:<vendor>:<product>`
|
||||
* `usb:<vendor>:<product>/<serial-number>`
|
||||
@@ -16,6 +17,10 @@ In the `usb:<vendor>:<product>#<index>` form, matching devices are the ones with
|
||||
|
||||
`<vendor>` and `<product>` are a vendor ID and product ID in hexadecimal.
|
||||
|
||||
In addition, if the moniker ends with the symbol "!", the device will be used in "forced" mode:
|
||||
the first USB interface of the device will be used, regardless of the interface class/subclass.
|
||||
This may be useful for some devices that use a custom class/subclass but may nonetheless work as-is.
|
||||
|
||||
!!! examples
|
||||
`usb:04b4:f901`
|
||||
The USB dongle with `<vendor>` equal to `04b4` and `<product>` equal to `f901`
|
||||
@@ -29,6 +34,10 @@ In the `usb:<vendor>:<product>#<index>` form, matching devices are the ones with
|
||||
`usb:04b4:f901/#1`
|
||||
The second USB dongle with `<vendor>` equal to `04b4` and `<product>` equal to `f901`
|
||||
|
||||
`usb:0B05:17CB!`
|
||||
The BT USB dongle vendor=0B05 and product=17CB, in "forced" mode.
|
||||
|
||||
|
||||
## Alternative
|
||||
The library includes two different implementations of the USB transport, implemented using different python bindings for `libusb`.
|
||||
Using the transport prefix `pyusb:` instead of `usb:` selects the implementation based on [PyUSB](https://pypi.org/project/pyusb/), using the synchronous API of `libusb`, whereas the default implementation is based on [libusb1](https://pypi.org/project/libusb1/), using the asynchronous API of `libusb`. In order to use the alternative PyUSB-based implementation, you need to ensure that you have installed that python module, as it isn't installed by default as a dependency of Bumble.
|
||||
|
||||
@@ -20,7 +20,7 @@ import sys
|
||||
import os
|
||||
import logging
|
||||
from colors import color
|
||||
from bumble.device import Device
|
||||
from bumble.device import Device, Peer
|
||||
from bumble.transport import open_transport
|
||||
from bumble.profiles.battery_service import BatteryServiceProxy
|
||||
|
||||
|
||||
+1
-2
@@ -246,8 +246,7 @@ IO_CAP = [
|
||||
SC = [False, True]
|
||||
MITM = [False, True]
|
||||
# Key distribution is a 4-bit bitmask
|
||||
# IdKey is necessary for current SMP structure
|
||||
KEY_DIST = [i for i in range(16) if (i & SMP_ID_KEY_DISTRIBUTION_FLAG)]
|
||||
KEY_DIST = range(16)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize('io_cap, sc, mitm, key_dist',
|
||||
|
||||
Reference in New Issue
Block a user