Compare commits

..

1 Commits

Author SHA1 Message Date
Gilles Boccon-Gibod
34161e5e14 add test.release task to facilitate CI integration 2022-08-16 13:30:45 -07:00
50 changed files with 1152 additions and 5870 deletions

View File

@@ -9,7 +9,7 @@
Bluetooth Stack for Apps, Emulation, Test and Experimentation Bluetooth Stack for Apps, Emulation, Test and Experimentation
============================================================= =============================================================
<img src="docs/mkdocs/src/images/logo_framed.png" alt="Logo" width="200" height="200"/> <img src="docs/mkdocs/src/images/logo_framed.png" alt="drawing" width="200" height="200"/>
Bumble is a full-featured Bluetooth stack written entirely in Python. It supports most of the common Bluetooth Low Energy (BLE) and Bluetooth Classic (BR/EDR) protocols and profiles, including GAP, L2CAP, ATT, GATT, SMP, SDP, RFCOMM, HFP, HID and A2DP. The stack can be used with physical radios via HCI over USB, UART, or the Linux VHCI, as well as virtual radios, including the virtual Bluetooth support of the Android emulator. Bumble is a full-featured Bluetooth stack written entirely in Python. It supports most of the common Bluetooth Low Energy (BLE) and Bluetooth Classic (BR/EDR) protocols and profiles, including GAP, L2CAP, ATT, GATT, SMP, SDP, RFCOMM, HFP, HID and A2DP. The stack can be used with physical radios via HCI over USB, UART, or the Linux VHCI, as well as virtual radios, including the virtual Bluetooth support of the Android emulator.
@@ -38,20 +38,12 @@ python -m pip install ".[test,development,documentation]"
### Examples ### Examples
Refer to the [Examples Documentation](examples/README.md) for details on the included example scripts and how to run them. Refer to the [Example Documentation](examples/README.md) for details on the included example scripts and how to run them.
The complete [list of Examples](/docs/mkdocs/src/examples/index.md), and what they are designed to do is here. The complete [list of Examples](/docs/mkdocs/src/examples/index.md), and what they are designed to do is here.
There are also a set of [Apps and Tools](docs/mkdocs/src/apps_and_tools/index.md) that show the utility of Bumble. There are also a set of [Apps and Tools](docs/mkdocs/src/apps_and_tools/index.md) that show the utility of Bumble.
### Using Bumble With a USB Dongle
Bumble is easiest to use with a dedicated USB dongle.
This is because internal Bluetooth interfaces tend to be locked down by the operating system.
You can use the [usb_probe](/docs/mkdocs/src/apps_and_tools/usb_probe.md) tool (all platforms) or `lsusb` (Linux or macOS) to list the available USB devices on your system.
See the [USB Transport](/docs/mkdocs/src/transports/usb.md) page for details on how to refer to USB devices.
## License ## License
Licensed under the [Apache 2.0](LICENSE) License. Licensed under the [Apache 2.0](LICENSE) License.

View File

@@ -20,26 +20,19 @@
# Imports # Imports
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
import asyncio import asyncio
import logging from bumble.hci import HCI_Constant
import os import os
import random import os.path
import re import logging
from collections import OrderedDict
import click import click
from collections import OrderedDict
import colors import colors
from bumble.core import UUID, AdvertisingData, TimeoutError, BT_LE_TRANSPORT from bumble.core import UUID, AdvertisingData
from bumble.device import ConnectionParametersPreferences, Device, Connection, Peer from bumble.device import Device, Connection, Peer
from bumble.utils import AsyncRunner from bumble.utils import AsyncRunner
from bumble.transport import open_transport_or_link from bumble.transport import open_transport_or_link
from bumble.gatt import Characteristic from bumble.gatt import Characteristic
from bumble.hci import (
HCI_Constant,
HCI_LE_1M_PHY,
HCI_LE_2M_PHY,
HCI_LE_CODED_PHY,
)
from prompt_toolkit import Application from prompt_toolkit import Application
from prompt_toolkit.history import FileHistory from prompt_toolkit.history import FileHistory
@@ -50,7 +43,6 @@ from prompt_toolkit.styles import Style
from prompt_toolkit.filters import Condition from prompt_toolkit.filters import Condition
from prompt_toolkit.widgets import TextArea, Frame from prompt_toolkit.widgets import TextArea, Frame
from prompt_toolkit.widgets.toolbars import FormattedTextToolbar from prompt_toolkit.widgets.toolbars import FormattedTextToolbar
from prompt_toolkit.data_structures import Point
from prompt_toolkit.layout import ( from prompt_toolkit.layout import (
Layout, Layout,
HSplit, HSplit,
@@ -59,20 +51,17 @@ from prompt_toolkit.layout import (
Float, Float,
FormattedTextControl, FormattedTextControl,
FloatContainer, FloatContainer,
ConditionalContainer, ConditionalContainer
Dimension
) )
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# Constants # Constants
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
BUMBLE_USER_DIR = os.path.expanduser('~/.bumble') BUMBLE_USER_DIR = os.path.expanduser('~/.bumble')
DEFAULT_RSSI_BAR_WIDTH = 20 DEFAULT_PROMPT_HEIGHT = 20
DEFAULT_CONNECTION_TIMEOUT = 30.0 DEFAULT_RSSI_BAR_WIDTH = 20
DISPLAY_MIN_RSSI = -100 DISPLAY_MIN_RSSI = -100
DISPLAY_MAX_RSSI = -30 DISPLAY_MAX_RSSI = -30
RSSI_MONITOR_INTERVAL = 5.0 # Seconds
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# Globals # Globals
@@ -80,57 +69,16 @@ RSSI_MONITOR_INTERVAL = 5.0 # Seconds
App = None App = None
# -----------------------------------------------------------------------------
# Utils
# -----------------------------------------------------------------------------
def le_phy_name(phy_id):
return {
HCI_LE_1M_PHY: '1M',
HCI_LE_2M_PHY: '2M',
HCI_LE_CODED_PHY: 'CODED'
}.get(phy_id, HCI_Constant.le_phy_name(phy_id))
def rssi_bar(rssi):
blocks = ['', '', '', '', '', '', '', '']
bar_width = (rssi - DISPLAY_MIN_RSSI) / (DISPLAY_MAX_RSSI - DISPLAY_MIN_RSSI)
bar_width = min(max(bar_width, 0), 1)
bar_ticks = int(bar_width * DEFAULT_RSSI_BAR_WIDTH * 8)
bar_blocks = ('' * int(bar_ticks / 8)) + blocks[bar_ticks % 8]
return f'{rssi:4} {bar_blocks}'
def parse_phys(phys):
if phys.lower() == '*':
return None
else:
phy_list = []
elements = phys.lower().split(',')
for element in elements:
if element == '1m':
phy_list.append(HCI_LE_1M_PHY)
elif element == '2m':
phy_list.append(HCI_LE_2M_PHY)
elif element == 'coded':
phy_list.append(HCI_LE_CODED_PHY)
else:
raise ValueError('invalid PHY name')
return phy_list
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# Console App # Console App
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class ConsoleApp: class ConsoleApp:
def __init__(self): def __init__(self):
self.known_addresses = set() self.known_addresses = set()
self.known_attributes = [] self.known_attributes = []
self.device = None self.device = None
self.connected_peer = None self.connected_peer = None
self.top_tab = 'device' self.top_tab = 'scan'
self.monitor_rssi = False
self.connection_rssi = None
style = Style.from_dict({ style = Style.from_dict({
'output-field': 'bg:#000044 #ffffff', 'output-field': 'bg:#000044 #ffffff',
@@ -151,15 +99,10 @@ class ConsoleApp:
def make_completer(): def make_completer():
return NestedCompleter.from_nested_dict({ return NestedCompleter.from_nested_dict({
'scan': { 'scan': {
'on': None,
'off': None,
'clear': None
},
'advertise': {
'on': None, 'on': None,
'off': None 'off': None
}, },
'rssi': { 'advertise': {
'on': None, 'on': None,
'off': None 'off': None
}, },
@@ -167,11 +110,7 @@ class ConsoleApp:
'scan': None, 'scan': None,
'services': None, 'services': None,
'attributes': None, 'attributes': None,
'log': None, 'log': None
'device': None
},
'filter': {
'address': None,
}, },
'connect': LiveCompleter(self.known_addresses), 'connect': LiveCompleter(self.known_addresses),
'update-parameters': None, 'update-parameters': None,
@@ -181,17 +120,10 @@ class ConsoleApp:
'services': None, 'services': None,
'attributes': None 'attributes': None
}, },
'request-mtu': None,
'read': LiveCompleter(self.known_attributes), 'read': LiveCompleter(self.known_attributes),
'write': LiveCompleter(self.known_attributes), 'write': LiveCompleter(self.known_attributes),
'subscribe': LiveCompleter(self.known_attributes), 'subscribe': LiveCompleter(self.known_attributes),
'unsubscribe': LiveCompleter(self.known_attributes), 'unsubscribe': LiveCompleter(self.known_attributes),
'set-phy': {
'1m': None,
'2m': None,
'coded': None
},
'set-default-phy': None,
'quit': None, 'quit': None,
'exit': None 'exit': None
}) })
@@ -207,17 +139,14 @@ class ConsoleApp:
self.input_field.accept_handler = self.accept_input self.input_field.accept_handler = self.accept_input
self.output_height = Dimension(min=7, max=7, weight=1) self.output_height = 7
self.output_lines = [] self.output_lines = []
self.output = FormattedTextControl(get_cursor_position=lambda: Point(0, max(0, len(self.output_lines) - 1))) self.output = FormattedTextControl()
self.output_max_lines = 20
self.scan_results_text = FormattedTextControl() self.scan_results_text = FormattedTextControl()
self.services_text = FormattedTextControl() self.services_text = FormattedTextControl()
self.attributes_text = FormattedTextControl() self.attributes_text = FormattedTextControl()
self.device_text = FormattedTextControl() self.log_text = FormattedTextControl()
self.log_text = FormattedTextControl(get_cursor_position=lambda: Point(0, max(0, len(self.log_lines) - 1))) self.log_height = 20
self.log_height = Dimension(min=7, weight=4)
self.log_max_lines = 100
self.log_lines = [] self.log_lines = []
container = HSplit([ container = HSplit([
@@ -234,14 +163,11 @@ class ConsoleApp:
filter=Condition(lambda: self.top_tab == 'attributes') filter=Condition(lambda: self.top_tab == 'attributes')
), ),
ConditionalContainer( ConditionalContainer(
Frame(Window(self.log_text, height=self.log_height), title='Log'), Frame(Window(self.log_text), title='Log'),
filter=Condition(lambda: self.top_tab == 'log') filter=Condition(lambda: self.top_tab == 'log')
), ),
ConditionalContainer( Frame(Window(self.output), height=self.output_height),
Frame(Window(self.device_text), title='Device'), # HorizontalLine(),
filter=Condition(lambda: self.top_tab == 'device')
),
Frame(Window(self.output, height=self.output_height)),
FormattedTextToolbar(text=self.get_status_bar_text, style='reverse'), FormattedTextToolbar(text=self.get_status_bar_text, style='reverse'),
self.input_field self.input_field
]) ])
@@ -273,26 +199,17 @@ class ConsoleApp:
) )
async def run_async(self, device_config, transport): async def run_async(self, device_config, transport):
rssi_monitoring_task = asyncio.create_task(self.rssi_monitor_loop())
async with await open_transport_or_link(transport) as (hci_source, hci_sink): async with await open_transport_or_link(transport) as (hci_source, hci_sink):
if device_config: if device_config:
self.device = Device.from_config_file_with_hci(device_config, hci_source, hci_sink) self.device = Device.from_config_file_with_hci(device_config, hci_source, hci_sink)
else: else:
random_address = f"{random.randint(192,255):02X}" # address is static random self.device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink)
for c in random.sample(range(255), 5):
random_address += f":{c:02X}"
self.append_to_log(f"Setting random address: {random_address}")
self.device = Device.with_hci('Bumble', random_address, hci_source, hci_sink)
self.device.listener = DeviceListener(self) self.device.listener = DeviceListener(self)
await self.device.power_on() await self.device.power_on()
self.show_device(self.device)
# Run the UI # Run the UI
await self.ui.run_async() await self.ui.run_async()
rssi_monitoring_task.cancel()
def add_known_address(self, address): def add_known_address(self, address):
self.known_addresses.add(address) self.known_addresses.add(address)
@@ -307,33 +224,22 @@ class ConsoleApp:
connection_state = 'NONE' connection_state = 'NONE'
encryption_state = '' encryption_state = ''
att_mtu = ''
rssi = '' if self.connection_rssi is None else rssi_bar(self.connection_rssi)
if self.device: if self.device:
if self.device.is_le_connecting: if self.device.is_connecting:
connection_state = 'CONNECTING' connection_state = 'CONNECTING'
elif self.connected_peer: elif self.connected_peer:
connection = self.connected_peer.connection connection = self.connected_peer.connection
connection_parameters = f'{connection.parameters.connection_interval}/{connection.parameters.peripheral_latency}/{connection.parameters.supervision_timeout}' connection_parameters = f'{connection.parameters.connection_interval}/{connection.parameters.connection_latency}/{connection.parameters.supervision_timeout}'
if connection.transport == BT_LE_TRANSPORT: connection_state = f'{connection.peer_address} {connection_parameters} {connection.data_length}'
phy_state = f' RX={le_phy_name(connection.phy.rx_phy)}/TX={le_phy_name(connection.phy.tx_phy)}'
else:
phy_state = ''
connection_state = f'{connection.peer_address} {connection_parameters} {connection.data_length}{phy_state}'
encryption_state = 'ENCRYPTED' if connection.is_encrypted else 'NOT ENCRYPTED' encryption_state = 'ENCRYPTED' if connection.is_encrypted else 'NOT ENCRYPTED'
att_mtu = f'ATT_MTU: {connection.att_mtu}'
return [ return [
('ansigreen', f' SCAN: {scanning} '), ('ansigreen', f' SCAN: {scanning} '),
('', ' '), ('', ' '),
('ansiblue', f' CONNECTION: {connection_state} '), ('ansiblue', f' CONNECTION: {connection_state} '),
('', ' '), ('', ' '),
('ansimagenta', f' {encryption_state} '), ('ansimagenta', f' {encryption_state} ')
('', ' '),
('ansicyan', f' {att_mtu} '),
('', ' '),
('ansiyellow', f' {rssi} ')
] ]
def show_error(self, title, details = None): def show_error(self, title, details = None):
@@ -368,7 +274,7 @@ class ConsoleApp:
self.services_text.text = lines self.services_text.text = lines
self.ui.invalidate() self.ui.invalidate()
def show_attributes(self, attributes): async def show_attributes(self, attributes):
lines = [] lines = []
for attribute in attributes: for attribute in attributes:
@@ -377,48 +283,10 @@ class ConsoleApp:
self.attributes_text.text = lines self.attributes_text.text = lines
self.ui.invalidate() self.ui.invalidate()
def show_device(self, device):
lines = []
lines.append(('ansicyan', 'Name: '))
lines.append(('', f'{device.name}\n'))
lines.append(('ansicyan', 'Public Address: '))
lines.append(('', f'{device.public_address}\n'))
lines.append(('ansicyan', 'Random Address: '))
lines.append(('', f'{device.random_address}\n'))
lines.append(('ansicyan', 'LE Enabled: '))
lines.append(('', f'{device.le_enabled}\n'))
lines.append(('ansicyan', 'Classic Enabled: '))
lines.append(('', f'{device.classic_enabled}\n'))
lines.append(('ansicyan', 'Classic SC Enabled: '))
lines.append(('', f'{device.classic_sc_enabled}\n'))
lines.append(('ansicyan', 'Classic SSP Enabled: '))
lines.append(('', f'{device.classic_ssp_enabled}\n'))
lines.append(('ansicyan', 'Classic Class: '))
lines.append(('', f'{device.class_of_device}\n'))
lines.append(('ansicyan', 'Discoverable: '))
lines.append(('', f'{device.discoverable}\n'))
lines.append(('ansicyan', 'Connectable: '))
lines.append(('', f'{device.connectable}\n'))
lines.append(('ansicyan', 'Advertising Data: '))
lines.append(('', f'{device.advertising_data}\n'))
lines.append(('ansicyan', 'Scan Response Data: '))
lines.append(('', f'{device.scan_response_data}\n'))
advertising_interval = (
device.advertising_interval_min
if device.advertising_interval_min == device.advertising_interval_max
else f"{device.advertising_interval_min} to {device.advertising_interval_max}"
)
lines.append(('ansicyan', 'Advertising Interval: '))
lines.append(('', f'{advertising_interval}\n'))
self.device_text.text = lines
self.ui.invalidate()
def append_to_output(self, line, invalidate=True): def append_to_output(self, line, invalidate=True):
if type(line) is str: if type(line) is str:
line = [('', line)] line = [('', line)]
self.output_lines = self.output_lines[-self.output_max_lines:] self.output_lines = self.output_lines[-(self.output_height - 3):]
self.output_lines.append(line) self.output_lines.append(line)
formatted_text = [] formatted_text = []
for line in self.output_lines: for line in self.output_lines:
@@ -430,7 +298,7 @@ class ConsoleApp:
def append_to_log(self, lines, invalidate=True): def append_to_log(self, lines, invalidate=True):
self.log_lines.extend(lines.split('\n')) self.log_lines.extend(lines.split('\n'))
self.log_lines = self.log_lines[-self.log_max_lines:] self.log_lines = self.log_lines[-(self.log_height - 3):]
self.log_text.text = ANSI('\n'.join(self.log_lines)) self.log_text.text = ANSI('\n'.join(self.log_lines))
if invalidate: if invalidate:
self.ui.invalidate() self.ui.invalidate()
@@ -463,7 +331,7 @@ class ConsoleApp:
attributes = await self.connected_peer.discover_attributes() attributes = await self.connected_peer.discover_attributes()
self.append_to_output(f'discovered {len(attributes)} attributes...') self.append_to_output(f'discovered {len(attributes)} attributes...')
self.show_attributes(attributes) await self.show_attributes(attributes)
def find_characteristic(self, param): def find_characteristic(self, param):
parts = param.split('.') parts = param.split('.')
@@ -483,12 +351,6 @@ class ConsoleApp:
if characteristic.handle == attribute_handle: if characteristic.handle == attribute_handle:
return characteristic return characteristic
async def rssi_monitor_loop(self):
while True:
if self.monitor_rssi and self.connected_peer:
self.connection_rssi = await self.connected_peer.connection.get_rssi()
await asyncio.sleep(RSSI_MONITOR_INTERVAL)
async def command(self, command): async def command(self, command):
try: try:
(keyword, *params) = command.strip().split(' ') (keyword, *params) = command.strip().split(' ')
@@ -510,96 +372,46 @@ class ConsoleApp:
else: else:
await self.device.start_scanning() await self.device.start_scanning()
elif params[0] == 'on': elif params[0] == 'on':
if len(params) == 2:
if not params[1].startswith("filter="):
self.show_error('invalid syntax', 'expected address filter=key1:value1,key2:value,... available filters: address')
# regex: (word):(any char except ,)
matches = re.findall(r"(\w+):([^,]+)", params[1])
for match in matches:
if match[0] == "address":
self.device.listener.address_filter = match[1]
await self.device.start_scanning() await self.device.start_scanning()
self.top_tab = 'scan' self.top_tab = 'scan'
elif params[0] == 'off': elif params[0] == 'off':
await self.device.stop_scanning() await self.device.stop_scanning()
elif params[0] == 'clear':
self.device.listener.scan_results.clear()
self.known_addresses.clear()
self.show_scan_results(self.device.listener.scan_results)
else: else:
self.show_error('unsupported arguments for scan command') self.show_error('unsupported arguments for scan command')
async def do_rssi(self, params):
if len(params) == 0:
# Toggle monitoring
self.monitor_rssi = not self.monitor_rssi
elif params[0] == 'on':
self.monitor_rssi = True
elif params[0] == 'off':
self.monitor_rssi = False
else:
self.show_error('unsupported arguments for rssi command')
async def do_connect(self, params): async def do_connect(self, params):
if len(params) != 1 and len(params) != 2: if len(params) != 1:
self.show_error('invalid syntax', 'expected connect <address> [phys]') self.show_error('invalid syntax', 'expected connect <address>')
return return
if len(params) == 1:
phys = None
else:
phys = parse_phys(params[1])
if phys is None:
connection_parameters_preferences = None
else:
connection_parameters_preferences = {
phy: ConnectionParametersPreferences()
for phy in phys
}
if self.device.is_scanning:
await self.device.stop_scanning()
self.append_to_output('connecting...') self.append_to_output('connecting...')
await self.device.connect(params[0])
try: self.top_tab = 'services'
await self.device.connect(
params[0],
connection_parameters_preferences=connection_parameters_preferences,
timeout=DEFAULT_CONNECTION_TIMEOUT
)
self.top_tab = 'services'
except TimeoutError:
self.show_error('connection timed out')
async def do_disconnect(self, params): async def do_disconnect(self, params):
if self.device.is_le_connecting: if not self.connected_peer:
await self.device.cancel_connection() self.show_error('not connected')
else: return
if not self.connected_peer:
self.show_error('not connected')
return
await self.connected_peer.connection.disconnect() await self.connected_peer.connection.disconnect()
async def do_update_parameters(self, params): async def do_update_parameters(self, params):
if len(params) != 1 or len(params[0].split('/')) != 3: if len(params) != 1 or len(params[0].split('/')) != 3:
self.show_error('invalid syntax', 'expected update-parameters <interval-min>-<interval-max>/<max-latency>/<supervision>') self.show_error('invalid syntax', 'expected update-parameters <interval-min>-<interval-max>/<latency>/<supervision>')
return return
if not self.connected_peer: if not self.connected_peer:
self.show_error('not connected') self.show_error('not connected')
return return
connection_intervals, max_latency, supervision_timeout = params[0].split('/') connection_intervals, connection_latency, supervision_timeout = params[0].split('/')
connection_interval_min, connection_interval_max = [int(x) for x in connection_intervals.split('-')] connection_interval_min, connection_interval_max = [int(x) for x in connection_intervals.split('-')]
max_latency = int(max_latency) connection_latency = int(connection_latency)
supervision_timeout = int(supervision_timeout) supervision_timeout = int(supervision_timeout)
await self.connected_peer.connection.update_parameters( await self.connected_peer.connection.update_parameters(
connection_interval_min, connection_interval_min,
connection_interval_max, connection_interval_max,
max_latency, connection_latency,
supervision_timeout supervision_timeout
) )
@@ -626,29 +438,10 @@ class ConsoleApp:
async def do_show(self, params): async def do_show(self, params):
if params: if params:
if params[0] in {'scan', 'services', 'attributes', 'log', 'device'}: if params[0] in {'scan', 'services', 'attributes', 'log'}:
self.top_tab = params[0] self.top_tab = params[0]
self.ui.invalidate() self.ui.invalidate()
async def do_get_phy(self, params):
if not self.connected_peer:
self.show_error('not connected')
return
phy = await self.connected_peer.connection.get_phy()
self.append_to_output(f'PHY: RX={HCI_Constant.le_phy_name(phy[0])}, TX={HCI_Constant.le_phy_name(phy[1])}')
async def do_request_mtu(self, params):
if len(params) != 1:
self.show_error('invalid syntax', 'expected request-mtu <mtu>')
return
if not self.connected_peer:
self.show_error('not connected')
return
await self.connected_peer.request_mtu(int(params[0]))
async def do_discover(self, params): async def do_discover(self, params):
if not params: if not params:
self.show_error('invalid syntax', 'expected discover services|attributes') self.show_error('invalid syntax', 'expected discover services|attributes')
@@ -661,14 +454,14 @@ class ConsoleApp:
await self.discover_attributes() await self.discover_attributes()
async def do_read(self, params): async def do_read(self, params):
if len(params) != 1:
self.show_error('invalid syntax', 'expected read <attribute>')
return
if not self.connected_peer: if not self.connected_peer:
self.show_error('not connected') self.show_error('not connected')
return return
if len(params) != 1:
self.show_error('invalid syntax', 'expected read <attribute>')
return
characteristic = self.find_characteristic(params[0]) characteristic = self.find_characteristic(params[0])
if characteristic is None: if characteristic is None:
self.show_error('no such characteristic') self.show_error('no such characteristic')
@@ -737,54 +530,12 @@ class ConsoleApp:
await characteristic.unsubscribe() await characteristic.unsubscribe()
async def do_set_phy(self, params):
if len(params) != 1:
self.show_error('invalid syntax', 'expected set-phy <tx_rx_phys>|<tx_phys>/<rx_phys>')
return
if not self.connected_peer:
self.show_error('not connected')
return
if '/' in params[0]:
tx_phys, rx_phys = params[0].split('/')
else:
tx_phys = params[0]
rx_phys = tx_phys
await self.connected_peer.connection.set_phy(
tx_phys=parse_phys(tx_phys),
rx_phys=parse_phys(rx_phys)
)
async def do_set_default_phy(self, params):
if len(params) != 1:
self.show_error('invalid syntax', 'expected set-default-phy <tx_rx_phys>|<tx_phys>/<rx_phys>')
return
if '/' in params[0]:
tx_phys, rx_phys = params[0].split('/')
else:
tx_phys = params[0]
rx_phys = tx_phys
await self.device.set_default_phy(
tx_phys=parse_phys(tx_phys),
rx_phys=parse_phys(rx_phys)
)
async def do_exit(self, params): async def do_exit(self, params):
self.ui.exit() self.ui.exit()
async def do_quit(self, params): async def do_quit(self, params):
self.ui.exit() self.ui.exit()
async def do_filter(self, params):
if params[0] == "address":
if len(params) != 2:
self.show_error('invalid syntax', 'expected filter address <pattern>')
return
self.device.listener.address_filter = params[1]
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# Device and Connection Listener # Device and Connection Listener
@@ -793,38 +544,16 @@ class DeviceListener(Device.Listener, Connection.Listener):
def __init__(self, app): def __init__(self, app):
self.app = app self.app = app
self.scan_results = OrderedDict() self.scan_results = OrderedDict()
self.address_filter = None
@property
def address_filter(self):
return self._address_filter
@address_filter.setter
def address_filter(self, filter_addr):
if filter_addr is None:
self._address_filter = re.compile(r".*")
else:
self._address_filter = re.compile(filter_addr)
self.scan_results = OrderedDict(filter(lambda x: self.filter_address_match(x), self.scan_results))
self.app.show_scan_results(self.scan_results)
def filter_address_match(self, address):
"""
Returns true if an address matches the filter
"""
return bool(self.address_filter.match(address))
@AsyncRunner.run_in_task() @AsyncRunner.run_in_task()
async def on_connection(self, connection): async def on_connection(self, connection):
self.app.connected_peer = Peer(connection) self.app.connected_peer = Peer(connection)
self.app.connection_rssi = None
self.app.append_to_output(f'connected to {self.app.connected_peer}') self.app.append_to_output(f'connected to {self.app.connected_peer}')
connection.listener = self connection.listener = self
def on_disconnection(self, reason): def on_disconnection(self, reason):
self.app.append_to_output(f'disconnected from {self.app.connected_peer}, reason: {HCI_Constant.error_name(reason)}') self.app.append_to_output(f'disconnected from {self.app.connected_peer}, reason: {HCI_Constant.error_name(reason)}')
self.app.connected_peer = None self.app.connected_peer = None
self.app.connection_rssi = None
def on_connection_parameters_update(self): def on_connection_parameters_update(self):
self.app.append_to_output(f'connection parameters update: {self.app.connected_peer.connection.parameters}') self.app.append_to_output(f'connection parameters update: {self.app.connected_peer.connection.parameters}')
@@ -841,19 +570,16 @@ class DeviceListener(Device.Listener, Connection.Listener):
def on_connection_data_length_change(self): def on_connection_data_length_change(self):
self.app.append_to_output(f'connection data length change: {self.app.connected_peer.connection.data_length}') self.app.append_to_output(f'connection data length change: {self.app.connected_peer.connection.data_length}')
def on_advertisement(self, advertisement): def on_advertisement(self, address, ad_data, rssi, connectable):
if not self.filter_address_match(str(advertisement.address)): entry_key = f'{address}/{address.address_type}'
return
entry_key = f'{advertisement.address}/{advertisement.address.address_type}'
entry = self.scan_results.get(entry_key) entry = self.scan_results.get(entry_key)
if entry: if entry:
entry.ad_data = advertisement.data entry.ad_data = ad_data
entry.rssi = advertisement.rssi entry.rssi = rssi
entry.connectable = advertisement.is_connectable entry.connectable = connectable
else: else:
self.app.add_known_address(str(advertisement.address)) self.app.add_known_address(str(address))
self.scan_results[entry_key] = ScanResult(advertisement.address, advertisement.address.address_type, advertisement.data, advertisement.rssi, advertisement.is_connectable) self.scan_results[entry_key] = ScanResult(address, address.address_type, ad_data, rssi, connectable)
self.app.show_scan_results(self.scan_results) self.app.show_scan_results(self.scan_results)
@@ -877,9 +603,9 @@ class ScanResult:
else: else:
type_color = colors.cyan type_color = colors.cyan
name = self.ad_data.get(AdvertisingData.COMPLETE_LOCAL_NAME, raw=True) name = self.ad_data.get(AdvertisingData.COMPLETE_LOCAL_NAME)
if name is None: if name is None:
name = self.ad_data.get(AdvertisingData.SHORTENED_LOCAL_NAME, raw=True) name = self.ad_data.get(AdvertisingData.SHORTENED_LOCAL_NAME)
if name: if name:
# Convert to string # Convert to string
try: try:
@@ -890,7 +616,12 @@ class ScanResult:
name = '' name = ''
# RSSI bar # RSSI bar
bar_string = rssi_bar(self.rssi) blocks = ['', '', '', '', '', '', '', '']
bar_width = (self.rssi - DISPLAY_MIN_RSSI) / (DISPLAY_MAX_RSSI - DISPLAY_MIN_RSSI)
bar_width = min(max(bar_width, 0), 1)
bar_ticks = int(bar_width * DEFAULT_RSSI_BAR_WIDTH * 8)
bar_blocks = ('' * int(bar_ticks / 8)) + blocks[bar_ticks % 8]
bar_string = f'{self.rssi} {bar_blocks}'
bar_padding = ' ' * (DEFAULT_RSSI_BAR_WIDTH + 5 - len(bar_string)) bar_padding = ' ' * (DEFAULT_RSSI_BAR_WIDTH + 5 - len(bar_string))
return f'{address_color(str(self.address))} [{type_color(address_type_string)}] {bar_string} {bar_padding} {name}' return f'{address_color(str(self.address))} [{type_color(address_type_string)}] {bar_string} {bar_padding} {name}'
@@ -902,7 +633,6 @@ class LogHandler(logging.Handler):
def __init__(self, app): def __init__(self, app):
super().__init__() super().__init__()
self.app = app self.app = app
self.setFormatter(logging.Formatter('[%(asctime)s][%(levelname)s] %(message)s'))
def emit(self, record): def emit(self, record):
message = self.format(record) message = self.format(record)
@@ -927,7 +657,6 @@ def main(device_config, transport):
# logging.basicConfig(level = 'FATAL') # logging.basicConfig(level = 'FATAL')
# logging.basicConfig(level = 'DEBUG') # logging.basicConfig(level = 'DEBUG')
root_logger = logging.getLogger() root_logger = logging.getLogger()
root_logger.addHandler(LogHandler(app)) root_logger.addHandler(LogHandler(app))
root_logger.setLevel(logging.DEBUG) root_logger.setLevel(logging.DEBUG)

View File

@@ -25,21 +25,15 @@ from bumble.company_ids import COMPANY_IDENTIFIERS
from bumble.core import name_or_number from bumble.core import name_or_number
from bumble.hci import ( from bumble.hci import (
map_null_terminated_utf8_string, map_null_terminated_utf8_string,
HCI_SUCCESS,
HCI_LE_SUPPORTED_FEATURES_NAMES, HCI_LE_SUPPORTED_FEATURES_NAMES,
HCI_SUCCESS,
HCI_VERSION_NAMES, HCI_VERSION_NAMES,
LMP_VERSION_NAMES, LMP_VERSION_NAMES,
HCI_Command, HCI_Command,
HCI_READ_BD_ADDR_COMMAND,
HCI_Read_BD_ADDR_Command, HCI_Read_BD_ADDR_Command,
HCI_READ_LOCAL_NAME_COMMAND, HCI_READ_BD_ADDR_COMMAND,
HCI_Read_Local_Name_Command, HCI_Read_Local_Name_Command,
HCI_LE_READ_MAXIMUM_DATA_LENGTH_COMMAND, HCI_READ_LOCAL_NAME_COMMAND
HCI_LE_Read_Maximum_Data_Length_Command,
HCI_LE_READ_NUMBER_OF_SUPPORTED_ADVERTISING_SETS_COMMAND,
HCI_LE_Read_Number_Of_Supported_Advertising_Sets_Command,
HCI_LE_READ_MAXIMUM_ADVERTISING_DATA_LENGTH_COMMAND,
HCI_LE_Read_Maximum_Advertising_Data_Length_Command
) )
from bumble.host import Host from bumble.host import Host
from bumble.transport import open_transport_or_link from bumble.transport import open_transport_or_link
@@ -63,39 +57,6 @@ async def get_classic_info(host):
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
async def get_le_info(host): async def get_le_info(host):
print() print()
if host.supports_command(HCI_LE_READ_NUMBER_OF_SUPPORTED_ADVERTISING_SETS_COMMAND):
response = await host.send_command(HCI_LE_Read_Number_Of_Supported_Advertising_Sets_Command())
if response.return_parameters.status == HCI_SUCCESS:
print(
color('LE Number Of Supported Advertising Sets:', 'yellow'),
response.return_parameters.num_supported_advertising_sets,
'\n'
)
if host.supports_command(HCI_LE_READ_MAXIMUM_ADVERTISING_DATA_LENGTH_COMMAND):
response = await host.send_command(HCI_LE_Read_Maximum_Advertising_Data_Length_Command())
if response.return_parameters.status == HCI_SUCCESS:
print(
color('LE Maximum Advertising Data Length:', 'yellow'),
response.return_parameters.max_advertising_data_length,
'\n'
)
if host.supports_command(HCI_LE_READ_MAXIMUM_DATA_LENGTH_COMMAND):
response = await host.send_command(HCI_LE_Read_Maximum_Data_Length_Command())
if response.return_parameters.status == HCI_SUCCESS:
print(
color('Maximum Data Length:', 'yellow'),
(
f'tx:{response.return_parameters.supported_max_tx_octets}/'
f'{response.return_parameters.supported_max_tx_time}, '
f'rx:{response.return_parameters.supported_max_rx_octets}/'
f'{response.return_parameters.supported_max_rx_time}'
),
'\n'
)
print(color('LE Features:', 'yellow')) print(color('LE Features:', 'yellow'))
for feature in host.supported_le_features: for feature in host.supported_le_features:
print(' ', name_or_number(HCI_LE_SUPPORTED_FEATURES_NAMES, feature)) print(' ', name_or_number(HCI_LE_SUPPORTED_FEATURES_NAMES, feature))

View File

@@ -17,14 +17,13 @@
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
import asyncio import asyncio
import os import os
import struct
import logging import logging
import click import click
from colors import color from colors import color
from bumble.device import Device, Peer from bumble.device import Device, Peer
from bumble.core import AdvertisingData from bumble.core import AdvertisingData
from bumble.gatt import Service, Characteristic, CharacteristicValue from bumble.gatt import Service, Characteristic
from bumble.utils import AsyncRunner from bumble.utils import AsyncRunner
from bumble.transport import open_transport_or_link from bumble.transport import open_transport_or_link
from bumble.hci import HCI_Constant from bumble.hci import HCI_Constant
@@ -42,59 +41,13 @@ GG_PREFERRED_MTU = 256
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class GattlinkL2capEndpoint: class GattlinkHubBridge(Device.Listener):
def __init__(self): def __init__(self):
self.l2cap_channel = None self.peer = None
self.l2cap_packet = b'' self.rx_socket = None
self.l2cap_packet_size = 0 self.tx_socket = None
self.rx_characteristic = None
# Called when an L2CAP SDU has been received self.tx_characteristic = None
def on_coc_sdu(self, sdu):
print(color(f'<<< [L2CAP SDU]: {len(sdu)} bytes', 'cyan'))
while len(sdu):
if self.l2cap_packet_size == 0:
# Expect a new packet
self.l2cap_packet_size = sdu[0] + 1
sdu = sdu[1:]
else:
bytes_needed = self.l2cap_packet_size - len(self.l2cap_packet)
chunk = min(bytes_needed, len(sdu))
self.l2cap_packet += sdu[:chunk]
sdu = sdu[chunk:]
if len(self.l2cap_packet) == self.l2cap_packet_size:
self.on_l2cap_packet(self.l2cap_packet)
self.l2cap_packet = b''
self.l2cap_packet_size = 0
# -----------------------------------------------------------------------------
class GattlinkHubBridge(GattlinkL2capEndpoint, Device.Listener):
def __init__(self, device, peer_address):
super().__init__()
self.device = device
self.peer_address = peer_address
self.peer = None
self.tx_socket = None
self.rx_characteristic = None
self.tx_characteristic = None
self.l2cap_psm_characteristic = None
device.listener = self
async def start(self):
# Connect to the peer
print(f'=== Connecting to {self.peer_address}...')
await self.device.connect(self.peer_address)
async def connect_l2cap(self, psm):
print(color(f'### Connecting with L2CAP on PSM = {psm}', 'yellow'))
try:
self.l2cap_channel = await self.peer.connection.open_l2cap_channel(psm)
print(color('*** Connected', 'yellow'), self.l2cap_channel)
self.l2cap_channel.sink = self.on_coc_sdu
except Exception as error:
print(color(f'!!! Connection failed: {error}', 'red'))
@AsyncRunner.run_in_task() @AsyncRunner.run_in_task()
async def on_connection(self, connection): async def on_connection(self, connection):
@@ -127,24 +80,15 @@ class GattlinkHubBridge(GattlinkL2capEndpoint, Device.Listener):
self.rx_characteristic = characteristic self.rx_characteristic = characteristic
elif characteristic.uuid == GG_GATTLINK_TX_CHARACTERISTIC_UUID: elif characteristic.uuid == GG_GATTLINK_TX_CHARACTERISTIC_UUID:
self.tx_characteristic = characteristic self.tx_characteristic = characteristic
elif characteristic.uuid == GG_GATTLINK_L2CAP_CHANNEL_PSM_CHARACTERISTIC_UUID:
self.l2cap_psm_characteristic = characteristic
print('RX:', self.rx_characteristic) print('RX:', self.rx_characteristic)
print('TX:', self.tx_characteristic) print('TX:', self.tx_characteristic)
print('PSM:', self.l2cap_psm_characteristic)
if self.l2cap_psm_characteristic: # Subscribe to TX
# Subscribe to and then read the PSM value if self.tx_characteristic:
await self.peer.subscribe(self.l2cap_psm_characteristic, self.on_l2cap_psm_received)
psm_bytes = await self.peer.read_value(self.l2cap_psm_characteristic)
psm = struct.unpack('<H', psm_bytes)[0]
await self.connect_l2cap(psm)
elif self.tx_characteristic:
# Subscribe to TX
await self.peer.subscribe(self.tx_characteristic, self.on_tx_received) await self.peer.subscribe(self.tx_characteristic, self.on_tx_received)
print(color('=== Subscribed to Gattlink TX', 'yellow')) print(color('=== Subscribed to Gattlink TX', 'yellow'))
else: else:
print(color('!!! No Gattlink TX or PSM found', 'red')) print(color('!!! Gattlink TX not found', 'red'))
def on_connection_failure(self, error): def on_connection_failure(self, error):
print(color(f'!!! Connection failed: {error}')) print(color(f'!!! Connection failed: {error}'))
@@ -155,154 +99,53 @@ class GattlinkHubBridge(GattlinkL2capEndpoint, Device.Listener):
self.rx_characteristic = None self.rx_characteristic = None
self.peer = None self.peer = None
# Called when an L2CAP packet has been received
def on_l2cap_packet(self, packet):
print(color(f'<<< [L2CAP PACKET]: {len(packet)} bytes', 'cyan'))
print(color('>>> [UDP]', 'magenta'))
self.tx_socket.sendto(packet)
# Called by the GATT client when a notification is received # Called by the GATT client when a notification is received
def on_tx_received(self, value): def on_tx_received(self, value):
print(color(f'<<< [GATT TX]: {len(value)} bytes', 'cyan')) print(color('>>> TX:', 'magenta'), value.hex())
if self.tx_socket: if self.tx_socket:
print(color('>>> [UDP]', 'magenta'))
self.tx_socket.sendto(value) self.tx_socket.sendto(value)
# Called by asyncio when the UDP socket is created
def on_l2cap_psm_received(self, value):
psm = struct.unpack('<H', value)[0]
asyncio.create_task(self.connect_l2cap(psm))
# Called by asyncio when the UDP socket is created # Called by asyncio when the UDP socket is created
def connection_made(self, transport): def connection_made(self, transport):
pass pass
# Called by asyncio when a UDP datagram is received # Called by asyncio when a UDP datagram is received
def datagram_received(self, data, address): def datagram_received(self, data, address):
print(color(f'<<< [UDP]: {len(data)} bytes', 'green')) print(color('<<< RX:', 'magenta'), data.hex())
if self.l2cap_channel: # TODO: use a queue instead of creating a task everytime
print(color('>>> [L2CAP]', 'yellow')) if self.peer and self.rx_characteristic:
self.l2cap_channel.write(bytes([len(data) - 1]) + data)
elif self.peer and self.rx_characteristic:
print(color('>>> [GATT RX]', 'yellow'))
asyncio.create_task(self.peer.write_value(self.rx_characteristic, data)) asyncio.create_task(self.peer.write_value(self.rx_characteristic, data))
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class GattlinkNodeBridge(GattlinkL2capEndpoint, Device.Listener): class GattlinkNodeBridge(Device.Listener):
def __init__(self, device): def __init__(self):
super().__init__() self.peer = None
self.device = device self.rx_socket = None
self.peer = None self.tx_socket = None
self.tx_socket = None
self.tx_subscriber = None
self.rx_characteristic = None
# Register as a listener
device.listener = self
# Listen for incoming L2CAP CoC connections
psm = 0xFB
device.register_l2cap_channel_server(0xFB, self.on_coc)
print(f'### Listening for CoC connection on PSM {psm}')
# Setup the Gattlink service
self.rx_characteristic = Characteristic(
GG_GATTLINK_RX_CHARACTERISTIC_UUID,
Characteristic.WRITE_WITHOUT_RESPONSE,
Characteristic.WRITEABLE,
CharacteristicValue(write=self.on_rx_write)
)
self.tx_characteristic = Characteristic(
GG_GATTLINK_TX_CHARACTERISTIC_UUID,
Characteristic.NOTIFY,
Characteristic.READABLE
)
self.tx_characteristic.on('subscription', self.on_tx_subscription)
self.psm_characteristic = Characteristic(
GG_GATTLINK_L2CAP_CHANNEL_PSM_CHARACTERISTIC_UUID,
Characteristic.READ | Characteristic.NOTIFY,
Characteristic.READABLE,
bytes([psm, 0])
)
gattlink_service = Service(
GG_GATTLINK_SERVICE_UUID,
[
self.rx_characteristic,
self.tx_characteristic,
self.psm_characteristic
]
)
device.add_services([gattlink_service])
device.advertising_data = bytes(
AdvertisingData([
(AdvertisingData.COMPLETE_LOCAL_NAME, bytes('Bumble GG', 'utf-8')),
(AdvertisingData.INCOMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS,
bytes(reversed(bytes.fromhex('ABBAFF00E56A484CB8328B17CF6CBFE8'))))
])
)
async def start(self):
await self.device.start_advertising()
# Called by asyncio when the UDP socket is created # Called by asyncio when the UDP socket is created
def connection_made(self, transport): def connection_made(self, transport):
self.transport = transport pass
# Called by asyncio when a UDP datagram is received # Called by asyncio when a UDP datagram is received
def datagram_received(self, data, address): def datagram_received(self, data, address):
print(color(f'<<< [UDP]: {len(data)} bytes', 'green')) print(color('<<< RX:', 'magenta'), data.hex())
if self.l2cap_channel: # TODO: use a queue instead of creating a task everytime
print(color('>>> [L2CAP]', 'yellow')) if self.peer and self.rx_characteristic:
self.l2cap_channel.write(bytes([len(data) - 1]) + data) asyncio.create_task(self.peer.write_value(self.rx_characteristic, data))
elif self.tx_subscriber:
print(color('>>> [GATT TX]', 'yellow'))
self.tx_characteristic.value = data
asyncio.create_task(self.device.notify_subscribers(self.tx_characteristic))
# Called when a write to the RX characteristic has been received
def on_rx_write(self, connection, data):
print(color(f'<<< [GATT RX]: {len(data)} bytes', 'cyan'))
print(color('>>> [UDP]', 'magenta'))
self.tx_socket.sendto(data)
# Called when the subscription to the TX characteristic has changed
def on_tx_subscription(self, peer, enabled):
print(f'### [GATT TX] subscription from {peer}: {"enabled" if enabled else "disabled"}')
if enabled:
self.tx_subscriber = peer
else:
self.tx_subscriber = None
# Called when an L2CAP packet is received
def on_l2cap_packet(self, packet):
print(color(f'<<< [L2CAP PACKET]: {len(packet)} bytes', 'cyan'))
print(color('>>> [UDP]', 'magenta'))
self.tx_socket.sendto(packet)
# Called when a new connection is established
def on_coc(self, channel):
print('*** CoC Connection', channel)
self.l2cap_channel = channel
channel.sink = self.on_coc_sdu
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
async def run(hci_transport, device_address, role_or_peer_address, send_host, send_port, receive_host, receive_port): async def run(hci_transport, device_address, send_host, send_port, receive_host, receive_port):
print('<<< connecting to HCI...') print('<<< connecting to HCI...')
async with await open_transport_or_link(hci_transport) as (hci_source, hci_sink): async with await open_transport_or_link(hci_transport) as (hci_source, hci_sink):
print('<<< connected') print('<<< connected')
# Instantiate a bridge object # Instantiate a bridge object
device = Device.with_hci('Bumble GG', device_address, hci_source, hci_sink) bridge = GattlinkNodeBridge()
# Instantiate a bridge object
if role_or_peer_address == 'node':
bridge = GattlinkNodeBridge(device)
else:
bridge = GattlinkHubBridge(device, role_or_peer_address)
# Create a UDP to RX bridge (receive from UDP, send to RX) # Create a UDP to RX bridge (receive from UDP, send to RX)
loop = asyncio.get_running_loop() loop = asyncio.get_running_loop()
@@ -317,8 +160,35 @@ async def run(hci_transport, device_address, role_or_peer_address, send_host, se
remote_addr=(send_host, send_port) remote_addr=(send_host, send_port)
) )
# Create a device to manage the host, with a custom listener
device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink)
device.listener = bridge
await device.power_on() await device.power_on()
await bridge.start()
# Connect to the peer
# print(f'=== Connecting to {device_address}...')
# await device.connect(device_address)
# TODO move to class
gattlink_service = Service(
GG_GATTLINK_SERVICE_UUID,
[
Characteristic(
GG_GATTLINK_L2CAP_CHANNEL_PSM_CHARACTERISTIC_UUID,
Characteristic.READ,
Characteristic.READABLE,
bytes([193, 0])
)
]
)
device.add_services([gattlink_service])
device.advertising_data = bytes(
AdvertisingData([
(AdvertisingData.COMPLETE_LOCAL_NAME, bytes('Bumble GG', 'utf-8')),
(AdvertisingData.INCOMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS, bytes(reversed(bytes.fromhex('ABBAFF00E56A484CB8328B17CF6CBFE8'))))
])
)
await device.start_advertising()
# Wait until the source terminates # Wait until the source terminates
await hci_source.wait_for_termination() await hci_source.wait_for_termination()
@@ -327,16 +197,15 @@ async def run(hci_transport, device_address, role_or_peer_address, send_host, se
@click.command() @click.command()
@click.argument('hci_transport') @click.argument('hci_transport')
@click.argument('device_address') @click.argument('device_address')
@click.argument('role_or_peer_address')
@click.option('-sh', '--send-host', type=str, default='127.0.0.1', help='UDP host to send to') @click.option('-sh', '--send-host', type=str, default='127.0.0.1', help='UDP host to send to')
@click.option('-sp', '--send-port', type=int, default=9001, help='UDP port to send to') @click.option('-sp', '--send-port', type=int, default=9001, help='UDP port to send to')
@click.option('-rh', '--receive-host', type=str, default='127.0.0.1', help='UDP host to receive on') @click.option('-rh', '--receive-host', type=str, default='127.0.0.1', help='UDP host to receive on')
@click.option('-rp', '--receive-port', type=int, default=9000, help='UDP port to receive on') @click.option('-rp', '--receive-port', type=int, default=9000, help='UDP port to receive on')
def main(hci_transport, device_address, role_or_peer_address, send_host, send_port, receive_host, receive_port): def main(hci_transport, device_address, send_host, send_port, receive_host, receive_port):
asyncio.run(run(hci_transport, device_address, role_or_peer_address, send_host, send_port, receive_host, receive_port)) logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
asyncio.run(run(hci_transport, device_address, send_host, send_port, receive_host, receive_port))
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'WARNING').upper())
if __name__ == '__main__': if __name__ == '__main__':
main() main()

View File

@@ -1,331 +0,0 @@
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import click
import logging
import os
from colors import color
from bumble.transport import open_transport_or_link
from bumble.device import Device
from bumble.utils import FlowControlAsyncPipe
from bumble.hci import HCI_Constant
# -----------------------------------------------------------------------------
class ServerBridge:
"""
L2CAP CoC server bridge: waits for a peer to connect an L2CAP CoC channel
on a specified PSM. When the connection is made, the bridge connects a TCP
socket to a remote host and bridges the data in both directions, with flow
control.
When the L2CAP CoC channel is closed, the bridge disconnects the TCP socket
and waits for a new L2CAP CoC channel to be connected.
When the TCP connection is closed by the TCP server, XXXX
"""
def __init__(
self,
psm,
max_credits,
mtu,
mps,
tcp_host,
tcp_port
):
self.psm = psm
self.max_credits = max_credits
self.mtu = mtu
self.mps = mps
self.tcp_host = tcp_host
self.tcp_port = tcp_port
async def start(self, device):
# Listen for incoming L2CAP CoC connections
device.register_l2cap_channel_server(
psm = self.psm,
server = self.on_coc,
max_credits = self.max_credits,
mtu = self.mtu,
mps = self.mps
)
print(color(f'### Listening for CoC connection on PSM {self.psm}', 'yellow'))
def on_ble_connection(connection):
def on_ble_disconnection(reason):
print(color('@@@ Bluetooth disconnection:', 'red'), HCI_Constant.error_name(reason))
print(color('@@@ Bluetooth connection:', 'green'), connection)
connection.on('disconnection', on_ble_disconnection)
device.on('connection', on_ble_connection)
await device.start_advertising(auto_restart=True)
# Called when a new L2CAP connection is established
def on_coc(self, l2cap_channel):
print(color('*** L2CAP channel:', 'cyan'), l2cap_channel)
class Pipe:
def __init__(self, bridge, l2cap_channel):
self.bridge = bridge
self.tcp_transport = None
self.l2cap_channel = l2cap_channel
l2cap_channel.on('close', self.on_l2cap_close)
l2cap_channel.sink = self.on_coc_sdu
async def connect_to_tcp(self):
# Connect to the TCP server
print(color(f'### Connecting to TCP {self.bridge.tcp_host}:{self.bridge.tcp_port}...', 'yellow'))
class TcpClientProtocol(asyncio.Protocol):
def __init__(self, pipe):
self.pipe = pipe
def connection_lost(self, error):
print(color(f'!!! TCP connection lost: {error}', 'red'))
if self.pipe.l2cap_channel is not None:
asyncio.create_task(self.pipe.l2cap_channel.disconnect())
def data_received(self, data):
print(f'<<< Received on TCP: {len(data)}')
self.pipe.l2cap_channel.write(data)
try:
self.tcp_transport, _ = await asyncio.get_running_loop().create_connection(
lambda: TcpClientProtocol(self),
host=self.bridge.tcp_host,
port=self.bridge.tcp_port,
)
print(color('### Connected', 'green'))
except Exception as error:
print(color(f'!!! Connection failed: {error}', 'red'))
await self.l2cap_channel.disconnect()
def on_l2cap_close(self):
self.l2cap_channel = None
if self.tcp_transport is not None:
self.tcp_transport.close()
def on_coc_sdu(self, sdu):
print(color(f'<<< [L2CAP SDU]: {len(sdu)} bytes', 'cyan'))
if self.tcp_transport is None:
print(color('!!! TCP socket not open, dropping', 'red'))
return
self.tcp_transport.write(sdu)
pipe = Pipe(self, l2cap_channel)
asyncio.create_task(pipe.connect_to_tcp())
# -----------------------------------------------------------------------------
class ClientBridge:
"""
L2CAP CoC client bridge: connects to a BLE device, then waits for an inbound
TCP connection on a specified port number. When a TCP client connects, an
L2CAP CoC channel connection to the BLE device is established, and the data
is bridged in both directions, with flow control.
When the TCP connection is closed by the client, the L2CAP CoC channel is
disconnected, but the connection to the BLE device remains, ready for a new
TCP client to connect.
When the L2CAP CoC channel is closed, XXXX
"""
READ_CHUNK_SIZE = 4096
def __init__(
self,
psm,
max_credits,
mtu,
mps,
address,
tcp_host,
tcp_port
):
self.psm = psm
self.max_credits = max_credits
self.mtu = mtu
self.mps = mps
self.address = address
self.tcp_host = tcp_host
self.tcp_port = tcp_port
async def start(self, device):
print(color(f'### Connecting to {self.address}...', 'yellow'))
connection = await device.connect(self.address)
print(color('### Connected', 'green'))
# Called when the BLE connection is disconnected
def on_ble_disconnection(reason):
print(color('@@@ Bluetooth disconnection:', 'red'), HCI_Constant.error_name(reason))
connection.on('disconnection', on_ble_disconnection)
# Called when a TCP connection is established
async def on_tcp_connection(reader, writer):
peername = writer.get_extra_info('peername')
print(color(f'<<< TCP connection from {peername}', 'magenta'))
def on_coc_sdu(sdu):
print(color(f'<<< [L2CAP SDU]: {len(sdu)} bytes', 'cyan'))
l2cap_to_tcp_pipe.write(sdu)
def on_l2cap_close():
print(color('*** L2CAP channel closed', 'red'))
l2cap_to_tcp_pipe.stop()
writer.close()
# Connect a new L2CAP channel
print(color(f'>>> Opening L2CAP channel on PSM = {self.psm}', 'yellow'))
try:
l2cap_channel = await connection.open_l2cap_channel(
psm = self.psm,
max_credits = self.max_credits,
mtu = self.mtu,
mps = self.mps
)
print(color('*** L2CAP channel:', 'cyan'), l2cap_channel)
except Exception as error:
print(color(f'!!! Connection failed: {error}', 'red'))
writer.close()
return
l2cap_channel.sink = on_coc_sdu
l2cap_channel.on('close', on_l2cap_close)
# Start a flow control pipe from L2CAP to TCP
l2cap_to_tcp_pipe = FlowControlAsyncPipe(
l2cap_channel.pause_reading,
l2cap_channel.resume_reading,
writer.write,
writer.drain
)
l2cap_to_tcp_pipe.start()
# Pipe data from TCP to L2CAP
while True:
try:
data = await reader.read(self.READ_CHUNK_SIZE)
if len(data) == 0:
print(color('!!! End of stream', 'red'))
await l2cap_channel.disconnect()
return
print(color(f'<<< [TCP DATA]: {len(data)} bytes', 'blue'))
l2cap_channel.write(data)
await l2cap_channel.drain()
except Exception as error:
print(f'!!! Exception: {error}')
break
writer.close()
print(color('~~~ Bye bye', 'magenta'))
await asyncio.start_server(
on_tcp_connection,
host=self.tcp_host if self.tcp_host != '_' else None,
port=self.tcp_port
)
print(color(f'### Listening for TCP connections on port {self.tcp_port}', 'magenta'))
# -----------------------------------------------------------------------------
async def run(device_config, hci_transport, bridge):
print('<<< connecting to HCI...')
async with await open_transport_or_link(hci_transport) as (hci_source, hci_sink):
print('<<< connected')
device = Device.from_config_file_with_hci(device_config, hci_source, hci_sink)
# Let's go
await device.power_on()
await bridge.start(device)
# Wait until the transport terminates
await hci_source.wait_for_termination()
# -----------------------------------------------------------------------------
@click.group()
@click.pass_context
@click.option('--device-config', help='Device configuration file', required=True)
@click.option('--hci-transport', help='HCI transport', required=True)
@click.option('--psm', help='PSM for L2CAP CoC', type=int, default=1234)
@click.option('--l2cap-coc-max-credits', help='Maximum L2CAP CoC Credits', type=click.IntRange(1, 65535), default=128)
@click.option('--l2cap-coc-mtu', help='L2CAP CoC MTU', type=click.IntRange(23, 65535), default=1022)
@click.option('--l2cap-coc-mps', help='L2CAP CoC MPS', type=click.IntRange(23, 65533), default=1024)
def cli(context, device_config, hci_transport, psm, l2cap_coc_max_credits, l2cap_coc_mtu, l2cap_coc_mps):
context.ensure_object(dict)
context.obj['device_config'] = device_config
context.obj['hci_transport'] = hci_transport
context.obj['psm'] = psm
context.obj['max_credits'] = l2cap_coc_max_credits
context.obj['mtu'] = l2cap_coc_mtu
context.obj['mps'] = l2cap_coc_mps
# -----------------------------------------------------------------------------
@cli.command()
@click.pass_context
@click.option('--tcp-host', help='TCP host', default='localhost')
@click.option('--tcp-port', help='TCP port', default=9544)
def server(context, tcp_host, tcp_port):
bridge = ServerBridge(
context.obj['psm'],
context.obj['max_credits'],
context.obj['mtu'],
context.obj['mps'],
tcp_host,
tcp_port)
asyncio.run(run(
context.obj['device_config'],
context.obj['hci_transport'],
bridge
))
# -----------------------------------------------------------------------------
@cli.command()
@click.pass_context
@click.argument('bluetooth-address')
@click.option('--tcp-host', help='TCP host', default='_')
@click.option('--tcp-port', help='TCP port', default=9543)
def client(context, bluetooth_address, tcp_host, tcp_port):
bridge = ClientBridge(
context.obj['psm'],
context.obj['max_credits'],
context.obj['mtu'],
context.obj['mps'],
bluetooth_address,
tcp_host,
tcp_port
)
asyncio.run(run(
context.obj['device_config'],
context.obj['hci_transport'],
bridge
))
# -----------------------------------------------------------------------------
logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'WARNING').upper())
if __name__ == '__main__':
cli(obj={})

View File

@@ -25,8 +25,8 @@ from bumble.device import Device
from bumble.transport import open_transport_or_link from bumble.transport import open_transport_or_link
from bumble.keys import JsonKeyStore from bumble.keys import JsonKeyStore
from bumble.smp import AddressResolver from bumble.smp import AddressResolver
from bumble.device import Advertisement from bumble.hci import HCI_LE_Advertising_Report_Event
from bumble.hci import HCI_Constant, HCI_LE_1M_PHY, HCI_LE_CODED_PHY from bumble.core import AdvertisingData
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -48,19 +48,16 @@ class AdvertisementPrinter:
self.min_rssi = min_rssi self.min_rssi = min_rssi
self.resolver = resolver self.resolver = resolver
def print_advertisement(self, advertisement): def print_advertisement(self, address, address_color, ad_data, rssi):
address = advertisement.address if self.min_rssi is not None and rssi < self.min_rssi:
address_color = 'yellow' if advertisement.is_connectable else 'red'
if self.min_rssi is not None and advertisement.rssi < self.min_rssi:
return return
address_qualifier = '' address_qualifier = ''
resolution_qualifier = '' resolution_qualifier = ''
if self.resolver and advertisement.address.is_resolvable: if self.resolver and address.is_resolvable:
resolved = self.resolver.resolve(advertisement.address) resolved = self.resolver.resolve(address)
if resolved is not None: if resolved is not None:
resolution_qualifier = f'(resolved from {advertisement.address})' resolution_qualifier = f'(resolved from {address})'
address = resolved address = resolved
address_type_string = ('PUBLIC', 'RANDOM', 'PUBLIC_ID', 'RANDOM_ID')[address.address_type] address_type_string = ('PUBLIC', 'RANDOM', 'PUBLIC_ID', 'RANDOM_ID')[address.address_type]
@@ -77,30 +74,18 @@ class AdvertisementPrinter:
type_color = 'blue' type_color = 'blue'
address_qualifier = '(non-resolvable)' address_qualifier = '(non-resolvable)'
rssi_bar = make_rssi_bar(rssi)
separator = '\n ' separator = '\n '
rssi_bar = make_rssi_bar(advertisement.rssi) print(f'>>> {color(address, address_color)} [{color(address_type_string, type_color)}]{address_qualifier}{resolution_qualifier}:{separator}RSSI:{rssi:4} {rssi_bar}{separator}{ad_data.to_string(separator)}\n')
if not advertisement.is_legacy:
phy_info = (
f'PHY: {HCI_Constant.le_phy_name(advertisement.primary_phy)}/'
f'{HCI_Constant.le_phy_name(advertisement.secondary_phy)} '
f'{separator}'
)
else:
phy_info = ''
print( def on_advertisement(self, address, ad_data, rssi, connectable):
f'>>> {color(address, address_color)} ' address_color = 'yellow' if connectable else 'red'
f'[{color(address_type_string, type_color)}]{address_qualifier}{resolution_qualifier}:{separator}' self.print_advertisement(address, address_color, ad_data, rssi)
f'{phy_info}'
f'RSSI:{advertisement.rssi:4} {rssi_bar}{separator}'
f'{advertisement.data.to_string(separator)}\n')
def on_advertisement(self, advertisement): def on_advertising_report(self, address, ad_data, rssi, event_type):
self.print_advertisement(advertisement) print(f'{color("EVENT", "green")}: {HCI_LE_Advertising_Report_Event.event_type_name(event_type)}')
ad_data = AdvertisingData.from_bytes(ad_data)
def on_advertising_report(self, report): self.print_advertisement(address, 'yellow', ad_data, rssi)
print(f'{color("EVENT", "green")}: {report.event_type_string()}')
self.print_advertisement(Advertisement.from_advertising_report(report))
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -109,7 +94,6 @@ async def scan(
passive, passive,
scan_interval, scan_interval,
scan_window, scan_window,
phy,
filter_duplicates, filter_duplicates,
raw, raw,
keystore_file, keystore_file,
@@ -142,18 +126,11 @@ async def scan(
device.on('advertisement', printer.on_advertisement) device.on('advertisement', printer.on_advertisement)
await device.power_on() await device.power_on()
if phy is None:
scanning_phys = [HCI_LE_1M_PHY, HCI_LE_CODED_PHY]
else:
scanning_phys = [{'1m': HCI_LE_1M_PHY, 'coded': HCI_LE_CODED_PHY}[phy]]
await device.start_scanning( await device.start_scanning(
active=(not passive), active=(not passive),
scan_interval=scan_interval, scan_interval=scan_interval,
scan_window=scan_window, scan_window=scan_window,
filter_duplicates=filter_duplicates, filter_duplicates=filter_duplicates
scanning_phys=scanning_phys
) )
await hci_source.wait_for_termination() await hci_source.wait_for_termination()
@@ -165,15 +142,14 @@ async def scan(
@click.option('--passive', is_flag=True, default=False, help='Perform passive scanning') @click.option('--passive', is_flag=True, default=False, help='Perform passive scanning')
@click.option('--scan-interval', type=int, default=60, help='Scan interval') @click.option('--scan-interval', type=int, default=60, help='Scan interval')
@click.option('--scan-window', type=int, default=60, help='Scan window') @click.option('--scan-window', type=int, default=60, help='Scan window')
@click.option('--phy', type=click.Choice(['1m', 'coded']), help='Only scan on the specified PHY')
@click.option('--filter-duplicates', type=bool, default=True, help='Filter duplicates at the controller level') @click.option('--filter-duplicates', type=bool, default=True, help='Filter duplicates at the controller level')
@click.option('--raw', is_flag=True, default=False, help='Listen for raw advertising reports instead of processed ones') @click.option('--raw', is_flag=True, default=False, help='Listen for raw advertising reports instead of processed ones')
@click.option('--keystore-file', help='Keystore file to use when resolving addresses') @click.option('--keystore-file', help='Keystore file to use when resolving addresses')
@click.option('--device-config', help='Device config file for the scanning device') @click.option('--device-config', help='Device config file for the scanning device')
@click.argument('transport') @click.argument('transport')
def main(min_rssi, passive, scan_interval, scan_window, phy, filter_duplicates, raw, keystore_file, device_config, transport): def main(min_rssi, passive, scan_interval, scan_window, filter_duplicates, raw, keystore_file, device_config, transport):
logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'WARNING').upper()) logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'WARNING').upper())
asyncio.run(scan(min_rssi, passive, scan_interval, scan_window, phy, filter_duplicates, raw, keystore_file, device_config, transport)) asyncio.run(scan(min_rssi, passive, scan_interval, scan_window, filter_duplicates, raw, keystore_file, device_config, transport))
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------

View File

@@ -90,7 +90,7 @@ class SnoopPacketReader:
@click.command() @click.command()
@click.option('--format', type=click.Choice(['h4', 'snoop']), default='h4', help='Format of the input file') @click.option('--format', type=click.Choice(['h4', 'snoop']), default='h4', help='Format of the input file')
@click.argument('filename') @click.argument('filename')
def main(format, filename): def show(format, filename):
input = open(filename, 'rb') input = open(filename, 'rb')
if format == 'h4': if format == 'h4':
packet_reader = PacketReader(input) packet_reader = PacketReader(input)
@@ -117,4 +117,4 @@ def main(format, filename):
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
if __name__ == '__main__': if __name__ == '__main__':
main() show()

View File

@@ -1,239 +0,0 @@
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# This tool lists all the USB devices, with details about each device.
# For each device, the different possible Bumble transport strings that can
# refer to it are listed. If the device is known to be a Bluetooth HCI device,
# its identifier is printed in reverse colors, and the transport names in cyan color.
# For other devices, regardless of their type, the transport names are printed
# in red. Whether that device is actually a Bluetooth device or not depends on
# whether it is a Bluetooth device that uses a non-standard Class, or some other
# type of device (there's no way to tell).
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import os
import logging
import sys
import click
import usb1
from colors import color
# -----------------------------------------------------------------------------
# Constants
# -----------------------------------------------------------------------------
USB_DEVICE_CLASS_DEVICE = 0x00
USB_DEVICE_CLASS_WIRELESS_CONTROLLER = 0xE0
USB_DEVICE_SUBCLASS_RF_CONTROLLER = 0x01
USB_DEVICE_PROTOCOL_BLUETOOTH_PRIMARY_CONTROLLER = 0x01
USB_DEVICE_CLASSES = {
0x00: 'Device',
0x01: 'Audio',
0x02: 'Communications and CDC Control',
0x03: 'Human Interface Device',
0x05: 'Physical',
0x06: 'Still Imaging',
0x07: 'Printer',
0x08: 'Mass Storage',
0x09: 'Hub',
0x0A: 'CDC Data',
0x0B: 'Smart Card',
0x0D: 'Content Security',
0x0E: 'Video',
0x0F: 'Personal Healthcare',
0x10: 'Audio/Video',
0x11: 'Billboard',
0x12: 'USB Type-C Bridge',
0x3C: 'I3C',
0xDC: 'Diagnostic',
USB_DEVICE_CLASS_WIRELESS_CONTROLLER: (
'Wireless Controller',
{
0x01: {
0x01: 'Bluetooth',
0x02: 'UWB',
0x03: 'Remote NDIS',
0x04: 'Bluetooth AMP'
}
}
),
0xEF: 'Miscellaneous',
0xFE: 'Application Specific',
0xFF: 'Vendor Specific'
}
USB_ENDPOINT_IN = 0x80
USB_ENDPOINT_TYPES = ['CONTROL', 'ISOCHRONOUS', 'BULK', 'INTERRUPT']
USB_BT_HCI_CLASS_TUPLE = (
USB_DEVICE_CLASS_WIRELESS_CONTROLLER,
USB_DEVICE_SUBCLASS_RF_CONTROLLER,
USB_DEVICE_PROTOCOL_BLUETOOTH_PRIMARY_CONTROLLER
)
# -----------------------------------------------------------------------------
def show_device_details(device):
for configuration in device:
print(f' Configuration {configuration.getConfigurationValue()}')
for interface in configuration:
for setting in interface:
alternateSetting = setting.getAlternateSetting()
suffix = f'/{alternateSetting}' if interface.getNumSettings() > 1 else ''
(class_string, subclass_string) = get_class_info(
setting.getClass(),
setting.getSubClass(),
setting.getProtocol()
)
details = f'({class_string}, {subclass_string})'
print(f' Interface: {setting.getNumber()}{suffix} {details}')
for endpoint in setting:
endpoint_type = USB_ENDPOINT_TYPES[endpoint.getAttributes() & 3]
endpoint_direction = 'OUT' if (endpoint.getAddress() & USB_ENDPOINT_IN == 0) else 'IN'
print(f' Endpoint 0x{endpoint.getAddress():02X}: {endpoint_type} {endpoint_direction}')
# -----------------------------------------------------------------------------
def get_class_info(cls, subclass, protocol):
class_info = USB_DEVICE_CLASSES.get(cls)
protocol_string = ''
if class_info is None:
class_string = f'0x{cls:02X}'
else:
if type(class_info) is tuple:
class_string = class_info[0]
subclass_info = class_info[1].get(subclass)
if subclass_info:
protocol_string = subclass_info.get(protocol)
if protocol_string is not None:
protocol_string = f' [{protocol_string}]'
else:
class_string = class_info
subclass_string = f'{subclass}/{protocol}{protocol_string}'
return (class_string, subclass_string)
# -----------------------------------------------------------------------------
def is_bluetooth_hci(device):
# Check if the device class indicates a match
if (device.getDeviceClass(), device.getDeviceSubClass(), device.getDeviceProtocol()) == USB_BT_HCI_CLASS_TUPLE:
return True
# If the device class is 'Device', look for a matching interface
if device.getDeviceClass() == USB_DEVICE_CLASS_DEVICE:
for configuration in device:
for interface in configuration:
for setting in interface:
if (setting.getClass(), setting.getSubClass(), setting.getProtocol()) == USB_BT_HCI_CLASS_TUPLE:
return True
return False
# -----------------------------------------------------------------------------
@click.command()
@click.option('--verbose', is_flag=True, default=False, help='Print more details')
def main(verbose):
logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'WARNING').upper())
with usb1.USBContext() as context:
bluetooth_device_count = 0
devices = {}
for device in context.getDeviceIterator(skip_on_error=True):
device_class = device.getDeviceClass()
device_subclass = device.getDeviceSubClass()
device_protocol = device.getDeviceProtocol()
device_id = (device.getVendorID(), device.getProductID())
(device_class_string, device_subclass_string) = get_class_info(
device_class,
device_subclass,
device_protocol
)
try:
device_serial_number = device.getSerialNumber()
except usb1.USBError:
device_serial_number = None
try:
device_manufacturer = device.getManufacturer()
except usb1.USBError:
device_manufacturer = None
try:
device_product = device.getProduct()
except usb1.USBError:
device_product = None
device_is_bluetooth_hci = is_bluetooth_hci(device)
if device_is_bluetooth_hci:
bluetooth_device_count += 1
fg_color = 'black'
bg_color = 'yellow'
else:
fg_color = 'yellow'
bg_color = 'black'
# Compute the different ways this can be referenced as a Bumble transport
bumble_transport_names = []
basic_transport_name = f'usb:{device.getVendorID():04X}:{device.getProductID():04X}'
if device_is_bluetooth_hci:
bumble_transport_names.append(f'usb:{bluetooth_device_count - 1}')
if device_id not in devices:
bumble_transport_names.append(basic_transport_name)
else:
bumble_transport_names.append(f'{basic_transport_name}#{len(devices[device_id])}')
if device_serial_number is not None:
if device_id not in devices or device_serial_number not in devices[device_id]:
bumble_transport_names.append(f'{basic_transport_name}/{device_serial_number}')
# Print the results
print(color(f'ID {device.getVendorID():04X}:{device.getProductID():04X}', fg=fg_color, bg=bg_color))
if bumble_transport_names:
print(color(' Bumble Transport Names:', 'blue'), ' or '.join(color(x, 'cyan' if device_is_bluetooth_hci else 'red') for x in bumble_transport_names))
print(color(' Bus/Device: ', 'green'), f'{device.getBusNumber():03}/{device.getDeviceAddress():03}')
print(color(' Class: ', 'green'), device_class_string)
print(color(' Subclass/Protocol: ', 'green'), device_subclass_string)
if device_serial_number is not None:
print(color(' Serial: ', 'green'), device_serial_number)
if device_manufacturer is not None:
print(color(' Manufacturer: ', 'green'), device_manufacturer)
if device_product is not None:
print(color(' Product: ', 'green'), device_product)
if verbose:
show_device_details(device)
print()
devices.setdefault(device_id, []).append(device_serial_number)
# -----------------------------------------------------------------------------
if __name__ == '__main__':
main()

View File

@@ -700,26 +700,16 @@ class Attribute(EventEmitter):
else: else:
self.value = value self.value = value
def encode_value(self, value):
return value
def decode_value(self, value_bytes):
return value_bytes
def read_value(self, connection): def read_value(self, connection):
if read := getattr(self.value, 'read', None): if read := getattr(self.value, 'read', None):
try: try:
value = read(connection) return read(connection)
except ATT_Error as error: except ATT_Error as error:
raise ATT_Error(error_code=error.error_code, att_handle=self.handle) raise ATT_Error(error_code=error.error_code, att_handle=self.handle)
else: else:
value = self.value return self.value
return self.encode_value(value)
def write_value(self, connection, value_bytes):
value = self.decode_value(value_bytes)
def write_value(self, connection, value):
if write := getattr(self.value, 'write', None): if write := getattr(self.value, 'write', None):
try: try:
write(connection, value) write(connection, value)
@@ -731,11 +721,7 @@ class Attribute(EventEmitter):
self.emit('write', connection, value) self.emit('write', connection, value)
def __repr__(self): def __repr__(self):
if type(self.value) is bytes: if len(self.value) > 0:
value_str = self.value.hex()
else:
value_str = str(self.value)
if value_str:
value_string = f', value={self.value.hex()}' value_string = f', value={self.value.hex()}'
else: else:
value_string = '' value_string = ''

View File

@@ -351,7 +351,7 @@ class MediaPacketPump:
logger.debug('pump canceled') logger.debug('pump canceled')
# Pump packets # Pump packets
self.pump_task = asyncio.create_task(pump_packets()) self.pump_task = asyncio.get_running_loop().create_task(pump_packets())
async def stop(self): async def stop(self):
# Stop the pump # Stop the pump
@@ -1890,10 +1890,10 @@ class LocalSource(LocalStreamEndPoint, EventEmitter):
self.configuration = configuration self.configuration = configuration
def on_start_command(self): def on_start_command(self):
asyncio.create_task(self.start()) asyncio.get_running_loop().create_task(self.start())
def on_suspend_command(self): def on_suspend_command(self):
asyncio.create_task(self.stop()) asyncio.get_running_loop().create_task(self.stop())
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------

View File

@@ -76,7 +76,7 @@ class Controller:
self.supported_commands = bytes.fromhex('2000800000c000000000e40000002822000000000000040000f7ffff7f00000030f0f9ff01008004000000000000000000000000000000000000000000000000') self.supported_commands = bytes.fromhex('2000800000c000000000e40000002822000000000000040000f7ffff7f00000030f0f9ff01008004000000000000000000000000000000000000000000000000')
self.le_features = bytes.fromhex('ff49010000000000') self.le_features = bytes.fromhex('ff49010000000000')
self.le_states = bytes.fromhex('ffff3fffff030000') self.le_states = bytes.fromhex('ffff3fffff030000')
self.advertising_channel_tx_power = 0 self.avertising_channel_tx_power = 0
self.filter_accept_list_size = 8 self.filter_accept_list_size = 8
self.resolving_list_size = 8 self.resolving_list_size = 8
self.supported_max_tx_octets = 27 self.supported_max_tx_octets = 27
@@ -259,15 +259,15 @@ class Controller:
# Then say that the connection has completed # Then say that the connection has completed
self.send_hci_packet(HCI_LE_Connection_Complete_Event( self.send_hci_packet(HCI_LE_Connection_Complete_Event(
status = HCI_SUCCESS, status = HCI_SUCCESS,
connection_handle = connection.handle, connection_handle = connection.handle,
role = connection.role, role = connection.role,
peer_address_type = peer_address_type, peer_address_type = peer_address_type,
peer_address = peer_address, peer_address = peer_address,
connection_interval = 10, # FIXME conn_interval = 10, # FIXME
peripheral_latency = 0, # FIXME conn_latency = 0, # FIXME
supervision_timeout = 10, # FIXME supervision_timeout = 10, # FIXME
central_clock_accuracy = 7 # FIXME master_clock_accuracy = 7 # FIXME
)) ))
def on_link_central_disconnected(self, peer_address, reason): def on_link_central_disconnected(self, peer_address, reason):
@@ -313,15 +313,15 @@ class Controller:
# Say that the connection has completed # Say that the connection has completed
self.send_hci_packet(HCI_LE_Connection_Complete_Event( self.send_hci_packet(HCI_LE_Connection_Complete_Event(
status = status, status = status,
connection_handle = connection.handle if connection else 0, connection_handle = connection.handle if connection else 0,
role = BT_CENTRAL_ROLE, role = BT_CENTRAL_ROLE,
peer_address_type = le_create_connection_command.peer_address_type, peer_address_type = le_create_connection_command.peer_address_type,
peer_address = le_create_connection_command.peer_address, peer_address = le_create_connection_command.peer_address,
connection_interval = le_create_connection_command.connection_interval_min, conn_interval = le_create_connection_command.conn_interval_min,
peripheral_latency = le_create_connection_command.max_latency, conn_latency = le_create_connection_command.conn_latency,
supervision_timeout = le_create_connection_command.supervision_timeout, supervision_timeout = le_create_connection_command.supervision_timeout,
central_clock_accuracy = 0 master_clock_accuracy = 0
)) ))
def on_link_peripheral_disconnection_complete(self, disconnection_command, status): def on_link_peripheral_disconnection_complete(self, disconnection_command, status):
@@ -583,15 +583,13 @@ class Controller:
''' '''
See Bluetooth spec Vol 2, Part E - 7.4.1 Read Local Version Information Command See Bluetooth spec Vol 2, Part E - 7.4.1 Read Local Version Information Command
''' '''
return struct.pack( return struct.pack('<BBHBHH',
'<BBHBHH', HCI_SUCCESS,
HCI_SUCCESS, self.hci_version,
self.hci_version, self.hci_revision,
self.hci_revision, self.lmp_version,
self.lmp_version, self.manufacturer_name,
self.manufacturer_name, self.lmp_subversion)
self.lmp_subversion
)
def on_hci_read_local_supported_commands_command(self, command): def on_hci_read_local_supported_commands_command(self, command):
''' '''
@@ -652,7 +650,7 @@ class Controller:
''' '''
See Bluetooth spec Vol 2, Part E - 7.8.6 LE Read Advertising Channel Tx Power Command See Bluetooth spec Vol 2, Part E - 7.8.6 LE Read Advertising Channel Tx Power Command
''' '''
return bytes([HCI_SUCCESS, self.advertising_channel_tx_power]) return bytes([HCI_SUCCESS, self.avertising_channel_tx_power])
def on_hci_le_set_advertising_data_command(self, command): def on_hci_le_set_advertising_data_command(self, command):
''' '''
@@ -859,9 +857,9 @@ class Controller:
See Bluetooth spec Vol 2, Part E - 7.8.44 LE Set Address Resolution Enable Command See Bluetooth spec Vol 2, Part E - 7.8.44 LE Set Address Resolution Enable Command
''' '''
ret = HCI_SUCCESS ret = HCI_SUCCESS
if command.address_resolution_enable == 1: if command.address_resolution == 1:
self.le_address_resolution = True self.le_address_resolution = True
elif command.address_resolution_enable == 0: elif command.address_resolution == 0:
self.le_address_resolution = False self.le_address_resolution = False
else: else:
ret = HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR ret = HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR
@@ -878,26 +876,12 @@ class Controller:
''' '''
See Bluetooth spec Vol 2, Part E - 7.8.46 LE Read Maximum Data Length Command See Bluetooth spec Vol 2, Part E - 7.8.46 LE Read Maximum Data Length Command
''' '''
return struct.pack( return struct.pack('<BHHHH',
'<BHHHH', HCI_SUCCESS,
HCI_SUCCESS, self.supported_max_tx_octets,
self.supported_max_tx_octets, self.supported_max_tx_time,
self.supported_max_tx_time, self.supported_max_rx_octets,
self.supported_max_rx_octets, self.supported_max_rx_time)
self.supported_max_rx_time
)
def on_hci_le_read_phy_command(self, command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.47 LE Read PHY command
'''
return struct.pack(
'<BHBB',
HCI_SUCCESS,
command.connection_handle,
HCI_LE_1M_PHY,
HCI_LE_1M_PHY
)
def on_hci_le_set_default_phy_command(self, command): def on_hci_le_set_default_phy_command(self, command):
''' '''
@@ -909,4 +893,3 @@ class Controller:
'rx_phys': command.rx_phys 'rx_phys': command.rx_phys
} }
return bytes([HCI_SUCCESS]) return bytes([HCI_SUCCESS])

View File

@@ -91,10 +91,6 @@ class TimeoutError(Exception):
""" Timeout Error """ """ Timeout Error """
class CommandTimeoutError(Exception):
""" Command Timeout Error """
class InvalidStateError(Exception): class InvalidStateError(Exception):
""" Invalid State Error """ """ Invalid State Error """
@@ -104,11 +100,6 @@ class ConnectionError(BaseError):
FAILURE = 0x01 FAILURE = 0x01
CONNECTION_REFUSED = 0x02 CONNECTION_REFUSED = 0x02
def __init__(self, error_code, transport, peer_address, error_namespace='', error_name='', details=''):
super().__init__(error_code, error_namespace, error_name, details)
self.transport = transport
self.peer_address = peer_address
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# UUID # UUID
@@ -769,20 +760,17 @@ class AdvertisingData:
def ad_data_to_object(ad_type, ad_data): def ad_data_to_object(ad_type, ad_data):
if ad_type in { if ad_type in {
AdvertisingData.COMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS, AdvertisingData.COMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS, AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS
AdvertisingData.LIST_OF_16_BIT_SERVICE_SOLICITATION_UUIDS
}: }:
return AdvertisingData.uuid_list_to_objects(ad_data, 2) return AdvertisingData.uuid_list_to_objects(ad_data, 2)
elif ad_type in { elif ad_type in {
AdvertisingData.COMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS, AdvertisingData.COMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.INCOMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS, AdvertisingData.INCOMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS
AdvertisingData.LIST_OF_32_BIT_SERVICE_SOLICITATION_UUIDS
}: }:
return AdvertisingData.uuid_list_to_objects(ad_data, 4) return AdvertisingData.uuid_list_to_objects(ad_data, 4)
elif ad_type in { elif ad_type in {
AdvertisingData.COMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS, AdvertisingData.COMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.INCOMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS, AdvertisingData.INCOMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS
AdvertisingData.LIST_OF_128_BIT_SERVICE_SOLICITATION_UUIDS
}: }:
return AdvertisingData.uuid_list_to_objects(ad_data, 16) return AdvertisingData.uuid_list_to_objects(ad_data, 16)
elif ad_type == AdvertisingData.SERVICE_DATA_16_BIT_UUID: elif ad_type == AdvertisingData.SERVICE_DATA_16_BIT_UUID:
@@ -793,24 +781,11 @@ class AdvertisingData:
return (UUID.from_bytes(ad_data[:16]), ad_data[16:]) return (UUID.from_bytes(ad_data[:16]), ad_data[16:])
elif ad_type in { elif ad_type in {
AdvertisingData.SHORTENED_LOCAL_NAME, AdvertisingData.SHORTENED_LOCAL_NAME,
AdvertisingData.COMPLETE_LOCAL_NAME, AdvertisingData.COMPLETE_LOCAL_NAME
AdvertisingData.URI
}: }:
return ad_data.decode("utf-8") return ad_data.decode("utf-8")
elif ad_type in { elif ad_type == AdvertisingData.TX_POWER_LEVEL:
AdvertisingData.TX_POWER_LEVEL,
AdvertisingData.FLAGS
}:
return ad_data[0] return ad_data[0]
elif ad_type in {
AdvertisingData.APPEARANCE,
AdvertisingData.ADVERTISING_INTERVAL
}:
return struct.unpack('<H', ad_data)[0]
elif ad_type == AdvertisingData.CLASS_OF_DEVICE:
return struct.unpack('<I', bytes([*ad_data, 0]))[0]
elif ad_type == AdvertisingData.PERIPHERAL_CONNECTION_INTERVAL_RANGE:
return struct.unpack('<HH', ad_data)
elif ad_type == AdvertisingData.MANUFACTURER_SPECIFIC_DATA: elif ad_type == AdvertisingData.MANUFACTURER_SPECIFIC_DATA:
return (struct.unpack_from('<H', ad_data, 0)[0], ad_data[2:]) return (struct.unpack_from('<H', ad_data, 0)[0], ad_data[2:])
else: else:
@@ -827,7 +802,7 @@ class AdvertisingData:
self.ad_structures.append((ad_type, ad_data)) self.ad_structures.append((ad_type, ad_data))
offset += length offset += length
def get(self, type_id, return_all=False, raw=False): def get(self, type_id, return_all=False, raw=True):
''' '''
Get Advertising Data Structure(s) with a given type Get Advertising Data Structure(s) with a given type
@@ -856,17 +831,13 @@ class AdvertisingData:
# Connection Parameters # Connection Parameters
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class ConnectionParameters: class ConnectionParameters:
def __init__(self, connection_interval, peripheral_latency, supervision_timeout): def __init__(self, connection_interval, connection_latency, supervision_timeout):
self.connection_interval = connection_interval self.connection_interval = connection_interval
self.peripheral_latency = peripheral_latency self.connection_latency = connection_latency
self.supervision_timeout = supervision_timeout self.supervision_timeout = supervision_timeout
def __str__(self): def __str__(self):
return ( return f'ConnectionParameters(connection_interval={self.connection_interval}, connection_latency={self.connection_latency}, supervision_timeout={self.supervision_timeout}'
f'ConnectionParameters(connection_interval={self.connection_interval}, '
f'peripheral_latency={self.peripheral_latency}, '
f'supervision_timeout={self.supervision_timeout}'
)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------

File diff suppressed because it is too large Load Diff

View File

@@ -23,10 +23,8 @@
# Imports # Imports
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
import asyncio import asyncio
import enum
import types import types
import logging import logging
from pyee import EventEmitter
from colors import color from colors import color
from .core import * from .core import *
@@ -264,7 +262,7 @@ class Characteristic(Attribute):
def get_descriptor(self, descriptor_type): def get_descriptor(self, descriptor_type):
for descriptor in self.descriptors: for descriptor in self.descriptors:
if descriptor.type == descriptor_type: if descriptor.uuid == descriptor_type:
return descriptor return descriptor
def __str__(self): def __str__(self):
@@ -305,7 +303,6 @@ class CharacteristicAdapter:
''' '''
def __init__(self, characteristic): def __init__(self, characteristic):
self.wrapped_characteristic = characteristic self.wrapped_characteristic = characteristic
self.subscribers = {} # Map from subscriber to proxy subscriber
if ( if (
asyncio.iscoroutinefunction(characteristic.read_value) and asyncio.iscoroutinefunction(characteristic.read_value) and
@@ -320,21 +317,11 @@ class CharacteristicAdapter:
if hasattr(self.wrapped_characteristic, 'subscribe'): if hasattr(self.wrapped_characteristic, 'subscribe'):
self.subscribe = self.wrapped_subscribe self.subscribe = self.wrapped_subscribe
if hasattr(self.wrapped_characteristic, 'unsubscribe'):
self.unsubscribe = self.wrapped_unsubscribe
def __getattr__(self, name): def __getattr__(self, name):
return getattr(self.wrapped_characteristic, name) return getattr(self.wrapped_characteristic, name)
def __setattr__(self, name, value): def __setattr__(self, name, value):
if name in { if name in {'wrapped_characteristic', 'read_value', 'write_value', 'subscribe'}:
'wrapped_characteristic',
'subscribers',
'read_value',
'write_value',
'subscribe',
'unsubscribe'
}:
super().__setattr__(name, value) super().__setattr__(name, value)
else: else:
setattr(self.wrapped_characteristic, name, value) setattr(self.wrapped_characteristic, name, value)
@@ -348,11 +335,8 @@ class CharacteristicAdapter:
async def read_decoded_value(self): async def read_decoded_value(self):
return self.decode_value(await self.wrapped_characteristic.read_value()) return self.decode_value(await self.wrapped_characteristic.read_value())
async def write_decoded_value(self, value, with_response=False): async def write_decoded_value(self, value):
return await self.wrapped_characteristic.write_value( return await self.wrapped_characteristic.write_value(self.encode_value(value))
self.encode_value(value),
with_response
)
def encode_value(self, value): def encode_value(self, value):
return value return value
@@ -361,26 +345,9 @@ class CharacteristicAdapter:
return value return value
def wrapped_subscribe(self, subscriber=None): def wrapped_subscribe(self, subscriber=None):
if subscriber is not None: return self.wrapped_characteristic.subscribe(
if subscriber in self.subscribers: None if subscriber is None else lambda value: subscriber(self.decode_value(value))
# We already have a proxy subscriber )
subscriber = self.subscribers[subscriber]
else:
# Create and register a proxy that will decode the value
original_subscriber = subscriber
def on_change(value):
original_subscriber(self.decode_value(value))
self.subscribers[subscriber] = on_change
subscriber = on_change
return self.wrapped_characteristic.subscribe(subscriber)
def wrapped_unsubscribe(self, subscriber=None):
if subscriber in self.subscribers:
subscriber = self.subscribers.pop(subscriber)
return self.wrapped_characteristic.unsubscribe(subscriber)
def __str__(self): def __str__(self):
wrapped = str(self.wrapped_characteristic) wrapped = str(self.wrapped_characteristic)
@@ -475,12 +442,3 @@ class Descriptor(Attribute):
def __str__(self): def __str__(self):
return f'Descriptor(handle=0x{self.handle:04X}, type={self.type}, value={self.read_value(None).hex()})' return f'Descriptor(handle=0x{self.handle:04X}, type={self.type}, value={self.read_value(None).hex()})'
class ClientCharacteristicConfigurationBits(enum.IntFlag):
'''
See Vol 3, Part G - 3.3.3.3 - Table 3.11 Client Characteristic Configuration bit field definition
'''
DEFAULT = 0x0000
NOTIFICATION = 0x0001
INDICATION = 0x0002

View File

@@ -26,17 +26,19 @@
import asyncio import asyncio
import logging import logging
import struct import struct
from colors import color from colors import color
from .att import * from .core import ProtocolError, TimeoutError
from .core import InvalidStateError, ProtocolError, TimeoutError
from .gatt import (GATT_CHARACTERISTIC_ATTRIBUTE_TYPE,
GATT_CLIENT_CHARACTERISTIC_CONFIGURATION_DESCRIPTOR,
GATT_PRIMARY_SERVICE_ATTRIBUTE_TYPE, GATT_REQUEST_TIMEOUT,
GATT_SECONDARY_SERVICE_ATTRIBUTE_TYPE, Characteristic,
ClientCharacteristicConfigurationBits)
from .hci import * from .hci import *
from .att import *
from .gatt import (
GATT_CLIENT_CHARACTERISTIC_CONFIGURATION_DESCRIPTOR,
GATT_REQUEST_TIMEOUT,
GATT_PRIMARY_SERVICE_ATTRIBUTE_TYPE,
GATT_SECONDARY_SERVICE_ATTRIBUTE_TYPE,
GATT_CHARACTERISTIC_ATTRIBUTE_TYPE,
Characteristic
)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# Logging # Logging
@@ -56,16 +58,10 @@ class AttributeProxy(EventEmitter):
self.type = attribute_type self.type = attribute_type
async def read_value(self, no_long_read=False): async def read_value(self, no_long_read=False):
return self.decode_value(await self.client.read_value(self.handle, no_long_read)) return await self.client.read_value(self.handle, no_long_read)
async def write_value(self, value, with_response=False): async def write_value(self, value, with_response=False):
return await self.client.write_value(self.handle, self.encode_value(value), with_response) return await self.client.write_value(self.handle, value, with_response)
def encode_value(self, value):
return value
def decode_value(self, value_bytes):
return value_bytes
def __str__(self): def __str__(self):
return f'Attribute(handle=0x{self.handle:04X}, type={self.uuid})' return f'Attribute(handle=0x{self.handle:04X}, type={self.uuid})'
@@ -102,7 +98,6 @@ class CharacteristicProxy(AttributeProxy):
self.properties = properties self.properties = properties
self.descriptors = [] self.descriptors = []
self.descriptors_discovered = False self.descriptors_discovered = False
self.subscribers = {} # Map from subscriber to proxy subscriber
def get_descriptor(self, descriptor_type): def get_descriptor(self, descriptor_type):
for descriptor in self.descriptors: for descriptor in self.descriptors:
@@ -112,26 +107,10 @@ class CharacteristicProxy(AttributeProxy):
async def discover_descriptors(self): async def discover_descriptors(self):
return await self.client.discover_descriptors(self) return await self.client.discover_descriptors(self)
async def subscribe(self, subscriber=None, prefer_notify=True): async def subscribe(self, subscriber=None):
if subscriber is not None: return await self.client.subscribe(self, subscriber)
if subscriber in self.subscribers:
# We already have a proxy subscriber
subscriber = self.subscribers[subscriber]
else:
# Create and register a proxy that will decode the value
original_subscriber = subscriber
def on_change(value):
original_subscriber(self.decode_value(value))
self.subscribers[subscriber] = on_change
subscriber = on_change
return await self.client.subscribe(self, subscriber, prefer_notify)
async def unsubscribe(self, subscriber=None): async def unsubscribe(self, subscriber=None):
if subscriber in self.subscribers:
subscriber = self.subscribers.pop(subscriber)
return await self.client.unsubscribe(self, subscriber) return await self.client.unsubscribe(self, subscriber)
def __str__(self): def __str__(self):
@@ -161,6 +140,7 @@ class ProfileServiceProxy:
class Client: class Client:
def __init__(self, connection): def __init__(self, connection):
self.connection = connection self.connection = connection
self.mtu = ATT_DEFAULT_MTU
self.mtu_exchange_done = False self.mtu_exchange_done = False
self.request_semaphore = asyncio.Semaphore(1) self.request_semaphore = asyncio.Semaphore(1)
self.pending_request = None self.pending_request = None
@@ -182,8 +162,8 @@ class Client:
# Wait until we can send (only one pending command at a time for the connection) # Wait until we can send (only one pending command at a time for the connection)
response = None response = None
async with self.request_semaphore: async with self.request_semaphore:
assert self.pending_request is None assert(self.pending_request is None)
assert self.pending_response is None assert(self.pending_response is None)
# Create a future value to hold the eventual response # Create a future value to hold the eventual response
self.pending_response = asyncio.get_running_loop().create_future() self.pending_response = asyncio.get_running_loop().create_future()
@@ -214,7 +194,7 @@ class Client:
# We can only send one request per connection # We can only send one request per connection
if self.mtu_exchange_done: if self.mtu_exchange_done:
return self.connection.att_mtu return
# Send the request # Send the request
self.mtu_exchange_done = True self.mtu_exchange_done = True
@@ -227,10 +207,8 @@ class Client:
response response
) )
# Compute the final MTU self.mtu = max(ATT_DEFAULT_MTU, response.server_rx_mtu)
self.connection.att_mtu = min(mtu, response.server_rx_mtu) return self.mtu
return self.connection.att_mtu
def get_services_by_uuid(self, uuid): def get_services_by_uuid(self, uuid):
return [service for service in self.services if service.uuid == uuid] return [service for service in self.services if service.uuid == uuid]
@@ -271,7 +249,7 @@ class Client:
if response.op_code == ATT_ERROR_RESPONSE: if response.op_code == ATT_ERROR_RESPONSE:
if response.error_code != ATT_ATTRIBUTE_NOT_FOUND_ERROR: if response.error_code != ATT_ATTRIBUTE_NOT_FOUND_ERROR:
# Unexpected end # Unexpected end
logger.warning(f'!!! unexpected error while discovering services: {HCI_Constant.error_name(response.error_code)}') logger.waning(f'!!! unexpected error while discovering services: {HCI_Constant.error_name(response.error_code)}')
# TODO raise appropriate exception # TODO raise appropriate exception
return return
break break
@@ -335,7 +313,7 @@ class Client:
if response.op_code == ATT_ERROR_RESPONSE: if response.op_code == ATT_ERROR_RESPONSE:
if response.error_code != ATT_ATTRIBUTE_NOT_FOUND_ERROR: if response.error_code != ATT_ATTRIBUTE_NOT_FOUND_ERROR:
# Unexpected end # Unexpected end
logger.warning(f'!!! unexpected error while discovering services: {HCI_Constant.error_name(response.error_code)}') logger.waning(f'!!! unexpected error while discovering services: {HCI_Constant.error_name(response.error_code)}')
# TODO raise appropriate exception # TODO raise appropriate exception
return return
break break
@@ -544,7 +522,7 @@ class Client:
return attributes return attributes
async def subscribe(self, characteristic, subscriber=None, prefer_notify=True): async def subscribe(self, characteristic, subscriber=None):
# If we haven't already discovered the descriptors for this characteristic, do it now # If we haven't already discovered the descriptors for this characteristic, do it now
if not characteristic.descriptors_discovered: if not characteristic.descriptors_discovered:
await self.discover_descriptors(characteristic) await self.discover_descriptors(characteristic)
@@ -555,32 +533,23 @@ class Client:
logger.warning('subscribing to characteristic with no CCCD descriptor') logger.warning('subscribing to characteristic with no CCCD descriptor')
return return
if ( # Set the subscription bits and select the subscriber set
characteristic.properties & Characteristic.NOTIFY bits = 0
and characteristic.properties & Characteristic.INDICATE subscriber_sets = []
): if characteristic.properties & Characteristic.NOTIFY:
if prefer_notify: bits |= 0x0001
bits = ClientCharacteristicConfigurationBits.NOTIFICATION subscriber_sets.append(self.notification_subscribers.setdefault(characteristic.handle, set()))
subscribers = self.notification_subscribers if characteristic.properties & Characteristic.INDICATE:
else: bits |= 0x0002
bits = ClientCharacteristicConfigurationBits.INDICATION subscriber_sets.append(self.indication_subscribers.setdefault(characteristic.handle, set()))
subscribers = self.indication_subscribers
elif characteristic.properties & Characteristic.NOTIFY:
bits = ClientCharacteristicConfigurationBits.NOTIFICATION
subscribers = self.notification_subscribers
elif characteristic.properties & Characteristic.INDICATE:
bits = ClientCharacteristicConfigurationBits.INDICATION
subscribers = self.indication_subscribers
else:
raise InvalidStateError("characteristic is not notify or indicate")
# Add subscribers to the sets # Add subscribers to the sets
subscriber_set = subscribers.setdefault(characteristic.handle, set()) for subscriber_set in subscriber_sets:
if subscriber is not None: if subscriber is not None:
subscriber_set.add(subscriber) subscriber_set.add(subscriber)
# Add the characteristic as a subscriber, which will result in the characteristic # Add the characteristic as a subscriber, which will result in the characteristic
# emitting an 'update' event when a notification or indication is received # emitting an 'update' event when a notification or indication is received
subscriber_set.add(characteristic) subscriber_set.add(characteristic)
await self.write_value(cccd, struct.pack('<H', bits), with_response=True) await self.write_value(cccd, struct.pack('<H', bits), with_response=True)
@@ -601,18 +570,12 @@ class Client:
subscribers = subscriber_set.get(characteristic.handle, []) subscribers = subscriber_set.get(characteristic.handle, [])
if subscriber in subscribers: if subscriber in subscribers:
subscribers.remove(subscriber) subscribers.remove(subscriber)
# Cleanup if we removed the last one
if not subscribers:
subscriber_set.remove(characteristic.handle)
else: else:
# Remove all subscribers for this attribute from the sets! # Remove all subscribers for this attribute from the sets!
self.notification_subscribers.pop(characteristic.handle, None) self.notification_subscribers.pop(characteristic.handle, None)
self.indication_subscribers.pop(characteristic.handle, None) self.indication_subscribers.pop(characteristic.handle, None)
if not self.notification_subscribers and not self.indication_subscribers: await self.write_value(cccd, b'\x00\x00', with_response=True)
# No more subscribers left
await self.write_value(cccd, b'\x00\x00', with_response=True)
async def read_value(self, attribute, no_long_read=False): async def read_value(self, attribute, no_long_read=False):
''' '''
@@ -637,7 +600,7 @@ class Client:
# If the value is the max size for the MTU, try to read more unless the caller # If the value is the max size for the MTU, try to read more unless the caller
# specifically asked not to do that # specifically asked not to do that
attribute_value = response.attribute_value attribute_value = response.attribute_value
if not no_long_read and len(attribute_value) == self.connection.att_mtu - 1: if not no_long_read and len(attribute_value) == self.mtu - 1:
logger.debug('using READ BLOB to get the rest of the value') logger.debug('using READ BLOB to get the rest of the value')
offset = len(attribute_value) offset = len(attribute_value)
while True: while True:
@@ -659,7 +622,7 @@ class Client:
part = response.part_attribute_value part = response.part_attribute_value
attribute_value += part attribute_value += part
if len(part) < self.connection.att_mtu - 1: if len(part) < self.mtu - 1:
break break
offset += len(part) offset += len(part)

View File

@@ -40,12 +40,6 @@ from .gatt import *
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
# Constants
# -----------------------------------------------------------------------------
GATT_SERVER_DEFAULT_MAX_MTU = 517
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# GATT Server # GATT Server
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -55,8 +49,9 @@ class Server(EventEmitter):
self.device = device self.device = device
self.attributes = [] # Attributes, ordered by increasing handle values self.attributes = [] # Attributes, ordered by increasing handle values
self.attributes_by_handle = {} # Map for fast attribute access by handle self.attributes_by_handle = {} # Map for fast attribute access by handle
self.max_mtu = GATT_SERVER_DEFAULT_MAX_MTU # The max MTU we're willing to negotiate self.max_mtu = 23 # FIXME: 517 # The max MTU we're willing to negotiate
self.subscribers = {} # Map of subscriber states by connection handle and attribute handle self.subscribers = {} # Map of subscriber states by connection handle and attribute handle
self.mtus = {} # Map of ATT MTU values by connection handle
self.indication_semaphores = defaultdict(lambda: asyncio.Semaphore(1)) self.indication_semaphores = defaultdict(lambda: asyncio.Semaphore(1))
self.pending_confirmations = defaultdict(lambda: None) self.pending_confirmations = defaultdict(lambda: None)
@@ -155,7 +150,7 @@ class Server(EventEmitter):
return cccd or bytes([0, 0]) return cccd or bytes([0, 0])
def write_cccd(self, connection, characteristic, value): def write_cccd(self, connection, characteristic, value):
logger.debug(f'Subscription update for connection=0x{connection.handle:04X}, handle=0x{characteristic.handle:04X}: {value.hex()}') logger.debug(f'Subscription update for connection={connection.handle:04X}, handle={characteristic.handle:04X}: {value.hex()}')
# Sanity check # Sanity check
if len(value) != 2: if len(value) != 2:
@@ -174,7 +169,7 @@ class Server(EventEmitter):
logger.debug(f'GATT Response from server: [0x{connection.handle:04X}] {response}') logger.debug(f'GATT Response from server: [0x{connection.handle:04X}] {response}')
self.send_gatt_pdu(connection.handle, response.to_bytes()) self.send_gatt_pdu(connection.handle, response.to_bytes())
async def notify_subscriber(self, connection, attribute, value=None, force=False): async def notify_subscriber(self, connection, attribute, force=False):
# Check if there's a subscriber # Check if there's a subscriber
if not force: if not force:
subscribers = self.subscribers.get(connection.handle) subscribers = self.subscribers.get(connection.handle)
@@ -189,12 +184,13 @@ class Server(EventEmitter):
logger.debug(f'not notifying, cccd={cccd.hex()}') logger.debug(f'not notifying, cccd={cccd.hex()}')
return return
# Get or encode the value # Get the value
value = attribute.read_value(connection) if value is None else attribute.encode_value(value) value = attribute.read_value(connection)
# Truncate if needed # Truncate if needed
if len(value) > connection.att_mtu - 3: mtu = self.get_mtu(connection)
value = value[:connection.att_mtu - 3] if len(value) > mtu - 3:
value = value[:mtu - 3]
# Notify # Notify
notification = ATT_Handle_Value_Notification( notification = ATT_Handle_Value_Notification(
@@ -202,9 +198,27 @@ class Server(EventEmitter):
attribute_value = value attribute_value = value
) )
logger.debug(f'GATT Notify from server: [0x{connection.handle:04X}] {notification}') logger.debug(f'GATT Notify from server: [0x{connection.handle:04X}] {notification}')
self.send_gatt_pdu(connection.handle, bytes(notification)) self.send_gatt_pdu(connection.handle, notification.to_bytes())
async def indicate_subscriber(self, connection, attribute, value=None, force=False): async def notify_subscribers(self, attribute, force=False):
# Get all the connections for which there's at least one subscription
connections = [
connection for connection in [
self.device.lookup_connection(connection_handle)
for (connection_handle, subscribers) in self.subscribers.items()
if force or subscribers.get(attribute.handle)
]
if connection is not None
]
# Notify for each connection
if connections:
await asyncio.wait([
self.notify_subscriber(connection, attribute, force)
for connection in connections
])
async def indicate_subscriber(self, connection, attribute, force=False):
# Check if there's a subscriber # Check if there's a subscriber
if not force: if not force:
subscribers = self.subscribers.get(connection.handle) subscribers = self.subscribers.get(connection.handle)
@@ -219,12 +233,13 @@ class Server(EventEmitter):
logger.debug(f'not indicating, cccd={cccd.hex()}') logger.debug(f'not indicating, cccd={cccd.hex()}')
return return
# Get or encode the value # Get the value
value = attribute.read_value(connection) if value is None else attribute.encode_value(value) value = attribute.read_value(connection)
# Truncate if needed # Truncate if needed
if len(value) > connection.att_mtu - 3: mtu = self.get_mtu(connection)
value = value[:connection.att_mtu - 3] if len(value) > mtu - 3:
value = value[:mtu - 3]
# Indicate # Indicate
indication = ATT_Handle_Value_Indication( indication = ATT_Handle_Value_Indication(
@@ -249,32 +264,27 @@ class Server(EventEmitter):
finally: finally:
self.pending_confirmations[connection.handle] = None self.pending_confirmations[connection.handle] = None
async def notify_or_indicate_subscribers(self, indicate, attribute, value=None, force=False): async def indicate_subscribers(self, attribute):
# Get all the connections for which there's at least one subscription # Get all the connections for which there's at least one subscription
connections = [ connections = [
connection for connection in [ connection for connection in [
self.device.lookup_connection(connection_handle) self.device.lookup_connection(connection_handle)
for (connection_handle, subscribers) in self.subscribers.items() for (connection_handle, subscribers) in self.subscribers.items()
if force or subscribers.get(attribute.handle) if subscribers.get(attribute.handle)
] ]
if connection is not None if connection is not None
] ]
# Indicate or notify for each connection # Indicate for each connection
if connections: if connections:
coroutine = self.indicate_subscriber if indicate else self.notify_subscriber
await asyncio.wait([ await asyncio.wait([
asyncio.create_task(coroutine(connection, attribute, value, force)) self.indicate_subscriber(connection, attribute)
for connection in connections for connection in connections
]) ])
async def notify_subscribers(self, attribute, value=None, force=False):
return await self.notify_or_indicate_subscribers(False, attribute, value, force)
async def indicate_subscribers(self, attribute, value=None, force=False):
return await self.notify_or_indicate_subscribers(True, attribute, value, force)
def on_disconnection(self, connection): def on_disconnection(self, connection):
if connection.handle in self.mtus:
del self.mtus[connection.handle]
if connection.handle in self.subscribers: if connection.handle in self.subscribers:
del self.subscribers[connection.handle] del self.subscribers[connection.handle]
if connection.handle in self.indication_semaphores: if connection.handle in self.indication_semaphores:
@@ -315,6 +325,9 @@ class Server(EventEmitter):
# Just ignore # Just ignore
logger.warning(f'{color("--- Ignoring GATT Request from [0x{connection.handle:04X}]:", "red")} {att_pdu}') logger.warning(f'{color("--- Ignoring GATT Request from [0x{connection.handle:04X}]:", "red")} {att_pdu}')
def get_mtu(self, connection):
return self.mtus.get(connection.handle, ATT_DEFAULT_MTU)
####################################################### #######################################################
# ATT handlers # ATT handlers
####################################################### #######################################################
@@ -334,16 +347,12 @@ class Server(EventEmitter):
''' '''
See Bluetooth spec Vol 3, Part F - 3.4.2.1 Exchange MTU Request See Bluetooth spec Vol 3, Part F - 3.4.2.1 Exchange MTU Request
''' '''
self.send_response(connection, ATT_Exchange_MTU_Response(server_rx_mtu = self.max_mtu)) mtu = max(ATT_DEFAULT_MTU, min(self.max_mtu, request.client_rx_mtu))
self.mtus[connection.handle] = mtu
self.send_response(connection, ATT_Exchange_MTU_Response(server_rx_mtu = mtu))
# Compute the final MTU # Notify the device
if request.client_rx_mtu >= ATT_DEFAULT_MTU: self.device.on_connection_att_mtu_update(connection.handle, mtu)
mtu = min(self.max_mtu, request.client_rx_mtu)
# Notify the device
self.device.on_connection_att_mtu_update(connection.handle, mtu)
else:
logger.warning('invalid client_rx_mtu received, MTU not changed')
def on_att_find_information_request(self, connection, request): def on_att_find_information_request(self, connection, request):
''' '''
@@ -360,7 +369,7 @@ class Server(EventEmitter):
return return
# Build list of returned attributes # Build list of returned attributes
pdu_space_available = connection.att_mtu - 2 pdu_space_available = self.get_mtu(connection) - 2
attributes = [] attributes = []
uuid_size = 0 uuid_size = 0
for attribute in ( for attribute in (
@@ -411,7 +420,7 @@ class Server(EventEmitter):
''' '''
# Build list of returned attributes # Build list of returned attributes
pdu_space_available = connection.att_mtu - 2 pdu_space_available = self.get_mtu(connection) - 2
attributes = [] attributes = []
for attribute in ( for attribute in (
attribute for attribute in self.attributes if attribute for attribute in self.attributes if
@@ -459,7 +468,8 @@ class Server(EventEmitter):
See Bluetooth spec Vol 3, Part F - 3.4.4.1 Read By Type Request See Bluetooth spec Vol 3, Part F - 3.4.4.1 Read By Type Request
''' '''
pdu_space_available = connection.att_mtu - 2 mtu = self.get_mtu(connection)
pdu_space_available = mtu - 2
attributes = [] attributes = []
for attribute in ( for attribute in (
attribute for attribute in self.attributes if attribute for attribute in self.attributes if
@@ -472,7 +482,7 @@ class Server(EventEmitter):
# Check the attribute value size # Check the attribute value size
attribute_value = attribute.read_value(connection) attribute_value = attribute.read_value(connection)
max_attribute_size = min(connection.att_mtu - 4, 253) max_attribute_size = min(mtu - 4, 253)
if len(attribute_value) > max_attribute_size: if len(attribute_value) > max_attribute_size:
# We need to truncate # We need to truncate
attribute_value = attribute_value[:max_attribute_size] attribute_value = attribute_value[:max_attribute_size]
@@ -512,7 +522,7 @@ class Server(EventEmitter):
if attribute := self.get_attribute(request.attribute_handle): if attribute := self.get_attribute(request.attribute_handle):
# TODO: check permissions # TODO: check permissions
value = attribute.read_value(connection) value = attribute.read_value(connection)
value_size = min(connection.att_mtu - 1, len(value)) value_size = min(self.get_mtu(connection) - 1, len(value))
response = ATT_Read_Response( response = ATT_Read_Response(
attribute_value = value[:value_size] attribute_value = value[:value_size]
) )
@@ -531,6 +541,7 @@ class Server(EventEmitter):
if attribute := self.get_attribute(request.attribute_handle): if attribute := self.get_attribute(request.attribute_handle):
# TODO: check permissions # TODO: check permissions
mtu = self.get_mtu(connection)
value = attribute.read_value(connection) value = attribute.read_value(connection)
if request.value_offset > len(value): if request.value_offset > len(value):
response = ATT_Error_Response( response = ATT_Error_Response(
@@ -538,14 +549,14 @@ class Server(EventEmitter):
attribute_handle_in_error = request.attribute_handle, attribute_handle_in_error = request.attribute_handle,
error_code = ATT_INVALID_OFFSET_ERROR error_code = ATT_INVALID_OFFSET_ERROR
) )
elif len(value) <= connection.att_mtu - 1: elif len(value) <= mtu - 1:
response = ATT_Error_Response( response = ATT_Error_Response(
request_opcode_in_error = request.op_code, request_opcode_in_error = request.op_code,
attribute_handle_in_error = request.attribute_handle, attribute_handle_in_error = request.attribute_handle,
error_code = ATT_ATTRIBUTE_NOT_LONG_ERROR error_code = ATT_ATTRIBUTE_NOT_LONG_ERROR
) )
else: else:
part_size = min(connection.att_mtu - 1, len(value) - request.value_offset) part_size = min(mtu - 1, len(value) - request.value_offset)
response = ATT_Read_Blob_Response( response = ATT_Read_Blob_Response(
part_attribute_value = value[request.value_offset:request.value_offset + part_size] part_attribute_value = value[request.value_offset:request.value_offset + part_size]
) )
@@ -574,7 +585,8 @@ class Server(EventEmitter):
self.send_response(connection, response) self.send_response(connection, response)
return return
pdu_space_available = connection.att_mtu - 2 mtu = self.get_mtu(connection)
pdu_space_available = mtu - 2
attributes = [] attributes = []
for attribute in ( for attribute in (
attribute for attribute in self.attributes if attribute for attribute in self.attributes if
@@ -585,7 +597,7 @@ class Server(EventEmitter):
): ):
# Check the attribute value size # Check the attribute value size
attribute_value = attribute.read_value(connection) attribute_value = attribute.read_value(connection)
max_attribute_size = min(connection.att_mtu - 6, 251) max_attribute_size = min(mtu - 6, 251)
if len(attribute_value) > max_attribute_size: if len(attribute_value) > max_attribute_size:
# We need to truncate # We need to truncate
attribute_value = attribute_value[:max_attribute_size] attribute_value = attribute_value[:max_attribute_size]

File diff suppressed because it is too large Load Diff

View File

@@ -18,8 +18,6 @@
import logging import logging
from colors import color from colors import color
from bumble.smp import SMP_CID, SMP_Command
from .core import name_or_number from .core import name_or_number
from .gatt import ATT_PDU, ATT_CID from .gatt import ATT_PDU, ATT_CID
from .l2cap import ( from .l2cap import (
@@ -75,9 +73,6 @@ class PacketTracer:
if l2cap_pdu.cid == ATT_CID: if l2cap_pdu.cid == ATT_CID:
att_pdu = ATT_PDU.from_bytes(l2cap_pdu.payload) att_pdu = ATT_PDU.from_bytes(l2cap_pdu.payload)
self.analyzer.emit(att_pdu) self.analyzer.emit(att_pdu)
elif l2cap_pdu.cid == SMP_CID:
smp_command = SMP_Command.from_bytes(l2cap_pdu.payload)
self.analyzer.emit(smp_command)
elif l2cap_pdu.cid == L2CAP_SIGNALING_CID or l2cap_pdu.cid == L2CAP_LE_SIGNALING_CID: elif l2cap_pdu.cid == L2CAP_SIGNALING_CID or l2cap_pdu.cid == L2CAP_LE_SIGNALING_CID:
control_frame = L2CAP_Control_Frame.from_bytes(l2cap_pdu.payload) control_frame = L2CAP_Control_Frame.from_bytes(l2cap_pdu.payload)
self.analyzer.emit(control_frame) self.analyzer.emit(control_frame)

View File

@@ -44,20 +44,25 @@ HOST_HC_TOTAL_NUM_ACL_DATA_PACKETS = 1
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class Connection: class Connection:
def __init__(self, host, handle, role, peer_address, transport): def __init__(self, host, handle, role, peer_address):
self.host = host self.host = host
self.handle = handle self.handle = handle
self.role = role self.role = role
self.peer_address = peer_address self.peer_address = peer_address
self.assembler = HCI_AclDataPacketAssembler(self.on_acl_pdu) self.assembler = HCI_AclDataPacketAssembler(self.on_acl_pdu)
self.transport = transport
def on_hci_acl_data_packet(self, packet): def on_hci_acl_data_packet(self, packet):
self.assembler.feed_packet(packet) self.assembler.feed_packet(packet)
def on_acl_pdu(self, pdu): def on_acl_pdu(self, pdu):
l2cap_pdu = L2CAP_PDU.from_bytes(pdu) l2cap_pdu = L2CAP_PDU.from_bytes(pdu)
self.host.on_l2cap_pdu(self, l2cap_pdu.cid, l2cap_pdu.payload)
if l2cap_pdu.cid == ATT_CID:
self.host.on_gatt_pdu(self, l2cap_pdu.payload)
elif l2cap_pdu.cid == SMP_CID:
self.host.on_smp_pdu(self, l2cap_pdu.payload)
else:
self.host.on_l2cap_pdu(self, l2cap_pdu.cid, l2cap_pdu.payload)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -79,8 +84,6 @@ class Host(EventEmitter):
self.local_version = None self.local_version = None
self.local_supported_commands = bytes(64) self.local_supported_commands = bytes(64)
self.local_le_features = 0 self.local_le_features = 0
self.suggested_max_tx_octets = 251 # Max allowed
self.suggested_max_tx_time = 2120 # Max allowed
self.command_semaphore = asyncio.Semaphore(1) self.command_semaphore = asyncio.Semaphore(1)
self.long_term_key_provider = None self.long_term_key_provider = None
self.link_key_provider = None self.link_key_provider = None
@@ -93,72 +96,54 @@ class Host(EventEmitter):
self.set_packet_sink(controller_sink) self.set_packet_sink(controller_sink)
async def reset(self): async def reset(self):
await self.send_command(HCI_Reset_Command(), check_result=True) await self.send_command(HCI_Reset_Command())
self.ready = True self.ready = True
response = await self.send_command(HCI_Read_Local_Supported_Commands_Command(), check_result=True) await self.send_command(HCI_Set_Event_Mask_Command(event_mask = bytes.fromhex('FFFFFFFFFFFFFFFF')))
self.local_supported_commands = response.return_parameters.supported_commands await self.send_command(HCI_LE_Set_Event_Mask_Command(le_event_mask = bytes.fromhex('FFFFF00000000000')))
if self.supports_command(HCI_LE_READ_LOCAL_SUPPORTED_FEATURES_COMMAND): response = await self.send_command(HCI_Read_Local_Supported_Commands_Command())
response = await self.send_command(HCI_LE_Read_Local_Supported_Features_Command(), check_result=True) if response.return_parameters.status == HCI_SUCCESS:
self.local_le_features = struct.unpack('<Q', response.return_parameters.le_features)[0] self.local_supported_commands = response.return_parameters.supported_commands
else:
logger.warn(f'HCI_Read_Local_Supported_Commands_Command failed: {response.return_parameters.status}')
if self.supports_command(HCI_WRITE_LE_HOST_SUPPORT_COMMAND):
await self.send_command(HCI_Write_LE_Host_Support_Command(le_supported_host = 1, simultaneous_le_host = 0))
if self.supports_command(HCI_READ_LOCAL_VERSION_INFORMATION_COMMAND): if self.supports_command(HCI_READ_LOCAL_VERSION_INFORMATION_COMMAND):
response = await self.send_command(HCI_Read_Local_Version_Information_Command(), check_result=True) response = await self.send_command(HCI_Read_Local_Version_Information_Command())
self.local_version = response.return_parameters if response.return_parameters.status == HCI_SUCCESS:
self.local_version = response.return_parameters
await self.send_command(HCI_Set_Event_Mask_Command(event_mask = bytes.fromhex('FFFFFFFFFFFFFF3F'))) else:
logger.warn(f'HCI_Read_Local_Version_Information_Command failed: {response.return_parameters.status}')
if self.local_version is not None and self.local_version.hci_version <= HCI_VERSION_BLUETOOTH_CORE_4_0:
# Some older controllers don't like event masks with bits they don't understand
le_event_mask = bytes.fromhex('1F00000000000000')
else:
le_event_mask = bytes.fromhex('FFFFF00000000000')
await self.send_command(HCI_LE_Set_Event_Mask_Command(le_event_mask = le_event_mask))
if self.supports_command(HCI_READ_BUFFER_SIZE_COMMAND):
response = await self.send_command(HCI_Read_Buffer_Size_Command(), check_result=True)
self.hc_acl_data_packet_length = response.return_parameters.hc_acl_data_packet_length
self.hc_total_num_acl_data_packets = response.return_parameters.hc_total_num_acl_data_packets
logger.debug(
f'HCI ACL flow control: hc_acl_data_packet_length={self.hc_acl_data_packet_length},'
f'hc_total_num_acl_data_packets={self.hc_total_num_acl_data_packets}'
)
if self.supports_command(HCI_LE_READ_BUFFER_SIZE_COMMAND): if self.supports_command(HCI_LE_READ_BUFFER_SIZE_COMMAND):
response = await self.send_command(HCI_LE_Read_Buffer_Size_Command(), check_result=True) response = await self.send_command(HCI_LE_Read_Buffer_Size_Command())
self.hc_le_acl_data_packet_length = response.return_parameters.hc_le_acl_data_packet_length if response.return_parameters.status == HCI_SUCCESS:
self.hc_total_num_le_acl_data_packets = response.return_parameters.hc_total_num_le_acl_data_packets self.hc_le_acl_data_packet_length = response.return_parameters.hc_le_acl_data_packet_length
self.hc_total_num_le_acl_data_packets = response.return_parameters.hc_total_num_le_acl_data_packets
logger.debug(f'HCI LE ACL flow control: hc_le_acl_data_packet_length={response.return_parameters.hc_le_acl_data_packet_length}, hc_total_num_le_acl_data_packets={response.return_parameters.hc_total_num_le_acl_data_packets}')
else:
logger.warn(f'HCI_LE_Read_Buffer_Size_Command failed: {response.return_parameters.status}')
if response.return_parameters.hc_le_acl_data_packet_length == 0 or response.return_parameters.hc_total_num_le_acl_data_packets == 0:
# Read the non-LE-specific values
response = await self.send_command(HCI_Read_Buffer_Size_Command())
if response.return_parameters.status == HCI_SUCCESS:
self.hc_acl_data_packet_length = response.return_parameters.hc_le_acl_data_packet_length
self.hc_le_acl_data_packet_length = self.hc_le_acl_data_packet_length or self.hc_acl_data_packet_length
self.hc_total_num_acl_data_packets = response.return_parameters.hc_total_num_le_acl_data_packets
self.hc_total_num_le_acl_data_packets = self.hc_total_num_le_acl_data_packets or self.hc_total_num_acl_data_packets
logger.debug(f'HCI LE ACL flow control: hc_le_acl_data_packet_length={self.hc_le_acl_data_packet_length}, hc_total_num_le_acl_data_packets={self.hc_total_num_le_acl_data_packets}')
else:
logger.warn(f'HCI_Read_Buffer_Size_Command failed: {response.return_parameters.status}')
logger.debug( if self.supports_command(HCI_LE_READ_LOCAL_SUPPORTED_FEATURES_COMMAND):
f'HCI LE ACL flow control: hc_le_acl_data_packet_length={self.hc_le_acl_data_packet_length},' response = await self.send_command(HCI_LE_Read_Local_Supported_Features_Command())
f'hc_total_num_le_acl_data_packets={self.hc_total_num_le_acl_data_packets}' if response.return_parameters.status == HCI_SUCCESS:
) self.local_le_features = struct.unpack('<Q', response.return_parameters.le_features)[0]
else:
if ( logger.warn(f'HCI_LE_Read_Supported_Features_Command failed: {response.return_parameters.status}')
response.return_parameters.hc_le_acl_data_packet_length == 0 or
response.return_parameters.hc_total_num_le_acl_data_packets == 0
):
# LE and Classic share the same values
self.hc_le_acl_data_packet_length = self.hc_acl_data_packet_length
self.hc_total_num_le_acl_data_packets = self.hc_total_num_acl_data_packets
if (
self.supports_command(HCI_LE_READ_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND) and
self.supports_command(HCI_LE_WRITE_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND)
):
response = await self.send_command(HCI_LE_Read_Suggested_Default_Data_Length_Command())
suggested_max_tx_octets = response.return_parameters.suggested_max_tx_octets
suggested_max_tx_time = response.return_parameters.suggested_max_tx_time
if (
suggested_max_tx_octets != self.suggested_max_tx_octets or
suggested_max_tx_time != self.suggested_max_tx_time
):
await self.send_command(HCI_LE_Write_Suggested_Default_Data_Length_Command(
suggested_max_tx_octets = self.suggested_max_tx_octets,
suggested_max_tx_time = self.suggested_max_tx_time
))
self.reset_done = True self.reset_done = True
@@ -178,13 +163,13 @@ class Host(EventEmitter):
def send_hci_packet(self, packet): def send_hci_packet(self, packet):
self.hci_sink.on_packet(packet.to_bytes()) self.hci_sink.on_packet(packet.to_bytes())
async def send_command(self, command, check_result=False): async def send_command(self, command):
logger.debug(f'{color("### HOST -> CONTROLLER", "blue")}: {command}') logger.debug(f'{color("### HOST -> CONTROLLER", "blue")}: {command}')
# Wait until we can send (only one pending command at a time) # Wait until we can send (only one pending command at a time)
async with self.command_semaphore: async with self.command_semaphore:
assert self.pending_command is None assert(self.pending_command is None)
assert self.pending_response is None assert(self.pending_response is None)
# Create a future value to hold the eventual response # Create a future value to hold the eventual response
self.pending_response = asyncio.get_running_loop().create_future() self.pending_response = asyncio.get_running_loop().create_future()
@@ -193,25 +178,11 @@ class Host(EventEmitter):
try: try:
self.send_hci_packet(command) self.send_hci_packet(command)
response = await self.pending_response response = await self.pending_response
# TODO: check error values
# Check the return parameters if required
if check_result:
if type(response.return_parameters) is int:
status = response.return_parameters
elif type(response.return_parameters) is bytes:
# return parameters first field is a one byte status code
status = response.return_parameters[0]
else:
status = response.return_parameters.status
if status != HCI_SUCCESS:
logger.warning(f'{command.name} failed ({HCI_Constant.error_name(status)})')
raise HCI_Error(status)
return response return response
except Exception as error: except Exception as error:
logger.warning(f'{color("!!! Exception while sending HCI packet:", "red")} {error}') logger.warning(f'{color("!!! Exception while sending HCI packet:", "red")} {error}')
raise error # raise error
finally: finally:
self.pending_command = None self.pending_command = None
self.pending_response = None self.pending_response = None
@@ -231,7 +202,6 @@ class Host(EventEmitter):
offset = 0 offset = 0
pb_flag = 0 pb_flag = 0
while bytes_remaining: while bytes_remaining:
# TODO: support different LE/Classic lengths
data_total_length = min(bytes_remaining, self.hc_le_acl_data_packet_length) data_total_length = min(bytes_remaining, self.hc_le_acl_data_packet_length)
acl_packet = HCI_AclDataPacket( acl_packet = HCI_AclDataPacket(
connection_handle = connection_handle, connection_handle = connection_handle,
@@ -254,7 +224,7 @@ class Host(EventEmitter):
logger.debug(f'{self.acl_packets_in_flight} ACL packets in flight, {len(self.acl_packet_queue)} in queue') logger.debug(f'{self.acl_packets_in_flight} ACL packets in flight, {len(self.acl_packet_queue)} in queue')
def check_acl_packet_queue(self): def check_acl_packet_queue(self):
# Send all we can (TODO: support different LE/Classic limits) # Send all we can
while len(self.acl_packet_queue) > 0 and self.acl_packets_in_flight < self.hc_total_num_le_acl_data_packets: while len(self.acl_packet_queue) > 0 and self.acl_packets_in_flight < self.hc_total_num_le_acl_data_packets:
packet = self.acl_packet_queue.pop() packet = self.acl_packet_queue.pop()
self.send_hci_packet(packet) self.send_hci_packet(packet)
@@ -329,6 +299,12 @@ class Host(EventEmitter):
if connection := self.connections.get(packet.connection_handle): if connection := self.connections.get(packet.connection_handle):
connection.on_hci_acl_data_packet(packet) connection.on_hci_acl_data_packet(packet)
def on_gatt_pdu(self, connection, pdu):
self.emit('gatt_pdu', connection.handle, pdu)
def on_smp_pdu(self, connection, pdu):
self.emit('smp_pdu', connection.handle, pdu)
def on_l2cap_pdu(self, connection, cid, pdu): def on_l2cap_pdu(self, connection, cid, pdu):
self.emit('l2cap_pdu', connection.handle, cid, pdu) self.emit('l2cap_pdu', connection.handle, cid, pdu)
@@ -369,12 +345,13 @@ class Host(EventEmitter):
# Classic only # Classic only
def on_hci_connection_request_event(self, event): def on_hci_connection_request_event(self, event):
# Notify the listeners # For now, just accept everything
self.emit( # TODO: delegate the decision
'connection_request', self.send_command_sync(
event.bd_addr, HCI_Accept_Connection_Request_Command(
event.class_of_device, bd_addr = event.bd_addr,
event.link_type, role = 0x01 # Remain the peripheral
)
) )
def on_hci_le_connection_complete_event(self, event): def on_hci_le_connection_complete_event(self, event):
@@ -385,13 +362,13 @@ class Host(EventEmitter):
connection = self.connections.get(event.connection_handle) connection = self.connections.get(event.connection_handle)
if connection is None: if connection is None:
connection = Connection(self, event.connection_handle, event.role, event.peer_address, BT_LE_TRANSPORT) connection = Connection(self, event.connection_handle, event.role, event.peer_address)
self.connections[event.connection_handle] = connection self.connections[event.connection_handle] = connection
# Notify the client # Notify the client
connection_parameters = ConnectionParameters( connection_parameters = ConnectionParameters(
event.connection_interval, event.conn_interval,
event.peripheral_latency, event.conn_latency,
event.supervision_timeout event.supervision_timeout
) )
self.emit( self.emit(
@@ -407,7 +384,7 @@ class Host(EventEmitter):
logger.debug(f'### CONNECTION FAILED: {event.status}') logger.debug(f'### CONNECTION FAILED: {event.status}')
# Notify the listeners # Notify the listeners
self.emit('connection_failure', BT_LE_TRANSPORT, event.peer_address, event.status) self.emit('connection_failure', event.status)
def on_hci_le_enhanced_connection_complete_event(self, event): def on_hci_le_enhanced_connection_complete_event(self, event):
# Just use the same implementation as for the non-enhanced event for now # Just use the same implementation as for the non-enhanced event for now
@@ -420,7 +397,7 @@ class Host(EventEmitter):
connection = self.connections.get(event.connection_handle) connection = self.connections.get(event.connection_handle)
if connection is None: if connection is None:
connection = Connection(self, event.connection_handle, BT_CENTRAL_ROLE, event.bd_addr, BT_BR_EDR_TRANSPORT) connection = Connection(self, event.connection_handle, BT_CENTRAL_ROLE, event.bd_addr)
self.connections[event.connection_handle] = connection self.connections[event.connection_handle] = connection
# Notify the client # Notify the client
@@ -437,7 +414,7 @@ class Host(EventEmitter):
logger.debug(f'### BR/EDR CONNECTION FAILED: {event.status}') logger.debug(f'### BR/EDR CONNECTION FAILED: {event.status}')
# Notify the client # Notify the client
self.emit('connection_failure', BT_BR_EDR_TRANSPORT, event.bd_addr, event.status) self.emit('connection_failure', event.connection_handle, event.status)
def on_hci_disconnection_complete_event(self, event): def on_hci_disconnection_complete_event(self, event):
# Find the connection # Find the connection
@@ -455,7 +432,7 @@ class Host(EventEmitter):
logger.debug(f'### DISCONNECTION FAILED: {event.status}') logger.debug(f'### DISCONNECTION FAILED: {event.status}')
# Notify the listeners # Notify the listeners
self.emit('disconnection_failure', event.connection_handle, event.status) self.emit('disconnection_failure', event.status)
def on_hci_le_connection_update_complete_event(self, event): def on_hci_le_connection_update_complete_event(self, event):
if (connection := self.connections.get(event.connection_handle)) is None: if (connection := self.connections.get(event.connection_handle)) is None:
@@ -465,8 +442,8 @@ class Host(EventEmitter):
# Notify the client # Notify the client
if event.status == HCI_SUCCESS: if event.status == HCI_SUCCESS:
connection_parameters = ConnectionParameters( connection_parameters = ConnectionParameters(
event.connection_interval, event.conn_interval,
event.peripheral_latency, event.conn_latency,
event.supervision_timeout event.supervision_timeout
) )
self.emit('connection_parameters_update', connection.handle, connection_parameters) self.emit('connection_parameters_update', connection.handle, connection_parameters)
@@ -487,10 +464,13 @@ class Host(EventEmitter):
def on_hci_le_advertising_report_event(self, event): def on_hci_le_advertising_report_event(self, event):
for report in event.reports: for report in event.reports:
self.emit('advertising_report', report) self.emit(
'advertising_report',
def on_hci_le_extended_advertising_report_event(self, event): report.address,
self.on_hci_le_advertising_report_event(event) report.data,
report.rssi,
report.event_type
)
def on_hci_le_remote_connection_parameter_request_event(self, event): def on_hci_le_remote_connection_parameter_request_event(self, event):
if event.connection_handle not in self.connections: if event.connection_handle not in self.connections:
@@ -506,8 +486,8 @@ class Host(EventEmitter):
interval_max = event.interval_max, interval_max = event.interval_max,
latency = event.latency, latency = event.latency,
timeout = event.timeout, timeout = event.timeout,
min_ce_length = 0, minimum_ce_length = 0,
max_ce_length = 0 maximum_ce_length = 0
) )
) )
@@ -642,9 +622,6 @@ class Host(EventEmitter):
def on_hci_user_passkey_request_event(self, event): def on_hci_user_passkey_request_event(self, event):
self.emit('authentication_user_passkey_request', event.bd_addr) self.emit('authentication_user_passkey_request', event.bd_addr)
def on_hci_user_passkey_notification_event(self, event):
self.emit('authentication_user_passkey_notification', event.bd_addr, event.passkey)
def on_hci_inquiry_complete_event(self, event): def on_hci_inquiry_complete_event(self, event):
self.emit('inquiry_complete') self.emit('inquiry_complete')
@@ -672,6 +649,3 @@ class Host(EventEmitter):
self.emit('remote_name_failure', event.bd_addr, event.status) self.emit('remote_name_failure', event.bd_addr, event.status)
else: else:
self.emit('remote_name', event.bd_addr, event.remote_name) self.emit('remote_name', event.bd_addr, event.remote_name)
def on_hci_remote_host_supported_features_notification_event(self, event):
self.emit('remote_host_supported_features', event.bd_addr, event.host_supported_features)

File diff suppressed because it is too large Load Diff

View File

@@ -21,7 +21,7 @@ import asyncio
from colors import color from colors import color
from pyee import EventEmitter from pyee import EventEmitter
from .core import BT_BR_EDR_TRANSPORT, InvalidStateError, ProtocolError, ConnectionError from .core import InvalidStateError, ProtocolError, ConnectionError
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# Logging # Logging
@@ -634,12 +634,7 @@ class Multiplexer(EventEmitter):
if self.state == Multiplexer.OPENING: if self.state == Multiplexer.OPENING:
self.change_state(Multiplexer.CONNECTED) self.change_state(Multiplexer.CONNECTED)
if self.open_result: if self.open_result:
self.open_result.set_exception(ConnectionError( self.open_result.set_exception(ConnectionError(ConnectionError.CONNECTION_REFUSED))
ConnectionError.CONNECTION_REFUSED,
BT_BR_EDR_TRANSPORT,
self.l2cap_channel.connection.peer_address,
'rfcomm'
))
else: else:
logger.warn(f'unexpected state for DM: {self}') logger.warn(f'unexpected state for DM: {self}')

View File

@@ -44,7 +44,6 @@ logger = logging.getLogger(__name__)
# Constants # Constants
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
SMP_CID = 0x06 SMP_CID = 0x06
SMP_BR_CID = 0x07
SMP_PAIRING_REQUEST_COMMAND = 0x01 SMP_PAIRING_REQUEST_COMMAND = 0x01
SMP_PAIRING_RESPONSE_COMMAND = 0x02 SMP_PAIRING_RESPONSE_COMMAND = 0x02
@@ -153,8 +152,6 @@ SMP_CT2_AUTHREQ = 0b00100000
# Crypto salt # Crypto salt
SMP_CTKD_H7_LEBR_SALT = bytes.fromhex('00000000000000000000000000000000746D7031') SMP_CTKD_H7_LEBR_SALT = bytes.fromhex('00000000000000000000000000000000746D7031')
SMP_CTKD_H7_BRLE_SALT = bytes.fromhex('00000000000000000000000000000000746D7032')
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# Utils # Utils
@@ -477,9 +474,6 @@ class PairingDelegate:
async def accept(self): async def accept(self):
return True return True
async def confirm(self):
return True
async def compare_numbers(self, number, digits=6): async def compare_numbers(self, number, digits=6):
return True return True
@@ -604,7 +598,6 @@ class Session:
self.pairing_config = pairing_config self.pairing_config = pairing_config
self.wait_before_continuing = None self.wait_before_continuing = None
self.completed = False self.completed = False
self.ctkd_task = None
# Decide if we're the initiator or the responder # Decide if we're the initiator or the responder
self.is_initiator = (connection.role == BT_CENTRAL_ROLE) self.is_initiator = (connection.role == BT_CENTRAL_ROLE)
@@ -640,16 +633,15 @@ class Session:
self.oob = False self.oob = False
# Set up addresses # Set up addresses
self_address = connection.self_address
peer_address = connection.peer_resolvable_address or connection.peer_address peer_address = connection.peer_resolvable_address or connection.peer_address
if self.is_initiator: if self.is_initiator:
self.ia = bytes(self_address) self.ia = bytes(manager.address)
self.iat = 1 if self_address.is_random else 0 self.iat = 1 if manager.address.is_random else 0
self.ra = bytes(peer_address) self.ra = bytes(peer_address)
self.rat = 1 if peer_address.is_random else 0 self.rat = 1 if peer_address.is_random else 0
else: else:
self.ra = bytes(self_address) self.ra = bytes(manager.address)
self.rat = 1 if self_address.is_random else 0 self.rat = 1 if manager.address.is_random else 0
self.ia = bytes(peer_address) self.ia = bytes(peer_address)
self.iat = 1 if peer_address.is_random else 0 self.iat = 1 if peer_address.is_random else 0
@@ -719,21 +711,6 @@ class Session:
return False return False
return True return True
def prompt_user_for_confirmation(self, next_steps):
async def prompt():
logger.debug('ask for confirmation')
try:
response = await self.pairing_config.delegate.confirm()
if response:
next_steps()
return
except Exception as error:
logger.warn(f'exception while confirm: {error}')
self.send_pairing_failed(SMP_CONFIRM_VALUE_FAILED_ERROR)
asyncio.create_task(prompt())
def prompt_user_for_numeric_comparison(self, code, next_steps): def prompt_user_for_numeric_comparison(self, code, next_steps):
async def prompt(): async def prompt():
logger.debug(f'verification code: {code}') logger.debug(f'verification code: {code}')
@@ -900,21 +877,10 @@ class Session:
) )
) )
async def derive_ltk(self):
link_key = await self.manager.device.get_link_key(self.connection.peer_address)
assert link_key is not None
ilk = crypto.h7(
salt=SMP_CTKD_H7_BRLE_SALT,
w=link_key) if self.ct2 else crypto.h6(link_key, b'tmp2')
self.ltk = crypto.h6(ilk, b'brle')
def distribute_keys(self): def distribute_keys(self):
# Distribute the keys as required # Distribute the keys as required
if self.is_initiator: if self.is_initiator:
# CTKD: Derive LTK from LinkKey if not self.sc:
if self.connection.transport == BT_BR_EDR_TRANSPORT and self.initiator_key_distribution & SMP_ENC_KEY_DISTRIBUTION_FLAG:
self.ctkd_task = asyncio.create_task(self.derive_ltk())
elif not self.sc:
# Distribute the LTK, EDIV and RAND # Distribute the LTK, EDIV and RAND
if self.initiator_key_distribution & SMP_ENC_KEY_DISTRIBUTION_FLAG: if self.initiator_key_distribution & SMP_ENC_KEY_DISTRIBUTION_FLAG:
self.send_command(SMP_Encryption_Information_Command(long_term_key=self.ltk)) self.send_command(SMP_Encryption_Information_Command(long_term_key=self.ltk))
@@ -926,15 +892,15 @@ class Session:
SMP_Identity_Information_Command(identity_resolving_key=self.manager.device.irk) SMP_Identity_Information_Command(identity_resolving_key=self.manager.device.irk)
) )
self.send_command(SMP_Identity_Address_Information_Command( self.send_command(SMP_Identity_Address_Information_Command(
addr_type = self.connection.self_address.address_type, addr_type = self.manager.address.address_type,
bd_addr = self.connection.self_address bd_addr = self.manager.address
)) ))
# Distribute CSRK # Distribute CSRK
csrk = bytes(16) # FIXME: testing csrk = bytes(16) # FIXME: testing
if self.initiator_key_distribution & SMP_SIGN_KEY_DISTRIBUTION_FLAG: if self.initiator_key_distribution & SMP_SIGN_KEY_DISTRIBUTION_FLAG:
self.send_command(SMP_Signing_Information_Command(signature_key=csrk)) self.send_command(SMP_Signing_Information_Command(signature_key=csrk))
# CTKD, calculate BR/EDR link key # CTKD, calculate BR/EDR link key
if self.initiator_key_distribution & SMP_LINK_KEY_DISTRIBUTION_FLAG: if self.initiator_key_distribution & SMP_LINK_KEY_DISTRIBUTION_FLAG:
ilk = crypto.h7( ilk = crypto.h7(
@@ -943,11 +909,8 @@ class Session:
self.link_key = crypto.h6(ilk, b'lebr') self.link_key = crypto.h6(ilk, b'lebr')
else: else:
# CTKD: Derive LTK from LinkKey
if self.connection.transport == BT_BR_EDR_TRANSPORT and self.responder_key_distribution & SMP_ENC_KEY_DISTRIBUTION_FLAG:
self.ctkd_task = asyncio.create_task(self.derive_ltk())
# Distribute the LTK, EDIV and RAND # Distribute the LTK, EDIV and RAND
elif not self.sc: if not self.sc:
if self.responder_key_distribution & SMP_ENC_KEY_DISTRIBUTION_FLAG: if self.responder_key_distribution & SMP_ENC_KEY_DISTRIBUTION_FLAG:
self.send_command(SMP_Encryption_Information_Command(long_term_key=self.ltk)) self.send_command(SMP_Encryption_Information_Command(long_term_key=self.ltk))
self.send_command(SMP_Master_Identification_Command(ediv=self.ltk_ediv, rand=self.ltk_rand)) self.send_command(SMP_Master_Identification_Command(ediv=self.ltk_ediv, rand=self.ltk_rand))
@@ -958,15 +921,15 @@ class Session:
SMP_Identity_Information_Command(identity_resolving_key=self.manager.device.irk) SMP_Identity_Information_Command(identity_resolving_key=self.manager.device.irk)
) )
self.send_command(SMP_Identity_Address_Information_Command( self.send_command(SMP_Identity_Address_Information_Command(
addr_type = self.connection.self_address.address_type, addr_type = self.manager.address.address_type,
bd_addr = self.connection.self_address bd_addr = self.manager.address
)) ))
# Distribute CSRK # Distribute CSRK
csrk = bytes(16) # FIXME: testing csrk = bytes(16) # FIXME: testing
if self.responder_key_distribution & SMP_SIGN_KEY_DISTRIBUTION_FLAG: if self.responder_key_distribution & SMP_SIGN_KEY_DISTRIBUTION_FLAG:
self.send_command(SMP_Signing_Information_Command(signature_key=csrk)) self.send_command(SMP_Signing_Information_Command(signature_key=csrk))
# CTKD, calculate BR/EDR link key # CTKD, calculate BR/EDR link key
if self.responder_key_distribution & SMP_LINK_KEY_DISTRIBUTION_FLAG: if self.responder_key_distribution & SMP_LINK_KEY_DISTRIBUTION_FLAG:
ilk = crypto.h7( ilk = crypto.h7(
@@ -977,7 +940,7 @@ class Session:
def compute_peer_expected_distributions(self, key_distribution_flags): def compute_peer_expected_distributions(self, key_distribution_flags):
# Set our expectations for what to wait for in the key distribution phase # Set our expectations for what to wait for in the key distribution phase
self.peer_expected_distributions = [] self.peer_expected_distributions = []
if not self.sc and self.connection.transport == BT_LE_TRANSPORT: if not self.sc:
if (key_distribution_flags & SMP_ENC_KEY_DISTRIBUTION_FLAG != 0): if (key_distribution_flags & SMP_ENC_KEY_DISTRIBUTION_FLAG != 0):
self.peer_expected_distributions.append(SMP_Encryption_Information_Command) self.peer_expected_distributions.append(SMP_Encryption_Information_Command)
self.peer_expected_distributions.append(SMP_Master_Identification_Command) self.peer_expected_distributions.append(SMP_Master_Identification_Command)
@@ -1000,7 +963,12 @@ class Session:
self.peer_expected_distributions.remove(command_class) self.peer_expected_distributions.remove(command_class)
logger.debug(f'remaining distributions: {[c.__name__ for c in self.peer_expected_distributions]}') logger.debug(f'remaining distributions: {[c.__name__ for c in self.peer_expected_distributions]}')
if not self.peer_expected_distributions: if not self.peer_expected_distributions:
self.on_peer_key_distribution_complete() # The initiator can now send its keys
if self.is_initiator:
self.distribute_keys()
# Nothing left to expect, we're done
self.on_pairing()
else: else:
logger.warn(color(f'!!! unexpected key distribution command: {command_class.__name__}', 'red')) logger.warn(color(f'!!! unexpected key distribution command: {command_class.__name__}', 'red'))
self.send_pairing_failed(SMP_UNSPECIFIED_REASON_ERROR) self.send_pairing_failed(SMP_UNSPECIFIED_REASON_ERROR)
@@ -1021,28 +989,17 @@ class Session:
self.connection.remove_listener('connection_encryption_key_refresh', self.on_connection_encryption_key_refresh) self.connection.remove_listener('connection_encryption_key_refresh', self.on_connection_encryption_key_refresh)
self.manager.on_session_end(self) self.manager.on_session_end(self)
def on_peer_key_distribution_complete(self):
# The initiator can now send its keys
if self.is_initiator:
self.distribute_keys()
asyncio.create_task(self.on_pairing())
def on_connection_encryption_change(self): def on_connection_encryption_change(self):
if self.connection.is_encrypted: if self.connection.is_encrypted:
if self.is_responder: if self.is_responder:
# The responder distributes its keys first, the initiator later # The responder distributes its keys first, the initiator later
self.distribute_keys() self.distribute_keys()
# If we're not expecting key distributions from the peer, we're done
if not self.peer_expected_distributions:
self.on_peer_key_distribution_complete()
def on_connection_encryption_key_refresh(self): def on_connection_encryption_key_refresh(self):
# Do as if the connection had just been encrypted # Do as if the connection had just been encrypted
self.on_connection_encryption_change() self.on_connection_encryption_change()
async def on_pairing(self): def on_pairing(self):
logger.debug('pairing complete') logger.debug('pairing complete')
if self.completed: if self.completed:
@@ -1059,16 +1016,11 @@ class Session:
else: else:
peer_address = self.connection.peer_address peer_address = self.connection.peer_address
# Wait for link key fetch and key derivation
if self.ctkd_task is not None:
await self.ctkd_task
self.ctkd_task = None
# Create an object to hold the keys # Create an object to hold the keys
keys = PairingKeys() keys = PairingKeys()
keys.address_type = peer_address.address_type keys.address_type = peer_address.address_type
authenticated = self.pairing_method != self.JUST_WORKS authenticated = self.pairing_method != self.JUST_WORKS
if self.sc or self.connection.transport == BT_BR_EDR_TRANSPORT: if self.sc:
keys.ltk = PairingKeys.Key( keys.ltk = PairingKeys.Key(
value = self.ltk, value = self.ltk,
authenticated = authenticated authenticated = authenticated
@@ -1107,6 +1059,7 @@ class Session:
value = self.link_key, value = self.link_key,
authenticated = authenticated authenticated = authenticated
) )
self.manager.on_pairing(self, peer_address, keys) self.manager.on_pairing(self, peer_address, keys)
def on_pairing_failure(self, reason): def on_pairing_failure(self, reason):
@@ -1184,12 +1137,6 @@ class Session:
# Respond # Respond
self.send_pairing_response_command() self.send_pairing_response_command()
# Vol 3, Part C, 5.2.2.1.3
# CTKD over BR/EDR should happen after the connection has been encrypted,
# so when receiving pairing requests, responder should start distributing keys
if self.connection.transport == BT_BR_EDR_TRANSPORT and self.connection.is_encrypted and self.is_responder and accepted:
self.distribute_keys()
def on_smp_pairing_response_command(self, command): def on_smp_pairing_response_command(self, command):
if self.is_responder: if self.is_responder:
logger.warn(color('received pairing response as a responder', 'red')) logger.warn(color('received pairing response as a responder', 'red'))
@@ -1406,12 +1353,12 @@ class Session:
# Compute the 6-digit code # Compute the 6-digit code
code = crypto.g2(self.pka, self.pkb, self.na, self.nb) % 1000000 code = crypto.g2(self.pka, self.pkb, self.na, self.nb) % 1000000
# Ask for user confirmation if self.pairing_method == self.NUMERIC_COMPARISON:
self.wait_before_continuing = asyncio.get_running_loop().create_future() # Ask for user confirmation
if self.pairing_method == self.JUST_WORKS: self.wait_before_continuing = asyncio.get_running_loop().create_future()
self.prompt_user_for_confirmation(next_steps)
else:
self.prompt_user_for_numeric_comparison(code, next_steps) self.prompt_user_for_numeric_comparison(code, next_steps)
else:
next_steps()
else: else:
next_steps() next_steps()
@@ -1505,17 +1452,17 @@ class Manager(EventEmitter):
Implements the Initiator and Responder roles of the Security Manager Protocol Implements the Initiator and Responder roles of the Security Manager Protocol
''' '''
def __init__(self, device): def __init__(self, device, address):
super().__init__() super().__init__()
self.device = device self.device = device
self.address = address
self.sessions = {} self.sessions = {}
self._ecc_key = None self._ecc_key = None
self.pairing_config_factory = lambda connection: PairingConfig() self.pairing_config_factory = lambda connection: PairingConfig()
def send_command(self, connection, command): def send_command(self, connection, command):
logger.debug(f'>>> Sending SMP Command on connection [0x{connection.handle:04X}] {connection.peer_address}: {command}') logger.debug(f'>>> Sending SMP Command on connection [0x{connection.handle:04X}] {connection.peer_address}: {command}')
cid = SMP_BR_CID if connection.transport == BT_BR_EDR_TRANSPORT else SMP_CID connection.send_l2cap_pdu(SMP_CID, command.to_bytes())
connection.send_l2cap_pdu(cid, command.to_bytes())
def on_smp_pdu(self, connection, pdu): def on_smp_pdu(self, connection, pdu):
# Look for a session with this connection, and create one if none exists # Look for a session with this connection, and create one if none exists

View File

@@ -274,7 +274,7 @@ class PumpedPacketSource(ParserSource):
self.terminated.set_result(error) self.terminated.set_result(error)
break break
self.pump_task = asyncio.create_task(pump_packets()) self.pump_task = asyncio.get_running_loop().create_task(pump_packets())
def close(self): def close(self):
if self.pump_task: if self.pump_task:
@@ -304,7 +304,7 @@ class PumpedPacketSink:
logger.warn(f'exception while sending packet: {error}') logger.warn(f'exception while sending packet: {error}')
break break
self.pump_task = asyncio.create_task(pump_packets()) self.pump_task = asyncio.get_running_loop().create_task(pump_packets())
def close(self): def close(self):
if self.pump_task: if self.pump_task:

View File

@@ -36,51 +36,34 @@ logger = logging.getLogger(__name__)
async def open_usb_transport(spec): async def open_usb_transport(spec):
''' '''
Open a USB transport. Open a USB transport.
The moniker string has this syntax: The parameter string has this syntax:
either <index> or either <index> or <vendor>:<product>[/<serial-number>]
<vendor>:<product> or
<vendor>:<product>/<serial-number>] or
<vendor>:<product>#<index>
With <index> as the 0-based index to select amongst all the devices that appear With <index> as the 0-based index to select amongst all the devices that appear
to be supporting Bluetooth HCI (0 being the first one), or to be supporting Bluetooth HCI (0 being the first one), or
Where <vendor> and <product> are the vendor ID and product ID in hexadecimal. The Where <vendor> and <product> are the vendor ID and product ID in hexadecimal. The
/<serial-number> suffix or #<index> suffix max be specified when more than one device with /<serial-number> suffix max be specified when more than one device with the same
the same vendor and product identifiers are present. vendor and product identifiers are present.
In addition, if the moniker ends with the symbol "!", the device will be used in "forced" mode:
the first USB interface of the device will be used, regardless of the interface class/subclass.
This may be useful for some devices that use a custom class/subclass but may nonetheless work as-is.
Examples: Examples:
0 --> the first BT USB dongle 0 --> the first BT USB dongle
04b4:f901 --> the BT USB dongle with vendor=04b4 and product=f901 04b4:f901 --> the BT USB dongle with vendor=04b4 and product=f901
04b4:f901#2 --> the third USB device with vendor=04b4 and product=f901
04b4:f901/00E04C239987 --> the BT USB dongle with vendor=04b4 and product=f901 and serial number 00E04C239987 04b4:f901/00E04C239987 --> the BT USB dongle with vendor=04b4 and product=f901 and serial number 00E04C239987
usb:0B05:17CB! --> the BT USB dongle vendor=0B05 and product=17CB, in "forced" mode.
''' '''
USB_RECIPIENT_DEVICE = 0x00 USB_RECIPIENT_DEVICE = 0x00
USB_REQUEST_TYPE_CLASS = 0x01 << 5 USB_REQUEST_TYPE_CLASS = 0x01 << 5
USB_DEVICE_CLASS_DEVICE = 0x00 USB_ENDPOINT_EVENTS_IN = 0x81
USB_ENDPOINT_ACL_IN = 0x82
USB_ENDPOINT_ACL_OUT = 0x02
USB_DEVICE_CLASS_WIRELESS_CONTROLLER = 0xE0 USB_DEVICE_CLASS_WIRELESS_CONTROLLER = 0xE0
USB_DEVICE_SUBCLASS_RF_CONTROLLER = 0x01 USB_DEVICE_SUBCLASS_RF_CONTROLLER = 0x01
USB_DEVICE_PROTOCOL_BLUETOOTH_PRIMARY_CONTROLLER = 0x01 USB_DEVICE_PROTOCOL_BLUETOOTH_PRIMARY_CONTROLLER = 0x01
USB_ENDPOINT_TRANSFER_TYPE_BULK = 0x02
USB_ENDPOINT_TRANSFER_TYPE_INTERRUPT = 0x03
USB_ENDPOINT_IN = 0x80
USB_BT_HCI_CLASS_TUPLE = (
USB_DEVICE_CLASS_WIRELESS_CONTROLLER,
USB_DEVICE_SUBCLASS_RF_CONTROLLER,
USB_DEVICE_PROTOCOL_BLUETOOTH_PRIMARY_CONTROLLER
)
READ_SIZE = 1024 READ_SIZE = 1024
class UsbPacketSink: class UsbPacketSink:
def __init__(self, device, acl_out): def __init__(self, device):
self.device = device self.device = device
self.acl_out = acl_out
self.transfer = device.getTransfer() self.transfer = device.getTransfer()
self.packets = collections.deque() # Queue of packets waiting to be sent self.packets = collections.deque() # Queue of packets waiting to be sent
self.loop = asyncio.get_running_loop() self.loop = asyncio.get_running_loop()
@@ -129,7 +112,7 @@ async def open_usb_transport(spec):
packet_type = packet[0] packet_type = packet[0]
if packet_type == hci.HCI_ACL_DATA_PACKET: if packet_type == hci.HCI_ACL_DATA_PACKET:
self.transfer.setBulk( self.transfer.setBulk(
self.acl_out, USB_ENDPOINT_ACL_OUT,
packet[1:], packet[1:],
callback=self.on_packet_sent callback=self.on_packet_sent
) )
@@ -165,12 +148,10 @@ async def open_usb_transport(spec):
logger.debug('OUT transfer likely already completed') logger.debug('OUT transfer likely already completed')
class UsbPacketSource(asyncio.Protocol, ParserSource): class UsbPacketSource(asyncio.Protocol, ParserSource):
def __init__(self, context, device, acl_in, events_in): def __init__(self, context, device):
super().__init__() super().__init__()
self.context = context self.context = context
self.device = device self.device = device
self.acl_in = acl_in
self.events_in = events_in
self.loop = asyncio.get_running_loop() self.loop = asyncio.get_running_loop()
self.queue = asyncio.Queue() self.queue = asyncio.Queue()
self.closed = False self.closed = False
@@ -187,7 +168,7 @@ async def open_usb_transport(spec):
# Set up transfer objects for input # Set up transfer objects for input
self.events_in_transfer = device.getTransfer() self.events_in_transfer = device.getTransfer()
self.events_in_transfer.setInterrupt( self.events_in_transfer.setInterrupt(
self.events_in, USB_ENDPOINT_EVENTS_IN,
READ_SIZE, READ_SIZE,
callback=self.on_packet_received, callback=self.on_packet_received,
user_data=hci.HCI_EVENT_PACKET user_data=hci.HCI_EVENT_PACKET
@@ -196,7 +177,7 @@ async def open_usb_transport(spec):
self.acl_in_transfer = device.getTransfer() self.acl_in_transfer = device.getTransfer()
self.acl_in_transfer.setBulk( self.acl_in_transfer.setBulk(
self.acl_in, USB_ENDPOINT_ACL_IN,
READ_SIZE, READ_SIZE,
callback=self.on_packet_received, callback=self.on_packet_received,
user_data=hci.HCI_ACL_DATA_PACKET user_data=hci.HCI_ACL_DATA_PACKET
@@ -209,7 +190,7 @@ async def open_usb_transport(spec):
def on_packet_received(self, transfer): def on_packet_received(self, transfer):
packet_type = transfer.getUserData() packet_type = transfer.getUserData()
status = transfer.getStatus() status = transfer.getStatus()
# logger.debug(f'<<< USB IN transfer callback: status={status} packet_type={packet_type} length={transfer.getActualLength()}') # logger.debug(f'<<< USB IN transfer callback: status={status} packet_type={packet_type}')
if status == usb1.TRANSFER_COMPLETED: if status == usb1.TRANSFER_COMPLETED:
packet = bytes([packet_type]) + transfer.getBuffer()[:transfer.getActualLength()] packet = bytes([packet_type]) + transfer.getBuffer()[:transfer.getActualLength()]
@@ -263,7 +244,7 @@ async def open_usb_transport(spec):
await self.event_loop_done await self.event_loop_done
class UsbTransport(Transport): class UsbTransport(Transport):
def __init__(self, context, device, interface, setting, source, sink): def __init__(self, context, device, interface, source, sink):
super().__init__(source, sink) super().__init__(source, sink)
self.context = context self.context = context
self.device = device self.device = device
@@ -272,10 +253,6 @@ async def open_usb_transport(spec):
# Get exclusive access # Get exclusive access
device.claimInterface(interface) device.claimInterface(interface)
# Set the alternate setting if not the default
if setting != 0:
device.setInterfaceAltSetting(interface, setting)
# The source and sink can now start # The source and sink can now start
source.start() source.start()
sink.start() sink.start()
@@ -292,60 +269,29 @@ async def open_usb_transport(spec):
context.open() context.open()
try: try:
found = None found = None
if spec.endswith('!'):
spec = spec[:-1]
forced_mode = True
else:
forced_mode = False
if ':' in spec: if ':' in spec:
vendor_id, product_id = spec.split(':') vendor_id, product_id = spec.split(':')
serial_number = None
device_index = 0
if '/' in product_id: if '/' in product_id:
product_id, serial_number = product_id.split('/') product_id, serial_number = product_id.split('/')
elif '#' in product_id: for device in context.getDeviceIterator(skip_on_error=True):
product_id, device_index_str = product_id.split('#') if (
device_index = int(device_index_str) device.getVendorID() == int(vendor_id, 16) and
device.getProductID() == int(product_id, 16) and
for device in context.getDeviceIterator(skip_on_error=True): device.getSerialNumber() == serial_number
try: ):
device_serial_number = device.getSerialNumber()
except usb1.USBError:
device_serial_number = None
if (
device.getVendorID() == int(vendor_id, 16) and
device.getProductID() == int(product_id, 16) and
(serial_number is None or serial_number == device_serial_number)
):
if device_index == 0:
found = device found = device
break break
device_index -= 1 device.close()
device.close() else:
found = context.getByVendorIDAndProductID(int(vendor_id, 16), int(product_id, 16), skip_on_error=True)
else: else:
# Look for a compatible device by index
def device_is_bluetooth_hci(device):
# Check if the device class indicates a match
if (device.getDeviceClass(), device.getDeviceSubClass(), device.getDeviceProtocol()) == \
USB_BT_HCI_CLASS_TUPLE:
return True
# If the device class is 'Device', look for a matching interface
if device.getDeviceClass() == USB_DEVICE_CLASS_DEVICE:
for configuration in device:
for interface in configuration:
for setting in interface:
if (setting.getClass(), setting.getSubClass(), setting.getProtocol()) == \
USB_BT_HCI_CLASS_TUPLE:
return True
return False
device_index = int(spec) device_index = int(spec)
for device in context.getDeviceIterator(skip_on_error=True): for device in context.getDeviceIterator(skip_on_error=True):
if device_is_bluetooth_hci(device): if (
device.getDeviceClass() == USB_DEVICE_CLASS_WIRELESS_CONTROLLER and
device.getDeviceSubClass() == USB_DEVICE_SUBCLASS_RF_CONTROLLER and
device.getDeviceProtocol() == USB_DEVICE_PROTOCOL_BLUETOOTH_PRIMARY_CONTROLLER
):
if device_index == 0: if device_index == 0:
found = device found = device
break break
@@ -357,63 +303,22 @@ async def open_usb_transport(spec):
raise ValueError('device not found') raise ValueError('device not found')
logger.debug(f'USB Device: {found}') logger.debug(f'USB Device: {found}')
# Look for the first interface with the right class and endpoints
def find_endpoints(device):
for (configuration_index, configuration) in enumerate(device):
interface = None
for interface in configuration:
setting = None
for setting in interface:
if (
not forced_mode and
(setting.getClass(), setting.getSubClass(), setting.getProtocol()) != USB_BT_HCI_CLASS_TUPLE
):
continue
events_in = None
acl_in = None
acl_out = None
for endpoint in setting:
attributes = endpoint.getAttributes()
address = endpoint.getAddress()
if attributes & 0x03 == USB_ENDPOINT_TRANSFER_TYPE_BULK:
if address & USB_ENDPOINT_IN and acl_in is None:
acl_in = address
elif acl_out is None:
acl_out = address
elif attributes & 0x03 == USB_ENDPOINT_TRANSFER_TYPE_INTERRUPT:
if address & USB_ENDPOINT_IN and events_in is None:
events_in = address
# Return if we found all 3 endpoints
if acl_in is not None and acl_out is not None and events_in is not None:
return (
configuration_index + 1,
setting.getNumber(),
setting.getAlternateSetting(),
acl_in,
acl_out,
events_in
)
else:
logger.debug(f'skipping configuration {configuration_index + 1} / interface {setting.getNumber()}')
endpoints = find_endpoints(found)
if endpoints is None:
raise ValueError('no compatible interface found for device')
(configuration, interface, setting, acl_in, acl_out, events_in) = endpoints
logger.debug(
f'selected endpoints: configuration={configuration}, '
f'interface={interface}, '
f'setting={setting}, '
f'acl_in=0x{acl_in:02X}, '
f'acl_out=0x{acl_out:02X}, '
f'events_in=0x{events_in:02X}, '
)
device = found.open() device = found.open()
# Set the configuration if needed
try:
configuration = device.getConfiguration()
logger.debug(f'current configuration = {configuration}')
except usb1.USBError:
try:
logger.debug('setting configuration 1')
device.setConfiguration(1)
except usb1.USBError:
logger.debug('failed to set configuration 1')
# Use the first interface
interface = 0
# Detach the kernel driver if supported and needed # Detach the kernel driver if supported and needed
if usb1.hasCapability(usb1.CAP_SUPPORTS_DETACH_KERNEL_DRIVER): if usb1.hasCapability(usb1.CAP_SUPPORTS_DETACH_KERNEL_DRIVER):
try: try:
@@ -423,23 +328,9 @@ async def open_usb_transport(spec):
except usb1.USBError: except usb1.USBError:
pass pass
# Set the configuration if needed source = UsbPacketSource(context, device)
try: sink = UsbPacketSink(device)
current_configuration = device.getConfiguration() return UsbTransport(context, device, interface, source, sink)
logger.debug(f'current configuration = {current_configuration}')
except usb1.USBError:
current_configuration = 0
if current_configuration != configuration:
try:
logger.debug(f'setting configuration {configuration}')
device.setConfiguration(configuration)
except usb1.USBError:
logger.warning('failed to set configuration')
source = UsbPacketSource(context, device, acl_in, events_in)
sink = UsbPacketSink(device, acl_out)
return UsbTransport(context, device, interface, setting, source, sink)
except usb1.USBError as error: except usb1.USBError as error:
logger.warning(color(f'!!! failed to open USB device: {error}', 'red')) logger.warning(color(f'!!! failed to open USB device: {error}', 'red'))
context.close() context.close()

View File

@@ -18,7 +18,6 @@
import asyncio import asyncio
import logging import logging
import traceback import traceback
import collections
from functools import wraps from functools import wraps
from colors import color from colors import color
from pyee import EventEmitter from pyee import EventEmitter
@@ -141,95 +140,3 @@ class AsyncRunner:
return wrapper return wrapper
return decorator return decorator
# -----------------------------------------------------------------------------
class FlowControlAsyncPipe:
"""
Asyncio pipe with flow control. When writing to the pipe, the source is
paused (by calling a function passed in when the pipe is created) if the
amount of queued data exceeds a specified threshold.
"""
def __init__(self, pause_source, resume_source, write_to_sink=None, drain_sink=None, threshold=0):
self.pause_source = pause_source
self.resume_source = resume_source
self.write_to_sink = write_to_sink
self.drain_sink = drain_sink
self.threshold = threshold
self.queue = collections.deque() # Queue of packets
self.queued_bytes = 0 # Number of bytes in the queue
self.ready_to_pump = asyncio.Event()
self.paused = False
self.source_paused = False
self.pump_task = None
def start(self):
if self.pump_task is None:
self.pump_task = asyncio.create_task(self.pump())
self.check_pump()
def stop(self):
if self.pump_task is not None:
self.pump_task.cancel()
self.pump_task = None
def write(self, packet):
self.queued_bytes += len(packet)
self.queue.append(packet)
# Pause the source if we're over the threshold
if self.queued_bytes > self.threshold and not self.source_paused:
logger.debug(f'pausing source (queued={self.queued_bytes})')
self.pause_source()
self.source_paused = True
self.check_pump()
def pause(self):
if not self.paused:
self.paused = True
if not self.source_paused:
self.pause_source()
self.source_paused = True
self.check_pump()
def resume(self):
if self.paused:
self.paused = False
if self.source_paused:
self.resume_source()
self.source_paused = False
self.check_pump()
def can_pump(self):
return self.queue and not self.paused and self.write_to_sink is not None
def check_pump(self):
if self.can_pump():
self.ready_to_pump.set()
else:
self.ready_to_pump.clear()
async def pump(self):
while True:
# Wait until we can try to pump packets
await self.ready_to_pump.wait()
# Try to pump a packet
if self.can_pump():
packet = self.queue.pop()
self.write_to_sink(packet)
self.queued_bytes -= len(packet)
# Drain the sink if we can
if self.drain_sink:
await self.drain_sink()
# Check if we can accept more
if self.queued_bytes <= self.threshold and self.source_paused:
logger.debug(f'resuming source (queued={self.queued_bytes})')
self.source_paused = False
self.resume_source()
self.check_pump()

View File

@@ -45,10 +45,6 @@ nav:
- HCI Bridge: apps_and_tools/hci_bridge.md - HCI Bridge: apps_and_tools/hci_bridge.md
- Golden Gate Bridge: apps_and_tools/gg_bridge.md - Golden Gate Bridge: apps_and_tools/gg_bridge.md
- Show: apps_and_tools/show.md - Show: apps_and_tools/show.md
- GATT Dump: apps_and_tools/gatt_dump.md
- Pair: apps_and_tools/pair.md
- Unbond: apps_and_tools/unbond.md
- USB Probe: apps_and_tools/usb_probe.md
- Hardware: - Hardware:
- Overview: hardware/index.md - Overview: hardware/index.md
- Platforms: - Platforms:

View File

@@ -1,6 +1,6 @@
# This requirements file is for python3 # This requirements file is for python3
mkdocs == 1.4.0 mkdocs == 1.2.3
mkdocs-material == 8.5.6 mkdocs-material == 7.1.7
mkdocs-material-extensions == 1.0.3 mkdocs-material-extensions == 1.0.1
pymdown-extensions == 9.6 pymdown-extensions == 8.2
mkdocstrings-python == 0.7.1 mkdocstrings == 0.15.1

View File

@@ -1,50 +0,0 @@
USB PROBE TOOL
==============
This tool lists all the USB devices, with details about each device.
For each device, the different possible Bumble transport strings that can
refer to it are listed.
If the device is known to be a Bluetooth HCI device, its identifier is printed
in reverse colors, and the transport names in cyan color.
For other devices, regardless of their type, the transport names are printed
in red. Whether that device is actually a Bluetooth device or not depends on
whether it is a Bluetooth device that uses a non-standard Class, or some other
type of device (there's no way to tell).
## Usage
This command line tool may be invoked with no arguments, or with `--verbose`
for extra details.
When installed from PyPI, run as
```
$ bumble-usb-probe
```
or, for extra details, with the `--verbose` argument
```
$ bumble-usb-probe --v
```
When running from the source distribution:
```
$ python3 apps/usb-probe.py
```
or
```
$ python3 apps/usb-probe.py --verbose
```
!!! example
```
$ python3 apps/usb_probe.py
ID 0A12:0001
Bumble Transport Names: usb:0 or usb:0A12:0001
Bus/Device: 020/034
Class: Wireless Controller
Subclass/Protocol: 1/1 [Bluetooth]
Manufacturer: None
Product: USB2.0-BT
```

View File

@@ -1,86 +1,48 @@
:material-linux: LINUX PLATFORM :material-linux: LINUX PLATFORM
=============================== ===============================
Using Bumble With Physical Bluetooth Controllers In addition to all the standard functionality available from the project by running the python tools and/or writing your own apps by leveraging the API, it is also possible on Linux hosts to interface the Bumble stack with the native BlueZ stack, and with Bluetooth controllers.
------------------------------------------------
A Bumble application can interface with a local Bluetooth controller on a Linux host. Using Bumble With BlueZ
The 3 main types of physical Bluetooth controllers are: -----------------------
* Bluetooth USB Dongle A Bumble virtual controller can be attached to the BlueZ stack.
* HCI over UART (via a serial port) Attaching a controller to BlueZ can be done by either simulating a UART HCI interface, or by using the VHCI driver interface if available.
* Kernel-managed Bluetooth HCI (HCI Sockets) In both cases, the controller can run locally on the Linux host, or remotely on a different host, with a bridge between the remote controller and the local BlueZ host, which may be useful when the BlueZ stack is running on an embedded system, or a host on which running the Bumble controller is not convenient.
!!! tip "Conflicts with the kernel and BlueZ" ### Using VHCI
If your use a USB dongle that is recognized by your kernel as a supported Bluetooth device, it is
likely that the kernel driver will claim that USB device and attach it to the BlueZ stack.
If you want to claim ownership of it to use with Bumble, you will need to set the state of the corresponding HCI interface as `DOWN`.
HCI interfaces are numbered, starting from 0 (i.e `hci0`, `hci1`, ...).
For example, to bring `hci0` down: With the [VHCI transport](../transports/vhci.md) you can attach a Bumble virtual controller to the BlueZ stack. Once attached, the controller will appear just like any other controller, and thus can be used with the standard BlueZ tools.
!!! example "Attaching a virtual controller"
With the example app `run_controller.py`:
``` ```
$ sudo hciconfig hci0 down PYTHONPATH=. python3 examples/run_controller.py F6:F7:F8:F9:FA:FB examples/device1.json vhci
```
You should see a 'Virtual Bus' controller. For example:
```
$ hciconfig
hci0: Type: Primary Bus: Virtual
BD Address: F6:F7:F8:F9:FA:FB ACL MTU: 27:64 SCO MTU: 0:0
UP RUNNING
RX bytes:0 acl:0 sco:0 events:43 errors:0
TX bytes:274 acl:0 sco:0 commands:43 errors:0
``` ```
You can use the `hciconfig` command with no arguments to get a list of HCI interfaces seen by And scanning for devices should show the virtual 'Bumble' device that's running as part of the `run_controller.py` example app:
the kernel.
Also, if `bluetoothd` is running on your system, it will likely re-claim the interface after you
close it, so you may need to bring the interface back `UP` before using it again, or to disable
`bluetoothd` altogether (see the section further below about BlueZ and `bluetoothd`).
### Using a USB Dongle
See the [USB Transport page](../transports/usb.md) for general information on how to use HCI USB controllers.
!!! tip "USB Permissions"
By default, when running as a regular user, you won't have the permission to use
arbitrary USB devices.
You can change the permissions for a specific USB device based on its bus number and
device number (you can use `lsusb` to find the Bus and Device numbers for your Bluetooth
dongle).
Example:
``` ```
$ sudo chmod o+w /dev/bus/usb/001/004 pi@raspberrypi:~ $ sudo hcitool -i hci2 lescan
LE Scan ...
F0:F1:F2:F3:F4:F5 Bumble
``` ```
This will change the permissions for Device 4 on Bus 1.
Note that the USB Bus number and Device number may change depending on where you plug the USB
dongle and what other USB devices and hubs are also plugged in.
If you need to make the permission changes permanent across reboots, you can create a `udev`
rule for your specific Bluetooth dongle. Visit [this Arch Linux Wiki page](https://wiki.archlinux.org/title/udev) for a
good overview of how you may do that.
### Using HCI over UART
See the [Serial Transport page](../transports/serial.md) for general information on how to use HCI over a UART (serial port).
### Using HCI Sockets ### Using HCI Sockets
HCI sockets provide a way to send/receive HCI packets to/from a Bluetooth controller managed by the kernel. HCI sockets provide a way to send/receive HCI packets to/from a Bluetooth controller managed by the kernel.
See the [HCI Socket Transport page](../transports/hci_socket.md) for details on the `hci-socket` tansport syntax. The HCI device referenced by an `hci-socket` transport (`hciX`, where `X` is an integer, with `hci0` being the first controller device, and so on) must be in the `DOWN` state before it can be opened as a transport.
You can bring a HCI controller `UP` or `DOWN` with `hciconfig`.
The HCI device referenced by an `hci-socket` transport (`hci<X>`, where `<X>` is an integer, with `hci0` being the first controller device, and so on) must be in the `DOWN` state before it can be opened as a transport.
You can bring a HCI controller `UP` or `DOWN` with `hciconfig hci<X> up` and `hciconfig hci<X> up`.
!!! tip "HCI Socket Permissions"
By default, when running as a regular user, you won't have the permission to use
an HCI socket to a Bluetooth controller (you may see an exception like `PermissionError: [Errno 1] Operation not permitted`).
If you want to run without using `sudo`, you need to manage the capabilities by adding the appropriate entries in `/etc/security/capability.conf` to grant a user or group the `cap_net_admin` capability.
See [this manpage](https://manpages.ubuntu.com/manpages/bionic/man5/capability.conf.5.html) for details.
Alternatively, if you are just experimenting temporarily, the `capsh` command may be useful in order
to execute a single command with enhanced permissions, as in this example:
```
$ sudo capsh --caps="cap_net_admin+eip cap_setpcap,cap_setuid,cap_setgid+ep" --keep=1 --user=$USER --addamb=cap_net_admin -- -c "<path/to/executable> <executable-args>"
```
Where `<path/to/executable>` is the path to your `python3` executable or to one of the Bumble bundled command-line applications.
!!! tip "List all available controllers" !!! tip "List all available controllers"
The command The command
``` ```
@@ -110,16 +72,29 @@ You can bring a HCI controller `UP` or `DOWN` with `hciconfig hci<X> up` and `hc
``` ```
$ hciconfig hci0 down $ hciconfig hci0 down
``` ```
(or `hci<X>` with `<X>` being the index of the controller device you want to use), but a simpler solution is to just stop the `bluetoothd` daemon, with a command like: (or `hciX` with `X` being the index of the controller device you want to use), but a simpler solution is to just stop the `bluetoothd` daemon, with a command like:
``` ```
$ sudo systemctl stop bluetooth.service $ sudo systemctl stop bluetooth.service
``` ```
You can always re-start the daemon with You can always re-start the daemon with
``` ```
$ sudo systemctl start bluetooth.service $ sudo systemctl start bluetooth.service
```
Bumble on the Raspberry Pi ### Using a Simulated UART HCI
--------------------------
### Bridge to a Remote Controller
Using Bumble With Bluetooth Controllers
---------------------------------------
A Bumble application can interface with a local Bluetooth controller.
If your Bluetooth controller is a standard HCI USB controller, see the [USB Transport page](../transports/usb.md) for details on how to use HCI USB controllers.
If your Bluetooth controller is a standard HCI UART controller, see the [Serial Transport page](../transports/serial.md).
Alternatively, a Bumble Host object can communicate with one of the platform's controllers via an HCI Socket.
`<details to be filled in>`
### Raspberry Pi 4 :fontawesome-brands-raspberry-pi: ### Raspberry Pi 4 :fontawesome-brands-raspberry-pi:
@@ -127,10 +102,9 @@ You can use the Bluetooth controller either via the kernel, or directly to the d
#### Via The Kernel #### Via The Kernel
Use an HCI Socket transport (see section above) Use an HCI Socket transport
#### Directly #### Directly
In order to use the Bluetooth controller directly on a Raspberry Pi 4 board, you need to ensure that it isn't being used by the BlueZ stack (which it probably is by default). In order to use the Bluetooth controller directly on a Raspberry Pi 4 board, you need to ensure that it isn't being used by the BlueZ stack (which it probably is by default).
``` ```
@@ -162,47 +136,3 @@ should detach the controller from the stack, after which you can use the HCI UAR
python3 run_scanner.py serial:/dev/serial1,3000000 python3 run_scanner.py serial:/dev/serial1,3000000
``` ```
Using Bumble With BlueZ
-----------------------
In addition to all the standard functionality available from the project by running the python tools and/or writing your own apps by leveraging the API, it is also possible on Linux hosts to interface the Bumble stack with the native BlueZ stack, and with Bluetooth controllers.
A Bumble virtual controller can be attached to the BlueZ stack.
Attaching a controller to BlueZ can be done by either simulating a UART HCI interface, or by using the VHCI driver interface if available.
In both cases, the controller can run locally on the Linux host, or remotely on a different host, with a bridge between the remote controller and the local BlueZ host, which may be useful when the BlueZ stack is running on an embedded system, or a host on which running the Bumble controller is not convenient.
### Using VHCI
With the [VHCI transport](../transports/vhci.md) you can attach a Bumble virtual controller to the BlueZ stack. Once attached, the controller will appear just like any other controller, and thus can be used with the standard BlueZ tools.
!!! example "Attaching a virtual controller"
With the example app `run_controller.py`:
```
python3 examples/run_controller.py F6:F7:F8:F9:FA:FB examples/device1.json vhci
```
You should see a 'Virtual Bus' controller. For example:
```
$ hciconfig
hci0: Type: Primary Bus: Virtual
BD Address: F6:F7:F8:F9:FA:FB ACL MTU: 27:64 SCO MTU: 0:0
UP RUNNING
RX bytes:0 acl:0 sco:0 events:43 errors:0
TX bytes:274 acl:0 sco:0 commands:43 errors:0
```
And scanning for devices should show the virtual 'Bumble' device that's running as part of the `run_controller.py` example app:
```
pi@raspberrypi:~ $ sudo hcitool -i hci2 lescan
LE Scan ...
F0:F1:F2:F3:F4:F5 Bumble
```
```
### Using a Simulated UART HCI
### Bridge to a Remote Controller

View File

@@ -5,9 +5,8 @@ The Android emulator transport either connects, as a host, to a "Root Canal" vir
("host" mode), or attaches a virtual controller to the Android Bluetooth host stack ("controller" mode). ("host" mode), or attaches a virtual controller to the Android Bluetooth host stack ("controller" mode).
## Moniker ## Moniker
The moniker syntax for an Android Emulator transport is: `android-emulator:[mode=<host|controller>][<hostname>:<port>]`, where The moniker syntax for an Android Emulator transport is: `android-emulator:[mode=<host|controller>][mode=<host|controller>]`.
the `mode` parameter can specify running as a host or a controller, and `<hostname>:<port>` can specify a host name (or IP address) and TCP port number on which to reach the gRPC server for the emulator. Both the `mode=<host|controller>` and `mode=<host|controller>` parameters are optional (so the moniker `android-emulator` by itself is a valid moniker, which will create a transport in `host` mode, connected to `localhost` on the default gRPC port for the emulator)
Both the `mode=<host|controller>` and `<hostname>:<port>` parameters are optional (so the moniker `android-emulator` by itself is a valid moniker, which will create a transport in `host` mode, connected to `localhost` on the default gRPC port for the emulator).
!!! example Example !!! example Example
`android-emulator` `android-emulator`

View File

@@ -4,65 +4,16 @@ USB TRANSPORT
The USB transport interfaces with a local Bluetooth USB dongle. The USB transport interfaces with a local Bluetooth USB dongle.
## Moniker ## Moniker
The moniker for a USB transport is either: The moniker for a USB transport is either `usb:<index>` or `usb:<vendor>:<product>`
with `<index>` as the 0-based index to select amongst all the devices that appear to be supporting Bluetooth HCI (0 being the first one), or where `<vendor>` and `<product>` are a vendor ID and product ID in hexadecimal.
* `usb:<index>` !!! example
* `usb:<vendor>:<product>`
* `usb:<vendor>:<product>/<serial-number>`
* `usb:<vendor>:<product>#<index>`
with `<index>` as a 0-based index (0 being the first one) to select amongst all the matching devices when there are more than one.
In the `usb:<index>` form, matching devices are the ones supporting Bluetooth HCI, as declared by their Class, Subclass and Protocol.
In the `usb:<vendor>:<product>#<index>` form, matching devices are the ones with the specified `<vendor>` and `<product>` identification.
`<vendor>` and `<product>` are a vendor ID and product ID in hexadecimal.
In addition, if the moniker ends with the symbol "!", the device will be used in "forced" mode:
the first USB interface of the device will be used, regardless of the interface class/subclass.
This may be useful for some devices that use a custom class/subclass but may nonetheless work as-is.
!!! examples
`usb:04b4:f901` `usb:04b4:f901`
The USB dongle with `<vendor>` equal to `04b4` and `<product>` equal to `f901` Use the USB dongle with `vendor` equal to `04b4` and `product` equal to `f901`
`usb:0` `usb:0`
The first Bluetooth HCI dongle that's declared as such by Class/Subclass/Protocol Use the first Bluetooth dongle
`usb:04b4:f901/0016A45B05D8`
The USB dongle with `<vendor>` equal to `04b4`, `<product>` equal to `f901` and `<serial>` equal to `0016A45B05D8`
`usb:04b4:f901/#1`
The second USB dongle with `<vendor>` equal to `04b4` and `<product>` equal to `f901`
`usb:0B05:17CB!`
The BT USB dongle vendor=0B05 and product=17CB, in "forced" mode.
## Alternative ## Alternative
The library includes two different implementations of the USB transport, implemented using different python bindings for `libusb`. The library includes two different implementations of the USB transport, implemented using different python bindings for `libusb`.
Using the transport prefix `pyusb:` instead of `usb:` selects the implementation based on [PyUSB](https://pypi.org/project/pyusb/), using the synchronous API of `libusb`, whereas the default implementation is based on [libusb1](https://pypi.org/project/libusb1/), using the asynchronous API of `libusb`. In order to use the alternative PyUSB-based implementation, you need to ensure that you have installed that python module, as it isn't installed by default as a dependency of Bumble. Using the transport prefix `pyusb:` instead of `usb:` selects the implementation based on [PyUSB](https://pypi.org/project/pyusb/), using the synchronous API of `libusb`, whereas the default implementation is based on [libusb1](https://pypi.org/project/libusb1/), using the asynchronous API of `libusb`. In order to use the alternative PyUSB-based implementation, you need to ensure that you have installed that python module, as it isn't installed by default as a dependency of Bumble.
## Listing Available USB Devices
### With `usb_probe`
You can use the [`usb_probe`](../apps_and_tools/usb_probe.md) tool to list all the USB devices attached to your host computer.
The tool will also show the `usb:XXX` transport name(s) you can use to reference each device.
### With `lsusb`
On Linux and macOS, the `lsusb` tool serves a similar purpose to Bumble's own `usb_probe` tool (without the Bumble specifics)
#### Installing lsusb
On Mac: `brew install lsusb`
On Linux: `sudo apt-get install usbutils`
#### Using lsusb
```
$ lsusb
Bus 004 Device 001: ID 1d6b:0003 Linux Foundation 3.0 root hub
Bus 003 Device 014: ID 0b05:17cb ASUSTek Computer, Inc. Broadcom BCM20702A0 Bluetooth
```
The device id for the Bluetooth interface in this case is `0b05:17cb`.

View File

@@ -1,5 +0,0 @@
{
"name": "Bumble Aid Left",
"address": "F1:F2:F3:F4:F5:F6",
"keystore": "JsonKeyStore"
}

View File

@@ -1,5 +0,0 @@
{
"name": "Bumble Aid Right",
"address": "F7:F8:F9:FA:FB:FC",
"keystore": "JsonKeyStore"
}

View File

@@ -34,8 +34,8 @@ from bumble.gatt import (
Characteristic, Characteristic,
CharacteristicValue, CharacteristicValue,
GATT_DEVICE_INFORMATION_SERVICE, GATT_DEVICE_INFORMATION_SERVICE,
GATT_HUMAN_INTERFACE_DEVICE_SERVICE, GATT_DEVICE_HUMAN_INTERFACE_DEVICE_SERVICE,
GATT_BATTERY_SERVICE, GATT_DEVICE_BATTERY_SERVICE,
GATT_BATTERY_LEVEL_CHARACTERISTIC, GATT_BATTERY_LEVEL_CHARACTERISTIC,
GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC, GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC,
GATT_REPORT_CHARACTERISTIC, GATT_REPORT_CHARACTERISTIC,
@@ -126,8 +126,8 @@ async def keyboard_host(device, peer_address):
connection = await device.connect(peer_address) connection = await device.connect(peer_address)
await connection.pair() await connection.pair()
peer = Peer(connection) peer = Peer(connection)
await peer.discover_service(GATT_HUMAN_INTERFACE_DEVICE_SERVICE) await peer.discover_service(GATT_DEVICE_HUMAN_INTERFACE_DEVICE_SERVICE)
hid_services = peer.get_services_by_uuid(GATT_HUMAN_INTERFACE_DEVICE_SERVICE) hid_services = peer.get_services_by_uuid(GATT_DEVICE_HUMAN_INTERFACE_DEVICE_SERVICE)
if not hid_services: if not hid_services:
print(color('!!! No HID service', 'red')) print(color('!!! No HID service', 'red'))
return return
@@ -221,7 +221,7 @@ async def keyboard_device(device, command):
] ]
), ),
Service( Service(
GATT_HUMAN_INTERFACE_DEVICE_SERVICE, GATT_DEVICE_HUMAN_INTERFACE_DEVICE_SERVICE,
[ [
Characteristic( Characteristic(
GATT_PROTOCOL_MODE_CHARACTERISTIC, GATT_PROTOCOL_MODE_CHARACTERISTIC,
@@ -252,7 +252,7 @@ async def keyboard_device(device, command):
] ]
), ),
Service( Service(
GATT_BATTERY_SERVICE, GATT_DEVICE_BATTERY_SERVICE,
[ [
Characteristic( Characteristic(
GATT_BATTERY_LEVEL_CHARACTERISTIC, GATT_BATTERY_LEVEL_CHARACTERISTIC,
@@ -273,7 +273,7 @@ async def keyboard_device(device, command):
AdvertisingData([ AdvertisingData([
(AdvertisingData.COMPLETE_LOCAL_NAME, bytes('Bumble Keyboard', 'utf-8')), (AdvertisingData.COMPLETE_LOCAL_NAME, bytes('Bumble Keyboard', 'utf-8')),
(AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS, (AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS,
bytes(GATT_HUMAN_INTERFACE_DEVICE_SERVICE)), bytes(GATT_DEVICE_HUMAN_INTERFACE_DEVICE_SERVICE)),
(AdvertisingData.APPEARANCE, struct.pack('<H', 0x03C1)), (AdvertisingData.APPEARANCE, struct.pack('<H', 0x03C1)),
(AdvertisingData.FLAGS, bytes([0x05])) (AdvertisingData.FLAGS, bytes([0x05]))
]) ])

View File

@@ -29,31 +29,18 @@ from bumble.transport import open_transport_or_link
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
async def main(): async def main():
if len(sys.argv) < 3: if len(sys.argv) != 3:
print('Usage: run_advertiser.py <config-file> <transport-spec> [type] [address]') print('Usage: run_advertiser.py <config-file> <transport-spec>')
print('example: run_advertiser.py device1.json usb:0') print('example: run_advertiser.py device1.json link-relay:ws://localhost:8888/test')
return return
if len(sys.argv) >= 4:
advertising_type = AdvertisingType(int(sys.argv[3]))
else:
advertising_type = AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE
if advertising_type.is_directed:
if len(sys.argv) < 5:
print('<address> required for directed advertising')
return
target = Address(sys.argv[4])
else:
target = None
print('<<< connecting to HCI...') print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
print('<<< connected') print('<<< connected')
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
await device.power_on() await device.power_on()
await device.start_advertising(advertising_type=advertising_type, target=target) await device.start_advertising()
await hci_source.wait_for_termination() await hci_source.wait_for_termination()
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------

View File

@@ -1,161 +0,0 @@
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import struct
import sys
import os
import logging
from bumble.core import AdvertisingData
from bumble.device import Device
from bumble.transport import open_transport_or_link
from bumble.hci import UUID
from bumble.gatt import (
Service,
Characteristic,
CharacteristicValue
)
# -----------------------------------------------------------------------------
# Constants
# -----------------------------------------------------------------------------
ASHA_SERVICE = UUID.from_16_bits(0xFDF0, 'Audio Streaming for Hearing Aid')
ASHA_READ_ONLY_PROPERTIES_CHARACTERISTIC = UUID('6333651e-c481-4a3e-9169-7c902aad37bb', 'ReadOnlyProperties')
ASHA_AUDIO_CONTROL_POINT_CHARACTERISTIC = UUID('f0d4de7e-4a88-476c-9d9f-1937b0996cc0', 'AudioControlPoint')
ASHA_AUDIO_STATUS_CHARACTERISTIC = UUID('38663f1a-e711-4cac-b641-326b56404837', 'AudioStatus')
ASHA_VOLUME_CHARACTERISTIC = UUID('00e4ca9e-ab14-41e4-8823-f9e70c7e91df', 'Volume')
ASHA_LE_PSM_OUT_CHARACTERISTIC = UUID('2d410339-82b6-42aa-b34e-e2e01df8cc1a', 'LE_PSM_OUT')
# -----------------------------------------------------------------------------
async def main():
if len(sys.argv) != 4:
print('Usage: python run_asha_sink.py <device-config> <transport-spec> <audio-file>')
print('example: python run_asha_sink.py device1.json usb:0 audio_out.g722')
return
audio_out = open(sys.argv[3], 'wb')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
# Handler for audio control commands
def on_audio_control_point_write(connection, value):
print('--- AUDIO CONTROL POINT Write:', value.hex())
opcode = value[0]
if opcode == 1:
# Start
audio_type = ('Unknown', 'Ringtone', 'Phone Call', 'Media')[value[2]]
print(f'### START: codec={value[1]}, audio_type={audio_type}, volume={value[3]}, otherstate={value[4]}')
elif opcode == 2:
print('### STOP')
elif opcode == 3:
print(f'### STATUS: connected={value[1]}')
# Respond with a status
asyncio.create_task(device.notify_subscribers(audio_status_characteristic, force=True))
# Handler for volume control
def on_volume_write(connection, value):
print('--- VOLUME Write:', value[0])
# Register an L2CAP CoC server
def on_coc(channel):
def on_data(data):
print('<<< Voice data received:', data.hex())
audio_out.write(data)
channel.sink = on_data
psm = device.register_l2cap_channel_server(0, on_coc, 8)
print(f'### LE_PSM_OUT = {psm}')
# Add the ASHA service to the GATT server
read_only_properties_characteristic = Characteristic(
ASHA_READ_ONLY_PROPERTIES_CHARACTERISTIC,
Characteristic.READ,
Characteristic.READABLE,
bytes([
0x01, # Version
0x00, # Device Capabilities [Left, Monaural]
0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, # HiSyncId
0x01, # Feature Map [LE CoC audio output streaming supported]
0x00, 0x00, # Render Delay
0x00, 0x00, # RFU
0x02, 0x00 # Codec IDs [G.722 at 16 kHz]
])
)
audio_control_point_characteristic = Characteristic(
ASHA_AUDIO_CONTROL_POINT_CHARACTERISTIC,
Characteristic.WRITE | Characteristic.WRITE_WITHOUT_RESPONSE,
Characteristic.WRITEABLE,
CharacteristicValue(write=on_audio_control_point_write)
)
audio_status_characteristic = Characteristic(
ASHA_AUDIO_STATUS_CHARACTERISTIC,
Characteristic.READ | Characteristic.NOTIFY,
Characteristic.READABLE,
bytes([0])
)
volume_characteristic = Characteristic(
ASHA_VOLUME_CHARACTERISTIC,
Characteristic.WRITE_WITHOUT_RESPONSE,
Characteristic.WRITEABLE,
CharacteristicValue(write=on_volume_write)
)
le_psm_out_characteristic = Characteristic(
ASHA_LE_PSM_OUT_CHARACTERISTIC,
Characteristic.READ,
Characteristic.READABLE,
struct.pack('<H', psm)
)
device.add_service(Service(
ASHA_SERVICE,
[
read_only_properties_characteristic,
audio_control_point_characteristic,
audio_status_characteristic,
volume_characteristic,
le_psm_out_characteristic
]
))
# Set the advertising data
device.advertising_data = bytes(
AdvertisingData([
(AdvertisingData.COMPLETE_LOCAL_NAME, bytes(device.name, 'utf-8')),
(AdvertisingData.FLAGS, bytes([0x06])),
(AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS, bytes(ASHA_SERVICE)),
(AdvertisingData.SERVICE_DATA_16_BIT_UUID, bytes(ASHA_SERVICE) + bytes([
0x01, # Protocol Version
0x00, # Capability
0x01, 0x02, 0x03, 0x04 # Truncated HiSyncID
]))
])
)
# Go!
await device.power_on()
await device.start_advertising(auto_restart=True)
await hci_source.wait_for_termination()
# -----------------------------------------------------------------------------
logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
asyncio.run(main())

View File

@@ -30,7 +30,7 @@ from bumble.sdp import Client as SDP_Client, SDP_PUBLIC_BROWSE_ROOT, SDP_ALL_ATT
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
async def main(): async def main():
if len(sys.argv) < 3: if len(sys.argv) < 3:
print('Usage: run_classic_connect.py <device-config> <transport-spec> <bluetooth-addresses..>') print('Usage: run_classic_connect.py <device-config> <transport-spec> <bluetooth-address>')
print('example: run_classic_connect.py classic1.json usb:04b4:f901 E1:CA:72:48:C4:E8') print('example: run_classic_connect.py classic1.json usb:04b4:f901 E1:CA:72:48:C4:E8')
return return
@@ -43,7 +43,8 @@ async def main():
device.classic_enabled = True device.classic_enabled = True
await device.power_on() await device.power_on()
async def connect(target_address): # Connect to a peer
target_address = sys.argv[3]
print(f'=== Connecting to {target_address}...') print(f'=== Connecting to {target_address}...')
connection = await device.connect(target_address, transport=BT_BR_EDR_TRANSPORT) connection = await device.connect(target_address, transport=BT_BR_EDR_TRANSPORT)
print(f'=== Connected to {connection.peer_address}!') print(f'=== Connected to {connection.peer_address}!')
@@ -75,10 +76,6 @@ async def main():
await sdp_client.disconnect() await sdp_client.disconnect()
await hci_source.wait_for_termination() await hci_source.wait_for_termination()
# Connect to a peer
target_addresses = sys.argv[3:]
await asyncio.wait([asyncio.create_task(connect(target_address)) for target_address in target_addresses])
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper()) logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
asyncio.run(main()) asyncio.run(main())

View File

@@ -29,15 +29,15 @@ from bumble.transport import open_transport_or_link
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class ScannerListener(Device.Listener): class ScannerListener(Device.Listener):
def on_advertisement(self, advertisement): def on_advertisement(self, address, ad_data, rssi, connectable):
address_type_string = ('P', 'R', 'PI', 'RI')[advertisement.address.address_type] address_type_string = ('P', 'R', 'PI', 'RI')[address.address_type]
address_color = 'yellow' if advertisement.is_connectable else 'red' address_color = 'yellow' if connectable else 'red'
if address_type_string.startswith('P'): if address_type_string.startswith('P'):
type_color = 'green' type_color = 'green'
else: else:
type_color = 'cyan' type_color = 'cyan'
print(f'>>> {color(advertisement.address, address_color)} [{color(address_type_string, type_color)}]: RSSI={advertisement.rssi}, {advertisement.data}') print(f'>>> {color(address, address_color)} [{color(address_type_string, type_color)}]: RSSI={rssi}, {ad_data}')
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------

View File

@@ -49,17 +49,11 @@ class Listener(Device.Listener, Connection.Listener):
) )
# -----------------------------------------------------------------------------
# Alternative way to listen for subscriptions
# -----------------------------------------------------------------------------
def on_my_characteristic_subscription(peer, enabled):
print(f'### My characteristic from {peer}: {"enabled" if enabled else "disabled"}')
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
async def main(): async def main():
if len(sys.argv) < 3: if len(sys.argv) < 3:
print('Usage: run_notifier.py <device-config> <transport-spec>') print('Usage: run_gatt_server.py <device-config> <transport-spec>')
print('example: run_notifier.py device1.json usb:0') print('example: run_gatt_server.py device1.json usb:0')
return return
print('<<< connecting to HCI...') print('<<< connecting to HCI...')
@@ -89,7 +83,6 @@ async def main():
Characteristic.READABLE, Characteristic.READABLE,
bytes([0x42]) bytes([0x42])
) )
characteristic3.on('subscription', on_my_characteristic_subscription)
custom_service = Service( custom_service = Service(
'50DB505C-8AC4-4738-8448-3B1D9CC09CC5', '50DB505C-8AC4-4738-8448-3B1D9CC09CC5',
[characteristic1, characteristic2, characteristic3] [characteristic1, characteristic2, characteristic3]

View File

@@ -98,7 +98,6 @@ async def list_rfcomm_channels(device, connection):
await sdp_client.disconnect() await sdp_client.disconnect()
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class TcpServerProtocol(asyncio.Protocol): class TcpServerProtocol(asyncio.Protocol):
def __init__(self, rfcomm_session): def __init__(self, rfcomm_session):
@@ -174,7 +173,7 @@ async def main():
print('*** Encryption on') print('*** Encryption on')
# Create a client and start it # Create a client and start it
print('@@@ Starting RFCOMM client...') print('@@@ Starting to RFCOMM client...')
rfcomm_client = Client(device, connection) rfcomm_client = Client(device, connection)
rfcomm_mux = await rfcomm_client.start() rfcomm_mux = await rfcomm_client.start()
print('@@@ Started') print('@@@ Started')
@@ -193,7 +192,7 @@ async def main():
if len(sys.argv) == 6: if len(sys.argv) == 6:
# A TCP port was specified, start listening # A TCP port was specified, start listening
tcp_port = int(sys.argv[5]) tcp_port = int(sys.argv[5])
asyncio.create_task(tcp_server(tcp_port, session)) asyncio.get_running_loop().create_task(tcp_server(tcp_port, session))
await hci_source.wait_for_termination() await hci_source.wait_for_termination()

View File

@@ -40,24 +40,24 @@ async def main():
device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink) device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink)
@device.on('advertisement') @device.on('advertisement')
def _(advertisement): def _(address, ad_data, rssi, connectable):
address_type_string = ('PUBLIC', 'RANDOM', 'PUBLIC_ID', 'RANDOM_ID')[advertisement.address.address_type] address_type_string = ('PUBLIC', 'RANDOM', 'PUBLIC_ID', 'RANDOM_ID')[address.address_type]
address_color = 'yellow' if advertisement.is_connectable else 'red' address_color = 'yellow' if connectable else 'red'
address_qualifier = '' address_qualifier = ''
if address_type_string.startswith('P'): if address_type_string.startswith('P'):
type_color = 'cyan' type_color = 'cyan'
else: else:
if advertisement.address.is_static: if address.is_static:
type_color = 'green' type_color = 'green'
address_qualifier = '(static)' address_qualifier = '(static)'
elif advertisement.address.is_resolvable: elif address.is_resolvable:
type_color = 'magenta' type_color = 'magenta'
address_qualifier = '(resolvable)' address_qualifier = '(resolvable)'
else: else:
type_color = 'white' type_color = 'white'
separator = '\n ' separator = '\n '
print(f'>>> {color(advertisement.address, address_color)} [{color(address_type_string, type_color)}]{address_qualifier}:{separator}RSSI:{advertisement.rssi}{separator}{advertisement.data.to_string(separator)}') print(f'>>> {color(address, address_color)} [{color(address_type_string, type_color)}]{address_qualifier}:{separator}RSSI:{rssi}{separator}{ad_data.to_string(separator)}')
await device.power_on() await device.power_on()
await device.start_scanning(filter_duplicates=filter_duplicates) await device.start_scanning(filter_duplicates=filter_duplicates)

View File

@@ -48,28 +48,24 @@ install_requires =
[options.entry_points] [options.entry_points]
console_scripts = console_scripts =
bumble-console = bumble.apps.console:main bumble-console = bumble.apps.console:main
bumble-controller-info = bumble.apps.controller_info:main
bumble-gatt-dump = bumble.apps.gatt_dump:main bumble-gatt-dump = bumble.apps.gatt_dump:main
bumble-hci-bridge = bumble.apps.hci_bridge:main bumble-hci-bridge = bumble.apps.hci_bridge:main
bumble-l2cap-bridge = bumble.apps.l2cap_bridge:main
bumble-pair = bumble.apps.pair:main bumble-pair = bumble.apps.pair:main
bumble-scan = bumble.apps.scan:main bumble-scan = bumble.apps.scan:main
bumble-show = bumble.apps.show:main bumble-show = bumble.apps.show:main
bumble-unbond = bumble.apps.unbond:main bumble-unbond = bumble.apps.unbond:main
bumble-usb-probe = bumble.apps.usb_probe:main
bumble-link-relay = bumble.apps.link_relay.link_relay:main bumble-link-relay = bumble.apps.link_relay.link_relay:main
[options.extras_require] [options.extras_require]
build = build =
build >= 0.7 build >= 0.7
test = test =
pytest >= 6.2 pytest >= 6.2
pytest-asyncio >= 0.17 pytest-asyncio >= 0.17
coverage >= 6.4
development = development =
invoke >= 1.4 invoke >= 1.4
nox >= 2022 nox >= 2022
documentation = documentation =
mkdocs >= 1.4.0 mkdocs >= 1.2.3
mkdocs-material >= 8.5.6 mkdocs-material >= 8.1.9
mkdocstrings[python] >= 0.19.0 mkdocstrings[python] >= 0.19.0

View File

@@ -24,19 +24,19 @@ def test_ad_data():
ad = AdvertisingData.from_bytes(data) ad = AdvertisingData.from_bytes(data)
ad_bytes = bytes(ad) ad_bytes = bytes(ad)
assert(data == ad_bytes) assert(data == ad_bytes)
assert(ad.get(AdvertisingData.COMPLETE_LOCAL_NAME, raw=True) is None) assert(ad.get(AdvertisingData.COMPLETE_LOCAL_NAME) is None)
assert(ad.get(AdvertisingData.TX_POWER_LEVEL, raw=True) == bytes([123])) assert(ad.get(AdvertisingData.TX_POWER_LEVEL) == bytes([123]))
assert(ad.get(AdvertisingData.COMPLETE_LOCAL_NAME, return_all=True, raw=True) == []) assert(ad.get(AdvertisingData.COMPLETE_LOCAL_NAME, return_all=True) == [])
assert(ad.get(AdvertisingData.TX_POWER_LEVEL, return_all=True, raw=True) == [bytes([123])]) assert(ad.get(AdvertisingData.TX_POWER_LEVEL, return_all=True) == [bytes([123])])
data2 = bytes([2, AdvertisingData.TX_POWER_LEVEL, 234]) data2 = bytes([2, AdvertisingData.TX_POWER_LEVEL, 234])
ad.append(data2) ad.append(data2)
ad_bytes = bytes(ad) ad_bytes = bytes(ad)
assert(ad_bytes == data + data2) assert(ad_bytes == data + data2)
assert(ad.get(AdvertisingData.COMPLETE_LOCAL_NAME, raw=True) is None) assert(ad.get(AdvertisingData.COMPLETE_LOCAL_NAME) is None)
assert(ad.get(AdvertisingData.TX_POWER_LEVEL, raw=True) == bytes([123])) assert(ad.get(AdvertisingData.TX_POWER_LEVEL) == bytes([123]))
assert(ad.get(AdvertisingData.COMPLETE_LOCAL_NAME, return_all=True, raw=True) == []) assert(ad.get(AdvertisingData.COMPLETE_LOCAL_NAME, return_all=True) == [])
assert(ad.get(AdvertisingData.TX_POWER_LEVEL, return_all=True, raw=True) == [bytes([123]), bytes([234])]) assert(ad.get(AdvertisingData.TX_POWER_LEVEL, return_all=True) == [bytes([123]), bytes([234])])
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------

View File

@@ -1,188 +0,0 @@
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import logging
import os
from types import LambdaType
import pytest
from bumble.core import BT_BR_EDR_TRANSPORT
from bumble.device import Connection, Device
from bumble.host import Host
from bumble.hci import (
HCI_ACCEPT_CONNECTION_REQUEST_COMMAND, HCI_COMMAND_STATUS_PENDING, HCI_CREATE_CONNECTION_COMMAND, HCI_SUCCESS,
Address, HCI_Command_Complete_Event, HCI_Command_Status_Event, HCI_Connection_Complete_Event, HCI_Connection_Request_Event, HCI_Packet
)
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
class Sink:
def __init__(self, flow):
self.flow = flow
next(self.flow)
def on_packet(self, packet):
self.flow.send(packet)
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_device_connect_parallel():
d0 = Device(host=Host(None, None))
d1 = Device(host=Host(None, None))
d2 = Device(host=Host(None, None))
# enable classic
d0.classic_enabled = True
d1.classic_enabled = True
d2.classic_enabled = True
# set public addresses
d0.public_address = Address('F0:F1:F2:F3:F4:F5', address_type=Address.PUBLIC_DEVICE_ADDRESS)
d1.public_address = Address('F5:F4:F3:F2:F1:F0', address_type=Address.PUBLIC_DEVICE_ADDRESS)
d2.public_address = Address('F5:F4:F3:F3:F4:F5', address_type=Address.PUBLIC_DEVICE_ADDRESS)
def d0_flow():
packet = HCI_Packet.from_bytes((yield))
assert packet.name == 'HCI_CREATE_CONNECTION_COMMAND'
assert packet.bd_addr == d1.public_address
d0.host.on_hci_packet(HCI_Command_Status_Event(
status = HCI_COMMAND_STATUS_PENDING,
num_hci_command_packets = 1,
command_opcode = HCI_CREATE_CONNECTION_COMMAND
))
d1.host.on_hci_packet(HCI_Connection_Request_Event(
bd_addr = d0.public_address,
class_of_device = 0,
link_type = HCI_Connection_Complete_Event.ACL_LINK_TYPE
))
packet = HCI_Packet.from_bytes((yield))
assert packet.name == 'HCI_CREATE_CONNECTION_COMMAND'
assert packet.bd_addr == d2.public_address
d0.host.on_hci_packet(HCI_Command_Status_Event(
status = HCI_COMMAND_STATUS_PENDING,
num_hci_command_packets = 1,
command_opcode = HCI_CREATE_CONNECTION_COMMAND
))
d2.host.on_hci_packet(HCI_Connection_Request_Event(
bd_addr = d0.public_address,
class_of_device = 0,
link_type = HCI_Connection_Complete_Event.ACL_LINK_TYPE
))
assert (yield) == None
def d1_flow():
packet = HCI_Packet.from_bytes((yield))
assert packet.name == 'HCI_ACCEPT_CONNECTION_REQUEST_COMMAND'
d1.host.on_hci_packet(HCI_Command_Complete_Event(
num_hci_command_packets = 1,
command_opcode = HCI_ACCEPT_CONNECTION_REQUEST_COMMAND,
return_parameters = b"\x00"
))
d1.host.on_hci_packet(HCI_Connection_Complete_Event(
status = HCI_SUCCESS,
connection_handle = 0x100,
bd_addr = d0.public_address,
link_type = HCI_Connection_Complete_Event.ACL_LINK_TYPE,
encryption_enabled = True,
))
d0.host.on_hci_packet(HCI_Connection_Complete_Event(
status = HCI_SUCCESS,
connection_handle = 0x100,
bd_addr = d1.public_address,
link_type = HCI_Connection_Complete_Event.ACL_LINK_TYPE,
encryption_enabled = True,
))
assert (yield) == None
def d2_flow():
packet = HCI_Packet.from_bytes((yield))
assert packet.name == 'HCI_ACCEPT_CONNECTION_REQUEST_COMMAND'
d2.host.on_hci_packet(HCI_Command_Complete_Event(
num_hci_command_packets = 1,
command_opcode = HCI_ACCEPT_CONNECTION_REQUEST_COMMAND,
return_parameters = b"\x00"
))
d2.host.on_hci_packet(HCI_Connection_Complete_Event(
status = HCI_SUCCESS,
connection_handle = 0x101,
bd_addr = d0.public_address,
link_type = HCI_Connection_Complete_Event.ACL_LINK_TYPE,
encryption_enabled = True,
))
d0.host.on_hci_packet(HCI_Connection_Complete_Event(
status = HCI_SUCCESS,
connection_handle = 0x101,
bd_addr = d2.public_address,
link_type = HCI_Connection_Complete_Event.ACL_LINK_TYPE,
encryption_enabled = True,
))
assert (yield) == None
d0.host.set_packet_sink(Sink(d0_flow()))
d1.host.set_packet_sink(Sink(d1_flow()))
d2.host.set_packet_sink(Sink(d2_flow()))
[c01, c02, a10, a20, a01] = await asyncio.gather(*[
asyncio.create_task(d0.connect(d1.public_address, transport=BT_BR_EDR_TRANSPORT)),
asyncio.create_task(d0.connect(d2.public_address, transport=BT_BR_EDR_TRANSPORT)),
asyncio.create_task(d1.accept(peer_address=d0.public_address)),
asyncio.create_task(d2.accept()),
asyncio.create_task(d0.accept(peer_address=d1.public_address)),
])
assert type(c01) == Connection
assert type(c02) == Connection
assert type(a10) == Connection
assert type(a20) == Connection
assert type(a01) == Connection
assert c01.handle == a10.handle and c01.handle == 0x100
assert c02.handle == a20.handle and c02.handle == 0x101
assert a01 == c01
# -----------------------------------------------------------------------------
async def run_test_device():
await test_device_connect_parallel()
# -----------------------------------------------------------------------------
if __name__ == '__main__':
logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
asyncio.run(run_test_device())

View File

@@ -22,7 +22,6 @@ import struct
import pytest import pytest
from bumble.controller import Controller from bumble.controller import Controller
from bumble.gatt_client import CharacteristicProxy
from bumble.link import LocalLink from bumble.link import LocalLink
from bumble.device import Device, Peer from bumble.device import Device, Peer
from bumble.host import Host from bumble.host import Host
@@ -54,29 +53,29 @@ def basic_check(x):
parsed = ATT_PDU.from_bytes(pdu) parsed = ATT_PDU.from_bytes(pdu)
x_str = str(x) x_str = str(x)
parsed_str = str(parsed) parsed_str = str(parsed)
assert x_str == parsed_str assert(x_str == parsed_str)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_UUID(): def test_UUID():
u = UUID.from_16_bits(0x7788) u = UUID.from_16_bits(0x7788)
assert str(u) == 'UUID-16:7788' assert(str(u) == 'UUID-16:7788')
u = UUID.from_32_bits(0x11223344) u = UUID.from_32_bits(0x11223344)
assert str(u) == 'UUID-32:11223344' assert(str(u) == 'UUID-32:11223344')
u = UUID('61A3512C-09BE-4DDC-A6A6-0B03667AAFC6') u = UUID('61A3512C-09BE-4DDC-A6A6-0B03667AAFC6')
assert str(u) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6' assert(str(u) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6')
v = UUID(str(u)) v = UUID(str(u))
assert str(v) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6' assert(str(v) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6')
w = UUID.from_bytes(v.to_bytes()) w = UUID.from_bytes(v.to_bytes())
assert str(w) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6' assert(str(w) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6')
u1 = UUID.from_16_bits(0x1234) u1 = UUID.from_16_bits(0x1234)
b1 = u1.to_bytes(force_128 = True) b1 = u1.to_bytes(force_128 = True)
u2 = UUID.from_bytes(b1) u2 = UUID.from_bytes(b1)
assert u1 == u2 assert(u1 == u2)
u3 = UUID.from_16_bits(0x180a) u3 = UUID.from_16_bits(0x180a)
assert str(u3) == 'UUID-16:180A (Device Information)' assert(str(u3) == 'UUID-16:180A (Device Information)')
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -99,133 +98,6 @@ def test_ATT_Read_By_Group_Type_Request():
basic_check(pdu) basic_check(pdu)
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_characteristic_encoding():
class Foo(Characteristic):
def encode_value(self, value):
return bytes([value])
def decode_value(self, value_bytes):
return value_bytes[0]
c = Foo(GATT_BATTERY_LEVEL_CHARACTERISTIC, Characteristic.READ, Characteristic.READABLE, 123)
x = c.read_value(None)
assert x == bytes([123])
c.write_value(None, bytes([122]))
assert c.value == 122
class FooProxy(CharacteristicProxy):
def __init__(self, characteristic):
super().__init__(
characteristic.client,
characteristic.handle,
characteristic.end_group_handle,
characteristic.uuid,
characteristic.properties
)
def encode_value(self, value):
return bytes([value])
def decode_value(self, value_bytes):
return value_bytes[0]
[client, server] = LinkedDevices().devices[:2]
characteristic = Characteristic(
'FDB159DB-036C-49E3-B3DB-6325AC750806',
Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY,
Characteristic.READABLE | Characteristic.WRITEABLE,
bytes([123])
)
service = Service(
'3A657F47-D34F-46B3-B1EC-698E29B6B829',
[characteristic]
)
server.add_service(service)
await client.power_on()
await server.power_on()
connection = await client.connect(server.random_address)
peer = Peer(connection)
await peer.discover_services()
await peer.discover_characteristics()
c = peer.get_characteristics_by_uuid(characteristic.uuid)
assert len(c) == 1
c = c[0]
cp = FooProxy(c)
v = await cp.read_value()
assert v == 123
await cp.write_value(124)
await async_barrier()
assert characteristic.value == bytes([124])
v = await cp.read_value()
assert v == 124
await cp.write_value(125, with_response=True)
await async_barrier()
assert characteristic.value == bytes([125])
cd = DelegatedCharacteristicAdapter(c, encode=lambda x: bytes([x // 2]))
await cd.write_value(100, with_response=True)
await async_barrier()
assert characteristic.value == bytes([50])
last_change = None
def on_change(value):
nonlocal last_change
last_change = value
await c.subscribe(on_change)
await server.notify_subscribers(characteristic)
await async_barrier()
assert last_change == characteristic.value
last_change = None
await server.notify_subscribers(characteristic, value=bytes([125]))
await async_barrier()
assert last_change == bytes([125])
last_change = None
await c.unsubscribe(on_change)
await server.notify_subscribers(characteristic)
await async_barrier()
assert last_change is None
await cp.subscribe(on_change)
await server.notify_subscribers(characteristic)
await async_barrier()
assert last_change == characteristic.value[0]
last_change = None
await server.notify_subscribers(characteristic, value=bytes([126]))
await async_barrier()
assert last_change == 126
last_change = None
await cp.unsubscribe(on_change)
await server.notify_subscribers(characteristic)
await async_barrier()
assert last_change is None
cd = DelegatedCharacteristicAdapter(c, decode=lambda x: x[0])
await cd.subscribe(on_change)
await server.notify_subscribers(characteristic)
await async_barrier()
assert last_change == characteristic.value[0]
last_change = None
await cd.unsubscribe(on_change)
await server.notify_subscribers(characteristic)
await async_barrier()
assert last_change is None
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_CharacteristicAdapter(): def test_CharacteristicAdapter():
# Check that the CharacteristicAdapter base class is transparent # Check that the CharacteristicAdapter base class is transparent
@@ -234,21 +106,21 @@ def test_CharacteristicAdapter():
a = CharacteristicAdapter(c) a = CharacteristicAdapter(c)
value = a.read_value(None) value = a.read_value(None)
assert value == v assert(value == v)
v = bytes([3, 4, 5]) v = bytes([3, 4, 5])
a.write_value(None, v) a.write_value(None, v)
assert c.value == v assert(c.value == v)
# Simple delegated adapter # Simple delegated adapter
a = DelegatedCharacteristicAdapter(c, lambda x: bytes(reversed(x)), lambda x: bytes(reversed(x))) a = DelegatedCharacteristicAdapter(c, lambda x: bytes(reversed(x)), lambda x: bytes(reversed(x)))
value = a.read_value(None) value = a.read_value(None)
assert value == bytes(reversed(v)) assert(value == bytes(reversed(v)))
v = bytes([3, 4, 5]) v = bytes([3, 4, 5])
a.write_value(None, v) a.write_value(None, v)
assert a.value == bytes(reversed(v)) assert(a.value == bytes(reversed(v)))
# Packed adapter with single element format # Packed adapter with single element format
v = 1234 v = 1234
@@ -257,10 +129,10 @@ def test_CharacteristicAdapter():
a = PackedCharacteristicAdapter(c, '>H') a = PackedCharacteristicAdapter(c, '>H')
value = a.read_value(None) value = a.read_value(None)
assert value == pv assert(value == pv)
c.value = None c.value = None
a.write_value(None, pv) a.write_value(None, pv)
assert a.value == v assert(a.value == v)
# Packed adapter with multi-element format # Packed adapter with multi-element format
v1 = 1234 v1 = 1234
@@ -270,10 +142,10 @@ def test_CharacteristicAdapter():
a = PackedCharacteristicAdapter(c, '>HH') a = PackedCharacteristicAdapter(c, '>HH')
value = a.read_value(None) value = a.read_value(None)
assert value == pv assert(value == pv)
c.value = None c.value = None
a.write_value(None, pv) a.write_value(None, pv)
assert a.value == (v1, v2) assert(a.value == (v1, v2))
# Mapped adapter # Mapped adapter
v1 = 1234 v1 = 1234
@@ -284,10 +156,10 @@ def test_CharacteristicAdapter():
a = MappedCharacteristicAdapter(c, '>HH', ('v1', 'v2')) a = MappedCharacteristicAdapter(c, '>HH', ('v1', 'v2'))
value = a.read_value(None) value = a.read_value(None)
assert value == pv assert(value == pv)
c.value = None c.value = None
a.write_value(None, pv) a.write_value(None, pv)
assert a.value == mapped assert(a.value == mapped)
# UTF-8 adapter # UTF-8 adapter
v = 'Hello π' v = 'Hello π'
@@ -296,10 +168,10 @@ def test_CharacteristicAdapter():
a = UTF8CharacteristicAdapter(c) a = UTF8CharacteristicAdapter(c)
value = a.read_value(None) value = a.read_value(None)
assert value == ev assert(value == ev)
c.value = None c.value = None
a.write_value(None, ev) a.write_value(None, ev)
assert a.value == v assert(a.value == v)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -307,25 +179,24 @@ def test_CharacteristicValue():
b = bytes([1, 2, 3]) b = bytes([1, 2, 3])
c = CharacteristicValue(read=lambda _: b) c = CharacteristicValue(read=lambda _: b)
x = c.read(None) x = c.read(None)
assert x == b assert(x == b)
result = [] result = []
c = CharacteristicValue(write=lambda connection, value: result.append((connection, value))) c = CharacteristicValue(write=lambda connection, value: result.append((connection, value)))
z = object() z = object()
c.write(z, b) c.write(z, b)
assert result == [(z, b)] assert(result == [(z, b)])
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class LinkedDevices: class TwoDevices:
def __init__(self): def __init__(self):
self.connections = [None, None, None] self.connections = [None, None]
self.link = LocalLink() self.link = LocalLink()
self.controllers = [ self.controllers = [
Controller('C1', link = self.link), Controller('C1', link = self.link),
Controller('C2', link = self.link), Controller('C2', link = self.link)
Controller('C3', link = self.link)
] ]
self.devices = [ self.devices = [
Device( Device(
@@ -333,16 +204,12 @@ class LinkedDevices:
host = Host(self.controllers[0], AsyncPipeSink(self.controllers[0])) host = Host(self.controllers[0], AsyncPipeSink(self.controllers[0]))
), ),
Device( Device(
address = 'F1:F2:F3:F4:F5:F6', address = 'F5:F4:F3:F2:F1:F0',
host = Host(self.controllers[1], AsyncPipeSink(self.controllers[1])) host = Host(self.controllers[1], AsyncPipeSink(self.controllers[1]))
),
Device(
address = 'F2:F3:F4:F5:F6:F7',
host = Host(self.controllers[2], AsyncPipeSink(self.controllers[2]))
) )
] ]
self.paired = [None, None, None] self.paired = [None, None]
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -355,7 +222,7 @@ async def async_barrier():
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_read_write(): async def test_read_write():
[client, server] = LinkedDevices().devices[:2] [client, server] = TwoDevices().devices
characteristic1 = Characteristic( characteristic1 = Characteristic(
'FDB159DB-036C-49E3-B3DB-6325AC750806', 'FDB159DB-036C-49E3-B3DB-6325AC750806',
@@ -398,41 +265,41 @@ async def test_read_write():
await peer.discover_services() await peer.discover_services()
await peer.discover_characteristics() await peer.discover_characteristics()
c = peer.get_characteristics_by_uuid(characteristic1.uuid) c = peer.get_characteristics_by_uuid(characteristic1.uuid)
assert len(c) == 1 assert(len(c) == 1)
c1 = c[0] c1 = c[0]
c = peer.get_characteristics_by_uuid(characteristic2.uuid) c = peer.get_characteristics_by_uuid(characteristic2.uuid)
assert len(c) == 1 assert(len(c) == 1)
c2 = c[0] c2 = c[0]
v1 = await peer.read_value(c1) v1 = await peer.read_value(c1)
assert v1 == b'' assert(v1 == b'')
b = bytes([1, 2, 3]) b = bytes([1, 2, 3])
await peer.write_value(c1, b) await peer.write_value(c1, b)
await async_barrier() await async_barrier()
assert characteristic1.value == b assert(characteristic1.value == b)
v1 = await peer.read_value(c1) v1 = await peer.read_value(c1)
assert v1 == b assert(v1 == b)
assert type(characteristic1._last_value is tuple) assert(type(characteristic1._last_value) is tuple)
assert len(characteristic1._last_value) == 2 assert(len(characteristic1._last_value) == 2)
assert str(characteristic1._last_value[0].peer_address) == str(client.random_address) assert(str(characteristic1._last_value[0].peer_address) == str(client.random_address))
assert characteristic1._last_value[1] == b assert(characteristic1._last_value[1] == b)
bb = bytes([3, 4, 5, 6]) bb = bytes([3, 4, 5, 6])
characteristic1.value = bb characteristic1.value = bb
v1 = await peer.read_value(c1) v1 = await peer.read_value(c1)
assert v1 == bb assert(v1 == bb)
await peer.write_value(c2, b) await peer.write_value(c2, b)
await async_barrier() await async_barrier()
assert type(characteristic2._last_value is tuple) assert(type(characteristic2._last_value) is tuple)
assert len(characteristic2._last_value) == 2 assert(len(characteristic2._last_value) == 2)
assert str(characteristic2._last_value[0].peer_address) == str(client.random_address) assert(str(characteristic2._last_value[0].peer_address) == str(client.random_address))
assert characteristic2._last_value[1] == b assert(characteristic2._last_value[1] == b)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_read_write2(): async def test_read_write2():
[client, server] = LinkedDevices().devices[:2] [client, server] = TwoDevices().devices
v = bytes([0x11, 0x22, 0x33, 0x44]) v = bytes([0x11, 0x22, 0x33, 0x44])
characteristic1 = Characteristic( characteristic1 = Characteristic(
@@ -457,32 +324,32 @@ async def test_read_write2():
await peer.discover_services() await peer.discover_services()
c = peer.get_services_by_uuid(service1.uuid) c = peer.get_services_by_uuid(service1.uuid)
assert len(c) == 1 assert(len(c) == 1)
s = c[0] s = c[0]
await s.discover_characteristics() await s.discover_characteristics()
c = s.get_characteristics_by_uuid(characteristic1.uuid) c = s.get_characteristics_by_uuid(characteristic1.uuid)
assert len(c) == 1 assert(len(c) == 1)
c1 = c[0] c1 = c[0]
v1 = await c1.read_value() v1 = await c1.read_value()
assert v1 == v assert(v1 == v)
a1 = PackedCharacteristicAdapter(c1, '>I') a1 = PackedCharacteristicAdapter(c1, '>I')
v1 = await a1.read_value() v1 = await a1.read_value()
assert v1 == struct.unpack('>I', v)[0] assert(v1 == struct.unpack('>I', v)[0])
b = bytes([0x55, 0x66, 0x77, 0x88]) b = bytes([0x55, 0x66, 0x77, 0x88])
await a1.write_value(struct.unpack('>I', b)[0]) await a1.write_value(struct.unpack('>I', b)[0])
await async_barrier() await async_barrier()
assert characteristic1.value == b assert(characteristic1.value == b)
v1 = await a1.read_value() v1 = await a1.read_value()
assert v1 == struct.unpack('>I', b)[0] assert(v1 == struct.unpack('>I', b)[0])
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_subscribe_notify(): async def test_subscribe_notify():
[client, server] = LinkedDevices().devices[:2] [client, server] = TwoDevices().devices
characteristic1 = Characteristic( characteristic1 = Characteristic(
'FDB159DB-036C-49E3-B3DB-6325AC750806', 'FDB159DB-036C-49E3-B3DB-6325AC750806',
@@ -543,13 +410,13 @@ async def test_subscribe_notify():
await peer.discover_services() await peer.discover_services()
await peer.discover_characteristics() await peer.discover_characteristics()
c = peer.get_characteristics_by_uuid(characteristic1.uuid) c = peer.get_characteristics_by_uuid(characteristic1.uuid)
assert len(c) == 1 assert(len(c) == 1)
c1 = c[0] c1 = c[0]
c = peer.get_characteristics_by_uuid(characteristic2.uuid) c = peer.get_characteristics_by_uuid(characteristic2.uuid)
assert len(c) == 1 assert(len(c) == 1)
c2 = c[0] c2 = c[0]
c = peer.get_characteristics_by_uuid(characteristic3.uuid) c = peer.get_characteristics_by_uuid(characteristic3.uuid)
assert len(c) == 1 assert(len(c) == 1)
c3 = c[0] c3 = c[0]
c1._called = False c1._called = False
@@ -562,32 +429,23 @@ async def test_subscribe_notify():
c1.on('update', on_c1_update) c1.on('update', on_c1_update)
await peer.subscribe(c1) await peer.subscribe(c1)
await async_barrier() await async_barrier()
assert server._last_subscription[1] == characteristic1 assert(server._last_subscription[1] == characteristic1)
assert server._last_subscription[2] assert(server._last_subscription[2])
assert not server._last_subscription[3] assert(not server._last_subscription[3])
assert characteristic1._last_subscription[1] assert(characteristic1._last_subscription[1])
assert not characteristic1._last_subscription[2] assert(not characteristic1._last_subscription[2])
await server.indicate_subscribers(characteristic1) await server.indicate_subscribers(characteristic1)
await async_barrier() await async_barrier()
assert not c1._called assert(not c1._called)
await server.notify_subscribers(characteristic1) await server.notify_subscribers(characteristic1)
await async_barrier() await async_barrier()
assert c1._called assert(c1._called)
assert c1._last_update == characteristic1.value assert(c1._last_update == characteristic1.value)
c1._called = False
c1._last_update = None
c1_value = characteristic1.value
await server.notify_subscribers(characteristic1, bytes([0, 1, 2]))
await async_barrier()
assert c1._called
assert c1._last_update == bytes([0, 1, 2])
assert characteristic1.value == c1_value
c1._called = False c1._called = False
await peer.unsubscribe(c1) await peer.unsubscribe(c1)
await server.notify_subscribers(characteristic1) await server.notify_subscribers(characteristic1)
assert not c1._called assert(not c1._called)
c2._called = False c2._called = False
c2._last_update = None c2._last_update = None
@@ -600,109 +458,51 @@ async def test_subscribe_notify():
await async_barrier() await async_barrier()
await server.notify_subscriber(characteristic2._last_subscription[0], characteristic2) await server.notify_subscriber(characteristic2._last_subscription[0], characteristic2)
await async_barrier() await async_barrier()
assert not c2._called assert(not c2._called)
await server.indicate_subscriber(characteristic2._last_subscription[0], characteristic2) await server.indicate_subscriber(characteristic2._last_subscription[0], characteristic2)
await async_barrier() await async_barrier()
assert c2._called assert(c2._called)
assert c2._last_update == characteristic2.value assert(c2._last_update == characteristic2.value)
c2._called = False c2._called = False
await peer.unsubscribe(c2, on_c2_update) await peer.unsubscribe(c2, on_c2_update)
await server.indicate_subscriber(characteristic2._last_subscription[0], characteristic2) await server.indicate_subscriber(characteristic2._last_subscription[0], characteristic2)
await async_barrier() await async_barrier()
assert not c2._called assert(not c2._called)
c3._called = False
c3._called_2 = False
c3._called_3 = False
c3._last_update = None
c3._last_update_2 = None
c3._last_update_3 = None
def on_c3_update(value): def on_c3_update(value):
c3._called = True c3._called = True
c3._last_update = value c3._last_update = value
def on_c3_update_2(value): # for notify def on_c3_update_2(value):
c3._called_2 = True c3._called_2 = True
c3._last_update_2 = value c3._last_update_2 = value
def on_c3_update_3(value): # for indicate
c3._called_3 = True
c3._last_update_3 = value
c3.on('update', on_c3_update) c3.on('update', on_c3_update)
await peer.subscribe(c3, on_c3_update_2) await peer.subscribe(c3, on_c3_update_2)
await async_barrier() await async_barrier()
await server.notify_subscriber(characteristic3._last_subscription[0], characteristic3) await server.notify_subscriber(characteristic3._last_subscription[0], characteristic3)
await async_barrier() await async_barrier()
assert c3._called assert(c3._called)
assert c3._last_update == characteristic3.value assert(c3._last_update == characteristic3.value)
assert c3._called_2 assert(c3._called_2)
assert c3._last_update_2 == characteristic3.value assert(c3._last_update_2 == characteristic3.value)
assert not c3._called_3
c3._called = False
c3._called_2 = False
c3._called_3 = False
await peer.unsubscribe(c3)
await peer.subscribe(c3, on_c3_update_3, prefer_notify=False)
await async_barrier()
characteristic3.value = bytes([1, 2, 3]) characteristic3.value = bytes([1, 2, 3])
await server.indicate_subscriber(characteristic3._last_subscription[0], characteristic3) await server.indicate_subscriber(characteristic3._last_subscription[0], characteristic3)
await async_barrier() await async_barrier()
assert c3._called assert(c3._called)
assert c3._last_update == characteristic3.value assert(c3._last_update == characteristic3.value)
assert not c3._called_2 assert(c3._called_2)
assert c3._called_3 assert(c3._last_update_2 == characteristic3.value)
assert c3._last_update_3 == characteristic3.value
c3._called = False c3._called = False
c3._called_2 = False c3._called_2 = False
c3._called_3 = False
await peer.unsubscribe(c3) await peer.unsubscribe(c3)
await server.notify_subscriber(characteristic3._last_subscription[0], characteristic3) await server.notify_subscriber(characteristic3._last_subscription[0], characteristic3)
await server.indicate_subscriber(characteristic3._last_subscription[0], characteristic3) await server.indicate_subscriber(characteristic3._last_subscription[0], characteristic3)
await async_barrier() await async_barrier()
assert not c3._called assert(not c3._called)
assert not c3._called_2 assert(not c3._called_2)
assert not c3._called_3
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_mtu_exchange():
[d1, d2, d3] = LinkedDevices().devices[:3]
d3.gatt_server.max_mtu = 100
d3_connections = []
@d3.on('connection')
def on_d3_connection(connection):
d3_connections.append(connection)
await d1.power_on()
await d2.power_on()
await d3.power_on()
d1_connection = await d1.connect(d3.random_address)
assert len(d3_connections) == 1
assert d3_connections[0] is not None
d2_connection = await d2.connect(d3.random_address)
assert len(d3_connections) == 2
assert d3_connections[1] is not None
d1_peer = Peer(d1_connection)
d2_peer = Peer(d2_connection)
d1_client_mtu = await d1_peer.request_mtu(220)
assert d1_client_mtu == 100
assert d1_connection.att_mtu == 100
d2_client_mtu = await d2_peer.request_mtu(50)
assert d2_client_mtu == 50
assert d2_connection.att_mtu == 50
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -710,9 +510,6 @@ async def async_main():
await test_read_write() await test_read_write()
await test_read_write2() await test_read_write2()
await test_subscribe_notify() await test_subscribe_notify()
await test_characteristic_encoding()
await test_mtu_exchange()
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
if __name__ == '__main__': if __name__ == '__main__':

View File

@@ -27,8 +27,8 @@ def basic_check(x):
parsed_str = str(parsed) parsed_str = str(parsed)
print(x_str) print(x_str)
parsed_bytes = parsed.to_bytes() parsed_bytes = parsed.to_bytes()
assert x_str == parsed_str assert(x_str == parsed_str)
assert packet == parsed_bytes assert(packet == parsed_bytes)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -49,19 +49,19 @@ def test_HCI_LE_Connection_Complete_Event():
role = 1, role = 1,
peer_address_type = 1, peer_address_type = 1,
peer_address = address, peer_address = address,
connection_interval = 3, conn_interval = 3,
peripheral_latency = 4, conn_latency = 4,
supervision_timeout = 5, supervision_timeout = 5,
central_clock_accuracy = 6 master_clock_accuracy = 6
) )
basic_check(event) basic_check(event)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_HCI_LE_Advertising_Report_Event(): def test_HCI_LE_Advertising_Report_Event():
address = Address('00:11:22:33:44:55/P') address = Address('00:11:22:33:44:55')
report = HCI_LE_Advertising_Report_Event.Report( report = HCI_Object(
HCI_LE_Advertising_Report_Event.Report.FIELDS, HCI_LE_Advertising_Report_Event.REPORT_FIELDS,
event_type = HCI_LE_Advertising_Report_Event.ADV_IND, event_type = HCI_LE_Advertising_Report_Event.ADV_IND,
address_type = Address.PUBLIC_DEVICE_ADDRESS, address_type = Address.PUBLIC_DEVICE_ADDRESS,
address = address, address = address,
@@ -87,8 +87,8 @@ def test_HCI_LE_Connection_Update_Complete_Event():
event = HCI_LE_Connection_Update_Complete_Event( event = HCI_LE_Connection_Update_Complete_Event(
status = HCI_SUCCESS, status = HCI_SUCCESS,
connection_handle = 0x007, connection_handle = 0x007,
connection_interval = 10, conn_interval = 10,
peripheral_latency = 3, conn_latency = 3,
supervision_timeout = 5 supervision_timeout = 5
) )
basic_check(event) basic_check(event)
@@ -133,7 +133,7 @@ def test_HCI_Command_Complete_Event():
) )
basic_check(event) basic_check(event)
event = HCI_Packet.from_bytes(event.to_bytes()) event = HCI_Packet.from_bytes(event.to_bytes())
assert event.return_parameters == 7 assert(event.return_parameters == 7)
# With a simple status as an integer status # With a simple status as an integer status
event = HCI_Command_Complete_Event( event = HCI_Command_Complete_Event(
@@ -142,7 +142,7 @@ def test_HCI_Command_Complete_Event():
return_parameters = 9 return_parameters = 9
) )
basic_check(event) basic_check(event)
assert event.return_parameters == 9 assert(event.return_parameters == 9)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -283,32 +283,12 @@ def test_HCI_LE_Create_Connection_Command():
peer_address_type = 1, peer_address_type = 1,
peer_address = Address('00:11:22:33:44:55'), peer_address = Address('00:11:22:33:44:55'),
own_address_type = 2, own_address_type = 2,
connection_interval_min = 7, conn_interval_min = 7,
connection_interval_max = 8, conn_interval_max = 8,
max_latency = 9, conn_latency = 9,
supervision_timeout = 10, supervision_timeout = 10,
min_ce_length = 11, minimum_ce_length = 11,
max_ce_length = 12 maximum_ce_length = 12
)
basic_check(command)
# -----------------------------------------------------------------------------
def test_HCI_LE_Extended_Create_Connection_Command():
command = HCI_LE_Extended_Create_Connection_Command(
initiator_filter_policy = 0,
own_address_type = 0,
peer_address_type = 1,
peer_address = Address('00:11:22:33:44:55'),
initiating_phys = 3,
scan_intervals = (10, 11),
scan_windows = (12, 13),
connection_interval_mins = (14, 15),
connection_interval_maxs = (16, 17),
max_latencies = (18, 19),
supervision_timeouts = (20, 21),
min_ce_lengths = (100, 101),
max_ce_lengths = (102, 103)
) )
basic_check(command) basic_check(command)
@@ -334,13 +314,13 @@ def test_HCI_LE_Remove_Device_From_Filter_Accept_List_Command():
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_HCI_LE_Connection_Update_Command(): def test_HCI_LE_Connection_Update_Command():
command = HCI_LE_Connection_Update_Command( command = HCI_LE_Connection_Update_Command(
connection_handle = 0x0002, connection_handle = 0x0002,
connection_interval_min = 10, conn_interval_min = 10,
connection_interval_max = 20, conn_interval_max = 20,
max_latency = 7, conn_latency = 7,
supervision_timeout = 3, supervision_timeout = 3,
min_ce_length = 100, minimum_ce_length = 100,
max_ce_length = 200 maximum_ce_length = 200
) )
basic_check(command) basic_check(command)
@@ -368,7 +348,7 @@ def test_HCI_LE_Set_Extended_Scan_Parameters_Command():
command = HCI_LE_Set_Extended_Scan_Parameters_Command( command = HCI_LE_Set_Extended_Scan_Parameters_Command(
own_address_type=Address.RANDOM_DEVICE_ADDRESS, own_address_type=Address.RANDOM_DEVICE_ADDRESS,
scanning_filter_policy=HCI_LE_Set_Extended_Scan_Parameters_Command.BASIC_FILTERED_POLICY, scanning_filter_policy=HCI_LE_Set_Extended_Scan_Parameters_Command.BASIC_FILTERED_POLICY,
scanning_phys=(1 << HCI_LE_1M_PHY_BIT | 1 << HCI_LE_CODED_PHY_BIT | 1 << 4), scanning_phys=(1 << HCI_LE_Set_Extended_Scan_Parameters_Command.LE_1M_PHY | 1 << HCI_LE_Set_Extended_Scan_Parameters_Command.LE_CODED_PHY | 1 << 4),
scan_types=[ scan_types=[
HCI_LE_Set_Extended_Scan_Parameters_Command.ACTIVE_SCANNING, HCI_LE_Set_Extended_Scan_Parameters_Command.ACTIVE_SCANNING,
HCI_LE_Set_Extended_Scan_Parameters_Command.ACTIVE_SCANNING, HCI_LE_Set_Extended_Scan_Parameters_Command.ACTIVE_SCANNING,
@@ -383,20 +363,20 @@ def test_HCI_LE_Set_Extended_Scan_Parameters_Command():
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_address(): def test_address():
a = Address('C4:F2:17:1A:1D:BB') a = Address('C4:F2:17:1A:1D:BB')
assert not a.is_public assert(not a.is_public)
assert a.is_random assert(a.is_random)
assert a.address_type == Address.RANDOM_DEVICE_ADDRESS assert(a.address_type == Address.RANDOM_DEVICE_ADDRESS)
assert not a.is_resolvable assert(not a.is_resolvable)
assert not a.is_resolved assert(not a.is_resolved)
assert a.is_static assert(a.is_static)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_custom(): def test_custom():
data = bytes([0x77, 0x02, 0x01, 0x03]) data = bytes([0x77, 0x02, 0x01, 0x03])
packet = HCI_CustomPacket(data) packet = HCI_CustomPacket(data)
assert packet.hci_packet_type == 0x77 assert(packet.hci_packet_type == 0x77)
assert packet.payload == data assert(packet.payload == data)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -428,7 +408,6 @@ def run_test_commands():
test_HCI_LE_Set_Scan_Parameters_Command() test_HCI_LE_Set_Scan_Parameters_Command()
test_HCI_LE_Set_Scan_Enable_Command() test_HCI_LE_Set_Scan_Enable_Command()
test_HCI_LE_Create_Connection_Command() test_HCI_LE_Create_Connection_Command()
test_HCI_LE_Extended_Create_Connection_Command()
test_HCI_LE_Add_Device_To_Filter_Accept_List_Command() test_HCI_LE_Add_Device_To_Filter_Accept_List_Command()
test_HCI_LE_Remove_Device_From_Filter_Accept_List_Command() test_HCI_LE_Remove_Device_From_Filter_Accept_List_Command()
test_HCI_LE_Connection_Update_Command() test_HCI_LE_Connection_Update_Command()

View File

@@ -62,57 +62,6 @@ def test_import():
assert utils assert utils
# -----------------------------------------------------------------------------
def test_app_imports():
from bumble.apps.console import main
assert main
from bumble.apps.controller_info import main
assert main
from bumble.apps.controllers import main
assert main
from bumble.apps.gatt_dump import main
assert main
from bumble.apps.gg_bridge import main
assert main
from bumble.apps.hci_bridge import main
assert main
from bumble.apps.pair import main
assert main
from bumble.apps.scan import main
assert main
from bumble.apps.show import main
assert main
from bumble.apps.unbond import main
assert main
from bumble.apps.usb_probe import main
assert main
# -----------------------------------------------------------------------------
def test_profiles_imports():
from bumble.profiles import (
battery_service,
device_information_service,
heart_rate_service
)
assert battery_service
assert device_information_service
assert heart_rate_service
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
if __name__ == '__main__': if __name__ == '__main__':
test_import() test_import()
test_app_imports()
test_profiles_imports()

View File

@@ -1,284 +0,0 @@
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import logging
import os
import random
import pytest
from bumble.controller import Controller
from bumble.link import LocalLink
from bumble.device import Device
from bumble.host import Host
from bumble.transport import AsyncPipeSink
from bumble.core import ProtocolError
from bumble.l2cap import (
L2CAP_Connection_Request
)
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
class TwoDevices:
def __init__(self):
self.connections = [None, None]
self.link = LocalLink()
self.controllers = [
Controller('C1', link = self.link),
Controller('C2', link = self.link)
]
self.devices = [
Device(
address = 'F0:F1:F2:F3:F4:F5',
host = Host(self.controllers[0], AsyncPipeSink(self.controllers[0]))
),
Device(
address = 'F5:F4:F3:F2:F1:F0',
host = Host(self.controllers[1], AsyncPipeSink(self.controllers[1]))
)
]
self.paired = [None, None]
def on_connection(self, which, connection):
self.connections[which] = connection
def on_paired(self, which, keys):
self.paired[which] = keys
# -----------------------------------------------------------------------------
async def setup_connection():
# Create two devices, each with a controller, attached to the same link
two_devices = TwoDevices()
# Attach listeners
two_devices.devices[0].on('connection', lambda connection: two_devices.on_connection(0, connection))
two_devices.devices[1].on('connection', lambda connection: two_devices.on_connection(1, connection))
# Start
await two_devices.devices[0].power_on()
await two_devices.devices[1].power_on()
# Connect the two devices
await two_devices.devices[0].connect(two_devices.devices[1].random_address)
# Check the post conditions
assert two_devices.connections[0] is not None
assert two_devices.connections[1] is not None
return two_devices
# -----------------------------------------------------------------------------
def test_helpers():
psm = L2CAP_Connection_Request.serialize_psm(0x01)
assert psm == bytes([0x01, 0x00])
psm = L2CAP_Connection_Request.serialize_psm(0x1023)
assert psm == bytes([0x23, 0x10])
psm = L2CAP_Connection_Request.serialize_psm(0x242311)
assert psm == bytes([0x11, 0x23, 0x24])
(offset, psm) = L2CAP_Connection_Request.parse_psm(bytes([0x00, 0x01, 0x00, 0x44]), 1)
assert offset == 3
assert psm == 0x01
(offset, psm) = L2CAP_Connection_Request.parse_psm(bytes([0x00, 0x23, 0x10, 0x44]), 1)
assert offset == 3
assert psm == 0x1023
(offset, psm) = L2CAP_Connection_Request.parse_psm(bytes([0x00, 0x11, 0x23, 0x24, 0x44]), 1)
assert offset == 4
assert psm == 0x242311
rq = L2CAP_Connection_Request(psm = 0x01, source_cid = 0x44)
brq = bytes(rq)
srq = L2CAP_Connection_Request.from_bytes(brq)
assert srq.psm == rq.psm
assert srq.source_cid == rq.source_cid
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_basic_connection():
devices = await setup_connection()
psm = 1234
# Check that if there's no one listening, we can't connect
with pytest.raises(ProtocolError):
l2cap_channel = await devices.connections[0].open_l2cap_channel(psm)
# Now add a listener
incoming_channel = None
received = []
def on_coc(channel):
nonlocal incoming_channel
incoming_channel = channel
def on_data(data):
received.append(data)
channel.sink = on_data
devices.devices[1].register_l2cap_channel_server(psm, on_coc)
l2cap_channel = await devices.connections[0].open_l2cap_channel(psm)
messages = (
bytes([1, 2, 3]),
bytes([4, 5, 6]),
bytes(10000)
)
for message in messages:
l2cap_channel.write(message)
await asyncio.sleep(0)
await l2cap_channel.drain()
# Test closing
closed = [False, False]
closed_event = asyncio.Event()
def on_close(which, event):
closed[which] = True
if event:
event.set()
l2cap_channel.on('close', lambda: on_close(0, None))
incoming_channel.on('close', lambda: on_close(1, closed_event))
await l2cap_channel.disconnect()
assert closed == [True, True]
await closed_event.wait()
sent_bytes = b''.join(messages)
received_bytes = b''.join(received)
assert sent_bytes == received_bytes
# -----------------------------------------------------------------------------
async def transfer_payload(max_credits, mtu, mps):
devices = await setup_connection()
received = []
def on_coc(channel):
def on_data(data):
received.append(data)
channel.sink = on_data
psm = devices.devices[1].register_l2cap_channel_server(
psm = 0,
server = on_coc,
max_credits = max_credits,
mtu = mtu,
mps = mps
)
l2cap_channel = await devices.connections[0].open_l2cap_channel(psm)
messages = [
bytes([1, 2, 3, 4, 5, 6, 7]) * x
for x in (3, 10, 100, 789)
]
for message in messages:
l2cap_channel.write(message)
await asyncio.sleep(0)
if random.randint(0, 5) == 1:
await l2cap_channel.drain()
await l2cap_channel.drain()
await l2cap_channel.disconnect()
sent_bytes = b''.join(messages)
received_bytes = b''.join(received)
assert sent_bytes == received_bytes
@pytest.mark.asyncio
async def test_transfer():
for max_credits in (1, 10, 100, 10000):
for mtu in (50, 255, 256, 1000):
for mps in (50, 255, 256, 1000):
# print(max_credits, mtu, mps)
await transfer_payload(max_credits, mtu, mps)
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_bidirectional_transfer():
devices = await setup_connection()
client_received = []
server_received = []
server_channel = None
def on_server_coc(channel):
nonlocal server_channel
server_channel = channel
def on_server_data(data):
server_received.append(data)
channel.sink = on_server_data
def on_client_data(data):
client_received.append(data)
psm = devices.devices[1].register_l2cap_channel_server(psm=0, server=on_server_coc)
client_channel = await devices.connections[0].open_l2cap_channel(psm)
client_channel.sink = on_client_data
messages = [
bytes([1, 2, 3, 4, 5, 6, 7]) * x
for x in (3, 10, 100)
]
for message in messages:
client_channel.write(message)
await client_channel.drain()
await asyncio.sleep(0)
server_channel.write(message)
await server_channel.drain()
await client_channel.disconnect()
message_bytes = b''.join(messages)
client_received_bytes = b''.join(client_received)
server_received_bytes = b''.join(server_received)
assert client_received_bytes == message_bytes
assert server_received_bytes == message_bytes
# -----------------------------------------------------------------------------
async def run():
test_helpers()
await test_basic_connection()
await test_transfer()
await test_bidirectional_transfer()
# -----------------------------------------------------------------------------
if __name__ == '__main__':
logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
asyncio.run(run())

View File

@@ -246,7 +246,8 @@ IO_CAP = [
SC = [False, True] SC = [False, True]
MITM = [False, True] MITM = [False, True]
# Key distribution is a 4-bit bitmask # Key distribution is a 4-bit bitmask
KEY_DIST = range(16) # IdKey is necessary for current SMP structure
KEY_DIST = [i for i in range(16) if (i & SMP_ID_KEY_DISTRIBUTION_FLAG)]
@pytest.mark.asyncio @pytest.mark.asyncio
@pytest.mark.parametrize('io_cap, sc, mitm, key_dist', @pytest.mark.parametrize('io_cap, sc, mitm, key_dist',

View File

@@ -21,9 +21,9 @@ from bumble.transport import PacketParser
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class ScannerListener(Device.Listener): class ScannerListener(Device.Listener):
def on_advertisement(self, advertisement): def on_advertisement(self, address, ad_data, rssi, connectable):
address_type_string = ('P', 'R', 'PI', 'RI')[advertisement.address.address_type] address_type_string = ('P', 'R', 'PI', 'RI')[address.address_type]
print(f'>>> {advertisement.address} [{address_type_string}]: RSSI={advertisement.rssi}, {advertisement.ad_data}') print(f'>>> {address} [{address_type_string}]: RSSI={rssi}, {ad_data}')
class HciSource: class HciSource: