Compare commits

...

20 Commits

Author SHA1 Message Date
Gilles Boccon-Gibod
eb8556ccf6 gbg/extended scanning (#47)
Squashed:
* add extended report class
* more HCI commands
* add AdvertisingType
* add phy options
* fix tests
2022-10-19 10:06:00 -07:00
Octavian Purdila
4d96b821bc Merge pull request #44 from google/tavip/fix-address-resolution
Fix address resolution handling
2022-10-12 10:09:33 -07:00
Gilles Boccon-Gibod
78b36d2049 Merge pull request #45 from google/gbg/add-missing-app
add controller-info CLI app to setup
2022-10-11 22:21:08 -07:00
Gilles Boccon-Gibod
3e0cad1456 add controller-info CLI app to setup 2022-10-11 22:15:23 -07:00
Octavian Purdila
b4de38cdc3 Fix address resolution handling
In one of the refactors the command address_resolution field was
changed to address_reslution_enable but the controller code was not
updated.
2022-10-11 22:53:42 +00:00
Gilles Boccon-Gibod
68d9fbc159 Merge pull request #42 from google/gbg/improve-linux-doc
Refactor and improve the doc for Bumble on Linux
2022-10-11 14:35:14 -07:00
Gilles Boccon-Gibod
a916b7a21a Merge pull request #43 from google/gbg/proxy-write-with-response
support with_response on adapters
2022-10-11 07:41:28 -07:00
Gilles Boccon-Gibod
6ff52df8bd better/safer Linux recommendations 2022-10-10 20:11:55 -07:00
Gilles Boccon-Gibod
7fa2eb7658 support with_response on adapters 2022-10-10 12:11:51 -07:00
Gilles Boccon-Gibod
86618e52ef Refactor and improve the doc for Bumble on Linux 2022-10-09 12:56:06 -07:00
Gilles Boccon-Gibod
fbb46dd736 Merge pull request #41 from google/gbg/cli-scripts
use arg-less main() functions in all scripts
2022-10-07 16:16:35 -07:00
Gilles Boccon-Gibod
d1e119f176 use arg-less main() functions in all scripts 2022-10-07 13:56:42 -07:00
Gilles Boccon-Gibod
2fc7a0bf04 Merge pull request #39 from google/gbg/usb-descriptors
improve USB device detection logic
2022-10-06 15:39:32 -07:00
Gilles Boccon-Gibod
d6c4644b23 reorder the order of printing 2022-10-06 10:40:28 -07:00
Gilles Boccon-Gibod
073757d5dd Merge pull request #40 from google/gbg/gatt-mtu
maintain the att mtu only at the connection level
2022-10-05 13:53:47 -07:00
Gilles Boccon-Gibod
20dedbd923 maintain the att mtu only at the connection level 2022-10-04 20:04:43 -07:00
Octavian Purdila
df1962e8da apps/usb_probe.py: handle libusb1 exceptions
Some USB device properties are only accessible if the user has the
appropriate permissions. Handle libusb1 errors to graciously skip
showing details for these devices.
2022-10-04 23:38:13 +00:00
Gilles Boccon-Gibod
0edd6b731f Merge pull request #37 from google/gbg/gatt-notify-with-value
add support for notifying with a transient value
2022-10-04 10:33:04 -07:00
Gilles Boccon-Gibod
d2227f017f improve USB device detection logic 2022-10-04 09:59:48 -07:00
Gilles Boccon-Gibod
80569bc9f3 add support for notifying with a transient value 2022-09-06 12:42:35 -07:00
30 changed files with 2691 additions and 784 deletions

View File

@@ -28,11 +28,16 @@ import click
from collections import OrderedDict
import colors
from bumble.core import UUID, AdvertisingData
from bumble.device import Device, Connection, Peer
from bumble.core import UUID, AdvertisingData, TimeoutError, BT_LE_TRANSPORT
from bumble.device import ConnectionParametersPreferences, Device, Connection, Peer
from bumble.utils import AsyncRunner
from bumble.transport import open_transport_or_link
from bumble.gatt import Characteristic
from bumble.hci import (
HCI_LE_1M_PHY,
HCI_LE_2M_PHY,
HCI_LE_CODED_PHY,
)
from prompt_toolkit import Application
from prompt_toolkit.history import FileHistory
@@ -43,6 +48,7 @@ from prompt_toolkit.styles import Style
from prompt_toolkit.filters import Condition
from prompt_toolkit.widgets import TextArea, Frame
from prompt_toolkit.widgets.toolbars import FormattedTextToolbar
from prompt_toolkit.data_structures import Point
from prompt_toolkit.layout import (
Layout,
HSplit,
@@ -51,17 +57,20 @@ from prompt_toolkit.layout import (
Float,
FormattedTextControl,
FloatContainer,
ConditionalContainer
ConditionalContainer,
Dimension
)
# -----------------------------------------------------------------------------
# Constants
# -----------------------------------------------------------------------------
BUMBLE_USER_DIR = os.path.expanduser('~/.bumble')
DEFAULT_PROMPT_HEIGHT = 20
DEFAULT_RSSI_BAR_WIDTH = 20
DISPLAY_MIN_RSSI = -100
DISPLAY_MAX_RSSI = -30
BUMBLE_USER_DIR = os.path.expanduser('~/.bumble')
DEFAULT_RSSI_BAR_WIDTH = 20
DEFAULT_CONNECTION_TIMEOUT = 30.0
DISPLAY_MIN_RSSI = -100
DISPLAY_MAX_RSSI = -30
RSSI_MONITOR_INTERVAL = 5.0 # Seconds
# -----------------------------------------------------------------------------
# Globals
@@ -69,16 +78,57 @@ DISPLAY_MAX_RSSI = -30
App = None
# -----------------------------------------------------------------------------
# Utils
# -----------------------------------------------------------------------------
def le_phy_name(phy_id):
return {
HCI_LE_1M_PHY: '1M',
HCI_LE_2M_PHY: '2M',
HCI_LE_CODED_PHY: 'CODED'
}.get(phy_id, HCI_Constant.le_phy_name(phy_id))
def rssi_bar(rssi):
blocks = ['', '', '', '', '', '', '', '']
bar_width = (rssi - DISPLAY_MIN_RSSI) / (DISPLAY_MAX_RSSI - DISPLAY_MIN_RSSI)
bar_width = min(max(bar_width, 0), 1)
bar_ticks = int(bar_width * DEFAULT_RSSI_BAR_WIDTH * 8)
bar_blocks = ('' * int(bar_ticks / 8)) + blocks[bar_ticks % 8]
return f'{rssi:4} {bar_blocks}'
def parse_phys(phys):
if phys.lower() == '*':
return None
else:
phy_list = []
elements = phys.lower().split(',')
for element in elements:
if element == '1m':
phy_list.append(HCI_LE_1M_PHY)
elif element == '2m':
phy_list.append(HCI_LE_2M_PHY)
elif element == 'coded':
phy_list.append(HCI_LE_CODED_PHY)
else:
raise ValueError('invalid PHY name')
return phy_list
# -----------------------------------------------------------------------------
# Console App
# -----------------------------------------------------------------------------
class ConsoleApp:
def __init__(self):
self.known_addresses = set()
self.known_addresses = set()
self.known_attributes = []
self.device = None
self.connected_peer = None
self.top_tab = 'scan'
self.device = None
self.connected_peer = None
self.top_tab = 'scan'
self.monitor_rssi = False
self.connection_rssi = None
style = Style.from_dict({
'output-field': 'bg:#000044 #ffffff',
@@ -106,6 +156,10 @@ class ConsoleApp:
'on': None,
'off': None
},
'rssi': {
'on': None,
'off': None
},
'show': {
'scan': None,
'services': None,
@@ -120,10 +174,17 @@ class ConsoleApp:
'services': None,
'attributes': None
},
'request-mtu': None,
'read': LiveCompleter(self.known_attributes),
'write': LiveCompleter(self.known_attributes),
'subscribe': LiveCompleter(self.known_attributes),
'unsubscribe': LiveCompleter(self.known_attributes),
'set-phy': {
'1m': None,
'2m': None,
'coded': None
},
'set-default-phy': None,
'quit': None,
'exit': None
})
@@ -139,14 +200,16 @@ class ConsoleApp:
self.input_field.accept_handler = self.accept_input
self.output_height = 7
self.output_height = Dimension(min=7, max=7, weight=1)
self.output_lines = []
self.output = FormattedTextControl()
self.output = FormattedTextControl(get_cursor_position=lambda: Point(0, max(0, len(self.output_lines) - 1)))
self.output_max_lines = 20
self.scan_results_text = FormattedTextControl()
self.services_text = FormattedTextControl()
self.attributes_text = FormattedTextControl()
self.log_text = FormattedTextControl()
self.log_height = 20
self.log_text = FormattedTextControl(get_cursor_position=lambda: Point(0, max(0, len(self.log_lines) - 1)))
self.log_height = Dimension(min=7, weight=4)
self.log_max_lines = 100
self.log_lines = []
container = HSplit([
@@ -163,11 +226,10 @@ class ConsoleApp:
filter=Condition(lambda: self.top_tab == 'attributes')
),
ConditionalContainer(
Frame(Window(self.log_text), title='Log'),
Frame(Window(self.log_text, height=self.log_height), title='Log'),
filter=Condition(lambda: self.top_tab == 'log')
),
Frame(Window(self.output), height=self.output_height),
# HorizontalLine(),
Frame(Window(self.output, height=self.output_height)),
FormattedTextToolbar(text=self.get_status_bar_text, style='reverse'),
self.input_field
])
@@ -199,6 +261,8 @@ class ConsoleApp:
)
async def run_async(self, device_config, transport):
rssi_monitoring_task = asyncio.create_task(self.rssi_monitor_loop())
async with await open_transport_or_link(transport) as (hci_source, hci_sink):
if device_config:
self.device = Device.from_config_file_with_hci(device_config, hci_source, hci_sink)
@@ -210,6 +274,8 @@ class ConsoleApp:
# Run the UI
await self.ui.run_async()
rssi_monitoring_task.cancel()
def add_known_address(self, address):
self.known_addresses.add(address)
@@ -224,22 +290,33 @@ class ConsoleApp:
connection_state = 'NONE'
encryption_state = ''
att_mtu = ''
rssi = '' if self.connection_rssi is None else rssi_bar(self.connection_rssi)
if self.device:
if self.device.is_connecting:
connection_state = 'CONNECTING'
elif self.connected_peer:
connection = self.connected_peer.connection
connection_parameters = f'{connection.parameters.connection_interval}/{connection.parameters.connection_latency}/{connection.parameters.supervision_timeout}'
connection_state = f'{connection.peer_address} {connection_parameters} {connection.data_length}'
connection_parameters = f'{connection.parameters.connection_interval}/{connection.parameters.peripheral_latency}/{connection.parameters.supervision_timeout}'
if connection.transport == BT_LE_TRANSPORT:
phy_state = f' RX={le_phy_name(connection.phy.rx_phy)}/TX={le_phy_name(connection.phy.tx_phy)}'
else:
phy_state = ''
connection_state = f'{connection.peer_address} {connection_parameters} {connection.data_length}{phy_state}'
encryption_state = 'ENCRYPTED' if connection.is_encrypted else 'NOT ENCRYPTED'
att_mtu = f'ATT_MTU: {connection.att_mtu}'
return [
('ansigreen', f' SCAN: {scanning} '),
('', ' '),
('ansiblue', f' CONNECTION: {connection_state} '),
('', ' '),
('ansimagenta', f' {encryption_state} ')
('ansimagenta', f' {encryption_state} '),
('', ' '),
('ansicyan', f' {att_mtu} '),
('', ' '),
('ansiyellow', f' {rssi} ')
]
def show_error(self, title, details = None):
@@ -286,7 +363,7 @@ class ConsoleApp:
def append_to_output(self, line, invalidate=True):
if type(line) is str:
line = [('', line)]
self.output_lines = self.output_lines[-(self.output_height - 3):]
self.output_lines = self.output_lines[-self.output_max_lines:]
self.output_lines.append(line)
formatted_text = []
for line in self.output_lines:
@@ -298,7 +375,7 @@ class ConsoleApp:
def append_to_log(self, lines, invalidate=True):
self.log_lines.extend(lines.split('\n'))
self.log_lines = self.log_lines[-(self.log_height - 3):]
self.log_lines = self.log_lines[-self.log_max_lines:]
self.log_text.text = ANSI('\n'.join(self.log_lines))
if invalidate:
self.ui.invalidate()
@@ -351,6 +428,12 @@ class ConsoleApp:
if characteristic.handle == attribute_handle:
return characteristic
async def rssi_monitor_loop(self):
while True:
if self.monitor_rssi and self.connected_peer:
self.connection_rssi = await self.connected_peer.connection.get_rssi()
await asyncio.sleep(RSSI_MONITOR_INTERVAL)
async def command(self, command):
try:
(keyword, *params) = command.strip().split(' ')
@@ -379,39 +462,73 @@ class ConsoleApp:
else:
self.show_error('unsupported arguments for scan command')
async def do_rssi(self, params):
if len(params) == 0:
# Toggle monitoring
self.monitor_rssi = not self.monitor_rssi
elif params[0] == 'on':
self.monitor_rssi = True
elif params[0] == 'off':
self.monitor_rssi = False
else:
self.show_error('unsupported arguments for rssi command')
async def do_connect(self, params):
if len(params) != 1:
self.show_error('invalid syntax', 'expected connect <address>')
if len(params) != 1 and len(params) != 2:
self.show_error('invalid syntax', 'expected connect <address> [phys]')
return
if len(params) == 1:
phys = None
else:
phys = parse_phys(params[1])
if phys is None:
connection_parameters_preferences = None
else:
connection_parameters_preferences = {
phy: ConnectionParametersPreferences()
for phy in phys
}
self.append_to_output('connecting...')
await self.device.connect(params[0])
self.top_tab = 'services'
try:
await self.device.connect(
params[0],
connection_parameters_preferences=connection_parameters_preferences,
timeout=DEFAULT_CONNECTION_TIMEOUT
)
self.top_tab = 'services'
except TimeoutError:
self.show_error('connection timed out')
async def do_disconnect(self, params):
if not self.connected_peer:
self.show_error('not connected')
return
if self.device.connecting:
await self.device.cancel_connection()
else:
if not self.connected_peer:
self.show_error('not connected')
return
await self.connected_peer.connection.disconnect()
await self.connected_peer.connection.disconnect()
async def do_update_parameters(self, params):
if len(params) != 1 or len(params[0].split('/')) != 3:
self.show_error('invalid syntax', 'expected update-parameters <interval-min>-<interval-max>/<latency>/<supervision>')
self.show_error('invalid syntax', 'expected update-parameters <interval-min>-<interval-max>/<max-latency>/<supervision>')
return
if not self.connected_peer:
self.show_error('not connected')
return
connection_intervals, connection_latency, supervision_timeout = params[0].split('/')
connection_intervals, max_latency, supervision_timeout = params[0].split('/')
connection_interval_min, connection_interval_max = [int(x) for x in connection_intervals.split('-')]
connection_latency = int(connection_latency)
max_latency = int(max_latency)
supervision_timeout = int(supervision_timeout)
await self.connected_peer.connection.update_parameters(
connection_interval_min,
connection_interval_max,
connection_latency,
max_latency,
supervision_timeout
)
@@ -442,6 +559,25 @@ class ConsoleApp:
self.top_tab = params[0]
self.ui.invalidate()
async def do_get_phy(self, params):
if not self.connected_peer:
self.show_error('not connected')
return
phy = await self.connected_peer.connection.get_phy()
self.append_to_output(f'PHY: RX={HCI_Constant.le_phy_name(phy[0])}, TX={HCI_Constant.le_phy_name(phy[1])}')
async def do_request_mtu(self, params):
if len(params) != 1:
self.show_error('invalid syntax', 'expected request-mtu <mtu>')
return
if not self.connected_peer:
self.show_error('not connected')
return
await self.connected_peer.request_mtu(int(params[0]))
async def do_discover(self, params):
if not params:
self.show_error('invalid syntax', 'expected discover services|attributes')
@@ -454,14 +590,14 @@ class ConsoleApp:
await self.discover_attributes()
async def do_read(self, params):
if not self.connected_peer:
self.show_error('not connected')
return
if len(params) != 1:
self.show_error('invalid syntax', 'expected read <attribute>')
return
if not self.connected_peer:
self.show_error('not connected')
return
characteristic = self.find_characteristic(params[0])
if characteristic is None:
self.show_error('no such characteristic')
@@ -530,6 +666,42 @@ class ConsoleApp:
await characteristic.unsubscribe()
async def do_set_phy(self, params):
if len(params) != 1:
self.show_error('invalid syntax', 'expected set-phy <tx_rx_phys>|<tx_phys>/<rx_phys>')
return
if not self.connected_peer:
self.show_error('not connected')
return
if '/' in params[0]:
tx_phys, rx_phys = params[0].split('/')
else:
tx_phys = params[0]
rx_phys = tx_phys
await self.connected_peer.connection.set_phy(
tx_phys=parse_phys(tx_phys),
rx_phys=parse_phys(rx_phys)
)
async def do_set_default_phy(self, params):
if len(params) != 1:
self.show_error('invalid syntax', 'expected set-default-phy <tx_rx_phys>|<tx_phys>/<rx_phys>')
return
if '/' in params[0]:
tx_phys, rx_phys = params[0].split('/')
else:
tx_phys = params[0]
rx_phys = tx_phys
await self.device.set_default_phy(
tx_phys=parse_phys(tx_phys),
rx_phys=parse_phys(rx_phys)
)
async def do_exit(self, params):
self.ui.exit()
@@ -548,12 +720,14 @@ class DeviceListener(Device.Listener, Connection.Listener):
@AsyncRunner.run_in_task()
async def on_connection(self, connection):
self.app.connected_peer = Peer(connection)
self.app.connection_rssi = None
self.app.append_to_output(f'connected to {self.app.connected_peer}')
connection.listener = self
def on_disconnection(self, reason):
self.app.append_to_output(f'disconnected from {self.app.connected_peer}, reason: {HCI_Constant.error_name(reason)}')
self.app.connected_peer = None
self.app.connection_rssi = None
def on_connection_parameters_update(self):
self.app.append_to_output(f'connection parameters update: {self.app.connected_peer.connection.parameters}')
@@ -570,16 +744,16 @@ class DeviceListener(Device.Listener, Connection.Listener):
def on_connection_data_length_change(self):
self.app.append_to_output(f'connection data length change: {self.app.connected_peer.connection.data_length}')
def on_advertisement(self, address, ad_data, rssi, connectable):
entry_key = f'{address}/{address.address_type}'
def on_advertisement(self, advertisement):
entry_key = f'{advertisement.address}/{advertisement.address.address_type}'
entry = self.scan_results.get(entry_key)
if entry:
entry.ad_data = ad_data
entry.rssi = rssi
entry.connectable = connectable
entry.ad_data = advertisement.data
entry.rssi = advertisement.rssi
entry.connectable = advertisement.is_connectable
else:
self.app.add_known_address(str(address))
self.scan_results[entry_key] = ScanResult(address, address.address_type, ad_data, rssi, connectable)
self.app.add_known_address(str(advertisement.address))
self.scan_results[entry_key] = ScanResult(advertisement.address, advertisement.address.address_type, advertisement.data, advertisement.rssi, advertisement.is_connectable)
self.app.show_scan_results(self.scan_results)
@@ -616,12 +790,7 @@ class ScanResult:
name = ''
# RSSI bar
blocks = ['', '', '', '', '', '', '', '']
bar_width = (self.rssi - DISPLAY_MIN_RSSI) / (DISPLAY_MAX_RSSI - DISPLAY_MIN_RSSI)
bar_width = min(max(bar_width, 0), 1)
bar_ticks = int(bar_width * DEFAULT_RSSI_BAR_WIDTH * 8)
bar_blocks = ('' * int(bar_ticks / 8)) + blocks[bar_ticks % 8]
bar_string = f'{self.rssi} {bar_blocks}'
bar_string = rssi_bar(self.rssi)
bar_padding = ' ' * (DEFAULT_RSSI_BAR_WIDTH + 5 - len(bar_string))
return f'{address_color(str(self.address))} [{type_color(address_type_string)}] {bar_string} {bar_padding} {name}'

View File

@@ -25,15 +25,21 @@ from bumble.company_ids import COMPANY_IDENTIFIERS
from bumble.core import name_or_number
from bumble.hci import (
map_null_terminated_utf8_string,
HCI_LE_SUPPORTED_FEATURES_NAMES,
HCI_SUCCESS,
HCI_LE_SUPPORTED_FEATURES_NAMES,
HCI_VERSION_NAMES,
LMP_VERSION_NAMES,
HCI_Command,
HCI_Read_BD_ADDR_Command,
HCI_READ_BD_ADDR_COMMAND,
HCI_Read_BD_ADDR_Command,
HCI_READ_LOCAL_NAME_COMMAND,
HCI_Read_Local_Name_Command,
HCI_READ_LOCAL_NAME_COMMAND
HCI_LE_READ_MAXIMUM_DATA_LENGTH_COMMAND,
HCI_LE_Read_Maximum_Data_Length_Command,
HCI_LE_READ_NUMBER_OF_SUPPORTED_ADVERTISING_SETS_COMMAND,
HCI_LE_Read_Number_Of_Supported_Advertising_Sets_Command,
HCI_LE_READ_MAXIMUM_ADVERTISING_DATA_LENGTH_COMMAND,
HCI_LE_Read_Maximum_Advertising_Data_Length_Command
)
from bumble.host import Host
from bumble.transport import open_transport_or_link
@@ -57,6 +63,39 @@ async def get_classic_info(host):
# -----------------------------------------------------------------------------
async def get_le_info(host):
print()
if host.supports_command(HCI_LE_READ_NUMBER_OF_SUPPORTED_ADVERTISING_SETS_COMMAND):
response = await host.send_command(HCI_LE_Read_Number_Of_Supported_Advertising_Sets_Command())
if response.return_parameters.status == HCI_SUCCESS:
print(
color('LE Number Of Supported Advertising Sets:', 'yellow'),
response.return_parameters.num_supported_advertising_sets,
'\n'
)
if host.supports_command(HCI_LE_READ_MAXIMUM_ADVERTISING_DATA_LENGTH_COMMAND):
response = await host.send_command(HCI_LE_Read_Maximum_Advertising_Data_Length_Command())
if response.return_parameters.status == HCI_SUCCESS:
print(
color('LE Maximum Advertising Data Length:', 'yellow'),
response.return_parameters.max_advertising_data_length,
'\n'
)
if host.supports_command(HCI_LE_READ_MAXIMUM_DATA_LENGTH_COMMAND):
response = await host.send_command(HCI_LE_Read_Maximum_Data_Length_Command())
if response.return_parameters.status == HCI_SUCCESS:
print(
color('Maximum Data Length:', 'yellow'),
(
f'tx:{response.return_parameters.supported_max_tx_octets}/'
f'{response.return_parameters.supported_max_tx_time}, '
f'rx:{response.return_parameters.supported_max_rx_octets}/'
f'{response.return_parameters.supported_max_rx_time}'
),
'\n'
)
print(color('LE Features:', 'yellow'))
for feature in host.supported_le_features:
print(' ', name_or_number(HCI_LE_SUPPORTED_FEATURES_NAMES, feature))

View File

@@ -25,8 +25,8 @@ from bumble.device import Device
from bumble.transport import open_transport_or_link
from bumble.keys import JsonKeyStore
from bumble.smp import AddressResolver
from bumble.hci import HCI_LE_Advertising_Report_Event
from bumble.core import AdvertisingData
from bumble.device import Advertisement
from bumble.hci import HCI_Constant, HCI_LE_1M_PHY, HCI_LE_CODED_PHY
# -----------------------------------------------------------------------------
@@ -48,16 +48,19 @@ class AdvertisementPrinter:
self.min_rssi = min_rssi
self.resolver = resolver
def print_advertisement(self, address, address_color, ad_data, rssi):
if self.min_rssi is not None and rssi < self.min_rssi:
def print_advertisement(self, advertisement):
address = advertisement.address
address_color = 'yellow' if advertisement.is_connectable else 'red'
if self.min_rssi is not None and advertisement.rssi < self.min_rssi:
return
address_qualifier = ''
resolution_qualifier = ''
if self.resolver and address.is_resolvable:
resolved = self.resolver.resolve(address)
if self.resolver and advertisement.address.is_resolvable:
resolved = self.resolver.resolve(advertisement.address)
if resolved is not None:
resolution_qualifier = f'(resolved from {address})'
resolution_qualifier = f'(resolved from {advertisement.address})'
address = resolved
address_type_string = ('PUBLIC', 'RANDOM', 'PUBLIC_ID', 'RANDOM_ID')[address.address_type]
@@ -74,18 +77,30 @@ class AdvertisementPrinter:
type_color = 'blue'
address_qualifier = '(non-resolvable)'
rssi_bar = make_rssi_bar(rssi)
separator = '\n '
print(f'>>> {color(address, address_color)} [{color(address_type_string, type_color)}]{address_qualifier}{resolution_qualifier}:{separator}RSSI:{rssi:4} {rssi_bar}{separator}{ad_data.to_string(separator)}\n')
rssi_bar = make_rssi_bar(advertisement.rssi)
if not advertisement.is_legacy:
phy_info = (
f'PHY: {HCI_Constant.le_phy_name(advertisement.primary_phy)}/'
f'{HCI_Constant.le_phy_name(advertisement.secondary_phy)} '
f'{separator}'
)
else:
phy_info = ''
def on_advertisement(self, address, ad_data, rssi, connectable):
address_color = 'yellow' if connectable else 'red'
self.print_advertisement(address, address_color, ad_data, rssi)
print(
f'>>> {color(address, address_color)} '
f'[{color(address_type_string, type_color)}]{address_qualifier}{resolution_qualifier}:{separator}'
f'{phy_info}'
f'RSSI:{advertisement.rssi:4} {rssi_bar}{separator}'
f'{advertisement.data.to_string(separator)}\n')
def on_advertising_report(self, address, ad_data, rssi, event_type):
print(f'{color("EVENT", "green")}: {HCI_LE_Advertising_Report_Event.event_type_name(event_type)}')
ad_data = AdvertisingData.from_bytes(ad_data)
self.print_advertisement(address, 'yellow', ad_data, rssi)
def on_advertisement(self, advertisement):
self.print_advertisement(advertisement)
def on_advertising_report(self, report):
print(f'{color("EVENT", "green")}: {report.event_type_string()}')
self.print_advertisement(Advertisement.from_advertising_report(report))
# -----------------------------------------------------------------------------
@@ -94,6 +109,7 @@ async def scan(
passive,
scan_interval,
scan_window,
phy,
filter_duplicates,
raw,
keystore_file,
@@ -126,11 +142,18 @@ async def scan(
device.on('advertisement', printer.on_advertisement)
await device.power_on()
if phy is None:
scanning_phys = [HCI_LE_1M_PHY, HCI_LE_CODED_PHY]
else:
scanning_phys = [{'1m': HCI_LE_1M_PHY, 'coded': HCI_LE_CODED_PHY}[phy]]
await device.start_scanning(
active=(not passive),
scan_interval=scan_interval,
scan_window=scan_window,
filter_duplicates=filter_duplicates
filter_duplicates=filter_duplicates,
scanning_phys=scanning_phys
)
await hci_source.wait_for_termination()
@@ -142,14 +165,15 @@ async def scan(
@click.option('--passive', is_flag=True, default=False, help='Perform passive scanning')
@click.option('--scan-interval', type=int, default=60, help='Scan interval')
@click.option('--scan-window', type=int, default=60, help='Scan window')
@click.option('--phy', type=click.Choice(['1m', 'coded']), help='Only scan on the specified PHY')
@click.option('--filter-duplicates', type=bool, default=True, help='Filter duplicates at the controller level')
@click.option('--raw', is_flag=True, default=False, help='Listen for raw advertising reports instead of processed ones')
@click.option('--keystore-file', help='Keystore file to use when resolving addresses')
@click.option('--device-config', help='Device config file for the scanning device')
@click.argument('transport')
def main(min_rssi, passive, scan_interval, scan_window, filter_duplicates, raw, keystore_file, device_config, transport):
def main(min_rssi, passive, scan_interval, scan_window, phy, filter_duplicates, raw, keystore_file, device_config, transport):
logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'WARNING').upper())
asyncio.run(scan(min_rssi, passive, scan_interval, scan_window, filter_duplicates, raw, keystore_file, device_config, transport))
asyncio.run(scan(min_rssi, passive, scan_interval, scan_window, phy, filter_duplicates, raw, keystore_file, device_config, transport))
# -----------------------------------------------------------------------------

View File

@@ -90,7 +90,7 @@ class SnoopPacketReader:
@click.command()
@click.option('--format', type=click.Choice(['h4', 'snoop']), default='h4', help='Format of the input file')
@click.argument('filename')
def show(format, filename):
def main(format, filename):
input = open(filename, 'rb')
if format == 'h4':
packet_reader = PacketReader(input)
@@ -117,4 +117,4 @@ def show(format, filename):
# -----------------------------------------------------------------------------
if __name__ == '__main__':
show()
main()

View File

@@ -28,6 +28,8 @@
# -----------------------------------------------------------------------------
import os
import logging
import sys
import click
import usb1
from colors import color
@@ -35,6 +37,7 @@ from colors import color
# -----------------------------------------------------------------------------
# Constants
# -----------------------------------------------------------------------------
USB_DEVICE_CLASS_DEVICE = 0x00
USB_DEVICE_CLASS_WIRELESS_CONTROLLER = 0xE0
USB_DEVICE_SUBCLASS_RF_CONTROLLER = 0x01
USB_DEVICE_PROTOCOL_BLUETOOTH_PRIMARY_CONTROLLER = 0x01
@@ -75,9 +78,81 @@ USB_DEVICE_CLASSES = {
0xFF: 'Vendor Specific'
}
USB_ENDPOINT_IN = 0x80
USB_ENDPOINT_TYPES = ['CONTROL', 'ISOCHRONOUS', 'BULK', 'INTERRUPT']
USB_BT_HCI_CLASS_TUPLE = (
USB_DEVICE_CLASS_WIRELESS_CONTROLLER,
USB_DEVICE_SUBCLASS_RF_CONTROLLER,
USB_DEVICE_PROTOCOL_BLUETOOTH_PRIMARY_CONTROLLER
)
# -----------------------------------------------------------------------------
def main():
def show_device_details(device):
for configuration in device:
print(f' Configuration {configuration.getConfigurationValue()}')
for interface in configuration:
for setting in interface:
alternateSetting = setting.getAlternateSetting()
suffix = f'/{alternateSetting}' if interface.getNumSettings() > 1 else ''
(class_string, subclass_string) = get_class_info(
setting.getClass(),
setting.getSubClass(),
setting.getProtocol()
)
details = f'({class_string}, {subclass_string})'
print(f' Interface: {setting.getNumber()}{suffix} {details}')
for endpoint in setting:
endpoint_type = USB_ENDPOINT_TYPES[endpoint.getAttributes() & 3]
endpoint_direction = 'OUT' if (endpoint.getAddress() & USB_ENDPOINT_IN == 0) else 'IN'
print(f' Endpoint 0x{endpoint.getAddress():02X}: {endpoint_type} {endpoint_direction}')
# -----------------------------------------------------------------------------
def get_class_info(cls, subclass, protocol):
class_info = USB_DEVICE_CLASSES.get(cls)
protocol_string = ''
if class_info is None:
class_string = f'0x{cls:02X}'
else:
if type(class_info) is tuple:
class_string = class_info[0]
subclass_info = class_info[1].get(subclass)
if subclass_info:
protocol_string = subclass_info.get(protocol)
if protocol_string is not None:
protocol_string = f' [{protocol_string}]'
else:
class_string = class_info
subclass_string = f'{subclass}/{protocol}{protocol_string}'
return (class_string, subclass_string)
# -----------------------------------------------------------------------------
def is_bluetooth_hci(device):
# Check if the device class indicates a match
if (device.getDeviceClass(), device.getDeviceSubClass(), device.getDeviceProtocol()) == USB_BT_HCI_CLASS_TUPLE:
return True
# If the device class is 'Device', look for a matching interface
if device.getDeviceClass() == USB_DEVICE_CLASS_DEVICE:
for configuration in device:
for interface in configuration:
for setting in interface:
if (setting.getClass(), setting.getSubClass(), setting.getProtocol()) == USB_BT_HCI_CLASS_TUPLE:
return True
return False
# -----------------------------------------------------------------------------
@click.command()
@click.option('--verbose', is_flag=True, default=False, help='Print more details')
def main(verbose):
logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'WARNING').upper())
with usb1.USBContext() as context:
@@ -91,23 +166,28 @@ def main():
device_id = (device.getVendorID(), device.getProductID())
device_is_bluetooth_hci = (
device_class == USB_DEVICE_CLASS_WIRELESS_CONTROLLER and
device_subclass == USB_DEVICE_SUBCLASS_RF_CONTROLLER and
device_protocol == USB_DEVICE_PROTOCOL_BLUETOOTH_PRIMARY_CONTROLLER
(device_class_string, device_subclass_string) = get_class_info(
device_class,
device_subclass,
device_protocol
)
device_class_details = ''
device_class_info = USB_DEVICE_CLASSES.get(device_class)
if device_class_info is not None:
if type(device_class_info) is tuple:
device_class = device_class_info[0]
device_subclass_info = device_class_info[1].get(device_subclass)
if device_subclass_info:
device_class_details = f' [{device_subclass_info.get(device_protocol)}]'
else:
device_class = device_class_info
try:
device_serial_number = device.getSerialNumber()
except usb1.USBError:
device_serial_number = None
try:
device_manufacturer = device.getManufacturer()
except usb1.USBError:
device_manufacturer = None
try:
device_product = device.getProduct()
except usb1.USBError:
device_product = None
device_is_bluetooth_hci = is_bluetooth_hci(device)
if device_is_bluetooth_hci:
bluetooth_device_count += 1
fg_color = 'black'
@@ -123,33 +203,35 @@ def main():
if device_is_bluetooth_hci:
bumble_transport_names.append(f'usb:{bluetooth_device_count - 1}')
serial_number_collision = False
if device_id in devices:
for device_serial in devices[device_id]:
if device_serial == device.getSerialNumber():
serial_number_collision = True
if device_id not in devices:
bumble_transport_names.append(basic_transport_name)
else:
bumble_transport_names.append(f'{basic_transport_name}#{len(devices[device_id])}')
if device.getSerialNumber() and not serial_number_collision:
bumble_transport_names.append(f'{basic_transport_name}/{device.getSerialNumber()}')
if device_serial_number is not None:
if device_id not in devices or device_serial_number not in devices[device_id]:
bumble_transport_names.append(f'{basic_transport_name}/{device_serial_number}')
# Print the results
print(color(f'ID {device.getVendorID():04X}:{device.getProductID():04X}', fg=fg_color, bg=bg_color))
if bumble_transport_names:
print(color(' Bumble Transport Names:', 'blue'), ' or '.join(color(x, 'cyan' if device_is_bluetooth_hci else 'red') for x in bumble_transport_names))
print(color(' Bus/Device: ', 'green'), f'{device.getBusNumber():03}/{device.getDeviceAddress():03}')
if device.getSerialNumber():
print(color(' Serial: ', 'green'), device.getSerialNumber())
print(color(' Class: ', 'green'), device_class)
print(color(' Subclass/Protocol: ', 'green'), f'{device_subclass}/{device_protocol}{device_class_details}')
print(color(' Manufacturer: ', 'green'), device.getManufacturer())
print(color(' Product: ', 'green'), device.getProduct())
print(color(' Class: ', 'green'), device_class_string)
print(color(' Subclass/Protocol: ', 'green'), device_subclass_string)
if device_serial_number is not None:
print(color(' Serial: ', 'green'), device_serial_number)
if device_manufacturer is not None:
print(color(' Manufacturer: ', 'green'), device_manufacturer)
if device_product is not None:
print(color(' Product: ', 'green'), device_product)
if verbose:
show_device_details(device)
print()
devices.setdefault(device_id, []).append(device.getSerialNumber())
devices.setdefault(device_id, []).append(device_serial_number)
# -----------------------------------------------------------------------------

View File

@@ -700,16 +700,26 @@ class Attribute(EventEmitter):
else:
self.value = value
def encode_value(self, value):
return value
def decode_value(self, value_bytes):
return value_bytes
def read_value(self, connection):
if read := getattr(self.value, 'read', None):
try:
return read(connection)
value = read(connection)
except ATT_Error as error:
raise ATT_Error(error_code=error.error_code, att_handle=self.handle)
else:
return self.value
value = self.value
return self.encode_value(value)
def write_value(self, connection, value_bytes):
value = self.decode_value(value_bytes)
def write_value(self, connection, value):
if write := getattr(self.value, 'write', None):
try:
write(connection, value)
@@ -721,7 +731,11 @@ class Attribute(EventEmitter):
self.emit('write', connection, value)
def __repr__(self):
if len(self.value) > 0:
if type(self.value) is bytes:
value_str = self.value.hex()
else:
value_str = str(self.value)
if value_str:
value_string = f', value={self.value.hex()}'
else:
value_string = ''

View File

@@ -76,7 +76,7 @@ class Controller:
self.supported_commands = bytes.fromhex('2000800000c000000000e40000002822000000000000040000f7ffff7f00000030f0f9ff01008004000000000000000000000000000000000000000000000000')
self.le_features = bytes.fromhex('ff49010000000000')
self.le_states = bytes.fromhex('ffff3fffff030000')
self.avertising_channel_tx_power = 0
self.advertising_channel_tx_power = 0
self.filter_accept_list_size = 8
self.resolving_list_size = 8
self.supported_max_tx_octets = 27
@@ -259,15 +259,15 @@ class Controller:
# Then say that the connection has completed
self.send_hci_packet(HCI_LE_Connection_Complete_Event(
status = HCI_SUCCESS,
connection_handle = connection.handle,
role = connection.role,
peer_address_type = peer_address_type,
peer_address = peer_address,
conn_interval = 10, # FIXME
conn_latency = 0, # FIXME
supervision_timeout = 10, # FIXME
master_clock_accuracy = 7 # FIXME
status = HCI_SUCCESS,
connection_handle = connection.handle,
role = connection.role,
peer_address_type = peer_address_type,
peer_address = peer_address,
connection_interval = 10, # FIXME
peripheral_latency = 0, # FIXME
supervision_timeout = 10, # FIXME
central_clock_accuracy = 7 # FIXME
))
def on_link_central_disconnected(self, peer_address, reason):
@@ -313,15 +313,15 @@ class Controller:
# Say that the connection has completed
self.send_hci_packet(HCI_LE_Connection_Complete_Event(
status = status,
connection_handle = connection.handle if connection else 0,
role = BT_CENTRAL_ROLE,
peer_address_type = le_create_connection_command.peer_address_type,
peer_address = le_create_connection_command.peer_address,
conn_interval = le_create_connection_command.conn_interval_min,
conn_latency = le_create_connection_command.conn_latency,
supervision_timeout = le_create_connection_command.supervision_timeout,
master_clock_accuracy = 0
status = status,
connection_handle = connection.handle if connection else 0,
role = BT_CENTRAL_ROLE,
peer_address_type = le_create_connection_command.peer_address_type,
peer_address = le_create_connection_command.peer_address,
connection_interval = le_create_connection_command.connection_interval_min,
peripheral_latency = le_create_connection_command.max_latency,
supervision_timeout = le_create_connection_command.supervision_timeout,
central_clock_accuracy = 0
))
def on_link_peripheral_disconnection_complete(self, disconnection_command, status):
@@ -583,13 +583,15 @@ class Controller:
'''
See Bluetooth spec Vol 2, Part E - 7.4.1 Read Local Version Information Command
'''
return struct.pack('<BBHBHH',
HCI_SUCCESS,
self.hci_version,
self.hci_revision,
self.lmp_version,
self.manufacturer_name,
self.lmp_subversion)
return struct.pack(
'<BBHBHH',
HCI_SUCCESS,
self.hci_version,
self.hci_revision,
self.lmp_version,
self.manufacturer_name,
self.lmp_subversion
)
def on_hci_read_local_supported_commands_command(self, command):
'''
@@ -650,7 +652,7 @@ class Controller:
'''
See Bluetooth spec Vol 2, Part E - 7.8.6 LE Read Advertising Channel Tx Power Command
'''
return bytes([HCI_SUCCESS, self.avertising_channel_tx_power])
return bytes([HCI_SUCCESS, self.advertising_channel_tx_power])
def on_hci_le_set_advertising_data_command(self, command):
'''
@@ -857,9 +859,9 @@ class Controller:
See Bluetooth spec Vol 2, Part E - 7.8.44 LE Set Address Resolution Enable Command
'''
ret = HCI_SUCCESS
if command.address_resolution == 1:
if command.address_resolution_enable == 1:
self.le_address_resolution = True
elif command.address_resolution == 0:
elif command.address_resolution_enable == 0:
self.le_address_resolution = False
else:
ret = HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR
@@ -876,12 +878,26 @@ class Controller:
'''
See Bluetooth spec Vol 2, Part E - 7.8.46 LE Read Maximum Data Length Command
'''
return struct.pack('<BHHHH',
HCI_SUCCESS,
self.supported_max_tx_octets,
self.supported_max_tx_time,
self.supported_max_rx_octets,
self.supported_max_rx_time)
return struct.pack(
'<BHHHH',
HCI_SUCCESS,
self.supported_max_tx_octets,
self.supported_max_tx_time,
self.supported_max_rx_octets,
self.supported_max_rx_time
)
def on_hci_le_read_phy_command(self, command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.47 LE Read PHY command
'''
return struct.pack(
'<BHBB',
HCI_SUCCESS,
command.connection_handle,
HCI_LE_1M_PHY,
HCI_LE_1M_PHY
)
def on_hci_le_set_default_phy_command(self, command):
'''
@@ -893,3 +909,4 @@ class Controller:
'rx_phys': command.rx_phys
}
return bytes([HCI_SUCCESS])

View File

@@ -831,13 +831,17 @@ class AdvertisingData:
# Connection Parameters
# -----------------------------------------------------------------------------
class ConnectionParameters:
def __init__(self, connection_interval, connection_latency, supervision_timeout):
def __init__(self, connection_interval, peripheral_latency, supervision_timeout):
self.connection_interval = connection_interval
self.connection_latency = connection_latency
self.peripheral_latency = peripheral_latency
self.supervision_timeout = supervision_timeout
def __str__(self):
return f'ConnectionParameters(connection_interval={self.connection_interval}, connection_latency={self.connection_latency}, supervision_timeout={self.supervision_timeout}'
return (
f'ConnectionParameters(connection_interval={self.connection_interval}, '
f'peripheral_latency={self.peripheral_latency}, '
f'supervision_timeout={self.supervision_timeout}'
)
# -----------------------------------------------------------------------------

File diff suppressed because it is too large Load Diff

View File

@@ -303,6 +303,7 @@ class CharacteristicAdapter:
'''
def __init__(self, characteristic):
self.wrapped_characteristic = characteristic
self.subscribers = {} # Map from subscriber to proxy subscriber
if (
asyncio.iscoroutinefunction(characteristic.read_value) and
@@ -317,11 +318,21 @@ class CharacteristicAdapter:
if hasattr(self.wrapped_characteristic, 'subscribe'):
self.subscribe = self.wrapped_subscribe
if hasattr(self.wrapped_characteristic, 'unsubscribe'):
self.unsubscribe = self.wrapped_unsubscribe
def __getattr__(self, name):
return getattr(self.wrapped_characteristic, name)
def __setattr__(self, name, value):
if name in {'wrapped_characteristic', 'read_value', 'write_value', 'subscribe'}:
if name in {
'wrapped_characteristic',
'subscribers',
'read_value',
'write_value',
'subscribe',
'unsubscribe'
}:
super().__setattr__(name, value)
else:
setattr(self.wrapped_characteristic, name, value)
@@ -335,8 +346,11 @@ class CharacteristicAdapter:
async def read_decoded_value(self):
return self.decode_value(await self.wrapped_characteristic.read_value())
async def write_decoded_value(self, value):
return await self.wrapped_characteristic.write_value(self.encode_value(value))
async def write_decoded_value(self, value, with_response=False):
return await self.wrapped_characteristic.write_value(
self.encode_value(value),
with_response
)
def encode_value(self, value):
return value
@@ -345,9 +359,26 @@ class CharacteristicAdapter:
return value
def wrapped_subscribe(self, subscriber=None):
return self.wrapped_characteristic.subscribe(
None if subscriber is None else lambda value: subscriber(self.decode_value(value))
)
if subscriber is not None:
if subscriber in self.subscribers:
# We already have a proxy subscriber
subscriber = self.subscribers[subscriber]
else:
# Create and register a proxy that will decode the value
original_subscriber = subscriber
def on_change(value):
original_subscriber(self.decode_value(value))
self.subscribers[subscriber] = on_change
subscriber = on_change
return self.wrapped_characteristic.subscribe(subscriber)
def wrapped_unsubscribe(self, subscriber=None):
if subscriber in self.subscribers:
subscriber = self.subscribers.pop(subscriber)
return self.wrapped_characteristic.unsubscribe(subscriber)
def __str__(self):
wrapped = str(self.wrapped_characteristic)

View File

@@ -58,10 +58,16 @@ class AttributeProxy(EventEmitter):
self.type = attribute_type
async def read_value(self, no_long_read=False):
return await self.client.read_value(self.handle, no_long_read)
return self.decode_value(await self.client.read_value(self.handle, no_long_read))
async def write_value(self, value, with_response=False):
return await self.client.write_value(self.handle, value, with_response)
return await self.client.write_value(self.handle, self.encode_value(value), with_response)
def encode_value(self, value):
return value
def decode_value(self, value_bytes):
return value_bytes
def __str__(self):
return f'Attribute(handle=0x{self.handle:04X}, type={self.uuid})'
@@ -98,6 +104,7 @@ class CharacteristicProxy(AttributeProxy):
self.properties = properties
self.descriptors = []
self.descriptors_discovered = False
self.subscribers = {} # Map from subscriber to proxy subscriber
def get_descriptor(self, descriptor_type):
for descriptor in self.descriptors:
@@ -108,9 +115,25 @@ class CharacteristicProxy(AttributeProxy):
return await self.client.discover_descriptors(self)
async def subscribe(self, subscriber=None):
if subscriber is not None:
if subscriber in self.subscribers:
# We already have a proxy subscriber
subscriber = self.subscribers[subscriber]
else:
# Create and register a proxy that will decode the value
original_subscriber = subscriber
def on_change(value):
original_subscriber(self.decode_value(value))
self.subscribers[subscriber] = on_change
subscriber = on_change
return await self.client.subscribe(self, subscriber)
async def unsubscribe(self, subscriber=None):
if subscriber in self.subscribers:
subscriber = self.subscribers.pop(subscriber)
return await self.client.unsubscribe(self, subscriber)
def __str__(self):
@@ -140,7 +163,6 @@ class ProfileServiceProxy:
class Client:
def __init__(self, connection):
self.connection = connection
self.mtu = ATT_DEFAULT_MTU
self.mtu_exchange_done = False
self.request_semaphore = asyncio.Semaphore(1)
self.pending_request = None
@@ -162,8 +184,8 @@ class Client:
# Wait until we can send (only one pending command at a time for the connection)
response = None
async with self.request_semaphore:
assert(self.pending_request is None)
assert(self.pending_response is None)
assert self.pending_request is None
assert self.pending_response is None
# Create a future value to hold the eventual response
self.pending_response = asyncio.get_running_loop().create_future()
@@ -194,7 +216,7 @@ class Client:
# We can only send one request per connection
if self.mtu_exchange_done:
return
return self.connection.att_mtu
# Send the request
self.mtu_exchange_done = True
@@ -207,8 +229,10 @@ class Client:
response
)
self.mtu = max(ATT_DEFAULT_MTU, response.server_rx_mtu)
return self.mtu
# Compute the final MTU
self.connection.att_mtu = min(mtu, response.server_rx_mtu)
return self.connection.att_mtu
def get_services_by_uuid(self, uuid):
return [service for service in self.services if service.uuid == uuid]
@@ -570,12 +594,18 @@ class Client:
subscribers = subscriber_set.get(characteristic.handle, [])
if subscriber in subscribers:
subscribers.remove(subscriber)
# Cleanup if we removed the last one
if not subscribers:
subscriber_set.remove(characteristic.handle)
else:
# Remove all subscribers for this attribute from the sets!
self.notification_subscribers.pop(characteristic.handle, None)
self.indication_subscribers.pop(characteristic.handle, None)
await self.write_value(cccd, b'\x00\x00', with_response=True)
if not self.notification_subscribers and not self.indication_subscribers:
# No more subscribers left
await self.write_value(cccd, b'\x00\x00', with_response=True)
async def read_value(self, attribute, no_long_read=False):
'''
@@ -600,7 +630,7 @@ class Client:
# If the value is the max size for the MTU, try to read more unless the caller
# specifically asked not to do that
attribute_value = response.attribute_value
if not no_long_read and len(attribute_value) == self.mtu - 1:
if not no_long_read and len(attribute_value) == self.connection.att_mtu - 1:
logger.debug('using READ BLOB to get the rest of the value')
offset = len(attribute_value)
while True:
@@ -622,7 +652,7 @@ class Client:
part = response.part_attribute_value
attribute_value += part
if len(part) < self.mtu - 1:
if len(part) < self.connection.att_mtu - 1:
break
offset += len(part)

View File

@@ -40,6 +40,12 @@ from .gatt import *
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
# Constants
# -----------------------------------------------------------------------------
GATT_SERVER_DEFAULT_MAX_MTU = 517
# -----------------------------------------------------------------------------
# GATT Server
# -----------------------------------------------------------------------------
@@ -49,9 +55,8 @@ class Server(EventEmitter):
self.device = device
self.attributes = [] # Attributes, ordered by increasing handle values
self.attributes_by_handle = {} # Map for fast attribute access by handle
self.max_mtu = 23 # FIXME: 517 # The max MTU we're willing to negotiate
self.max_mtu = GATT_SERVER_DEFAULT_MAX_MTU # The max MTU we're willing to negotiate
self.subscribers = {} # Map of subscriber states by connection handle and attribute handle
self.mtus = {} # Map of ATT MTU values by connection handle
self.indication_semaphores = defaultdict(lambda: asyncio.Semaphore(1))
self.pending_confirmations = defaultdict(lambda: None)
@@ -169,7 +174,7 @@ class Server(EventEmitter):
logger.debug(f'GATT Response from server: [0x{connection.handle:04X}] {response}')
self.send_gatt_pdu(connection.handle, response.to_bytes())
async def notify_subscriber(self, connection, attribute, force=False):
async def notify_subscriber(self, connection, attribute, value=None, force=False):
# Check if there's a subscriber
if not force:
subscribers = self.subscribers.get(connection.handle)
@@ -184,13 +189,12 @@ class Server(EventEmitter):
logger.debug(f'not notifying, cccd={cccd.hex()}')
return
# Get the value
value = attribute.read_value(connection)
# Get or encode the value
value = attribute.read_value(connection) if value is None else attribute.encode_value(value)
# Truncate if needed
mtu = self.get_mtu(connection)
if len(value) > mtu - 3:
value = value[:mtu - 3]
if len(value) > connection.att_mtu - 3:
value = value[:connection.att_mtu - 3]
# Notify
notification = ATT_Handle_Value_Notification(
@@ -198,27 +202,9 @@ class Server(EventEmitter):
attribute_value = value
)
logger.debug(f'GATT Notify from server: [0x{connection.handle:04X}] {notification}')
self.send_gatt_pdu(connection.handle, notification.to_bytes())
self.send_gatt_pdu(connection.handle, bytes(notification))
async def notify_subscribers(self, attribute, force=False):
# Get all the connections for which there's at least one subscription
connections = [
connection for connection in [
self.device.lookup_connection(connection_handle)
for (connection_handle, subscribers) in self.subscribers.items()
if force or subscribers.get(attribute.handle)
]
if connection is not None
]
# Notify for each connection
if connections:
await asyncio.wait([
self.notify_subscriber(connection, attribute, force)
for connection in connections
])
async def indicate_subscriber(self, connection, attribute, force=False):
async def indicate_subscriber(self, connection, attribute, value=None, force=False):
# Check if there's a subscriber
if not force:
subscribers = self.subscribers.get(connection.handle)
@@ -233,13 +219,12 @@ class Server(EventEmitter):
logger.debug(f'not indicating, cccd={cccd.hex()}')
return
# Get the value
value = attribute.read_value(connection)
# Get or encode the value
value = attribute.read_value(connection) if value is None else attribute.encode_value(value)
# Truncate if needed
mtu = self.get_mtu(connection)
if len(value) > mtu - 3:
value = value[:mtu - 3]
if len(value) > connection.att_mtu - 3:
value = value[:connection.att_mtu - 3]
# Indicate
indication = ATT_Handle_Value_Indication(
@@ -264,27 +249,32 @@ class Server(EventEmitter):
finally:
self.pending_confirmations[connection.handle] = None
async def indicate_subscribers(self, attribute):
async def notify_or_indicate_subscribers(self, indicate, attribute, value=None, force=False):
# Get all the connections for which there's at least one subscription
connections = [
connection for connection in [
self.device.lookup_connection(connection_handle)
for (connection_handle, subscribers) in self.subscribers.items()
if subscribers.get(attribute.handle)
if force or subscribers.get(attribute.handle)
]
if connection is not None
]
# Indicate for each connection
# Indicate or notify for each connection
if connections:
coroutine = self.indicate_subscriber if indicate else self.notify_subscriber
await asyncio.wait([
self.indicate_subscriber(connection, attribute)
asyncio.create_task(coroutine(connection, attribute, value, force))
for connection in connections
])
async def notify_subscribers(self, attribute, value=None, force=False):
return await self.notify_or_indicate_subscribers(False, attribute, value, force)
async def indicate_subscribers(self, attribute, value=None, force=False):
return await self.notify_or_indicate_subscribers(True, attribute, value, force)
def on_disconnection(self, connection):
if connection.handle in self.mtus:
del self.mtus[connection.handle]
if connection.handle in self.subscribers:
del self.subscribers[connection.handle]
if connection.handle in self.indication_semaphores:
@@ -325,9 +315,6 @@ class Server(EventEmitter):
# Just ignore
logger.warning(f'{color("--- Ignoring GATT Request from [0x{connection.handle:04X}]:", "red")} {att_pdu}')
def get_mtu(self, connection):
return self.mtus.get(connection.handle, ATT_DEFAULT_MTU)
#######################################################
# ATT handlers
#######################################################
@@ -347,12 +334,16 @@ class Server(EventEmitter):
'''
See Bluetooth spec Vol 3, Part F - 3.4.2.1 Exchange MTU Request
'''
mtu = max(ATT_DEFAULT_MTU, min(self.max_mtu, request.client_rx_mtu))
self.mtus[connection.handle] = mtu
self.send_response(connection, ATT_Exchange_MTU_Response(server_rx_mtu = mtu))
self.send_response(connection, ATT_Exchange_MTU_Response(server_rx_mtu = self.max_mtu))
# Notify the device
self.device.on_connection_att_mtu_update(connection.handle, mtu)
# Compute the final MTU
if request.client_rx_mtu >= ATT_DEFAULT_MTU:
mtu = min(self.max_mtu, request.client_rx_mtu)
# Notify the device
self.device.on_connection_att_mtu_update(connection.handle, mtu)
else:
logger.warning('invalid client_rx_mtu received, MTU not changed')
def on_att_find_information_request(self, connection, request):
'''
@@ -369,7 +360,7 @@ class Server(EventEmitter):
return
# Build list of returned attributes
pdu_space_available = self.get_mtu(connection) - 2
pdu_space_available = connection.att_mtu - 2
attributes = []
uuid_size = 0
for attribute in (
@@ -420,7 +411,7 @@ class Server(EventEmitter):
'''
# Build list of returned attributes
pdu_space_available = self.get_mtu(connection) - 2
pdu_space_available = connection.att_mtu - 2
attributes = []
for attribute in (
attribute for attribute in self.attributes if
@@ -468,8 +459,7 @@ class Server(EventEmitter):
See Bluetooth spec Vol 3, Part F - 3.4.4.1 Read By Type Request
'''
mtu = self.get_mtu(connection)
pdu_space_available = mtu - 2
pdu_space_available = connection.att_mtu - 2
attributes = []
for attribute in (
attribute for attribute in self.attributes if
@@ -482,7 +472,7 @@ class Server(EventEmitter):
# Check the attribute value size
attribute_value = attribute.read_value(connection)
max_attribute_size = min(mtu - 4, 253)
max_attribute_size = min(connection.att_mtu - 4, 253)
if len(attribute_value) > max_attribute_size:
# We need to truncate
attribute_value = attribute_value[:max_attribute_size]
@@ -522,7 +512,7 @@ class Server(EventEmitter):
if attribute := self.get_attribute(request.attribute_handle):
# TODO: check permissions
value = attribute.read_value(connection)
value_size = min(self.get_mtu(connection) - 1, len(value))
value_size = min(connection.att_mtu - 1, len(value))
response = ATT_Read_Response(
attribute_value = value[:value_size]
)
@@ -541,7 +531,6 @@ class Server(EventEmitter):
if attribute := self.get_attribute(request.attribute_handle):
# TODO: check permissions
mtu = self.get_mtu(connection)
value = attribute.read_value(connection)
if request.value_offset > len(value):
response = ATT_Error_Response(
@@ -549,14 +538,14 @@ class Server(EventEmitter):
attribute_handle_in_error = request.attribute_handle,
error_code = ATT_INVALID_OFFSET_ERROR
)
elif len(value) <= mtu - 1:
elif len(value) <= connection.att_mtu - 1:
response = ATT_Error_Response(
request_opcode_in_error = request.op_code,
attribute_handle_in_error = request.attribute_handle,
error_code = ATT_ATTRIBUTE_NOT_LONG_ERROR
)
else:
part_size = min(mtu - 1, len(value) - request.value_offset)
part_size = min(connection.att_mtu - 1, len(value) - request.value_offset)
response = ATT_Read_Blob_Response(
part_attribute_value = value[request.value_offset:request.value_offset + part_size]
)
@@ -585,8 +574,7 @@ class Server(EventEmitter):
self.send_response(connection, response)
return
mtu = self.get_mtu(connection)
pdu_space_available = mtu - 2
pdu_space_available = connection.att_mtu - 2
attributes = []
for attribute in (
attribute for attribute in self.attributes if
@@ -597,7 +585,7 @@ class Server(EventEmitter):
):
# Check the attribute value size
attribute_value = attribute.read_value(connection)
max_attribute_size = min(mtu - 6, 251)
max_attribute_size = min(connection.att_mtu - 6, 251)
if len(attribute_value) > max_attribute_size:
# We need to truncate
attribute_value = attribute_value[:max_attribute_size]

File diff suppressed because it is too large Load Diff

View File

@@ -76,7 +76,7 @@ class Host(EventEmitter):
self.hc_total_num_acl_data_packets = HOST_HC_TOTAL_NUM_ACL_DATA_PACKETS
self.acl_packet_queue = collections.deque()
self.acl_packets_in_flight = 0
self.local_version = HCI_VERSION_BLUETOOTH_CORE_4_0
self.local_version = None
self.local_supported_commands = bytes(64)
self.local_le_features = 0
self.command_semaphore = asyncio.Semaphore(1)
@@ -91,32 +91,23 @@ class Host(EventEmitter):
self.set_packet_sink(controller_sink)
async def reset(self):
await self.send_command(HCI_Reset_Command())
await self.send_command(HCI_Reset_Command(), check_result=True)
self.ready = True
response = await self.send_command(HCI_Read_Local_Supported_Commands_Command())
if response.return_parameters.status == HCI_SUCCESS:
self.local_supported_commands = response.return_parameters.supported_commands
else:
logger.warn(f'HCI_Read_Local_Supported_Commands_Command failed: {response.return_parameters.status}')
response = await self.send_command(HCI_Read_Local_Supported_Commands_Command(), check_result=True)
self.local_supported_commands = response.return_parameters.supported_commands
if self.supports_command(HCI_LE_READ_LOCAL_SUPPORTED_FEATURES_COMMAND):
response = await self.send_command(HCI_LE_Read_Local_Supported_Features_Command())
if response.return_parameters.status == HCI_SUCCESS:
self.local_le_features = struct.unpack('<Q', response.return_parameters.le_features)[0]
else:
logger.warn(f'HCI_LE_Read_Supported_Features_Command failed: {response.return_parameters.status}')
response = await self.send_command(HCI_LE_Read_Local_Supported_Features_Command(), check_result=True)
self.local_le_features = struct.unpack('<Q', response.return_parameters.le_features)[0]
if self.supports_command(HCI_READ_LOCAL_VERSION_INFORMATION_COMMAND):
response = await self.send_command(HCI_Read_Local_Version_Information_Command())
if response.return_parameters.status == HCI_SUCCESS:
self.local_version = response.return_parameters
else:
logger.warn(f'HCI_Read_Local_Version_Information_Command failed: {response.return_parameters.status}')
response = await self.send_command(HCI_Read_Local_Version_Information_Command(), check_result=True)
self.local_version = response.return_parameters
await self.send_command(HCI_Set_Event_Mask_Command(event_mask = bytes.fromhex('FFFFFFFFFFFFFF3F')))
if self.local_version.hci_version <= HCI_VERSION_BLUETOOTH_CORE_4_0:
if self.local_version is not None and self.local_version.hci_version <= HCI_VERSION_BLUETOOTH_CORE_4_0:
# Some older controllers don't like event masks with bits they don't understand
le_event_mask = bytes.fromhex('1F00000000000000')
else:
@@ -124,20 +115,14 @@ class Host(EventEmitter):
await self.send_command(HCI_LE_Set_Event_Mask_Command(le_event_mask = le_event_mask))
if self.supports_command(HCI_READ_BUFFER_SIZE_COMMAND):
response = await self.send_command(HCI_Read_Buffer_Size_Command())
if response.return_parameters.status == HCI_SUCCESS:
self.hc_acl_data_packet_length = response.return_parameters.hc_acl_data_packet_length
self.hc_total_num_acl_data_packets = response.return_parameters.hc_total_num_acl_data_packets
else:
logger.warn(f'HCI_Read_Buffer_Size_Command failed: {response.return_parameters.status}')
response = await self.send_command(HCI_Read_Buffer_Size_Command(), check_result=True)
self.hc_acl_data_packet_length = response.return_parameters.hc_acl_data_packet_length
self.hc_total_num_acl_data_packets = response.return_parameters.hc_total_num_acl_data_packets
if self.supports_command(HCI_LE_READ_BUFFER_SIZE_COMMAND):
response = await self.send_command(HCI_LE_Read_Buffer_Size_Command())
if response.return_parameters.status == HCI_SUCCESS:
self.hc_le_acl_data_packet_length = response.return_parameters.hc_le_acl_data_packet_length
self.hc_total_num_le_acl_data_packets = response.return_parameters.hc_total_num_le_acl_data_packets
else:
logger.warn(f'HCI_LE_Read_Buffer_Size_Command failed: {response.return_parameters.status}')
response = await self.send_command(HCI_LE_Read_Buffer_Size_Command(), check_result=True)
self.hc_le_acl_data_packet_length = response.return_parameters.hc_le_acl_data_packet_length
self.hc_total_num_le_acl_data_packets = response.return_parameters.hc_total_num_le_acl_data_packets
if response.return_parameters.hc_le_acl_data_packet_length == 0 or response.return_parameters.hc_total_num_le_acl_data_packets == 0:
# LE and Classic share the same values
@@ -171,7 +156,7 @@ class Host(EventEmitter):
def send_hci_packet(self, packet):
self.hci_sink.on_packet(packet.to_bytes())
async def send_command(self, command):
async def send_command(self, command, check_result=False):
logger.debug(f'{color("### HOST -> CONTROLLER", "blue")}: {command}')
# Wait until we can send (only one pending command at a time)
@@ -186,11 +171,22 @@ class Host(EventEmitter):
try:
self.send_hci_packet(command)
response = await self.pending_response
# TODO: check error values
# Check the return parameters if required
if check_result:
if type(response.return_parameters) is int:
status = response.return_parameters
else:
status = response.return_parameters.status
if status != HCI_SUCCESS:
logger.warning(f'{command.name} failed ({HCI_Constant.error_name(status)})')
raise HCI_Error(status)
return response
except Exception as error:
logger.warning(f'{color("!!! Exception while sending HCI packet:", "red")} {error}')
# raise error
raise error
finally:
self.pending_command = None
self.pending_response = None
@@ -370,8 +366,8 @@ class Host(EventEmitter):
# Notify the client
connection_parameters = ConnectionParameters(
event.conn_interval,
event.conn_latency,
event.connection_interval,
event.peripheral_latency,
event.supervision_timeout
)
self.emit(
@@ -387,7 +383,7 @@ class Host(EventEmitter):
logger.debug(f'### CONNECTION FAILED: {event.status}')
# Notify the listeners
self.emit('connection_failure', event.status)
self.emit('connection_failure', event.connection_handle, event.status)
def on_hci_le_enhanced_connection_complete_event(self, event):
# Just use the same implementation as for the non-enhanced event for now
@@ -435,7 +431,7 @@ class Host(EventEmitter):
logger.debug(f'### DISCONNECTION FAILED: {event.status}')
# Notify the listeners
self.emit('disconnection_failure', event.status)
self.emit('disconnection_failure', event.connection_handle, event.status)
def on_hci_le_connection_update_complete_event(self, event):
if (connection := self.connections.get(event.connection_handle)) is None:
@@ -445,8 +441,8 @@ class Host(EventEmitter):
# Notify the client
if event.status == HCI_SUCCESS:
connection_parameters = ConnectionParameters(
event.conn_interval,
event.conn_latency,
event.connection_interval,
event.peripheral_latency,
event.supervision_timeout
)
self.emit('connection_parameters_update', connection.handle, connection_parameters)
@@ -467,13 +463,10 @@ class Host(EventEmitter):
def on_hci_le_advertising_report_event(self, event):
for report in event.reports:
self.emit(
'advertising_report',
report.address,
report.data,
report.rssi,
report.event_type
)
self.emit('advertising_report', report)
def on_hci_le_extended_advertising_report_event(self, event):
self.on_hci_le_advertising_report_event(event)
def on_hci_le_remote_connection_parameter_request_event(self, event):
if event.connection_handle not in self.connections:
@@ -489,8 +482,8 @@ class Host(EventEmitter):
interval_max = event.interval_max,
latency = event.latency,
timeout = event.timeout,
minimum_ce_length = 0,
maximum_ce_length = 0
min_ce_length = 0,
max_ce_length = 0
)
)

View File

@@ -462,10 +462,10 @@ class L2CAP_Information_Response(L2CAP_Control_Frame):
# -----------------------------------------------------------------------------
@L2CAP_Control_Frame.subclass([
('interval_min', 2),
('interval_max', 2),
('slave_latency', 2),
('timeout_multiplier', 2)
('interval_min', 2),
('interval_max', 2),
('latency', 2),
('timeout', 2)
])
class L2CAP_Connection_Parameter_Update_Request(L2CAP_Control_Frame):
'''
@@ -857,10 +857,10 @@ class ChannelManager:
identifier = (self.identifiers.setdefault(connection.handle, 0) + 1) % 256
self.identifiers[connection.handle] = identifier
return identifier
def register_fixed_channel(self, cid, handler):
self.fixed_channels[cid] = handler
def deregister_fixed_channel(self, cid):
if cid in self.fixed_channels:
del self.fixed_channels[cid]
@@ -1057,13 +1057,13 @@ class ChannelManager:
)
)
self.host.send_command_sync(HCI_LE_Connection_Update_Command(
connection_handle = connection.handle,
conn_interval_min = request.interval_min,
conn_interval_max = request.interval_max,
conn_latency = request.slave_latency,
supervision_timeout = request.timeout_multiplier,
minimum_ce_length = 0,
maximum_ce_length = 0
connection_handle = connection.handle,
connection_interval_min = request.interval_min,
connection_interval_max = request.interval_max,
max_latency = request.latency,
supervision_timeout = request.timeout,
min_ce_length = 0,
max_ce_length = 0
))
else:
self.send_control_frame(

View File

@@ -36,7 +36,7 @@ logger = logging.getLogger(__name__)
async def open_usb_transport(spec):
'''
Open a USB transport.
The parameter string has this syntax:
The moniker string has this syntax:
either <index> or
<vendor>:<product> or
<vendor>:<product>/<serial-number>] or
@@ -47,15 +47,21 @@ async def open_usb_transport(spec):
/<serial-number> suffix or #<index> suffix max be specified when more than one device with
the same vendor and product identifiers are present.
In addition, if the moniker ends with the symbol "!", the device will be used in "forced" mode:
the first USB interface of the device will be used, regardless of the interface class/subclass.
This may be useful for some devices that use a custom class/subclass but may nonetheless work as-is.
Examples:
0 --> the first BT USB dongle
04b4:f901 --> the BT USB dongle with vendor=04b4 and product=f901
04b4:f901#2 --> the third USB device with vendor=04b4 and product=f901
04b4:f901/00E04C239987 --> the BT USB dongle with vendor=04b4 and product=f901 and serial number 00E04C239987
usb:0B05:17CB! --> the BT USB dongle vendor=0B05 and product=17CB, in "forced" mode.
'''
USB_RECIPIENT_DEVICE = 0x00
USB_REQUEST_TYPE_CLASS = 0x01 << 5
USB_DEVICE_CLASS_DEVICE = 0x00
USB_DEVICE_CLASS_WIRELESS_CONTROLLER = 0xE0
USB_DEVICE_SUBCLASS_RF_CONTROLLER = 0x01
USB_DEVICE_PROTOCOL_BLUETOOTH_PRIMARY_CONTROLLER = 0x01
@@ -63,6 +69,12 @@ async def open_usb_transport(spec):
USB_ENDPOINT_TRANSFER_TYPE_INTERRUPT = 0x03
USB_ENDPOINT_IN = 0x80
USB_BT_HCI_CLASS_TUPLE = (
USB_DEVICE_CLASS_WIRELESS_CONTROLLER,
USB_DEVICE_SUBCLASS_RF_CONTROLLER,
USB_DEVICE_PROTOCOL_BLUETOOTH_PRIMARY_CONTROLLER
)
READ_SIZE = 1024
class UsbPacketSink:
@@ -280,6 +292,13 @@ async def open_usb_transport(spec):
context.open()
try:
found = None
if spec.endswith('!'):
spec = spec[:-1]
forced_mode = True
else:
forced_mode = False
if ':' in spec:
vendor_id, product_id = spec.split(':')
serial_number = None
@@ -291,10 +310,14 @@ async def open_usb_transport(spec):
device_index = int(device_index_str)
for device in context.getDeviceIterator(skip_on_error=True):
try:
device_serial_number = device.getSerialNumber()
except usb1.USBError:
device_serial_number = None
if (
device.getVendorID() == int(vendor_id, 16) and
device.getProductID() == int(product_id, 16) and
(serial_number is None or device.getSerialNumber() == serial_number)
(serial_number is None or serial_number == device_serial_number)
):
if device_index == 0:
found = device
@@ -302,13 +325,27 @@ async def open_usb_transport(spec):
device_index -= 1
device.close()
else:
# Look for a compatible device by index
def device_is_bluetooth_hci(device):
# Check if the device class indicates a match
if (device.getDeviceClass(), device.getDeviceSubClass(), device.getDeviceProtocol()) == \
USB_BT_HCI_CLASS_TUPLE:
return True
# If the device class is 'Device', look for a matching interface
if device.getDeviceClass() == USB_DEVICE_CLASS_DEVICE:
for configuration in device:
for interface in configuration:
for setting in interface:
if (setting.getClass(), setting.getSubClass(), setting.getProtocol()) == \
USB_BT_HCI_CLASS_TUPLE:
return True
return False
device_index = int(spec)
for device in context.getDeviceIterator(skip_on_error=True):
if (
device.getDeviceClass() == USB_DEVICE_CLASS_WIRELESS_CONTROLLER and
device.getDeviceSubClass() == USB_DEVICE_SUBCLASS_RF_CONTROLLER and
device.getDeviceProtocol() == USB_DEVICE_PROTOCOL_BLUETOOTH_PRIMARY_CONTROLLER
):
if device_is_bluetooth_hci(device):
if device_index == 0:
found = device
break
@@ -329,9 +366,8 @@ async def open_usb_transport(spec):
setting = None
for setting in interface:
if (
setting.getClass() != USB_DEVICE_CLASS_WIRELESS_CONTROLLER or
setting.getSubClass() != USB_DEVICE_SUBCLASS_RF_CONTROLLER or
setting.getProtocol() != USB_DEVICE_PROTOCOL_BLUETOOTH_PRIMARY_CONTROLLER
not forced_mode and
(setting.getClass(), setting.getSubClass(), setting.getProtocol()) != USB_BT_HCI_CLASS_TUPLE
):
continue

View File

@@ -45,6 +45,10 @@ nav:
- HCI Bridge: apps_and_tools/hci_bridge.md
- Golden Gate Bridge: apps_and_tools/gg_bridge.md
- Show: apps_and_tools/show.md
- GATT Dump: apps_and_tools/gatt_dump.md
- Pair: apps_and_tools/pair.md
- Unbond: apps_and_tools/unbond.md
- USB Probe: apps_and_tools/usb_probe.md
- Hardware:
- Overview: hardware/index.md
- Platforms:

View File

@@ -1,6 +1,6 @@
# This requirements file is for python3
mkdocs == 1.2.3
mkdocs-material == 7.1.7
mkdocs-material-extensions == 1.0.1
pymdown-extensions == 8.2
mkdocstrings == 0.15.1
mkdocs == 1.4.0
mkdocs-material == 8.5.6
mkdocs-material-extensions == 1.0.3
pymdown-extensions == 9.6
mkdocstrings-python == 0.7.1

View File

@@ -13,17 +13,29 @@ type of device (there's no way to tell).
## Usage
This command line tool takes no arguments.
This command line tool may be invoked with no arguments, or with `--verbose`
for extra details.
When installed from PyPI, run as
```
$ bumble-usb-probe
```
or, for extra details, with the `--verbose` argument
```
$ bumble-usb-probe --v
```
When running from the source distribution:
```
$ python3 apps/usb-probe.py
```
or
```
$ python3 apps/usb-probe.py --verbose
```
!!! example
```
$ python3 apps/usb_probe.py

View File

@@ -1,48 +1,86 @@
:material-linux: LINUX PLATFORM
===============================
In addition to all the standard functionality available from the project by running the python tools and/or writing your own apps by leveraging the API, it is also possible on Linux hosts to interface the Bumble stack with the native BlueZ stack, and with Bluetooth controllers.
Using Bumble With Physical Bluetooth Controllers
------------------------------------------------
Using Bumble With BlueZ
-----------------------
A Bumble application can interface with a local Bluetooth controller on a Linux host.
The 3 main types of physical Bluetooth controllers are:
A Bumble virtual controller can be attached to the BlueZ stack.
Attaching a controller to BlueZ can be done by either simulating a UART HCI interface, or by using the VHCI driver interface if available.
In both cases, the controller can run locally on the Linux host, or remotely on a different host, with a bridge between the remote controller and the local BlueZ host, which may be useful when the BlueZ stack is running on an embedded system, or a host on which running the Bumble controller is not convenient.
* Bluetooth USB Dongle
* HCI over UART (via a serial port)
* Kernel-managed Bluetooth HCI (HCI Sockets)
### Using VHCI
!!! tip "Conflicts with the kernel and BlueZ"
If your use a USB dongle that is recognized by your kernel as a supported Bluetooth device, it is
likely that the kernel driver will claim that USB device and attach it to the BlueZ stack.
If you want to claim ownership of it to use with Bumble, you will need to set the state of the corresponding HCI interface as `DOWN`.
HCI interfaces are numbered, starting from 0 (i.e `hci0`, `hci1`, ...).
With the [VHCI transport](../transports/vhci.md) you can attach a Bumble virtual controller to the BlueZ stack. Once attached, the controller will appear just like any other controller, and thus can be used with the standard BlueZ tools.
!!! example "Attaching a virtual controller"
With the example app `run_controller.py`:
For example, to bring `hci0` down:
```
PYTHONPATH=. python3 examples/run_controller.py F6:F7:F8:F9:FA:FB examples/device1.json vhci
```
You should see a 'Virtual Bus' controller. For example:
```
$ hciconfig
hci0: Type: Primary Bus: Virtual
BD Address: F6:F7:F8:F9:FA:FB ACL MTU: 27:64 SCO MTU: 0:0
UP RUNNING
RX bytes:0 acl:0 sco:0 events:43 errors:0
TX bytes:274 acl:0 sco:0 commands:43 errors:0
$ sudo hciconfig hci0 down
```
And scanning for devices should show the virtual 'Bumble' device that's running as part of the `run_controller.py` example app:
You can use the `hciconfig` command with no arguments to get a list of HCI interfaces seen by
the kernel.
Also, if `bluetoothd` is running on your system, it will likely re-claim the interface after you
close it, so you may need to bring the interface back `UP` before using it again, or to disable
`bluetoothd` altogether (see the section further below about BlueZ and `bluetoothd`).
### Using a USB Dongle
See the [USB Transport page](../transports/usb.md) for general information on how to use HCI USB controllers.
!!! tip "USB Permissions"
By default, when running as a regular user, you won't have the permission to use
arbitrary USB devices.
You can change the permissions for a specific USB device based on its bus number and
device number (you can use `lsusb` to find the Bus and Device numbers for your Bluetooth
dongle).
Example:
```
pi@raspberrypi:~ $ sudo hcitool -i hci2 lescan
LE Scan ...
F0:F1:F2:F3:F4:F5 Bumble
$ sudo chmod o+w /dev/bus/usb/001/004
```
This will change the permissions for Device 4 on Bus 1.
Note that the USB Bus number and Device number may change depending on where you plug the USB
dongle and what other USB devices and hubs are also plugged in.
If you need to make the permission changes permanent across reboots, you can create a `udev`
rule for your specific Bluetooth dongle. Visit [this Arch Linux Wiki page](https://wiki.archlinux.org/title/udev) for a
good overview of how you may do that.
### Using HCI over UART
See the [Serial Transport page](../transports/serial.md) for general information on how to use HCI over a UART (serial port).
### Using HCI Sockets
HCI sockets provide a way to send/receive HCI packets to/from a Bluetooth controller managed by the kernel.
The HCI device referenced by an `hci-socket` transport (`hciX`, where `X` is an integer, with `hci0` being the first controller device, and so on) must be in the `DOWN` state before it can be opened as a transport.
You can bring a HCI controller `UP` or `DOWN` with `hciconfig`.
See the [HCI Socket Transport page](../transports/hci_socket.md) for details on the `hci-socket` tansport syntax.
The HCI device referenced by an `hci-socket` transport (`hci<X>`, where `<X>` is an integer, with `hci0` being the first controller device, and so on) must be in the `DOWN` state before it can be opened as a transport.
You can bring a HCI controller `UP` or `DOWN` with `hciconfig hci<X> up` and `hciconfig hci<X> up`.
!!! tip "HCI Socket Permissions"
By default, when running as a regular user, you won't have the permission to use
an HCI socket to a Bluetooth controller (you may see an exception like `PermissionError: [Errno 1] Operation not permitted`).
If you want to run without using `sudo`, you need to manage the capabilities by adding the appropriate entries in `/etc/security/capability.conf` to grant a user or group the `cap_net_admin` capability.
See [this manpage](https://manpages.ubuntu.com/manpages/bionic/man5/capability.conf.5.html) for details.
Alternatively, if you are just experimenting temporarily, the `capsh` command may be useful in order
to execute a single command with enhanced permissions, as in this example:
```
$ sudo capsh --caps="cap_net_admin+eip cap_setpcap,cap_setuid,cap_setgid+ep" --keep=1 --user=$USER --addamb=cap_net_admin -- -c "<path/to/executable> <executable-args>"
```
Where `<path/to/executable>` is the path to your `python3` executable or to one of the Bumble bundled command-line applications.
!!! tip "List all available controllers"
The command
```
@@ -72,29 +110,16 @@ You can bring a HCI controller `UP` or `DOWN` with `hciconfig`.
```
$ hciconfig hci0 down
```
(or `hciX` with `X` being the index of the controller device you want to use), but a simpler solution is to just stop the `bluetoothd` daemon, with a command like:
(or `hci<X>` with `<X>` being the index of the controller device you want to use), but a simpler solution is to just stop the `bluetoothd` daemon, with a command like:
```
$ sudo systemctl stop bluetooth.service
```
You can always re-start the daemon with
```
$ sudo systemctl start bluetooth.service
```
### Using a Simulated UART HCI
### Bridge to a Remote Controller
Using Bumble With Bluetooth Controllers
---------------------------------------
A Bumble application can interface with a local Bluetooth controller.
If your Bluetooth controller is a standard HCI USB controller, see the [USB Transport page](../transports/usb.md) for details on how to use HCI USB controllers.
If your Bluetooth controller is a standard HCI UART controller, see the [Serial Transport page](../transports/serial.md).
Alternatively, a Bumble Host object can communicate with one of the platform's controllers via an HCI Socket.
`<details to be filled in>`
Bumble on the Raspberry Pi
--------------------------
### Raspberry Pi 4 :fontawesome-brands-raspberry-pi:
@@ -102,9 +127,10 @@ You can use the Bluetooth controller either via the kernel, or directly to the d
#### Via The Kernel
Use an HCI Socket transport
Use an HCI Socket transport (see section above)
#### Directly
In order to use the Bluetooth controller directly on a Raspberry Pi 4 board, you need to ensure that it isn't being used by the BlueZ stack (which it probably is by default).
```
@@ -136,3 +162,47 @@ should detach the controller from the stack, after which you can use the HCI UAR
python3 run_scanner.py serial:/dev/serial1,3000000
```
Using Bumble With BlueZ
-----------------------
In addition to all the standard functionality available from the project by running the python tools and/or writing your own apps by leveraging the API, it is also possible on Linux hosts to interface the Bumble stack with the native BlueZ stack, and with Bluetooth controllers.
A Bumble virtual controller can be attached to the BlueZ stack.
Attaching a controller to BlueZ can be done by either simulating a UART HCI interface, or by using the VHCI driver interface if available.
In both cases, the controller can run locally on the Linux host, or remotely on a different host, with a bridge between the remote controller and the local BlueZ host, which may be useful when the BlueZ stack is running on an embedded system, or a host on which running the Bumble controller is not convenient.
### Using VHCI
With the [VHCI transport](../transports/vhci.md) you can attach a Bumble virtual controller to the BlueZ stack. Once attached, the controller will appear just like any other controller, and thus can be used with the standard BlueZ tools.
!!! example "Attaching a virtual controller"
With the example app `run_controller.py`:
```
python3 examples/run_controller.py F6:F7:F8:F9:FA:FB examples/device1.json vhci
```
You should see a 'Virtual Bus' controller. For example:
```
$ hciconfig
hci0: Type: Primary Bus: Virtual
BD Address: F6:F7:F8:F9:FA:FB ACL MTU: 27:64 SCO MTU: 0:0
UP RUNNING
RX bytes:0 acl:0 sco:0 events:43 errors:0
TX bytes:274 acl:0 sco:0 commands:43 errors:0
```
And scanning for devices should show the virtual 'Bumble' device that's running as part of the `run_controller.py` example app:
```
pi@raspberrypi:~ $ sudo hcitool -i hci2 lescan
LE Scan ...
F0:F1:F2:F3:F4:F5 Bumble
```
```
### Using a Simulated UART HCI
### Bridge to a Remote Controller

View File

@@ -5,8 +5,9 @@ The Android emulator transport either connects, as a host, to a "Root Canal" vir
("host" mode), or attaches a virtual controller to the Android Bluetooth host stack ("controller" mode).
## Moniker
The moniker syntax for an Android Emulator transport is: `android-emulator:[mode=<host|controller>][mode=<host|controller>]`.
Both the `mode=<host|controller>` and `mode=<host|controller>` parameters are optional (so the moniker `android-emulator` by itself is a valid moniker, which will create a transport in `host` mode, connected to `localhost` on the default gRPC port for the emulator)
The moniker syntax for an Android Emulator transport is: `android-emulator:[mode=<host|controller>][<hostname>:<port>]`, where
the `mode` parameter can specify running as a host or a controller, and `<hostname>:<port>` can specify a host name (or IP address) and TCP port number on which to reach the gRPC server for the emulator.
Both the `mode=<host|controller>` and `<hostname>:<port>` parameters are optional (so the moniker `android-emulator` by itself is a valid moniker, which will create a transport in `host` mode, connected to `localhost` on the default gRPC port for the emulator).
!!! example Example
`android-emulator`

View File

@@ -5,6 +5,7 @@ The USB transport interfaces with a local Bluetooth USB dongle.
## Moniker
The moniker for a USB transport is either:
* `usb:<index>`
* `usb:<vendor>:<product>`
* `usb:<vendor>:<product>/<serial-number>`
@@ -16,6 +17,10 @@ In the `usb:<vendor>:<product>#<index>` form, matching devices are the ones with
`<vendor>` and `<product>` are a vendor ID and product ID in hexadecimal.
In addition, if the moniker ends with the symbol "!", the device will be used in "forced" mode:
the first USB interface of the device will be used, regardless of the interface class/subclass.
This may be useful for some devices that use a custom class/subclass but may nonetheless work as-is.
!!! examples
`usb:04b4:f901`
The USB dongle with `<vendor>` equal to `04b4` and `<product>` equal to `f901`
@@ -29,6 +34,10 @@ In the `usb:<vendor>:<product>#<index>` form, matching devices are the ones with
`usb:04b4:f901/#1`
The second USB dongle with `<vendor>` equal to `04b4` and `<product>` equal to `f901`
`usb:0B05:17CB!`
The BT USB dongle vendor=0B05 and product=17CB, in "forced" mode.
## Alternative
The library includes two different implementations of the USB transport, implemented using different python bindings for `libusb`.
Using the transport prefix `pyusb:` instead of `usb:` selects the implementation based on [PyUSB](https://pypi.org/project/pyusb/), using the synchronous API of `libusb`, whereas the default implementation is based on [libusb1](https://pypi.org/project/libusb1/), using the asynchronous API of `libusb`. In order to use the alternative PyUSB-based implementation, you need to ensure that you have installed that python module, as it isn't installed by default as a dependency of Bumble.

View File

@@ -29,18 +29,31 @@ from bumble.transport import open_transport_or_link
# -----------------------------------------------------------------------------
async def main():
if len(sys.argv) != 3:
print('Usage: run_advertiser.py <config-file> <transport-spec>')
print('example: run_advertiser.py device1.json link-relay:ws://localhost:8888/test')
if len(sys.argv) < 3:
print('Usage: run_advertiser.py <config-file> <transport-spec> [type] [address]')
print('example: run_advertiser.py device1.json usb:0')
return
if len(sys.argv) >= 4:
advertising_type = AdvertisingType(int(sys.argv[3]))
else:
advertising_type = AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE
if advertising_type.is_directed:
if len(sys.argv) < 5:
print('<address> required for directed advertising')
return
target = Address(sys.argv[4])
else:
target = None
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
print('<<< connected')
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
await device.power_on()
await device.start_advertising()
await device.start_advertising(advertising_type=advertising_type, target=target)
await hci_source.wait_for_termination()
# -----------------------------------------------------------------------------

View File

@@ -29,15 +29,15 @@ from bumble.transport import open_transport_or_link
# -----------------------------------------------------------------------------
class ScannerListener(Device.Listener):
def on_advertisement(self, address, ad_data, rssi, connectable):
address_type_string = ('P', 'R', 'PI', 'RI')[address.address_type]
address_color = 'yellow' if connectable else 'red'
def on_advertisement(self, advertisement):
address_type_string = ('P', 'R', 'PI', 'RI')[advertisement.address.address_type]
address_color = 'yellow' if advertisement.is_connectable else 'red'
if address_type_string.startswith('P'):
type_color = 'green'
else:
type_color = 'cyan'
print(f'>>> {color(address, address_color)} [{color(address_type_string, type_color)}]: RSSI={rssi}, {ad_data}')
print(f'>>> {color(advertisement.address, address_color)} [{color(address_type_string, type_color)}]: RSSI={advertisement.rssi}, {advertisement.data}')
# -----------------------------------------------------------------------------

View File

@@ -40,24 +40,24 @@ async def main():
device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink)
@device.on('advertisement')
def _(address, ad_data, rssi, connectable):
address_type_string = ('PUBLIC', 'RANDOM', 'PUBLIC_ID', 'RANDOM_ID')[address.address_type]
address_color = 'yellow' if connectable else 'red'
def _(advertisement):
address_type_string = ('PUBLIC', 'RANDOM', 'PUBLIC_ID', 'RANDOM_ID')[advertisement.address.address_type]
address_color = 'yellow' if advertisement.is_connectable else 'red'
address_qualifier = ''
if address_type_string.startswith('P'):
type_color = 'cyan'
else:
if address.is_static:
if advertisement.address.is_static:
type_color = 'green'
address_qualifier = '(static)'
elif address.is_resolvable:
elif advertisement.address.is_resolvable:
type_color = 'magenta'
address_qualifier = '(resolvable)'
else:
type_color = 'white'
separator = '\n '
print(f'>>> {color(address, address_color)} [{color(address_type_string, type_color)}]{address_qualifier}:{separator}RSSI:{rssi}{separator}{ad_data.to_string(separator)}')
print(f'>>> {color(advertisement.address, address_color)} [{color(address_type_string, type_color)}]{address_qualifier}:{separator}RSSI:{advertisement.rssi}{separator}{advertisement.data.to_string(separator)}')
await device.power_on()
await device.start_scanning(filter_duplicates=filter_duplicates)

View File

@@ -48,6 +48,7 @@ install_requires =
[options.entry_points]
console_scripts =
bumble-console = bumble.apps.console:main
bumble-controller-info = bumble.apps.controller_info:main
bumble-gatt-dump = bumble.apps.gatt_dump:main
bumble-hci-bridge = bumble.apps.hci_bridge:main
bumble-pair = bumble.apps.pair:main
@@ -67,6 +68,6 @@ development =
invoke >= 1.4
nox >= 2022
documentation =
mkdocs >= 1.2.3
mkdocs-material >= 8.1.9
mkdocs >= 1.4.0
mkdocs-material >= 8.5.6
mkdocstrings[python] >= 0.19.0

View File

@@ -22,6 +22,7 @@ import struct
import pytest
from bumble.controller import Controller
from bumble.gatt_client import CharacteristicProxy
from bumble.link import LocalLink
from bumble.device import Device, Peer
from bumble.host import Host
@@ -53,29 +54,29 @@ def basic_check(x):
parsed = ATT_PDU.from_bytes(pdu)
x_str = str(x)
parsed_str = str(parsed)
assert(x_str == parsed_str)
assert x_str == parsed_str
# -----------------------------------------------------------------------------
def test_UUID():
u = UUID.from_16_bits(0x7788)
assert(str(u) == 'UUID-16:7788')
assert str(u) == 'UUID-16:7788'
u = UUID.from_32_bits(0x11223344)
assert(str(u) == 'UUID-32:11223344')
assert str(u) == 'UUID-32:11223344'
u = UUID('61A3512C-09BE-4DDC-A6A6-0B03667AAFC6')
assert(str(u) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6')
assert str(u) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6'
v = UUID(str(u))
assert(str(v) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6')
assert str(v) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6'
w = UUID.from_bytes(v.to_bytes())
assert(str(w) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6')
assert str(w) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6'
u1 = UUID.from_16_bits(0x1234)
b1 = u1.to_bytes(force_128 = True)
u2 = UUID.from_bytes(b1)
assert(u1 == u2)
assert u1 == u2
u3 = UUID.from_16_bits(0x180a)
assert(str(u3) == 'UUID-16:180A (Device Information)')
assert str(u3) == 'UUID-16:180A (Device Information)'
# -----------------------------------------------------------------------------
@@ -98,6 +99,133 @@ def test_ATT_Read_By_Group_Type_Request():
basic_check(pdu)
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_characteristic_encoding():
class Foo(Characteristic):
def encode_value(self, value):
return bytes([value])
def decode_value(self, value_bytes):
return value_bytes[0]
c = Foo(GATT_BATTERY_LEVEL_CHARACTERISTIC, Characteristic.READ, Characteristic.READABLE, 123)
x = c.read_value(None)
assert x == bytes([123])
c.write_value(None, bytes([122]))
assert c.value == 122
class FooProxy(CharacteristicProxy):
def __init__(self, characteristic):
super().__init__(
characteristic.client,
characteristic.handle,
characteristic.end_group_handle,
characteristic.uuid,
characteristic.properties
)
def encode_value(self, value):
return bytes([value])
def decode_value(self, value_bytes):
return value_bytes[0]
[client, server] = LinkedDevices().devices[:2]
characteristic = Characteristic(
'FDB159DB-036C-49E3-B3DB-6325AC750806',
Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY,
Characteristic.READABLE | Characteristic.WRITEABLE,
bytes([123])
)
service = Service(
'3A657F47-D34F-46B3-B1EC-698E29B6B829',
[characteristic]
)
server.add_service(service)
await client.power_on()
await server.power_on()
connection = await client.connect(server.random_address)
peer = Peer(connection)
await peer.discover_services()
await peer.discover_characteristics()
c = peer.get_characteristics_by_uuid(characteristic.uuid)
assert len(c) == 1
c = c[0]
cp = FooProxy(c)
v = await cp.read_value()
assert v == 123
await cp.write_value(124)
await async_barrier()
assert characteristic.value == bytes([124])
v = await cp.read_value()
assert v == 124
await cp.write_value(125, with_response=True)
await async_barrier()
assert characteristic.value == bytes([125])
cd = DelegatedCharacteristicAdapter(c, encode=lambda x: bytes([x // 2]))
await cd.write_value(100, with_response=True)
await async_barrier()
assert characteristic.value == bytes([50])
last_change = None
def on_change(value):
nonlocal last_change
last_change = value
await c.subscribe(on_change)
await server.notify_subscribers(characteristic)
await async_barrier()
assert last_change == characteristic.value
last_change = None
await server.notify_subscribers(characteristic, value=bytes([125]))
await async_barrier()
assert last_change == bytes([125])
last_change = None
await c.unsubscribe(on_change)
await server.notify_subscribers(characteristic)
await async_barrier()
assert last_change is None
await cp.subscribe(on_change)
await server.notify_subscribers(characteristic)
await async_barrier()
assert last_change == characteristic.value[0]
last_change = None
await server.notify_subscribers(characteristic, value=bytes([126]))
await async_barrier()
assert last_change == 126
last_change = None
await cp.unsubscribe(on_change)
await server.notify_subscribers(characteristic)
await async_barrier()
assert last_change is None
cd = DelegatedCharacteristicAdapter(c, decode=lambda x: x[0])
await cd.subscribe(on_change)
await server.notify_subscribers(characteristic)
await async_barrier()
assert last_change == characteristic.value[0]
last_change = None
await cd.unsubscribe(on_change)
await server.notify_subscribers(characteristic)
await async_barrier()
assert last_change is None
# -----------------------------------------------------------------------------
def test_CharacteristicAdapter():
# Check that the CharacteristicAdapter base class is transparent
@@ -106,21 +234,21 @@ def test_CharacteristicAdapter():
a = CharacteristicAdapter(c)
value = a.read_value(None)
assert(value == v)
assert value == v
v = bytes([3, 4, 5])
a.write_value(None, v)
assert(c.value == v)
assert c.value == v
# Simple delegated adapter
a = DelegatedCharacteristicAdapter(c, lambda x: bytes(reversed(x)), lambda x: bytes(reversed(x)))
value = a.read_value(None)
assert(value == bytes(reversed(v)))
assert value == bytes(reversed(v))
v = bytes([3, 4, 5])
a.write_value(None, v)
assert(a.value == bytes(reversed(v)))
assert a.value == bytes(reversed(v))
# Packed adapter with single element format
v = 1234
@@ -129,10 +257,10 @@ def test_CharacteristicAdapter():
a = PackedCharacteristicAdapter(c, '>H')
value = a.read_value(None)
assert(value == pv)
assert value == pv
c.value = None
a.write_value(None, pv)
assert(a.value == v)
assert a.value == v
# Packed adapter with multi-element format
v1 = 1234
@@ -142,10 +270,10 @@ def test_CharacteristicAdapter():
a = PackedCharacteristicAdapter(c, '>HH')
value = a.read_value(None)
assert(value == pv)
assert value == pv
c.value = None
a.write_value(None, pv)
assert(a.value == (v1, v2))
assert a.value == (v1, v2)
# Mapped adapter
v1 = 1234
@@ -156,10 +284,10 @@ def test_CharacteristicAdapter():
a = MappedCharacteristicAdapter(c, '>HH', ('v1', 'v2'))
value = a.read_value(None)
assert(value == pv)
assert value == pv
c.value = None
a.write_value(None, pv)
assert(a.value == mapped)
assert a.value == mapped
# UTF-8 adapter
v = 'Hello π'
@@ -168,10 +296,10 @@ def test_CharacteristicAdapter():
a = UTF8CharacteristicAdapter(c)
value = a.read_value(None)
assert(value == ev)
assert value == ev
c.value = None
a.write_value(None, ev)
assert(a.value == v)
assert a.value == v
# -----------------------------------------------------------------------------
@@ -179,24 +307,25 @@ def test_CharacteristicValue():
b = bytes([1, 2, 3])
c = CharacteristicValue(read=lambda _: b)
x = c.read(None)
assert(x == b)
assert x == b
result = []
c = CharacteristicValue(write=lambda connection, value: result.append((connection, value)))
z = object()
c.write(z, b)
assert(result == [(z, b)])
assert result == [(z, b)]
# -----------------------------------------------------------------------------
class TwoDevices:
class LinkedDevices:
def __init__(self):
self.connections = [None, None]
self.connections = [None, None, None]
self.link = LocalLink()
self.controllers = [
Controller('C1', link = self.link),
Controller('C2', link = self.link)
Controller('C2', link = self.link),
Controller('C3', link = self.link)
]
self.devices = [
Device(
@@ -204,12 +333,16 @@ class TwoDevices:
host = Host(self.controllers[0], AsyncPipeSink(self.controllers[0]))
),
Device(
address = 'F5:F4:F3:F2:F1:F0',
address = 'F1:F2:F3:F4:F5:F6',
host = Host(self.controllers[1], AsyncPipeSink(self.controllers[1]))
),
Device(
address = 'F2:F3:F4:F5:F6:F7',
host = Host(self.controllers[2], AsyncPipeSink(self.controllers[2]))
)
]
self.paired = [None, None]
self.paired = [None, None, None]
# -----------------------------------------------------------------------------
@@ -222,7 +355,7 @@ async def async_barrier():
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_read_write():
[client, server] = TwoDevices().devices
[client, server] = LinkedDevices().devices[:2]
characteristic1 = Characteristic(
'FDB159DB-036C-49E3-B3DB-6325AC750806',
@@ -265,41 +398,41 @@ async def test_read_write():
await peer.discover_services()
await peer.discover_characteristics()
c = peer.get_characteristics_by_uuid(characteristic1.uuid)
assert(len(c) == 1)
assert len(c) == 1
c1 = c[0]
c = peer.get_characteristics_by_uuid(characteristic2.uuid)
assert(len(c) == 1)
assert len(c) == 1
c2 = c[0]
v1 = await peer.read_value(c1)
assert(v1 == b'')
assert v1 == b''
b = bytes([1, 2, 3])
await peer.write_value(c1, b)
await async_barrier()
assert(characteristic1.value == b)
assert characteristic1.value == b
v1 = await peer.read_value(c1)
assert(v1 == b)
assert(type(characteristic1._last_value) is tuple)
assert(len(characteristic1._last_value) == 2)
assert(str(characteristic1._last_value[0].peer_address) == str(client.random_address))
assert(characteristic1._last_value[1] == b)
assert v1 == b
assert type(characteristic1._last_value is tuple)
assert len(characteristic1._last_value) == 2
assert str(characteristic1._last_value[0].peer_address) == str(client.random_address)
assert characteristic1._last_value[1] == b
bb = bytes([3, 4, 5, 6])
characteristic1.value = bb
v1 = await peer.read_value(c1)
assert(v1 == bb)
assert v1 == bb
await peer.write_value(c2, b)
await async_barrier()
assert(type(characteristic2._last_value) is tuple)
assert(len(characteristic2._last_value) == 2)
assert(str(characteristic2._last_value[0].peer_address) == str(client.random_address))
assert(characteristic2._last_value[1] == b)
assert type(characteristic2._last_value is tuple)
assert len(characteristic2._last_value) == 2
assert str(characteristic2._last_value[0].peer_address) == str(client.random_address)
assert characteristic2._last_value[1] == b
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_read_write2():
[client, server] = TwoDevices().devices
[client, server] = LinkedDevices().devices[:2]
v = bytes([0x11, 0x22, 0x33, 0x44])
characteristic1 = Characteristic(
@@ -324,32 +457,32 @@ async def test_read_write2():
await peer.discover_services()
c = peer.get_services_by_uuid(service1.uuid)
assert(len(c) == 1)
assert len(c) == 1
s = c[0]
await s.discover_characteristics()
c = s.get_characteristics_by_uuid(characteristic1.uuid)
assert(len(c) == 1)
assert len(c) == 1
c1 = c[0]
v1 = await c1.read_value()
assert(v1 == v)
assert v1 == v
a1 = PackedCharacteristicAdapter(c1, '>I')
v1 = await a1.read_value()
assert(v1 == struct.unpack('>I', v)[0])
assert v1 == struct.unpack('>I', v)[0]
b = bytes([0x55, 0x66, 0x77, 0x88])
await a1.write_value(struct.unpack('>I', b)[0])
await async_barrier()
assert(characteristic1.value == b)
assert characteristic1.value == b
v1 = await a1.read_value()
assert(v1 == struct.unpack('>I', b)[0])
assert v1 == struct.unpack('>I', b)[0]
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_subscribe_notify():
[client, server] = TwoDevices().devices
[client, server] = LinkedDevices().devices[:2]
characteristic1 = Characteristic(
'FDB159DB-036C-49E3-B3DB-6325AC750806',
@@ -410,13 +543,13 @@ async def test_subscribe_notify():
await peer.discover_services()
await peer.discover_characteristics()
c = peer.get_characteristics_by_uuid(characteristic1.uuid)
assert(len(c) == 1)
assert len(c) == 1
c1 = c[0]
c = peer.get_characteristics_by_uuid(characteristic2.uuid)
assert(len(c) == 1)
assert len(c) == 1
c2 = c[0]
c = peer.get_characteristics_by_uuid(characteristic3.uuid)
assert(len(c) == 1)
assert len(c) == 1
c3 = c[0]
c1._called = False
@@ -429,23 +562,32 @@ async def test_subscribe_notify():
c1.on('update', on_c1_update)
await peer.subscribe(c1)
await async_barrier()
assert(server._last_subscription[1] == characteristic1)
assert(server._last_subscription[2])
assert(not server._last_subscription[3])
assert(characteristic1._last_subscription[1])
assert(not characteristic1._last_subscription[2])
assert server._last_subscription[1] == characteristic1
assert server._last_subscription[2]
assert not server._last_subscription[3]
assert characteristic1._last_subscription[1]
assert not characteristic1._last_subscription[2]
await server.indicate_subscribers(characteristic1)
await async_barrier()
assert(not c1._called)
assert not c1._called
await server.notify_subscribers(characteristic1)
await async_barrier()
assert(c1._called)
assert(c1._last_update == characteristic1.value)
assert c1._called
assert c1._last_update == characteristic1.value
c1._called = False
c1._last_update = None
c1_value = characteristic1.value
await server.notify_subscribers(characteristic1, bytes([0, 1, 2]))
await async_barrier()
assert c1._called
assert c1._last_update == bytes([0, 1, 2])
assert characteristic1.value == c1_value
c1._called = False
await peer.unsubscribe(c1)
await server.notify_subscribers(characteristic1)
assert(not c1._called)
assert not c1._called
c2._called = False
c2._last_update = None
@@ -458,17 +600,17 @@ async def test_subscribe_notify():
await async_barrier()
await server.notify_subscriber(characteristic2._last_subscription[0], characteristic2)
await async_barrier()
assert(not c2._called)
assert not c2._called
await server.indicate_subscriber(characteristic2._last_subscription[0], characteristic2)
await async_barrier()
assert(c2._called)
assert(c2._last_update == characteristic2.value)
assert c2._called
assert c2._last_update == characteristic2.value
c2._called = False
await peer.unsubscribe(c2, on_c2_update)
await server.indicate_subscriber(characteristic2._last_subscription[0], characteristic2)
await async_barrier()
assert(not c2._called)
assert not c2._called
def on_c3_update(value):
c3._called = True
@@ -483,17 +625,17 @@ async def test_subscribe_notify():
await async_barrier()
await server.notify_subscriber(characteristic3._last_subscription[0], characteristic3)
await async_barrier()
assert(c3._called)
assert(c3._last_update == characteristic3.value)
assert(c3._called_2)
assert(c3._last_update_2 == characteristic3.value)
assert c3._called
assert c3._last_update == characteristic3.value
assert c3._called_2
assert c3._last_update_2 == characteristic3.value
characteristic3.value = bytes([1, 2, 3])
await server.indicate_subscriber(characteristic3._last_subscription[0], characteristic3)
await async_barrier()
assert(c3._called)
assert(c3._last_update == characteristic3.value)
assert(c3._called_2)
assert(c3._last_update_2 == characteristic3.value)
assert c3._called
assert c3._last_update == characteristic3.value
assert c3._called_2
assert c3._last_update_2 == characteristic3.value
c3._called = False
c3._called_2 = False
@@ -501,8 +643,44 @@ async def test_subscribe_notify():
await server.notify_subscriber(characteristic3._last_subscription[0], characteristic3)
await server.indicate_subscriber(characteristic3._last_subscription[0], characteristic3)
await async_barrier()
assert(not c3._called)
assert(not c3._called_2)
assert not c3._called
assert not c3._called_2
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_mtu_exchange():
[d1, d2, d3] = LinkedDevices().devices[:3]
d3.gatt_server.max_mtu = 100
d3_connections = []
@d3.on('connection')
def on_d3_connection(connection):
d3_connections.append(connection)
await d1.power_on()
await d2.power_on()
await d3.power_on()
d1_connection = await d1.connect(d3.random_address)
assert len(d3_connections) == 1
assert d3_connections[0] is not None
d2_connection = await d2.connect(d3.random_address)
assert len(d3_connections) == 2
assert d3_connections[1] is not None
d1_peer = Peer(d1_connection)
d2_peer = Peer(d2_connection)
d1_client_mtu = await d1_peer.request_mtu(220)
assert d1_client_mtu == 100
assert d1_connection.att_mtu == 100
d2_client_mtu = await d2_peer.request_mtu(50)
assert d2_client_mtu == 50
assert d2_connection.att_mtu == 50
# -----------------------------------------------------------------------------
@@ -510,6 +688,9 @@ async def async_main():
await test_read_write()
await test_read_write2()
await test_subscribe_notify()
await test_characteristic_encoding()
await test_mtu_exchange()
# -----------------------------------------------------------------------------
if __name__ == '__main__':

View File

@@ -27,8 +27,8 @@ def basic_check(x):
parsed_str = str(parsed)
print(x_str)
parsed_bytes = parsed.to_bytes()
assert(x_str == parsed_str)
assert(packet == parsed_bytes)
assert x_str == parsed_str
assert packet == parsed_bytes
# -----------------------------------------------------------------------------
@@ -49,10 +49,10 @@ def test_HCI_LE_Connection_Complete_Event():
role = 1,
peer_address_type = 1,
peer_address = address,
conn_interval = 3,
conn_latency = 4,
connection_interval = 3,
peripheral_latency = 4,
supervision_timeout = 5,
master_clock_accuracy = 6
central_clock_accuracy = 6
)
basic_check(event)
@@ -60,8 +60,8 @@ def test_HCI_LE_Connection_Complete_Event():
# -----------------------------------------------------------------------------
def test_HCI_LE_Advertising_Report_Event():
address = Address('00:11:22:33:44:55')
report = HCI_Object(
HCI_LE_Advertising_Report_Event.REPORT_FIELDS,
report = HCI_LE_Advertising_Report_Event.Report(
HCI_LE_Advertising_Report_Event.Report.FIELDS,
event_type = HCI_LE_Advertising_Report_Event.ADV_IND,
address_type = Address.PUBLIC_DEVICE_ADDRESS,
address = address,
@@ -87,8 +87,8 @@ def test_HCI_LE_Connection_Update_Complete_Event():
event = HCI_LE_Connection_Update_Complete_Event(
status = HCI_SUCCESS,
connection_handle = 0x007,
conn_interval = 10,
conn_latency = 3,
connection_interval = 10,
peripheral_latency = 3,
supervision_timeout = 5
)
basic_check(event)
@@ -133,7 +133,7 @@ def test_HCI_Command_Complete_Event():
)
basic_check(event)
event = HCI_Packet.from_bytes(event.to_bytes())
assert(event.return_parameters == 7)
assert event.return_parameters == 7
# With a simple status as an integer status
event = HCI_Command_Complete_Event(
@@ -142,7 +142,7 @@ def test_HCI_Command_Complete_Event():
return_parameters = 9
)
basic_check(event)
assert(event.return_parameters == 9)
assert event.return_parameters == 9
# -----------------------------------------------------------------------------
@@ -283,12 +283,32 @@ def test_HCI_LE_Create_Connection_Command():
peer_address_type = 1,
peer_address = Address('00:11:22:33:44:55'),
own_address_type = 2,
conn_interval_min = 7,
conn_interval_max = 8,
conn_latency = 9,
connection_interval_min = 7,
connection_interval_max = 8,
max_latency = 9,
supervision_timeout = 10,
minimum_ce_length = 11,
maximum_ce_length = 12
min_ce_length = 11,
max_ce_length = 12
)
basic_check(command)
# -----------------------------------------------------------------------------
def test_HCI_LE_Extended_Create_Connection_Command():
command = HCI_LE_Extended_Create_Connection_Command(
initiator_filter_policy = 0,
own_address_type = 0,
peer_address_type = 1,
peer_address = Address('00:11:22:33:44:55'),
initiating_phys = 3,
scan_intervals = (10, 11),
scan_windows = (12, 13),
connection_interval_mins = (14, 15),
connection_interval_maxs = (16, 17),
max_latencies = (18, 19),
supervision_timeouts = (20, 21),
min_ce_lengths = (100, 101),
max_ce_lengths = (102, 103)
)
basic_check(command)
@@ -314,13 +334,13 @@ def test_HCI_LE_Remove_Device_From_Filter_Accept_List_Command():
# -----------------------------------------------------------------------------
def test_HCI_LE_Connection_Update_Command():
command = HCI_LE_Connection_Update_Command(
connection_handle = 0x0002,
conn_interval_min = 10,
conn_interval_max = 20,
conn_latency = 7,
supervision_timeout = 3,
minimum_ce_length = 100,
maximum_ce_length = 200
connection_handle = 0x0002,
connection_interval_min = 10,
connection_interval_max = 20,
max_latency = 7,
supervision_timeout = 3,
min_ce_length = 100,
max_ce_length = 200
)
basic_check(command)
@@ -348,7 +368,7 @@ def test_HCI_LE_Set_Extended_Scan_Parameters_Command():
command = HCI_LE_Set_Extended_Scan_Parameters_Command(
own_address_type=Address.RANDOM_DEVICE_ADDRESS,
scanning_filter_policy=HCI_LE_Set_Extended_Scan_Parameters_Command.BASIC_FILTERED_POLICY,
scanning_phys=(1 << HCI_LE_Set_Extended_Scan_Parameters_Command.LE_1M_PHY | 1 << HCI_LE_Set_Extended_Scan_Parameters_Command.LE_CODED_PHY | 1 << 4),
scanning_phys=(1 << HCI_LE_1M_PHY_BIT | 1 << HCI_LE_CODED_PHY_BIT | 1 << 4),
scan_types=[
HCI_LE_Set_Extended_Scan_Parameters_Command.ACTIVE_SCANNING,
HCI_LE_Set_Extended_Scan_Parameters_Command.ACTIVE_SCANNING,
@@ -363,20 +383,20 @@ def test_HCI_LE_Set_Extended_Scan_Parameters_Command():
# -----------------------------------------------------------------------------
def test_address():
a = Address('C4:F2:17:1A:1D:BB')
assert(not a.is_public)
assert(a.is_random)
assert(a.address_type == Address.RANDOM_DEVICE_ADDRESS)
assert(not a.is_resolvable)
assert(not a.is_resolved)
assert(a.is_static)
assert not a.is_public
assert a.is_random
assert a.address_type == Address.RANDOM_DEVICE_ADDRESS
assert not a.is_resolvable
assert not a.is_resolved
assert a.is_static
# -----------------------------------------------------------------------------
def test_custom():
data = bytes([0x77, 0x02, 0x01, 0x03])
packet = HCI_CustomPacket(data)
assert(packet.hci_packet_type == 0x77)
assert(packet.payload == data)
assert packet.hci_packet_type == 0x77
assert packet.payload == data
# -----------------------------------------------------------------------------
@@ -408,6 +428,7 @@ def run_test_commands():
test_HCI_LE_Set_Scan_Parameters_Command()
test_HCI_LE_Set_Scan_Enable_Command()
test_HCI_LE_Create_Connection_Command()
test_HCI_LE_Extended_Create_Connection_Command()
test_HCI_LE_Add_Device_To_Filter_Accept_List_Command()
test_HCI_LE_Remove_Device_From_Filter_Accept_List_Command()
test_HCI_LE_Connection_Update_Command()

View File

@@ -62,6 +62,57 @@ def test_import():
assert utils
# -----------------------------------------------------------------------------
def test_app_imports():
from bumble.apps.console import main
assert main
from bumble.apps.controller_info import main
assert main
from bumble.apps.controllers import main
assert main
from bumble.apps.gatt_dump import main
assert main
from bumble.apps.gg_bridge import main
assert main
from bumble.apps.hci_bridge import main
assert main
from bumble.apps.pair import main
assert main
from bumble.apps.scan import main
assert main
from bumble.apps.show import main
assert main
from bumble.apps.unbond import main
assert main
from bumble.apps.usb_probe import main
assert main
# -----------------------------------------------------------------------------
def test_profiles_imports():
from bumble.profiles import (
battery_service,
device_information_service,
heart_rate_service
)
assert battery_service
assert device_information_service
assert heart_rate_service
# -----------------------------------------------------------------------------
if __name__ == '__main__':
test_import()
test_app_imports()
test_profiles_imports()

View File

@@ -21,9 +21,9 @@ from bumble.transport import PacketParser
# -----------------------------------------------------------------------------
class ScannerListener(Device.Listener):
def on_advertisement(self, address, ad_data, rssi, connectable):
address_type_string = ('P', 'R', 'PI', 'RI')[address.address_type]
print(f'>>> {address} [{address_type_string}]: RSSI={rssi}, {ad_data}')
def on_advertisement(self, advertisement):
address_type_string = ('P', 'R', 'PI', 'RI')[advertisement.address.address_type]
print(f'>>> {advertisement.address} [{address_type_string}]: RSSI={advertisement.rssi}, {advertisement.ad_data}')
class HciSource: