mirror of
https://github.com/google/bumble.git
synced 2026-04-16 00:25:31 +00:00
Compare commits
20 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
eb8556ccf6 | ||
|
|
4d96b821bc | ||
|
|
78b36d2049 | ||
|
|
3e0cad1456 | ||
|
|
b4de38cdc3 | ||
|
|
68d9fbc159 | ||
|
|
a916b7a21a | ||
|
|
6ff52df8bd | ||
|
|
7fa2eb7658 | ||
|
|
86618e52ef | ||
|
|
fbb46dd736 | ||
|
|
d1e119f176 | ||
|
|
2fc7a0bf04 | ||
|
|
d6c4644b23 | ||
|
|
073757d5dd | ||
|
|
20dedbd923 | ||
|
|
df1962e8da | ||
|
|
0edd6b731f | ||
|
|
d2227f017f | ||
|
|
80569bc9f3 |
275
apps/console.py
275
apps/console.py
@@ -28,11 +28,16 @@ import click
|
||||
from collections import OrderedDict
|
||||
import colors
|
||||
|
||||
from bumble.core import UUID, AdvertisingData
|
||||
from bumble.device import Device, Connection, Peer
|
||||
from bumble.core import UUID, AdvertisingData, TimeoutError, BT_LE_TRANSPORT
|
||||
from bumble.device import ConnectionParametersPreferences, Device, Connection, Peer
|
||||
from bumble.utils import AsyncRunner
|
||||
from bumble.transport import open_transport_or_link
|
||||
from bumble.gatt import Characteristic
|
||||
from bumble.hci import (
|
||||
HCI_LE_1M_PHY,
|
||||
HCI_LE_2M_PHY,
|
||||
HCI_LE_CODED_PHY,
|
||||
)
|
||||
|
||||
from prompt_toolkit import Application
|
||||
from prompt_toolkit.history import FileHistory
|
||||
@@ -43,6 +48,7 @@ from prompt_toolkit.styles import Style
|
||||
from prompt_toolkit.filters import Condition
|
||||
from prompt_toolkit.widgets import TextArea, Frame
|
||||
from prompt_toolkit.widgets.toolbars import FormattedTextToolbar
|
||||
from prompt_toolkit.data_structures import Point
|
||||
from prompt_toolkit.layout import (
|
||||
Layout,
|
||||
HSplit,
|
||||
@@ -51,17 +57,20 @@ from prompt_toolkit.layout import (
|
||||
Float,
|
||||
FormattedTextControl,
|
||||
FloatContainer,
|
||||
ConditionalContainer
|
||||
ConditionalContainer,
|
||||
Dimension
|
||||
)
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Constants
|
||||
# -----------------------------------------------------------------------------
|
||||
BUMBLE_USER_DIR = os.path.expanduser('~/.bumble')
|
||||
DEFAULT_PROMPT_HEIGHT = 20
|
||||
DEFAULT_RSSI_BAR_WIDTH = 20
|
||||
DISPLAY_MIN_RSSI = -100
|
||||
DISPLAY_MAX_RSSI = -30
|
||||
BUMBLE_USER_DIR = os.path.expanduser('~/.bumble')
|
||||
DEFAULT_RSSI_BAR_WIDTH = 20
|
||||
DEFAULT_CONNECTION_TIMEOUT = 30.0
|
||||
DISPLAY_MIN_RSSI = -100
|
||||
DISPLAY_MAX_RSSI = -30
|
||||
RSSI_MONITOR_INTERVAL = 5.0 # Seconds
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Globals
|
||||
@@ -69,16 +78,57 @@ DISPLAY_MAX_RSSI = -30
|
||||
App = None
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Utils
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
def le_phy_name(phy_id):
|
||||
return {
|
||||
HCI_LE_1M_PHY: '1M',
|
||||
HCI_LE_2M_PHY: '2M',
|
||||
HCI_LE_CODED_PHY: 'CODED'
|
||||
}.get(phy_id, HCI_Constant.le_phy_name(phy_id))
|
||||
|
||||
|
||||
def rssi_bar(rssi):
|
||||
blocks = ['', '▏', '▎', '▍', '▌', '▋', '▊', '▉']
|
||||
bar_width = (rssi - DISPLAY_MIN_RSSI) / (DISPLAY_MAX_RSSI - DISPLAY_MIN_RSSI)
|
||||
bar_width = min(max(bar_width, 0), 1)
|
||||
bar_ticks = int(bar_width * DEFAULT_RSSI_BAR_WIDTH * 8)
|
||||
bar_blocks = ('█' * int(bar_ticks / 8)) + blocks[bar_ticks % 8]
|
||||
return f'{rssi:4} {bar_blocks}'
|
||||
|
||||
|
||||
def parse_phys(phys):
|
||||
if phys.lower() == '*':
|
||||
return None
|
||||
else:
|
||||
phy_list = []
|
||||
elements = phys.lower().split(',')
|
||||
for element in elements:
|
||||
if element == '1m':
|
||||
phy_list.append(HCI_LE_1M_PHY)
|
||||
elif element == '2m':
|
||||
phy_list.append(HCI_LE_2M_PHY)
|
||||
elif element == 'coded':
|
||||
phy_list.append(HCI_LE_CODED_PHY)
|
||||
else:
|
||||
raise ValueError('invalid PHY name')
|
||||
return phy_list
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Console App
|
||||
# -----------------------------------------------------------------------------
|
||||
class ConsoleApp:
|
||||
def __init__(self):
|
||||
self.known_addresses = set()
|
||||
self.known_addresses = set()
|
||||
self.known_attributes = []
|
||||
self.device = None
|
||||
self.connected_peer = None
|
||||
self.top_tab = 'scan'
|
||||
self.device = None
|
||||
self.connected_peer = None
|
||||
self.top_tab = 'scan'
|
||||
self.monitor_rssi = False
|
||||
self.connection_rssi = None
|
||||
|
||||
style = Style.from_dict({
|
||||
'output-field': 'bg:#000044 #ffffff',
|
||||
@@ -106,6 +156,10 @@ class ConsoleApp:
|
||||
'on': None,
|
||||
'off': None
|
||||
},
|
||||
'rssi': {
|
||||
'on': None,
|
||||
'off': None
|
||||
},
|
||||
'show': {
|
||||
'scan': None,
|
||||
'services': None,
|
||||
@@ -120,10 +174,17 @@ class ConsoleApp:
|
||||
'services': None,
|
||||
'attributes': None
|
||||
},
|
||||
'request-mtu': None,
|
||||
'read': LiveCompleter(self.known_attributes),
|
||||
'write': LiveCompleter(self.known_attributes),
|
||||
'subscribe': LiveCompleter(self.known_attributes),
|
||||
'unsubscribe': LiveCompleter(self.known_attributes),
|
||||
'set-phy': {
|
||||
'1m': None,
|
||||
'2m': None,
|
||||
'coded': None
|
||||
},
|
||||
'set-default-phy': None,
|
||||
'quit': None,
|
||||
'exit': None
|
||||
})
|
||||
@@ -139,14 +200,16 @@ class ConsoleApp:
|
||||
|
||||
self.input_field.accept_handler = self.accept_input
|
||||
|
||||
self.output_height = 7
|
||||
self.output_height = Dimension(min=7, max=7, weight=1)
|
||||
self.output_lines = []
|
||||
self.output = FormattedTextControl()
|
||||
self.output = FormattedTextControl(get_cursor_position=lambda: Point(0, max(0, len(self.output_lines) - 1)))
|
||||
self.output_max_lines = 20
|
||||
self.scan_results_text = FormattedTextControl()
|
||||
self.services_text = FormattedTextControl()
|
||||
self.attributes_text = FormattedTextControl()
|
||||
self.log_text = FormattedTextControl()
|
||||
self.log_height = 20
|
||||
self.log_text = FormattedTextControl(get_cursor_position=lambda: Point(0, max(0, len(self.log_lines) - 1)))
|
||||
self.log_height = Dimension(min=7, weight=4)
|
||||
self.log_max_lines = 100
|
||||
self.log_lines = []
|
||||
|
||||
container = HSplit([
|
||||
@@ -163,11 +226,10 @@ class ConsoleApp:
|
||||
filter=Condition(lambda: self.top_tab == 'attributes')
|
||||
),
|
||||
ConditionalContainer(
|
||||
Frame(Window(self.log_text), title='Log'),
|
||||
Frame(Window(self.log_text, height=self.log_height), title='Log'),
|
||||
filter=Condition(lambda: self.top_tab == 'log')
|
||||
),
|
||||
Frame(Window(self.output), height=self.output_height),
|
||||
# HorizontalLine(),
|
||||
Frame(Window(self.output, height=self.output_height)),
|
||||
FormattedTextToolbar(text=self.get_status_bar_text, style='reverse'),
|
||||
self.input_field
|
||||
])
|
||||
@@ -199,6 +261,8 @@ class ConsoleApp:
|
||||
)
|
||||
|
||||
async def run_async(self, device_config, transport):
|
||||
rssi_monitoring_task = asyncio.create_task(self.rssi_monitor_loop())
|
||||
|
||||
async with await open_transport_or_link(transport) as (hci_source, hci_sink):
|
||||
if device_config:
|
||||
self.device = Device.from_config_file_with_hci(device_config, hci_source, hci_sink)
|
||||
@@ -210,6 +274,8 @@ class ConsoleApp:
|
||||
# Run the UI
|
||||
await self.ui.run_async()
|
||||
|
||||
rssi_monitoring_task.cancel()
|
||||
|
||||
def add_known_address(self, address):
|
||||
self.known_addresses.add(address)
|
||||
|
||||
@@ -224,22 +290,33 @@ class ConsoleApp:
|
||||
|
||||
connection_state = 'NONE'
|
||||
encryption_state = ''
|
||||
att_mtu = ''
|
||||
rssi = '' if self.connection_rssi is None else rssi_bar(self.connection_rssi)
|
||||
|
||||
if self.device:
|
||||
if self.device.is_connecting:
|
||||
connection_state = 'CONNECTING'
|
||||
elif self.connected_peer:
|
||||
connection = self.connected_peer.connection
|
||||
connection_parameters = f'{connection.parameters.connection_interval}/{connection.parameters.connection_latency}/{connection.parameters.supervision_timeout}'
|
||||
connection_state = f'{connection.peer_address} {connection_parameters} {connection.data_length}'
|
||||
connection_parameters = f'{connection.parameters.connection_interval}/{connection.parameters.peripheral_latency}/{connection.parameters.supervision_timeout}'
|
||||
if connection.transport == BT_LE_TRANSPORT:
|
||||
phy_state = f' RX={le_phy_name(connection.phy.rx_phy)}/TX={le_phy_name(connection.phy.tx_phy)}'
|
||||
else:
|
||||
phy_state = ''
|
||||
connection_state = f'{connection.peer_address} {connection_parameters} {connection.data_length}{phy_state}'
|
||||
encryption_state = 'ENCRYPTED' if connection.is_encrypted else 'NOT ENCRYPTED'
|
||||
att_mtu = f'ATT_MTU: {connection.att_mtu}'
|
||||
|
||||
return [
|
||||
('ansigreen', f' SCAN: {scanning} '),
|
||||
('', ' '),
|
||||
('ansiblue', f' CONNECTION: {connection_state} '),
|
||||
('', ' '),
|
||||
('ansimagenta', f' {encryption_state} ')
|
||||
('ansimagenta', f' {encryption_state} '),
|
||||
('', ' '),
|
||||
('ansicyan', f' {att_mtu} '),
|
||||
('', ' '),
|
||||
('ansiyellow', f' {rssi} ')
|
||||
]
|
||||
|
||||
def show_error(self, title, details = None):
|
||||
@@ -286,7 +363,7 @@ class ConsoleApp:
|
||||
def append_to_output(self, line, invalidate=True):
|
||||
if type(line) is str:
|
||||
line = [('', line)]
|
||||
self.output_lines = self.output_lines[-(self.output_height - 3):]
|
||||
self.output_lines = self.output_lines[-self.output_max_lines:]
|
||||
self.output_lines.append(line)
|
||||
formatted_text = []
|
||||
for line in self.output_lines:
|
||||
@@ -298,7 +375,7 @@ class ConsoleApp:
|
||||
|
||||
def append_to_log(self, lines, invalidate=True):
|
||||
self.log_lines.extend(lines.split('\n'))
|
||||
self.log_lines = self.log_lines[-(self.log_height - 3):]
|
||||
self.log_lines = self.log_lines[-self.log_max_lines:]
|
||||
self.log_text.text = ANSI('\n'.join(self.log_lines))
|
||||
if invalidate:
|
||||
self.ui.invalidate()
|
||||
@@ -351,6 +428,12 @@ class ConsoleApp:
|
||||
if characteristic.handle == attribute_handle:
|
||||
return characteristic
|
||||
|
||||
async def rssi_monitor_loop(self):
|
||||
while True:
|
||||
if self.monitor_rssi and self.connected_peer:
|
||||
self.connection_rssi = await self.connected_peer.connection.get_rssi()
|
||||
await asyncio.sleep(RSSI_MONITOR_INTERVAL)
|
||||
|
||||
async def command(self, command):
|
||||
try:
|
||||
(keyword, *params) = command.strip().split(' ')
|
||||
@@ -379,39 +462,73 @@ class ConsoleApp:
|
||||
else:
|
||||
self.show_error('unsupported arguments for scan command')
|
||||
|
||||
async def do_rssi(self, params):
|
||||
if len(params) == 0:
|
||||
# Toggle monitoring
|
||||
self.monitor_rssi = not self.monitor_rssi
|
||||
elif params[0] == 'on':
|
||||
self.monitor_rssi = True
|
||||
elif params[0] == 'off':
|
||||
self.monitor_rssi = False
|
||||
else:
|
||||
self.show_error('unsupported arguments for rssi command')
|
||||
|
||||
async def do_connect(self, params):
|
||||
if len(params) != 1:
|
||||
self.show_error('invalid syntax', 'expected connect <address>')
|
||||
if len(params) != 1 and len(params) != 2:
|
||||
self.show_error('invalid syntax', 'expected connect <address> [phys]')
|
||||
return
|
||||
|
||||
if len(params) == 1:
|
||||
phys = None
|
||||
else:
|
||||
phys = parse_phys(params[1])
|
||||
if phys is None:
|
||||
connection_parameters_preferences = None
|
||||
else:
|
||||
connection_parameters_preferences = {
|
||||
phy: ConnectionParametersPreferences()
|
||||
for phy in phys
|
||||
}
|
||||
|
||||
self.append_to_output('connecting...')
|
||||
await self.device.connect(params[0])
|
||||
self.top_tab = 'services'
|
||||
|
||||
try:
|
||||
await self.device.connect(
|
||||
params[0],
|
||||
connection_parameters_preferences=connection_parameters_preferences,
|
||||
timeout=DEFAULT_CONNECTION_TIMEOUT
|
||||
)
|
||||
self.top_tab = 'services'
|
||||
except TimeoutError:
|
||||
self.show_error('connection timed out')
|
||||
|
||||
async def do_disconnect(self, params):
|
||||
if not self.connected_peer:
|
||||
self.show_error('not connected')
|
||||
return
|
||||
if self.device.connecting:
|
||||
await self.device.cancel_connection()
|
||||
else:
|
||||
if not self.connected_peer:
|
||||
self.show_error('not connected')
|
||||
return
|
||||
|
||||
await self.connected_peer.connection.disconnect()
|
||||
await self.connected_peer.connection.disconnect()
|
||||
|
||||
async def do_update_parameters(self, params):
|
||||
if len(params) != 1 or len(params[0].split('/')) != 3:
|
||||
self.show_error('invalid syntax', 'expected update-parameters <interval-min>-<interval-max>/<latency>/<supervision>')
|
||||
self.show_error('invalid syntax', 'expected update-parameters <interval-min>-<interval-max>/<max-latency>/<supervision>')
|
||||
return
|
||||
|
||||
if not self.connected_peer:
|
||||
self.show_error('not connected')
|
||||
return
|
||||
|
||||
connection_intervals, connection_latency, supervision_timeout = params[0].split('/')
|
||||
connection_intervals, max_latency, supervision_timeout = params[0].split('/')
|
||||
connection_interval_min, connection_interval_max = [int(x) for x in connection_intervals.split('-')]
|
||||
connection_latency = int(connection_latency)
|
||||
max_latency = int(max_latency)
|
||||
supervision_timeout = int(supervision_timeout)
|
||||
await self.connected_peer.connection.update_parameters(
|
||||
connection_interval_min,
|
||||
connection_interval_max,
|
||||
connection_latency,
|
||||
max_latency,
|
||||
supervision_timeout
|
||||
)
|
||||
|
||||
@@ -442,6 +559,25 @@ class ConsoleApp:
|
||||
self.top_tab = params[0]
|
||||
self.ui.invalidate()
|
||||
|
||||
async def do_get_phy(self, params):
|
||||
if not self.connected_peer:
|
||||
self.show_error('not connected')
|
||||
return
|
||||
|
||||
phy = await self.connected_peer.connection.get_phy()
|
||||
self.append_to_output(f'PHY: RX={HCI_Constant.le_phy_name(phy[0])}, TX={HCI_Constant.le_phy_name(phy[1])}')
|
||||
|
||||
async def do_request_mtu(self, params):
|
||||
if len(params) != 1:
|
||||
self.show_error('invalid syntax', 'expected request-mtu <mtu>')
|
||||
return
|
||||
|
||||
if not self.connected_peer:
|
||||
self.show_error('not connected')
|
||||
return
|
||||
|
||||
await self.connected_peer.request_mtu(int(params[0]))
|
||||
|
||||
async def do_discover(self, params):
|
||||
if not params:
|
||||
self.show_error('invalid syntax', 'expected discover services|attributes')
|
||||
@@ -454,14 +590,14 @@ class ConsoleApp:
|
||||
await self.discover_attributes()
|
||||
|
||||
async def do_read(self, params):
|
||||
if not self.connected_peer:
|
||||
self.show_error('not connected')
|
||||
return
|
||||
|
||||
if len(params) != 1:
|
||||
self.show_error('invalid syntax', 'expected read <attribute>')
|
||||
return
|
||||
|
||||
if not self.connected_peer:
|
||||
self.show_error('not connected')
|
||||
return
|
||||
|
||||
characteristic = self.find_characteristic(params[0])
|
||||
if characteristic is None:
|
||||
self.show_error('no such characteristic')
|
||||
@@ -530,6 +666,42 @@ class ConsoleApp:
|
||||
|
||||
await characteristic.unsubscribe()
|
||||
|
||||
async def do_set_phy(self, params):
|
||||
if len(params) != 1:
|
||||
self.show_error('invalid syntax', 'expected set-phy <tx_rx_phys>|<tx_phys>/<rx_phys>')
|
||||
return
|
||||
|
||||
if not self.connected_peer:
|
||||
self.show_error('not connected')
|
||||
return
|
||||
|
||||
if '/' in params[0]:
|
||||
tx_phys, rx_phys = params[0].split('/')
|
||||
else:
|
||||
tx_phys = params[0]
|
||||
rx_phys = tx_phys
|
||||
|
||||
await self.connected_peer.connection.set_phy(
|
||||
tx_phys=parse_phys(tx_phys),
|
||||
rx_phys=parse_phys(rx_phys)
|
||||
)
|
||||
|
||||
async def do_set_default_phy(self, params):
|
||||
if len(params) != 1:
|
||||
self.show_error('invalid syntax', 'expected set-default-phy <tx_rx_phys>|<tx_phys>/<rx_phys>')
|
||||
return
|
||||
|
||||
if '/' in params[0]:
|
||||
tx_phys, rx_phys = params[0].split('/')
|
||||
else:
|
||||
tx_phys = params[0]
|
||||
rx_phys = tx_phys
|
||||
|
||||
await self.device.set_default_phy(
|
||||
tx_phys=parse_phys(tx_phys),
|
||||
rx_phys=parse_phys(rx_phys)
|
||||
)
|
||||
|
||||
async def do_exit(self, params):
|
||||
self.ui.exit()
|
||||
|
||||
@@ -548,12 +720,14 @@ class DeviceListener(Device.Listener, Connection.Listener):
|
||||
@AsyncRunner.run_in_task()
|
||||
async def on_connection(self, connection):
|
||||
self.app.connected_peer = Peer(connection)
|
||||
self.app.connection_rssi = None
|
||||
self.app.append_to_output(f'connected to {self.app.connected_peer}')
|
||||
connection.listener = self
|
||||
|
||||
def on_disconnection(self, reason):
|
||||
self.app.append_to_output(f'disconnected from {self.app.connected_peer}, reason: {HCI_Constant.error_name(reason)}')
|
||||
self.app.connected_peer = None
|
||||
self.app.connection_rssi = None
|
||||
|
||||
def on_connection_parameters_update(self):
|
||||
self.app.append_to_output(f'connection parameters update: {self.app.connected_peer.connection.parameters}')
|
||||
@@ -570,16 +744,16 @@ class DeviceListener(Device.Listener, Connection.Listener):
|
||||
def on_connection_data_length_change(self):
|
||||
self.app.append_to_output(f'connection data length change: {self.app.connected_peer.connection.data_length}')
|
||||
|
||||
def on_advertisement(self, address, ad_data, rssi, connectable):
|
||||
entry_key = f'{address}/{address.address_type}'
|
||||
def on_advertisement(self, advertisement):
|
||||
entry_key = f'{advertisement.address}/{advertisement.address.address_type}'
|
||||
entry = self.scan_results.get(entry_key)
|
||||
if entry:
|
||||
entry.ad_data = ad_data
|
||||
entry.rssi = rssi
|
||||
entry.connectable = connectable
|
||||
entry.ad_data = advertisement.data
|
||||
entry.rssi = advertisement.rssi
|
||||
entry.connectable = advertisement.is_connectable
|
||||
else:
|
||||
self.app.add_known_address(str(address))
|
||||
self.scan_results[entry_key] = ScanResult(address, address.address_type, ad_data, rssi, connectable)
|
||||
self.app.add_known_address(str(advertisement.address))
|
||||
self.scan_results[entry_key] = ScanResult(advertisement.address, advertisement.address.address_type, advertisement.data, advertisement.rssi, advertisement.is_connectable)
|
||||
|
||||
self.app.show_scan_results(self.scan_results)
|
||||
|
||||
@@ -616,12 +790,7 @@ class ScanResult:
|
||||
name = ''
|
||||
|
||||
# RSSI bar
|
||||
blocks = ['', '▏', '▎', '▍', '▌', '▋', '▊', '▉']
|
||||
bar_width = (self.rssi - DISPLAY_MIN_RSSI) / (DISPLAY_MAX_RSSI - DISPLAY_MIN_RSSI)
|
||||
bar_width = min(max(bar_width, 0), 1)
|
||||
bar_ticks = int(bar_width * DEFAULT_RSSI_BAR_WIDTH * 8)
|
||||
bar_blocks = ('█' * int(bar_ticks / 8)) + blocks[bar_ticks % 8]
|
||||
bar_string = f'{self.rssi} {bar_blocks}'
|
||||
bar_string = rssi_bar(self.rssi)
|
||||
bar_padding = ' ' * (DEFAULT_RSSI_BAR_WIDTH + 5 - len(bar_string))
|
||||
return f'{address_color(str(self.address))} [{type_color(address_type_string)}] {bar_string} {bar_padding} {name}'
|
||||
|
||||
|
||||
@@ -25,15 +25,21 @@ from bumble.company_ids import COMPANY_IDENTIFIERS
|
||||
from bumble.core import name_or_number
|
||||
from bumble.hci import (
|
||||
map_null_terminated_utf8_string,
|
||||
HCI_LE_SUPPORTED_FEATURES_NAMES,
|
||||
HCI_SUCCESS,
|
||||
HCI_LE_SUPPORTED_FEATURES_NAMES,
|
||||
HCI_VERSION_NAMES,
|
||||
LMP_VERSION_NAMES,
|
||||
HCI_Command,
|
||||
HCI_Read_BD_ADDR_Command,
|
||||
HCI_READ_BD_ADDR_COMMAND,
|
||||
HCI_Read_BD_ADDR_Command,
|
||||
HCI_READ_LOCAL_NAME_COMMAND,
|
||||
HCI_Read_Local_Name_Command,
|
||||
HCI_READ_LOCAL_NAME_COMMAND
|
||||
HCI_LE_READ_MAXIMUM_DATA_LENGTH_COMMAND,
|
||||
HCI_LE_Read_Maximum_Data_Length_Command,
|
||||
HCI_LE_READ_NUMBER_OF_SUPPORTED_ADVERTISING_SETS_COMMAND,
|
||||
HCI_LE_Read_Number_Of_Supported_Advertising_Sets_Command,
|
||||
HCI_LE_READ_MAXIMUM_ADVERTISING_DATA_LENGTH_COMMAND,
|
||||
HCI_LE_Read_Maximum_Advertising_Data_Length_Command
|
||||
)
|
||||
from bumble.host import Host
|
||||
from bumble.transport import open_transport_or_link
|
||||
@@ -57,6 +63,39 @@ async def get_classic_info(host):
|
||||
# -----------------------------------------------------------------------------
|
||||
async def get_le_info(host):
|
||||
print()
|
||||
|
||||
if host.supports_command(HCI_LE_READ_NUMBER_OF_SUPPORTED_ADVERTISING_SETS_COMMAND):
|
||||
response = await host.send_command(HCI_LE_Read_Number_Of_Supported_Advertising_Sets_Command())
|
||||
if response.return_parameters.status == HCI_SUCCESS:
|
||||
print(
|
||||
color('LE Number Of Supported Advertising Sets:', 'yellow'),
|
||||
response.return_parameters.num_supported_advertising_sets,
|
||||
'\n'
|
||||
)
|
||||
|
||||
if host.supports_command(HCI_LE_READ_MAXIMUM_ADVERTISING_DATA_LENGTH_COMMAND):
|
||||
response = await host.send_command(HCI_LE_Read_Maximum_Advertising_Data_Length_Command())
|
||||
if response.return_parameters.status == HCI_SUCCESS:
|
||||
print(
|
||||
color('LE Maximum Advertising Data Length:', 'yellow'),
|
||||
response.return_parameters.max_advertising_data_length,
|
||||
'\n'
|
||||
)
|
||||
|
||||
if host.supports_command(HCI_LE_READ_MAXIMUM_DATA_LENGTH_COMMAND):
|
||||
response = await host.send_command(HCI_LE_Read_Maximum_Data_Length_Command())
|
||||
if response.return_parameters.status == HCI_SUCCESS:
|
||||
print(
|
||||
color('Maximum Data Length:', 'yellow'),
|
||||
(
|
||||
f'tx:{response.return_parameters.supported_max_tx_octets}/'
|
||||
f'{response.return_parameters.supported_max_tx_time}, '
|
||||
f'rx:{response.return_parameters.supported_max_rx_octets}/'
|
||||
f'{response.return_parameters.supported_max_rx_time}'
|
||||
),
|
||||
'\n'
|
||||
)
|
||||
|
||||
print(color('LE Features:', 'yellow'))
|
||||
for feature in host.supported_le_features:
|
||||
print(' ', name_or_number(HCI_LE_SUPPORTED_FEATURES_NAMES, feature))
|
||||
|
||||
62
apps/scan.py
62
apps/scan.py
@@ -25,8 +25,8 @@ from bumble.device import Device
|
||||
from bumble.transport import open_transport_or_link
|
||||
from bumble.keys import JsonKeyStore
|
||||
from bumble.smp import AddressResolver
|
||||
from bumble.hci import HCI_LE_Advertising_Report_Event
|
||||
from bumble.core import AdvertisingData
|
||||
from bumble.device import Advertisement
|
||||
from bumble.hci import HCI_Constant, HCI_LE_1M_PHY, HCI_LE_CODED_PHY
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@@ -48,16 +48,19 @@ class AdvertisementPrinter:
|
||||
self.min_rssi = min_rssi
|
||||
self.resolver = resolver
|
||||
|
||||
def print_advertisement(self, address, address_color, ad_data, rssi):
|
||||
if self.min_rssi is not None and rssi < self.min_rssi:
|
||||
def print_advertisement(self, advertisement):
|
||||
address = advertisement.address
|
||||
address_color = 'yellow' if advertisement.is_connectable else 'red'
|
||||
|
||||
if self.min_rssi is not None and advertisement.rssi < self.min_rssi:
|
||||
return
|
||||
|
||||
address_qualifier = ''
|
||||
resolution_qualifier = ''
|
||||
if self.resolver and address.is_resolvable:
|
||||
resolved = self.resolver.resolve(address)
|
||||
if self.resolver and advertisement.address.is_resolvable:
|
||||
resolved = self.resolver.resolve(advertisement.address)
|
||||
if resolved is not None:
|
||||
resolution_qualifier = f'(resolved from {address})'
|
||||
resolution_qualifier = f'(resolved from {advertisement.address})'
|
||||
address = resolved
|
||||
|
||||
address_type_string = ('PUBLIC', 'RANDOM', 'PUBLIC_ID', 'RANDOM_ID')[address.address_type]
|
||||
@@ -74,18 +77,30 @@ class AdvertisementPrinter:
|
||||
type_color = 'blue'
|
||||
address_qualifier = '(non-resolvable)'
|
||||
|
||||
rssi_bar = make_rssi_bar(rssi)
|
||||
separator = '\n '
|
||||
print(f'>>> {color(address, address_color)} [{color(address_type_string, type_color)}]{address_qualifier}{resolution_qualifier}:{separator}RSSI:{rssi:4} {rssi_bar}{separator}{ad_data.to_string(separator)}\n')
|
||||
rssi_bar = make_rssi_bar(advertisement.rssi)
|
||||
if not advertisement.is_legacy:
|
||||
phy_info = (
|
||||
f'PHY: {HCI_Constant.le_phy_name(advertisement.primary_phy)}/'
|
||||
f'{HCI_Constant.le_phy_name(advertisement.secondary_phy)} '
|
||||
f'{separator}'
|
||||
)
|
||||
else:
|
||||
phy_info = ''
|
||||
|
||||
def on_advertisement(self, address, ad_data, rssi, connectable):
|
||||
address_color = 'yellow' if connectable else 'red'
|
||||
self.print_advertisement(address, address_color, ad_data, rssi)
|
||||
print(
|
||||
f'>>> {color(address, address_color)} '
|
||||
f'[{color(address_type_string, type_color)}]{address_qualifier}{resolution_qualifier}:{separator}'
|
||||
f'{phy_info}'
|
||||
f'RSSI:{advertisement.rssi:4} {rssi_bar}{separator}'
|
||||
f'{advertisement.data.to_string(separator)}\n')
|
||||
|
||||
def on_advertising_report(self, address, ad_data, rssi, event_type):
|
||||
print(f'{color("EVENT", "green")}: {HCI_LE_Advertising_Report_Event.event_type_name(event_type)}')
|
||||
ad_data = AdvertisingData.from_bytes(ad_data)
|
||||
self.print_advertisement(address, 'yellow', ad_data, rssi)
|
||||
def on_advertisement(self, advertisement):
|
||||
self.print_advertisement(advertisement)
|
||||
|
||||
def on_advertising_report(self, report):
|
||||
print(f'{color("EVENT", "green")}: {report.event_type_string()}')
|
||||
self.print_advertisement(Advertisement.from_advertising_report(report))
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@@ -94,6 +109,7 @@ async def scan(
|
||||
passive,
|
||||
scan_interval,
|
||||
scan_window,
|
||||
phy,
|
||||
filter_duplicates,
|
||||
raw,
|
||||
keystore_file,
|
||||
@@ -126,11 +142,18 @@ async def scan(
|
||||
device.on('advertisement', printer.on_advertisement)
|
||||
|
||||
await device.power_on()
|
||||
|
||||
if phy is None:
|
||||
scanning_phys = [HCI_LE_1M_PHY, HCI_LE_CODED_PHY]
|
||||
else:
|
||||
scanning_phys = [{'1m': HCI_LE_1M_PHY, 'coded': HCI_LE_CODED_PHY}[phy]]
|
||||
|
||||
await device.start_scanning(
|
||||
active=(not passive),
|
||||
scan_interval=scan_interval,
|
||||
scan_window=scan_window,
|
||||
filter_duplicates=filter_duplicates
|
||||
filter_duplicates=filter_duplicates,
|
||||
scanning_phys=scanning_phys
|
||||
)
|
||||
|
||||
await hci_source.wait_for_termination()
|
||||
@@ -142,14 +165,15 @@ async def scan(
|
||||
@click.option('--passive', is_flag=True, default=False, help='Perform passive scanning')
|
||||
@click.option('--scan-interval', type=int, default=60, help='Scan interval')
|
||||
@click.option('--scan-window', type=int, default=60, help='Scan window')
|
||||
@click.option('--phy', type=click.Choice(['1m', 'coded']), help='Only scan on the specified PHY')
|
||||
@click.option('--filter-duplicates', type=bool, default=True, help='Filter duplicates at the controller level')
|
||||
@click.option('--raw', is_flag=True, default=False, help='Listen for raw advertising reports instead of processed ones')
|
||||
@click.option('--keystore-file', help='Keystore file to use when resolving addresses')
|
||||
@click.option('--device-config', help='Device config file for the scanning device')
|
||||
@click.argument('transport')
|
||||
def main(min_rssi, passive, scan_interval, scan_window, filter_duplicates, raw, keystore_file, device_config, transport):
|
||||
def main(min_rssi, passive, scan_interval, scan_window, phy, filter_duplicates, raw, keystore_file, device_config, transport):
|
||||
logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'WARNING').upper())
|
||||
asyncio.run(scan(min_rssi, passive, scan_interval, scan_window, filter_duplicates, raw, keystore_file, device_config, transport))
|
||||
asyncio.run(scan(min_rssi, passive, scan_interval, scan_window, phy, filter_duplicates, raw, keystore_file, device_config, transport))
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
@@ -90,7 +90,7 @@ class SnoopPacketReader:
|
||||
@click.command()
|
||||
@click.option('--format', type=click.Choice(['h4', 'snoop']), default='h4', help='Format of the input file')
|
||||
@click.argument('filename')
|
||||
def show(format, filename):
|
||||
def main(format, filename):
|
||||
input = open(filename, 'rb')
|
||||
if format == 'h4':
|
||||
packet_reader = PacketReader(input)
|
||||
@@ -117,4 +117,4 @@ def show(format, filename):
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
if __name__ == '__main__':
|
||||
show()
|
||||
main()
|
||||
|
||||
@@ -28,6 +28,8 @@
|
||||
# -----------------------------------------------------------------------------
|
||||
import os
|
||||
import logging
|
||||
import sys
|
||||
import click
|
||||
import usb1
|
||||
from colors import color
|
||||
|
||||
@@ -35,6 +37,7 @@ from colors import color
|
||||
# -----------------------------------------------------------------------------
|
||||
# Constants
|
||||
# -----------------------------------------------------------------------------
|
||||
USB_DEVICE_CLASS_DEVICE = 0x00
|
||||
USB_DEVICE_CLASS_WIRELESS_CONTROLLER = 0xE0
|
||||
USB_DEVICE_SUBCLASS_RF_CONTROLLER = 0x01
|
||||
USB_DEVICE_PROTOCOL_BLUETOOTH_PRIMARY_CONTROLLER = 0x01
|
||||
@@ -75,9 +78,81 @@ USB_DEVICE_CLASSES = {
|
||||
0xFF: 'Vendor Specific'
|
||||
}
|
||||
|
||||
USB_ENDPOINT_IN = 0x80
|
||||
USB_ENDPOINT_TYPES = ['CONTROL', 'ISOCHRONOUS', 'BULK', 'INTERRUPT']
|
||||
|
||||
USB_BT_HCI_CLASS_TUPLE = (
|
||||
USB_DEVICE_CLASS_WIRELESS_CONTROLLER,
|
||||
USB_DEVICE_SUBCLASS_RF_CONTROLLER,
|
||||
USB_DEVICE_PROTOCOL_BLUETOOTH_PRIMARY_CONTROLLER
|
||||
)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
def main():
|
||||
def show_device_details(device):
|
||||
for configuration in device:
|
||||
print(f' Configuration {configuration.getConfigurationValue()}')
|
||||
for interface in configuration:
|
||||
for setting in interface:
|
||||
alternateSetting = setting.getAlternateSetting()
|
||||
suffix = f'/{alternateSetting}' if interface.getNumSettings() > 1 else ''
|
||||
(class_string, subclass_string) = get_class_info(
|
||||
setting.getClass(),
|
||||
setting.getSubClass(),
|
||||
setting.getProtocol()
|
||||
)
|
||||
details = f'({class_string}, {subclass_string})'
|
||||
print(f' Interface: {setting.getNumber()}{suffix} {details}')
|
||||
for endpoint in setting:
|
||||
endpoint_type = USB_ENDPOINT_TYPES[endpoint.getAttributes() & 3]
|
||||
endpoint_direction = 'OUT' if (endpoint.getAddress() & USB_ENDPOINT_IN == 0) else 'IN'
|
||||
print(f' Endpoint 0x{endpoint.getAddress():02X}: {endpoint_type} {endpoint_direction}')
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
def get_class_info(cls, subclass, protocol):
|
||||
class_info = USB_DEVICE_CLASSES.get(cls)
|
||||
protocol_string = ''
|
||||
if class_info is None:
|
||||
class_string = f'0x{cls:02X}'
|
||||
else:
|
||||
if type(class_info) is tuple:
|
||||
class_string = class_info[0]
|
||||
subclass_info = class_info[1].get(subclass)
|
||||
if subclass_info:
|
||||
protocol_string = subclass_info.get(protocol)
|
||||
if protocol_string is not None:
|
||||
protocol_string = f' [{protocol_string}]'
|
||||
|
||||
else:
|
||||
class_string = class_info
|
||||
|
||||
subclass_string = f'{subclass}/{protocol}{protocol_string}'
|
||||
|
||||
return (class_string, subclass_string)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
def is_bluetooth_hci(device):
|
||||
# Check if the device class indicates a match
|
||||
if (device.getDeviceClass(), device.getDeviceSubClass(), device.getDeviceProtocol()) == USB_BT_HCI_CLASS_TUPLE:
|
||||
return True
|
||||
|
||||
# If the device class is 'Device', look for a matching interface
|
||||
if device.getDeviceClass() == USB_DEVICE_CLASS_DEVICE:
|
||||
for configuration in device:
|
||||
for interface in configuration:
|
||||
for setting in interface:
|
||||
if (setting.getClass(), setting.getSubClass(), setting.getProtocol()) == USB_BT_HCI_CLASS_TUPLE:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@click.command()
|
||||
@click.option('--verbose', is_flag=True, default=False, help='Print more details')
|
||||
def main(verbose):
|
||||
logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'WARNING').upper())
|
||||
|
||||
with usb1.USBContext() as context:
|
||||
@@ -91,23 +166,28 @@ def main():
|
||||
|
||||
device_id = (device.getVendorID(), device.getProductID())
|
||||
|
||||
device_is_bluetooth_hci = (
|
||||
device_class == USB_DEVICE_CLASS_WIRELESS_CONTROLLER and
|
||||
device_subclass == USB_DEVICE_SUBCLASS_RF_CONTROLLER and
|
||||
device_protocol == USB_DEVICE_PROTOCOL_BLUETOOTH_PRIMARY_CONTROLLER
|
||||
(device_class_string, device_subclass_string) = get_class_info(
|
||||
device_class,
|
||||
device_subclass,
|
||||
device_protocol
|
||||
)
|
||||
|
||||
device_class_details = ''
|
||||
device_class_info = USB_DEVICE_CLASSES.get(device_class)
|
||||
if device_class_info is not None:
|
||||
if type(device_class_info) is tuple:
|
||||
device_class = device_class_info[0]
|
||||
device_subclass_info = device_class_info[1].get(device_subclass)
|
||||
if device_subclass_info:
|
||||
device_class_details = f' [{device_subclass_info.get(device_protocol)}]'
|
||||
else:
|
||||
device_class = device_class_info
|
||||
try:
|
||||
device_serial_number = device.getSerialNumber()
|
||||
except usb1.USBError:
|
||||
device_serial_number = None
|
||||
|
||||
try:
|
||||
device_manufacturer = device.getManufacturer()
|
||||
except usb1.USBError:
|
||||
device_manufacturer = None
|
||||
|
||||
try:
|
||||
device_product = device.getProduct()
|
||||
except usb1.USBError:
|
||||
device_product = None
|
||||
|
||||
device_is_bluetooth_hci = is_bluetooth_hci(device)
|
||||
if device_is_bluetooth_hci:
|
||||
bluetooth_device_count += 1
|
||||
fg_color = 'black'
|
||||
@@ -123,33 +203,35 @@ def main():
|
||||
if device_is_bluetooth_hci:
|
||||
bumble_transport_names.append(f'usb:{bluetooth_device_count - 1}')
|
||||
|
||||
serial_number_collision = False
|
||||
if device_id in devices:
|
||||
for device_serial in devices[device_id]:
|
||||
if device_serial == device.getSerialNumber():
|
||||
serial_number_collision = True
|
||||
|
||||
if device_id not in devices:
|
||||
bumble_transport_names.append(basic_transport_name)
|
||||
else:
|
||||
bumble_transport_names.append(f'{basic_transport_name}#{len(devices[device_id])}')
|
||||
|
||||
if device.getSerialNumber() and not serial_number_collision:
|
||||
bumble_transport_names.append(f'{basic_transport_name}/{device.getSerialNumber()}')
|
||||
if device_serial_number is not None:
|
||||
if device_id not in devices or device_serial_number not in devices[device_id]:
|
||||
bumble_transport_names.append(f'{basic_transport_name}/{device_serial_number}')
|
||||
|
||||
# Print the results
|
||||
print(color(f'ID {device.getVendorID():04X}:{device.getProductID():04X}', fg=fg_color, bg=bg_color))
|
||||
if bumble_transport_names:
|
||||
print(color(' Bumble Transport Names:', 'blue'), ' or '.join(color(x, 'cyan' if device_is_bluetooth_hci else 'red') for x in bumble_transport_names))
|
||||
print(color(' Bus/Device: ', 'green'), f'{device.getBusNumber():03}/{device.getDeviceAddress():03}')
|
||||
if device.getSerialNumber():
|
||||
print(color(' Serial: ', 'green'), device.getSerialNumber())
|
||||
print(color(' Class: ', 'green'), device_class)
|
||||
print(color(' Subclass/Protocol: ', 'green'), f'{device_subclass}/{device_protocol}{device_class_details}')
|
||||
print(color(' Manufacturer: ', 'green'), device.getManufacturer())
|
||||
print(color(' Product: ', 'green'), device.getProduct())
|
||||
print(color(' Class: ', 'green'), device_class_string)
|
||||
print(color(' Subclass/Protocol: ', 'green'), device_subclass_string)
|
||||
if device_serial_number is not None:
|
||||
print(color(' Serial: ', 'green'), device_serial_number)
|
||||
if device_manufacturer is not None:
|
||||
print(color(' Manufacturer: ', 'green'), device_manufacturer)
|
||||
if device_product is not None:
|
||||
print(color(' Product: ', 'green'), device_product)
|
||||
|
||||
if verbose:
|
||||
show_device_details(device)
|
||||
|
||||
print()
|
||||
|
||||
devices.setdefault(device_id, []).append(device.getSerialNumber())
|
||||
devices.setdefault(device_id, []).append(device_serial_number)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
@@ -700,16 +700,26 @@ class Attribute(EventEmitter):
|
||||
else:
|
||||
self.value = value
|
||||
|
||||
def encode_value(self, value):
|
||||
return value
|
||||
|
||||
def decode_value(self, value_bytes):
|
||||
return value_bytes
|
||||
|
||||
def read_value(self, connection):
|
||||
if read := getattr(self.value, 'read', None):
|
||||
try:
|
||||
return read(connection)
|
||||
value = read(connection)
|
||||
except ATT_Error as error:
|
||||
raise ATT_Error(error_code=error.error_code, att_handle=self.handle)
|
||||
else:
|
||||
return self.value
|
||||
value = self.value
|
||||
|
||||
return self.encode_value(value)
|
||||
|
||||
def write_value(self, connection, value_bytes):
|
||||
value = self.decode_value(value_bytes)
|
||||
|
||||
def write_value(self, connection, value):
|
||||
if write := getattr(self.value, 'write', None):
|
||||
try:
|
||||
write(connection, value)
|
||||
@@ -721,7 +731,11 @@ class Attribute(EventEmitter):
|
||||
self.emit('write', connection, value)
|
||||
|
||||
def __repr__(self):
|
||||
if len(self.value) > 0:
|
||||
if type(self.value) is bytes:
|
||||
value_str = self.value.hex()
|
||||
else:
|
||||
value_str = str(self.value)
|
||||
if value_str:
|
||||
value_string = f', value={self.value.hex()}'
|
||||
else:
|
||||
value_string = ''
|
||||
|
||||
@@ -76,7 +76,7 @@ class Controller:
|
||||
self.supported_commands = bytes.fromhex('2000800000c000000000e40000002822000000000000040000f7ffff7f00000030f0f9ff01008004000000000000000000000000000000000000000000000000')
|
||||
self.le_features = bytes.fromhex('ff49010000000000')
|
||||
self.le_states = bytes.fromhex('ffff3fffff030000')
|
||||
self.avertising_channel_tx_power = 0
|
||||
self.advertising_channel_tx_power = 0
|
||||
self.filter_accept_list_size = 8
|
||||
self.resolving_list_size = 8
|
||||
self.supported_max_tx_octets = 27
|
||||
@@ -259,15 +259,15 @@ class Controller:
|
||||
|
||||
# Then say that the connection has completed
|
||||
self.send_hci_packet(HCI_LE_Connection_Complete_Event(
|
||||
status = HCI_SUCCESS,
|
||||
connection_handle = connection.handle,
|
||||
role = connection.role,
|
||||
peer_address_type = peer_address_type,
|
||||
peer_address = peer_address,
|
||||
conn_interval = 10, # FIXME
|
||||
conn_latency = 0, # FIXME
|
||||
supervision_timeout = 10, # FIXME
|
||||
master_clock_accuracy = 7 # FIXME
|
||||
status = HCI_SUCCESS,
|
||||
connection_handle = connection.handle,
|
||||
role = connection.role,
|
||||
peer_address_type = peer_address_type,
|
||||
peer_address = peer_address,
|
||||
connection_interval = 10, # FIXME
|
||||
peripheral_latency = 0, # FIXME
|
||||
supervision_timeout = 10, # FIXME
|
||||
central_clock_accuracy = 7 # FIXME
|
||||
))
|
||||
|
||||
def on_link_central_disconnected(self, peer_address, reason):
|
||||
@@ -313,15 +313,15 @@ class Controller:
|
||||
|
||||
# Say that the connection has completed
|
||||
self.send_hci_packet(HCI_LE_Connection_Complete_Event(
|
||||
status = status,
|
||||
connection_handle = connection.handle if connection else 0,
|
||||
role = BT_CENTRAL_ROLE,
|
||||
peer_address_type = le_create_connection_command.peer_address_type,
|
||||
peer_address = le_create_connection_command.peer_address,
|
||||
conn_interval = le_create_connection_command.conn_interval_min,
|
||||
conn_latency = le_create_connection_command.conn_latency,
|
||||
supervision_timeout = le_create_connection_command.supervision_timeout,
|
||||
master_clock_accuracy = 0
|
||||
status = status,
|
||||
connection_handle = connection.handle if connection else 0,
|
||||
role = BT_CENTRAL_ROLE,
|
||||
peer_address_type = le_create_connection_command.peer_address_type,
|
||||
peer_address = le_create_connection_command.peer_address,
|
||||
connection_interval = le_create_connection_command.connection_interval_min,
|
||||
peripheral_latency = le_create_connection_command.max_latency,
|
||||
supervision_timeout = le_create_connection_command.supervision_timeout,
|
||||
central_clock_accuracy = 0
|
||||
))
|
||||
|
||||
def on_link_peripheral_disconnection_complete(self, disconnection_command, status):
|
||||
@@ -583,13 +583,15 @@ class Controller:
|
||||
'''
|
||||
See Bluetooth spec Vol 2, Part E - 7.4.1 Read Local Version Information Command
|
||||
'''
|
||||
return struct.pack('<BBHBHH',
|
||||
HCI_SUCCESS,
|
||||
self.hci_version,
|
||||
self.hci_revision,
|
||||
self.lmp_version,
|
||||
self.manufacturer_name,
|
||||
self.lmp_subversion)
|
||||
return struct.pack(
|
||||
'<BBHBHH',
|
||||
HCI_SUCCESS,
|
||||
self.hci_version,
|
||||
self.hci_revision,
|
||||
self.lmp_version,
|
||||
self.manufacturer_name,
|
||||
self.lmp_subversion
|
||||
)
|
||||
|
||||
def on_hci_read_local_supported_commands_command(self, command):
|
||||
'''
|
||||
@@ -650,7 +652,7 @@ class Controller:
|
||||
'''
|
||||
See Bluetooth spec Vol 2, Part E - 7.8.6 LE Read Advertising Channel Tx Power Command
|
||||
'''
|
||||
return bytes([HCI_SUCCESS, self.avertising_channel_tx_power])
|
||||
return bytes([HCI_SUCCESS, self.advertising_channel_tx_power])
|
||||
|
||||
def on_hci_le_set_advertising_data_command(self, command):
|
||||
'''
|
||||
@@ -857,9 +859,9 @@ class Controller:
|
||||
See Bluetooth spec Vol 2, Part E - 7.8.44 LE Set Address Resolution Enable Command
|
||||
'''
|
||||
ret = HCI_SUCCESS
|
||||
if command.address_resolution == 1:
|
||||
if command.address_resolution_enable == 1:
|
||||
self.le_address_resolution = True
|
||||
elif command.address_resolution == 0:
|
||||
elif command.address_resolution_enable == 0:
|
||||
self.le_address_resolution = False
|
||||
else:
|
||||
ret = HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR
|
||||
@@ -876,12 +878,26 @@ class Controller:
|
||||
'''
|
||||
See Bluetooth spec Vol 2, Part E - 7.8.46 LE Read Maximum Data Length Command
|
||||
'''
|
||||
return struct.pack('<BHHHH',
|
||||
HCI_SUCCESS,
|
||||
self.supported_max_tx_octets,
|
||||
self.supported_max_tx_time,
|
||||
self.supported_max_rx_octets,
|
||||
self.supported_max_rx_time)
|
||||
return struct.pack(
|
||||
'<BHHHH',
|
||||
HCI_SUCCESS,
|
||||
self.supported_max_tx_octets,
|
||||
self.supported_max_tx_time,
|
||||
self.supported_max_rx_octets,
|
||||
self.supported_max_rx_time
|
||||
)
|
||||
|
||||
def on_hci_le_read_phy_command(self, command):
|
||||
'''
|
||||
See Bluetooth spec Vol 2, Part E - 7.8.47 LE Read PHY command
|
||||
'''
|
||||
return struct.pack(
|
||||
'<BHBB',
|
||||
HCI_SUCCESS,
|
||||
command.connection_handle,
|
||||
HCI_LE_1M_PHY,
|
||||
HCI_LE_1M_PHY
|
||||
)
|
||||
|
||||
def on_hci_le_set_default_phy_command(self, command):
|
||||
'''
|
||||
@@ -893,3 +909,4 @@ class Controller:
|
||||
'rx_phys': command.rx_phys
|
||||
}
|
||||
return bytes([HCI_SUCCESS])
|
||||
|
||||
|
||||
@@ -831,13 +831,17 @@ class AdvertisingData:
|
||||
# Connection Parameters
|
||||
# -----------------------------------------------------------------------------
|
||||
class ConnectionParameters:
|
||||
def __init__(self, connection_interval, connection_latency, supervision_timeout):
|
||||
def __init__(self, connection_interval, peripheral_latency, supervision_timeout):
|
||||
self.connection_interval = connection_interval
|
||||
self.connection_latency = connection_latency
|
||||
self.peripheral_latency = peripheral_latency
|
||||
self.supervision_timeout = supervision_timeout
|
||||
|
||||
def __str__(self):
|
||||
return f'ConnectionParameters(connection_interval={self.connection_interval}, connection_latency={self.connection_latency}, supervision_timeout={self.supervision_timeout}'
|
||||
return (
|
||||
f'ConnectionParameters(connection_interval={self.connection_interval}, '
|
||||
f'peripheral_latency={self.peripheral_latency}, '
|
||||
f'supervision_timeout={self.supervision_timeout}'
|
||||
)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
863
bumble/device.py
863
bumble/device.py
File diff suppressed because it is too large
Load Diff
@@ -303,6 +303,7 @@ class CharacteristicAdapter:
|
||||
'''
|
||||
def __init__(self, characteristic):
|
||||
self.wrapped_characteristic = characteristic
|
||||
self.subscribers = {} # Map from subscriber to proxy subscriber
|
||||
|
||||
if (
|
||||
asyncio.iscoroutinefunction(characteristic.read_value) and
|
||||
@@ -317,11 +318,21 @@ class CharacteristicAdapter:
|
||||
if hasattr(self.wrapped_characteristic, 'subscribe'):
|
||||
self.subscribe = self.wrapped_subscribe
|
||||
|
||||
if hasattr(self.wrapped_characteristic, 'unsubscribe'):
|
||||
self.unsubscribe = self.wrapped_unsubscribe
|
||||
|
||||
def __getattr__(self, name):
|
||||
return getattr(self.wrapped_characteristic, name)
|
||||
|
||||
def __setattr__(self, name, value):
|
||||
if name in {'wrapped_characteristic', 'read_value', 'write_value', 'subscribe'}:
|
||||
if name in {
|
||||
'wrapped_characteristic',
|
||||
'subscribers',
|
||||
'read_value',
|
||||
'write_value',
|
||||
'subscribe',
|
||||
'unsubscribe'
|
||||
}:
|
||||
super().__setattr__(name, value)
|
||||
else:
|
||||
setattr(self.wrapped_characteristic, name, value)
|
||||
@@ -335,8 +346,11 @@ class CharacteristicAdapter:
|
||||
async def read_decoded_value(self):
|
||||
return self.decode_value(await self.wrapped_characteristic.read_value())
|
||||
|
||||
async def write_decoded_value(self, value):
|
||||
return await self.wrapped_characteristic.write_value(self.encode_value(value))
|
||||
async def write_decoded_value(self, value, with_response=False):
|
||||
return await self.wrapped_characteristic.write_value(
|
||||
self.encode_value(value),
|
||||
with_response
|
||||
)
|
||||
|
||||
def encode_value(self, value):
|
||||
return value
|
||||
@@ -345,9 +359,26 @@ class CharacteristicAdapter:
|
||||
return value
|
||||
|
||||
def wrapped_subscribe(self, subscriber=None):
|
||||
return self.wrapped_characteristic.subscribe(
|
||||
None if subscriber is None else lambda value: subscriber(self.decode_value(value))
|
||||
)
|
||||
if subscriber is not None:
|
||||
if subscriber in self.subscribers:
|
||||
# We already have a proxy subscriber
|
||||
subscriber = self.subscribers[subscriber]
|
||||
else:
|
||||
# Create and register a proxy that will decode the value
|
||||
original_subscriber = subscriber
|
||||
|
||||
def on_change(value):
|
||||
original_subscriber(self.decode_value(value))
|
||||
self.subscribers[subscriber] = on_change
|
||||
subscriber = on_change
|
||||
|
||||
return self.wrapped_characteristic.subscribe(subscriber)
|
||||
|
||||
def wrapped_unsubscribe(self, subscriber=None):
|
||||
if subscriber in self.subscribers:
|
||||
subscriber = self.subscribers.pop(subscriber)
|
||||
|
||||
return self.wrapped_characteristic.unsubscribe(subscriber)
|
||||
|
||||
def __str__(self):
|
||||
wrapped = str(self.wrapped_characteristic)
|
||||
|
||||
@@ -58,10 +58,16 @@ class AttributeProxy(EventEmitter):
|
||||
self.type = attribute_type
|
||||
|
||||
async def read_value(self, no_long_read=False):
|
||||
return await self.client.read_value(self.handle, no_long_read)
|
||||
return self.decode_value(await self.client.read_value(self.handle, no_long_read))
|
||||
|
||||
async def write_value(self, value, with_response=False):
|
||||
return await self.client.write_value(self.handle, value, with_response)
|
||||
return await self.client.write_value(self.handle, self.encode_value(value), with_response)
|
||||
|
||||
def encode_value(self, value):
|
||||
return value
|
||||
|
||||
def decode_value(self, value_bytes):
|
||||
return value_bytes
|
||||
|
||||
def __str__(self):
|
||||
return f'Attribute(handle=0x{self.handle:04X}, type={self.uuid})'
|
||||
@@ -98,6 +104,7 @@ class CharacteristicProxy(AttributeProxy):
|
||||
self.properties = properties
|
||||
self.descriptors = []
|
||||
self.descriptors_discovered = False
|
||||
self.subscribers = {} # Map from subscriber to proxy subscriber
|
||||
|
||||
def get_descriptor(self, descriptor_type):
|
||||
for descriptor in self.descriptors:
|
||||
@@ -108,9 +115,25 @@ class CharacteristicProxy(AttributeProxy):
|
||||
return await self.client.discover_descriptors(self)
|
||||
|
||||
async def subscribe(self, subscriber=None):
|
||||
if subscriber is not None:
|
||||
if subscriber in self.subscribers:
|
||||
# We already have a proxy subscriber
|
||||
subscriber = self.subscribers[subscriber]
|
||||
else:
|
||||
# Create and register a proxy that will decode the value
|
||||
original_subscriber = subscriber
|
||||
|
||||
def on_change(value):
|
||||
original_subscriber(self.decode_value(value))
|
||||
self.subscribers[subscriber] = on_change
|
||||
subscriber = on_change
|
||||
|
||||
return await self.client.subscribe(self, subscriber)
|
||||
|
||||
async def unsubscribe(self, subscriber=None):
|
||||
if subscriber in self.subscribers:
|
||||
subscriber = self.subscribers.pop(subscriber)
|
||||
|
||||
return await self.client.unsubscribe(self, subscriber)
|
||||
|
||||
def __str__(self):
|
||||
@@ -140,7 +163,6 @@ class ProfileServiceProxy:
|
||||
class Client:
|
||||
def __init__(self, connection):
|
||||
self.connection = connection
|
||||
self.mtu = ATT_DEFAULT_MTU
|
||||
self.mtu_exchange_done = False
|
||||
self.request_semaphore = asyncio.Semaphore(1)
|
||||
self.pending_request = None
|
||||
@@ -162,8 +184,8 @@ class Client:
|
||||
# Wait until we can send (only one pending command at a time for the connection)
|
||||
response = None
|
||||
async with self.request_semaphore:
|
||||
assert(self.pending_request is None)
|
||||
assert(self.pending_response is None)
|
||||
assert self.pending_request is None
|
||||
assert self.pending_response is None
|
||||
|
||||
# Create a future value to hold the eventual response
|
||||
self.pending_response = asyncio.get_running_loop().create_future()
|
||||
@@ -194,7 +216,7 @@ class Client:
|
||||
|
||||
# We can only send one request per connection
|
||||
if self.mtu_exchange_done:
|
||||
return
|
||||
return self.connection.att_mtu
|
||||
|
||||
# Send the request
|
||||
self.mtu_exchange_done = True
|
||||
@@ -207,8 +229,10 @@ class Client:
|
||||
response
|
||||
)
|
||||
|
||||
self.mtu = max(ATT_DEFAULT_MTU, response.server_rx_mtu)
|
||||
return self.mtu
|
||||
# Compute the final MTU
|
||||
self.connection.att_mtu = min(mtu, response.server_rx_mtu)
|
||||
|
||||
return self.connection.att_mtu
|
||||
|
||||
def get_services_by_uuid(self, uuid):
|
||||
return [service for service in self.services if service.uuid == uuid]
|
||||
@@ -570,12 +594,18 @@ class Client:
|
||||
subscribers = subscriber_set.get(characteristic.handle, [])
|
||||
if subscriber in subscribers:
|
||||
subscribers.remove(subscriber)
|
||||
|
||||
# Cleanup if we removed the last one
|
||||
if not subscribers:
|
||||
subscriber_set.remove(characteristic.handle)
|
||||
else:
|
||||
# Remove all subscribers for this attribute from the sets!
|
||||
self.notification_subscribers.pop(characteristic.handle, None)
|
||||
self.indication_subscribers.pop(characteristic.handle, None)
|
||||
|
||||
await self.write_value(cccd, b'\x00\x00', with_response=True)
|
||||
if not self.notification_subscribers and not self.indication_subscribers:
|
||||
# No more subscribers left
|
||||
await self.write_value(cccd, b'\x00\x00', with_response=True)
|
||||
|
||||
async def read_value(self, attribute, no_long_read=False):
|
||||
'''
|
||||
@@ -600,7 +630,7 @@ class Client:
|
||||
# If the value is the max size for the MTU, try to read more unless the caller
|
||||
# specifically asked not to do that
|
||||
attribute_value = response.attribute_value
|
||||
if not no_long_read and len(attribute_value) == self.mtu - 1:
|
||||
if not no_long_read and len(attribute_value) == self.connection.att_mtu - 1:
|
||||
logger.debug('using READ BLOB to get the rest of the value')
|
||||
offset = len(attribute_value)
|
||||
while True:
|
||||
@@ -622,7 +652,7 @@ class Client:
|
||||
part = response.part_attribute_value
|
||||
attribute_value += part
|
||||
|
||||
if len(part) < self.mtu - 1:
|
||||
if len(part) < self.connection.att_mtu - 1:
|
||||
break
|
||||
|
||||
offset += len(part)
|
||||
|
||||
@@ -40,6 +40,12 @@ from .gatt import *
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Constants
|
||||
# -----------------------------------------------------------------------------
|
||||
GATT_SERVER_DEFAULT_MAX_MTU = 517
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# GATT Server
|
||||
# -----------------------------------------------------------------------------
|
||||
@@ -49,9 +55,8 @@ class Server(EventEmitter):
|
||||
self.device = device
|
||||
self.attributes = [] # Attributes, ordered by increasing handle values
|
||||
self.attributes_by_handle = {} # Map for fast attribute access by handle
|
||||
self.max_mtu = 23 # FIXME: 517 # The max MTU we're willing to negotiate
|
||||
self.max_mtu = GATT_SERVER_DEFAULT_MAX_MTU # The max MTU we're willing to negotiate
|
||||
self.subscribers = {} # Map of subscriber states by connection handle and attribute handle
|
||||
self.mtus = {} # Map of ATT MTU values by connection handle
|
||||
self.indication_semaphores = defaultdict(lambda: asyncio.Semaphore(1))
|
||||
self.pending_confirmations = defaultdict(lambda: None)
|
||||
|
||||
@@ -169,7 +174,7 @@ class Server(EventEmitter):
|
||||
logger.debug(f'GATT Response from server: [0x{connection.handle:04X}] {response}')
|
||||
self.send_gatt_pdu(connection.handle, response.to_bytes())
|
||||
|
||||
async def notify_subscriber(self, connection, attribute, force=False):
|
||||
async def notify_subscriber(self, connection, attribute, value=None, force=False):
|
||||
# Check if there's a subscriber
|
||||
if not force:
|
||||
subscribers = self.subscribers.get(connection.handle)
|
||||
@@ -184,13 +189,12 @@ class Server(EventEmitter):
|
||||
logger.debug(f'not notifying, cccd={cccd.hex()}')
|
||||
return
|
||||
|
||||
# Get the value
|
||||
value = attribute.read_value(connection)
|
||||
# Get or encode the value
|
||||
value = attribute.read_value(connection) if value is None else attribute.encode_value(value)
|
||||
|
||||
# Truncate if needed
|
||||
mtu = self.get_mtu(connection)
|
||||
if len(value) > mtu - 3:
|
||||
value = value[:mtu - 3]
|
||||
if len(value) > connection.att_mtu - 3:
|
||||
value = value[:connection.att_mtu - 3]
|
||||
|
||||
# Notify
|
||||
notification = ATT_Handle_Value_Notification(
|
||||
@@ -198,27 +202,9 @@ class Server(EventEmitter):
|
||||
attribute_value = value
|
||||
)
|
||||
logger.debug(f'GATT Notify from server: [0x{connection.handle:04X}] {notification}')
|
||||
self.send_gatt_pdu(connection.handle, notification.to_bytes())
|
||||
self.send_gatt_pdu(connection.handle, bytes(notification))
|
||||
|
||||
async def notify_subscribers(self, attribute, force=False):
|
||||
# Get all the connections for which there's at least one subscription
|
||||
connections = [
|
||||
connection for connection in [
|
||||
self.device.lookup_connection(connection_handle)
|
||||
for (connection_handle, subscribers) in self.subscribers.items()
|
||||
if force or subscribers.get(attribute.handle)
|
||||
]
|
||||
if connection is not None
|
||||
]
|
||||
|
||||
# Notify for each connection
|
||||
if connections:
|
||||
await asyncio.wait([
|
||||
self.notify_subscriber(connection, attribute, force)
|
||||
for connection in connections
|
||||
])
|
||||
|
||||
async def indicate_subscriber(self, connection, attribute, force=False):
|
||||
async def indicate_subscriber(self, connection, attribute, value=None, force=False):
|
||||
# Check if there's a subscriber
|
||||
if not force:
|
||||
subscribers = self.subscribers.get(connection.handle)
|
||||
@@ -233,13 +219,12 @@ class Server(EventEmitter):
|
||||
logger.debug(f'not indicating, cccd={cccd.hex()}')
|
||||
return
|
||||
|
||||
# Get the value
|
||||
value = attribute.read_value(connection)
|
||||
# Get or encode the value
|
||||
value = attribute.read_value(connection) if value is None else attribute.encode_value(value)
|
||||
|
||||
# Truncate if needed
|
||||
mtu = self.get_mtu(connection)
|
||||
if len(value) > mtu - 3:
|
||||
value = value[:mtu - 3]
|
||||
if len(value) > connection.att_mtu - 3:
|
||||
value = value[:connection.att_mtu - 3]
|
||||
|
||||
# Indicate
|
||||
indication = ATT_Handle_Value_Indication(
|
||||
@@ -264,27 +249,32 @@ class Server(EventEmitter):
|
||||
finally:
|
||||
self.pending_confirmations[connection.handle] = None
|
||||
|
||||
async def indicate_subscribers(self, attribute):
|
||||
async def notify_or_indicate_subscribers(self, indicate, attribute, value=None, force=False):
|
||||
# Get all the connections for which there's at least one subscription
|
||||
connections = [
|
||||
connection for connection in [
|
||||
self.device.lookup_connection(connection_handle)
|
||||
for (connection_handle, subscribers) in self.subscribers.items()
|
||||
if subscribers.get(attribute.handle)
|
||||
if force or subscribers.get(attribute.handle)
|
||||
]
|
||||
if connection is not None
|
||||
]
|
||||
|
||||
# Indicate for each connection
|
||||
# Indicate or notify for each connection
|
||||
if connections:
|
||||
coroutine = self.indicate_subscriber if indicate else self.notify_subscriber
|
||||
await asyncio.wait([
|
||||
self.indicate_subscriber(connection, attribute)
|
||||
asyncio.create_task(coroutine(connection, attribute, value, force))
|
||||
for connection in connections
|
||||
])
|
||||
|
||||
async def notify_subscribers(self, attribute, value=None, force=False):
|
||||
return await self.notify_or_indicate_subscribers(False, attribute, value, force)
|
||||
|
||||
async def indicate_subscribers(self, attribute, value=None, force=False):
|
||||
return await self.notify_or_indicate_subscribers(True, attribute, value, force)
|
||||
|
||||
def on_disconnection(self, connection):
|
||||
if connection.handle in self.mtus:
|
||||
del self.mtus[connection.handle]
|
||||
if connection.handle in self.subscribers:
|
||||
del self.subscribers[connection.handle]
|
||||
if connection.handle in self.indication_semaphores:
|
||||
@@ -325,9 +315,6 @@ class Server(EventEmitter):
|
||||
# Just ignore
|
||||
logger.warning(f'{color("--- Ignoring GATT Request from [0x{connection.handle:04X}]:", "red")} {att_pdu}')
|
||||
|
||||
def get_mtu(self, connection):
|
||||
return self.mtus.get(connection.handle, ATT_DEFAULT_MTU)
|
||||
|
||||
#######################################################
|
||||
# ATT handlers
|
||||
#######################################################
|
||||
@@ -347,12 +334,16 @@ class Server(EventEmitter):
|
||||
'''
|
||||
See Bluetooth spec Vol 3, Part F - 3.4.2.1 Exchange MTU Request
|
||||
'''
|
||||
mtu = max(ATT_DEFAULT_MTU, min(self.max_mtu, request.client_rx_mtu))
|
||||
self.mtus[connection.handle] = mtu
|
||||
self.send_response(connection, ATT_Exchange_MTU_Response(server_rx_mtu = mtu))
|
||||
self.send_response(connection, ATT_Exchange_MTU_Response(server_rx_mtu = self.max_mtu))
|
||||
|
||||
# Notify the device
|
||||
self.device.on_connection_att_mtu_update(connection.handle, mtu)
|
||||
# Compute the final MTU
|
||||
if request.client_rx_mtu >= ATT_DEFAULT_MTU:
|
||||
mtu = min(self.max_mtu, request.client_rx_mtu)
|
||||
|
||||
# Notify the device
|
||||
self.device.on_connection_att_mtu_update(connection.handle, mtu)
|
||||
else:
|
||||
logger.warning('invalid client_rx_mtu received, MTU not changed')
|
||||
|
||||
def on_att_find_information_request(self, connection, request):
|
||||
'''
|
||||
@@ -369,7 +360,7 @@ class Server(EventEmitter):
|
||||
return
|
||||
|
||||
# Build list of returned attributes
|
||||
pdu_space_available = self.get_mtu(connection) - 2
|
||||
pdu_space_available = connection.att_mtu - 2
|
||||
attributes = []
|
||||
uuid_size = 0
|
||||
for attribute in (
|
||||
@@ -420,7 +411,7 @@ class Server(EventEmitter):
|
||||
'''
|
||||
|
||||
# Build list of returned attributes
|
||||
pdu_space_available = self.get_mtu(connection) - 2
|
||||
pdu_space_available = connection.att_mtu - 2
|
||||
attributes = []
|
||||
for attribute in (
|
||||
attribute for attribute in self.attributes if
|
||||
@@ -468,8 +459,7 @@ class Server(EventEmitter):
|
||||
See Bluetooth spec Vol 3, Part F - 3.4.4.1 Read By Type Request
|
||||
'''
|
||||
|
||||
mtu = self.get_mtu(connection)
|
||||
pdu_space_available = mtu - 2
|
||||
pdu_space_available = connection.att_mtu - 2
|
||||
attributes = []
|
||||
for attribute in (
|
||||
attribute for attribute in self.attributes if
|
||||
@@ -482,7 +472,7 @@ class Server(EventEmitter):
|
||||
|
||||
# Check the attribute value size
|
||||
attribute_value = attribute.read_value(connection)
|
||||
max_attribute_size = min(mtu - 4, 253)
|
||||
max_attribute_size = min(connection.att_mtu - 4, 253)
|
||||
if len(attribute_value) > max_attribute_size:
|
||||
# We need to truncate
|
||||
attribute_value = attribute_value[:max_attribute_size]
|
||||
@@ -522,7 +512,7 @@ class Server(EventEmitter):
|
||||
if attribute := self.get_attribute(request.attribute_handle):
|
||||
# TODO: check permissions
|
||||
value = attribute.read_value(connection)
|
||||
value_size = min(self.get_mtu(connection) - 1, len(value))
|
||||
value_size = min(connection.att_mtu - 1, len(value))
|
||||
response = ATT_Read_Response(
|
||||
attribute_value = value[:value_size]
|
||||
)
|
||||
@@ -541,7 +531,6 @@ class Server(EventEmitter):
|
||||
|
||||
if attribute := self.get_attribute(request.attribute_handle):
|
||||
# TODO: check permissions
|
||||
mtu = self.get_mtu(connection)
|
||||
value = attribute.read_value(connection)
|
||||
if request.value_offset > len(value):
|
||||
response = ATT_Error_Response(
|
||||
@@ -549,14 +538,14 @@ class Server(EventEmitter):
|
||||
attribute_handle_in_error = request.attribute_handle,
|
||||
error_code = ATT_INVALID_OFFSET_ERROR
|
||||
)
|
||||
elif len(value) <= mtu - 1:
|
||||
elif len(value) <= connection.att_mtu - 1:
|
||||
response = ATT_Error_Response(
|
||||
request_opcode_in_error = request.op_code,
|
||||
attribute_handle_in_error = request.attribute_handle,
|
||||
error_code = ATT_ATTRIBUTE_NOT_LONG_ERROR
|
||||
)
|
||||
else:
|
||||
part_size = min(mtu - 1, len(value) - request.value_offset)
|
||||
part_size = min(connection.att_mtu - 1, len(value) - request.value_offset)
|
||||
response = ATT_Read_Blob_Response(
|
||||
part_attribute_value = value[request.value_offset:request.value_offset + part_size]
|
||||
)
|
||||
@@ -585,8 +574,7 @@ class Server(EventEmitter):
|
||||
self.send_response(connection, response)
|
||||
return
|
||||
|
||||
mtu = self.get_mtu(connection)
|
||||
pdu_space_available = mtu - 2
|
||||
pdu_space_available = connection.att_mtu - 2
|
||||
attributes = []
|
||||
for attribute in (
|
||||
attribute for attribute in self.attributes if
|
||||
@@ -597,7 +585,7 @@ class Server(EventEmitter):
|
||||
):
|
||||
# Check the attribute value size
|
||||
attribute_value = attribute.read_value(connection)
|
||||
max_attribute_size = min(mtu - 6, 251)
|
||||
max_attribute_size = min(connection.att_mtu - 6, 251)
|
||||
if len(attribute_value) > max_attribute_size:
|
||||
# We need to truncate
|
||||
attribute_value = attribute_value[:max_attribute_size]
|
||||
|
||||
870
bumble/hci.py
870
bumble/hci.py
File diff suppressed because it is too large
Load Diff
@@ -76,7 +76,7 @@ class Host(EventEmitter):
|
||||
self.hc_total_num_acl_data_packets = HOST_HC_TOTAL_NUM_ACL_DATA_PACKETS
|
||||
self.acl_packet_queue = collections.deque()
|
||||
self.acl_packets_in_flight = 0
|
||||
self.local_version = HCI_VERSION_BLUETOOTH_CORE_4_0
|
||||
self.local_version = None
|
||||
self.local_supported_commands = bytes(64)
|
||||
self.local_le_features = 0
|
||||
self.command_semaphore = asyncio.Semaphore(1)
|
||||
@@ -91,32 +91,23 @@ class Host(EventEmitter):
|
||||
self.set_packet_sink(controller_sink)
|
||||
|
||||
async def reset(self):
|
||||
await self.send_command(HCI_Reset_Command())
|
||||
await self.send_command(HCI_Reset_Command(), check_result=True)
|
||||
self.ready = True
|
||||
|
||||
response = await self.send_command(HCI_Read_Local_Supported_Commands_Command())
|
||||
if response.return_parameters.status == HCI_SUCCESS:
|
||||
self.local_supported_commands = response.return_parameters.supported_commands
|
||||
else:
|
||||
logger.warn(f'HCI_Read_Local_Supported_Commands_Command failed: {response.return_parameters.status}')
|
||||
response = await self.send_command(HCI_Read_Local_Supported_Commands_Command(), check_result=True)
|
||||
self.local_supported_commands = response.return_parameters.supported_commands
|
||||
|
||||
if self.supports_command(HCI_LE_READ_LOCAL_SUPPORTED_FEATURES_COMMAND):
|
||||
response = await self.send_command(HCI_LE_Read_Local_Supported_Features_Command())
|
||||
if response.return_parameters.status == HCI_SUCCESS:
|
||||
self.local_le_features = struct.unpack('<Q', response.return_parameters.le_features)[0]
|
||||
else:
|
||||
logger.warn(f'HCI_LE_Read_Supported_Features_Command failed: {response.return_parameters.status}')
|
||||
response = await self.send_command(HCI_LE_Read_Local_Supported_Features_Command(), check_result=True)
|
||||
self.local_le_features = struct.unpack('<Q', response.return_parameters.le_features)[0]
|
||||
|
||||
if self.supports_command(HCI_READ_LOCAL_VERSION_INFORMATION_COMMAND):
|
||||
response = await self.send_command(HCI_Read_Local_Version_Information_Command())
|
||||
if response.return_parameters.status == HCI_SUCCESS:
|
||||
self.local_version = response.return_parameters
|
||||
else:
|
||||
logger.warn(f'HCI_Read_Local_Version_Information_Command failed: {response.return_parameters.status}')
|
||||
response = await self.send_command(HCI_Read_Local_Version_Information_Command(), check_result=True)
|
||||
self.local_version = response.return_parameters
|
||||
|
||||
await self.send_command(HCI_Set_Event_Mask_Command(event_mask = bytes.fromhex('FFFFFFFFFFFFFF3F')))
|
||||
|
||||
if self.local_version.hci_version <= HCI_VERSION_BLUETOOTH_CORE_4_0:
|
||||
if self.local_version is not None and self.local_version.hci_version <= HCI_VERSION_BLUETOOTH_CORE_4_0:
|
||||
# Some older controllers don't like event masks with bits they don't understand
|
||||
le_event_mask = bytes.fromhex('1F00000000000000')
|
||||
else:
|
||||
@@ -124,20 +115,14 @@ class Host(EventEmitter):
|
||||
await self.send_command(HCI_LE_Set_Event_Mask_Command(le_event_mask = le_event_mask))
|
||||
|
||||
if self.supports_command(HCI_READ_BUFFER_SIZE_COMMAND):
|
||||
response = await self.send_command(HCI_Read_Buffer_Size_Command())
|
||||
if response.return_parameters.status == HCI_SUCCESS:
|
||||
self.hc_acl_data_packet_length = response.return_parameters.hc_acl_data_packet_length
|
||||
self.hc_total_num_acl_data_packets = response.return_parameters.hc_total_num_acl_data_packets
|
||||
else:
|
||||
logger.warn(f'HCI_Read_Buffer_Size_Command failed: {response.return_parameters.status}')
|
||||
response = await self.send_command(HCI_Read_Buffer_Size_Command(), check_result=True)
|
||||
self.hc_acl_data_packet_length = response.return_parameters.hc_acl_data_packet_length
|
||||
self.hc_total_num_acl_data_packets = response.return_parameters.hc_total_num_acl_data_packets
|
||||
|
||||
if self.supports_command(HCI_LE_READ_BUFFER_SIZE_COMMAND):
|
||||
response = await self.send_command(HCI_LE_Read_Buffer_Size_Command())
|
||||
if response.return_parameters.status == HCI_SUCCESS:
|
||||
self.hc_le_acl_data_packet_length = response.return_parameters.hc_le_acl_data_packet_length
|
||||
self.hc_total_num_le_acl_data_packets = response.return_parameters.hc_total_num_le_acl_data_packets
|
||||
else:
|
||||
logger.warn(f'HCI_LE_Read_Buffer_Size_Command failed: {response.return_parameters.status}')
|
||||
response = await self.send_command(HCI_LE_Read_Buffer_Size_Command(), check_result=True)
|
||||
self.hc_le_acl_data_packet_length = response.return_parameters.hc_le_acl_data_packet_length
|
||||
self.hc_total_num_le_acl_data_packets = response.return_parameters.hc_total_num_le_acl_data_packets
|
||||
|
||||
if response.return_parameters.hc_le_acl_data_packet_length == 0 or response.return_parameters.hc_total_num_le_acl_data_packets == 0:
|
||||
# LE and Classic share the same values
|
||||
@@ -171,7 +156,7 @@ class Host(EventEmitter):
|
||||
def send_hci_packet(self, packet):
|
||||
self.hci_sink.on_packet(packet.to_bytes())
|
||||
|
||||
async def send_command(self, command):
|
||||
async def send_command(self, command, check_result=False):
|
||||
logger.debug(f'{color("### HOST -> CONTROLLER", "blue")}: {command}')
|
||||
|
||||
# Wait until we can send (only one pending command at a time)
|
||||
@@ -186,11 +171,22 @@ class Host(EventEmitter):
|
||||
try:
|
||||
self.send_hci_packet(command)
|
||||
response = await self.pending_response
|
||||
# TODO: check error values
|
||||
|
||||
# Check the return parameters if required
|
||||
if check_result:
|
||||
if type(response.return_parameters) is int:
|
||||
status = response.return_parameters
|
||||
else:
|
||||
status = response.return_parameters.status
|
||||
|
||||
if status != HCI_SUCCESS:
|
||||
logger.warning(f'{command.name} failed ({HCI_Constant.error_name(status)})')
|
||||
raise HCI_Error(status)
|
||||
|
||||
return response
|
||||
except Exception as error:
|
||||
logger.warning(f'{color("!!! Exception while sending HCI packet:", "red")} {error}')
|
||||
# raise error
|
||||
raise error
|
||||
finally:
|
||||
self.pending_command = None
|
||||
self.pending_response = None
|
||||
@@ -370,8 +366,8 @@ class Host(EventEmitter):
|
||||
|
||||
# Notify the client
|
||||
connection_parameters = ConnectionParameters(
|
||||
event.conn_interval,
|
||||
event.conn_latency,
|
||||
event.connection_interval,
|
||||
event.peripheral_latency,
|
||||
event.supervision_timeout
|
||||
)
|
||||
self.emit(
|
||||
@@ -387,7 +383,7 @@ class Host(EventEmitter):
|
||||
logger.debug(f'### CONNECTION FAILED: {event.status}')
|
||||
|
||||
# Notify the listeners
|
||||
self.emit('connection_failure', event.status)
|
||||
self.emit('connection_failure', event.connection_handle, event.status)
|
||||
|
||||
def on_hci_le_enhanced_connection_complete_event(self, event):
|
||||
# Just use the same implementation as for the non-enhanced event for now
|
||||
@@ -435,7 +431,7 @@ class Host(EventEmitter):
|
||||
logger.debug(f'### DISCONNECTION FAILED: {event.status}')
|
||||
|
||||
# Notify the listeners
|
||||
self.emit('disconnection_failure', event.status)
|
||||
self.emit('disconnection_failure', event.connection_handle, event.status)
|
||||
|
||||
def on_hci_le_connection_update_complete_event(self, event):
|
||||
if (connection := self.connections.get(event.connection_handle)) is None:
|
||||
@@ -445,8 +441,8 @@ class Host(EventEmitter):
|
||||
# Notify the client
|
||||
if event.status == HCI_SUCCESS:
|
||||
connection_parameters = ConnectionParameters(
|
||||
event.conn_interval,
|
||||
event.conn_latency,
|
||||
event.connection_interval,
|
||||
event.peripheral_latency,
|
||||
event.supervision_timeout
|
||||
)
|
||||
self.emit('connection_parameters_update', connection.handle, connection_parameters)
|
||||
@@ -467,13 +463,10 @@ class Host(EventEmitter):
|
||||
|
||||
def on_hci_le_advertising_report_event(self, event):
|
||||
for report in event.reports:
|
||||
self.emit(
|
||||
'advertising_report',
|
||||
report.address,
|
||||
report.data,
|
||||
report.rssi,
|
||||
report.event_type
|
||||
)
|
||||
self.emit('advertising_report', report)
|
||||
|
||||
def on_hci_le_extended_advertising_report_event(self, event):
|
||||
self.on_hci_le_advertising_report_event(event)
|
||||
|
||||
def on_hci_le_remote_connection_parameter_request_event(self, event):
|
||||
if event.connection_handle not in self.connections:
|
||||
@@ -489,8 +482,8 @@ class Host(EventEmitter):
|
||||
interval_max = event.interval_max,
|
||||
latency = event.latency,
|
||||
timeout = event.timeout,
|
||||
minimum_ce_length = 0,
|
||||
maximum_ce_length = 0
|
||||
min_ce_length = 0,
|
||||
max_ce_length = 0
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@@ -462,10 +462,10 @@ class L2CAP_Information_Response(L2CAP_Control_Frame):
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@L2CAP_Control_Frame.subclass([
|
||||
('interval_min', 2),
|
||||
('interval_max', 2),
|
||||
('slave_latency', 2),
|
||||
('timeout_multiplier', 2)
|
||||
('interval_min', 2),
|
||||
('interval_max', 2),
|
||||
('latency', 2),
|
||||
('timeout', 2)
|
||||
])
|
||||
class L2CAP_Connection_Parameter_Update_Request(L2CAP_Control_Frame):
|
||||
'''
|
||||
@@ -857,10 +857,10 @@ class ChannelManager:
|
||||
identifier = (self.identifiers.setdefault(connection.handle, 0) + 1) % 256
|
||||
self.identifiers[connection.handle] = identifier
|
||||
return identifier
|
||||
|
||||
|
||||
def register_fixed_channel(self, cid, handler):
|
||||
self.fixed_channels[cid] = handler
|
||||
|
||||
|
||||
def deregister_fixed_channel(self, cid):
|
||||
if cid in self.fixed_channels:
|
||||
del self.fixed_channels[cid]
|
||||
@@ -1057,13 +1057,13 @@ class ChannelManager:
|
||||
)
|
||||
)
|
||||
self.host.send_command_sync(HCI_LE_Connection_Update_Command(
|
||||
connection_handle = connection.handle,
|
||||
conn_interval_min = request.interval_min,
|
||||
conn_interval_max = request.interval_max,
|
||||
conn_latency = request.slave_latency,
|
||||
supervision_timeout = request.timeout_multiplier,
|
||||
minimum_ce_length = 0,
|
||||
maximum_ce_length = 0
|
||||
connection_handle = connection.handle,
|
||||
connection_interval_min = request.interval_min,
|
||||
connection_interval_max = request.interval_max,
|
||||
max_latency = request.latency,
|
||||
supervision_timeout = request.timeout,
|
||||
min_ce_length = 0,
|
||||
max_ce_length = 0
|
||||
))
|
||||
else:
|
||||
self.send_control_frame(
|
||||
|
||||
@@ -36,7 +36,7 @@ logger = logging.getLogger(__name__)
|
||||
async def open_usb_transport(spec):
|
||||
'''
|
||||
Open a USB transport.
|
||||
The parameter string has this syntax:
|
||||
The moniker string has this syntax:
|
||||
either <index> or
|
||||
<vendor>:<product> or
|
||||
<vendor>:<product>/<serial-number>] or
|
||||
@@ -47,15 +47,21 @@ async def open_usb_transport(spec):
|
||||
/<serial-number> suffix or #<index> suffix max be specified when more than one device with
|
||||
the same vendor and product identifiers are present.
|
||||
|
||||
In addition, if the moniker ends with the symbol "!", the device will be used in "forced" mode:
|
||||
the first USB interface of the device will be used, regardless of the interface class/subclass.
|
||||
This may be useful for some devices that use a custom class/subclass but may nonetheless work as-is.
|
||||
|
||||
Examples:
|
||||
0 --> the first BT USB dongle
|
||||
04b4:f901 --> the BT USB dongle with vendor=04b4 and product=f901
|
||||
04b4:f901#2 --> the third USB device with vendor=04b4 and product=f901
|
||||
04b4:f901/00E04C239987 --> the BT USB dongle with vendor=04b4 and product=f901 and serial number 00E04C239987
|
||||
usb:0B05:17CB! --> the BT USB dongle vendor=0B05 and product=17CB, in "forced" mode.
|
||||
'''
|
||||
|
||||
USB_RECIPIENT_DEVICE = 0x00
|
||||
USB_REQUEST_TYPE_CLASS = 0x01 << 5
|
||||
USB_DEVICE_CLASS_DEVICE = 0x00
|
||||
USB_DEVICE_CLASS_WIRELESS_CONTROLLER = 0xE0
|
||||
USB_DEVICE_SUBCLASS_RF_CONTROLLER = 0x01
|
||||
USB_DEVICE_PROTOCOL_BLUETOOTH_PRIMARY_CONTROLLER = 0x01
|
||||
@@ -63,6 +69,12 @@ async def open_usb_transport(spec):
|
||||
USB_ENDPOINT_TRANSFER_TYPE_INTERRUPT = 0x03
|
||||
USB_ENDPOINT_IN = 0x80
|
||||
|
||||
USB_BT_HCI_CLASS_TUPLE = (
|
||||
USB_DEVICE_CLASS_WIRELESS_CONTROLLER,
|
||||
USB_DEVICE_SUBCLASS_RF_CONTROLLER,
|
||||
USB_DEVICE_PROTOCOL_BLUETOOTH_PRIMARY_CONTROLLER
|
||||
)
|
||||
|
||||
READ_SIZE = 1024
|
||||
|
||||
class UsbPacketSink:
|
||||
@@ -280,6 +292,13 @@ async def open_usb_transport(spec):
|
||||
context.open()
|
||||
try:
|
||||
found = None
|
||||
|
||||
if spec.endswith('!'):
|
||||
spec = spec[:-1]
|
||||
forced_mode = True
|
||||
else:
|
||||
forced_mode = False
|
||||
|
||||
if ':' in spec:
|
||||
vendor_id, product_id = spec.split(':')
|
||||
serial_number = None
|
||||
@@ -291,10 +310,14 @@ async def open_usb_transport(spec):
|
||||
device_index = int(device_index_str)
|
||||
|
||||
for device in context.getDeviceIterator(skip_on_error=True):
|
||||
try:
|
||||
device_serial_number = device.getSerialNumber()
|
||||
except usb1.USBError:
|
||||
device_serial_number = None
|
||||
if (
|
||||
device.getVendorID() == int(vendor_id, 16) and
|
||||
device.getProductID() == int(product_id, 16) and
|
||||
(serial_number is None or device.getSerialNumber() == serial_number)
|
||||
(serial_number is None or serial_number == device_serial_number)
|
||||
):
|
||||
if device_index == 0:
|
||||
found = device
|
||||
@@ -302,13 +325,27 @@ async def open_usb_transport(spec):
|
||||
device_index -= 1
|
||||
device.close()
|
||||
else:
|
||||
# Look for a compatible device by index
|
||||
def device_is_bluetooth_hci(device):
|
||||
# Check if the device class indicates a match
|
||||
if (device.getDeviceClass(), device.getDeviceSubClass(), device.getDeviceProtocol()) == \
|
||||
USB_BT_HCI_CLASS_TUPLE:
|
||||
return True
|
||||
|
||||
# If the device class is 'Device', look for a matching interface
|
||||
if device.getDeviceClass() == USB_DEVICE_CLASS_DEVICE:
|
||||
for configuration in device:
|
||||
for interface in configuration:
|
||||
for setting in interface:
|
||||
if (setting.getClass(), setting.getSubClass(), setting.getProtocol()) == \
|
||||
USB_BT_HCI_CLASS_TUPLE:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
device_index = int(spec)
|
||||
for device in context.getDeviceIterator(skip_on_error=True):
|
||||
if (
|
||||
device.getDeviceClass() == USB_DEVICE_CLASS_WIRELESS_CONTROLLER and
|
||||
device.getDeviceSubClass() == USB_DEVICE_SUBCLASS_RF_CONTROLLER and
|
||||
device.getDeviceProtocol() == USB_DEVICE_PROTOCOL_BLUETOOTH_PRIMARY_CONTROLLER
|
||||
):
|
||||
if device_is_bluetooth_hci(device):
|
||||
if device_index == 0:
|
||||
found = device
|
||||
break
|
||||
@@ -329,9 +366,8 @@ async def open_usb_transport(spec):
|
||||
setting = None
|
||||
for setting in interface:
|
||||
if (
|
||||
setting.getClass() != USB_DEVICE_CLASS_WIRELESS_CONTROLLER or
|
||||
setting.getSubClass() != USB_DEVICE_SUBCLASS_RF_CONTROLLER or
|
||||
setting.getProtocol() != USB_DEVICE_PROTOCOL_BLUETOOTH_PRIMARY_CONTROLLER
|
||||
not forced_mode and
|
||||
(setting.getClass(), setting.getSubClass(), setting.getProtocol()) != USB_BT_HCI_CLASS_TUPLE
|
||||
):
|
||||
continue
|
||||
|
||||
|
||||
@@ -45,6 +45,10 @@ nav:
|
||||
- HCI Bridge: apps_and_tools/hci_bridge.md
|
||||
- Golden Gate Bridge: apps_and_tools/gg_bridge.md
|
||||
- Show: apps_and_tools/show.md
|
||||
- GATT Dump: apps_and_tools/gatt_dump.md
|
||||
- Pair: apps_and_tools/pair.md
|
||||
- Unbond: apps_and_tools/unbond.md
|
||||
- USB Probe: apps_and_tools/usb_probe.md
|
||||
- Hardware:
|
||||
- Overview: hardware/index.md
|
||||
- Platforms:
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
# This requirements file is for python3
|
||||
mkdocs == 1.2.3
|
||||
mkdocs-material == 7.1.7
|
||||
mkdocs-material-extensions == 1.0.1
|
||||
pymdown-extensions == 8.2
|
||||
mkdocstrings == 0.15.1
|
||||
mkdocs == 1.4.0
|
||||
mkdocs-material == 8.5.6
|
||||
mkdocs-material-extensions == 1.0.3
|
||||
pymdown-extensions == 9.6
|
||||
mkdocstrings-python == 0.7.1
|
||||
@@ -13,17 +13,29 @@ type of device (there's no way to tell).
|
||||
|
||||
## Usage
|
||||
|
||||
This command line tool takes no arguments.
|
||||
This command line tool may be invoked with no arguments, or with `--verbose`
|
||||
for extra details.
|
||||
When installed from PyPI, run as
|
||||
```
|
||||
$ bumble-usb-probe
|
||||
```
|
||||
|
||||
or, for extra details, with the `--verbose` argument
|
||||
```
|
||||
$ bumble-usb-probe --v
|
||||
```
|
||||
|
||||
When running from the source distribution:
|
||||
```
|
||||
$ python3 apps/usb-probe.py
|
||||
```
|
||||
|
||||
or
|
||||
|
||||
```
|
||||
$ python3 apps/usb-probe.py --verbose
|
||||
```
|
||||
|
||||
!!! example
|
||||
```
|
||||
$ python3 apps/usb_probe.py
|
||||
|
||||
@@ -1,48 +1,86 @@
|
||||
:material-linux: LINUX PLATFORM
|
||||
===============================
|
||||
|
||||
In addition to all the standard functionality available from the project by running the python tools and/or writing your own apps by leveraging the API, it is also possible on Linux hosts to interface the Bumble stack with the native BlueZ stack, and with Bluetooth controllers.
|
||||
Using Bumble With Physical Bluetooth Controllers
|
||||
------------------------------------------------
|
||||
|
||||
Using Bumble With BlueZ
|
||||
-----------------------
|
||||
A Bumble application can interface with a local Bluetooth controller on a Linux host.
|
||||
The 3 main types of physical Bluetooth controllers are:
|
||||
|
||||
A Bumble virtual controller can be attached to the BlueZ stack.
|
||||
Attaching a controller to BlueZ can be done by either simulating a UART HCI interface, or by using the VHCI driver interface if available.
|
||||
In both cases, the controller can run locally on the Linux host, or remotely on a different host, with a bridge between the remote controller and the local BlueZ host, which may be useful when the BlueZ stack is running on an embedded system, or a host on which running the Bumble controller is not convenient.
|
||||
* Bluetooth USB Dongle
|
||||
* HCI over UART (via a serial port)
|
||||
* Kernel-managed Bluetooth HCI (HCI Sockets)
|
||||
|
||||
### Using VHCI
|
||||
!!! tip "Conflicts with the kernel and BlueZ"
|
||||
If your use a USB dongle that is recognized by your kernel as a supported Bluetooth device, it is
|
||||
likely that the kernel driver will claim that USB device and attach it to the BlueZ stack.
|
||||
If you want to claim ownership of it to use with Bumble, you will need to set the state of the corresponding HCI interface as `DOWN`.
|
||||
HCI interfaces are numbered, starting from 0 (i.e `hci0`, `hci1`, ...).
|
||||
|
||||
With the [VHCI transport](../transports/vhci.md) you can attach a Bumble virtual controller to the BlueZ stack. Once attached, the controller will appear just like any other controller, and thus can be used with the standard BlueZ tools.
|
||||
|
||||
!!! example "Attaching a virtual controller"
|
||||
With the example app `run_controller.py`:
|
||||
For example, to bring `hci0` down:
|
||||
```
|
||||
PYTHONPATH=. python3 examples/run_controller.py F6:F7:F8:F9:FA:FB examples/device1.json vhci
|
||||
```
|
||||
|
||||
You should see a 'Virtual Bus' controller. For example:
|
||||
```
|
||||
$ hciconfig
|
||||
hci0: Type: Primary Bus: Virtual
|
||||
BD Address: F6:F7:F8:F9:FA:FB ACL MTU: 27:64 SCO MTU: 0:0
|
||||
UP RUNNING
|
||||
RX bytes:0 acl:0 sco:0 events:43 errors:0
|
||||
TX bytes:274 acl:0 sco:0 commands:43 errors:0
|
||||
$ sudo hciconfig hci0 down
|
||||
```
|
||||
|
||||
And scanning for devices should show the virtual 'Bumble' device that's running as part of the `run_controller.py` example app:
|
||||
You can use the `hciconfig` command with no arguments to get a list of HCI interfaces seen by
|
||||
the kernel.
|
||||
|
||||
Also, if `bluetoothd` is running on your system, it will likely re-claim the interface after you
|
||||
close it, so you may need to bring the interface back `UP` before using it again, or to disable
|
||||
`bluetoothd` altogether (see the section further below about BlueZ and `bluetoothd`).
|
||||
|
||||
### Using a USB Dongle
|
||||
|
||||
See the [USB Transport page](../transports/usb.md) for general information on how to use HCI USB controllers.
|
||||
|
||||
!!! tip "USB Permissions"
|
||||
By default, when running as a regular user, you won't have the permission to use
|
||||
arbitrary USB devices.
|
||||
You can change the permissions for a specific USB device based on its bus number and
|
||||
device number (you can use `lsusb` to find the Bus and Device numbers for your Bluetooth
|
||||
dongle).
|
||||
|
||||
Example:
|
||||
```
|
||||
pi@raspberrypi:~ $ sudo hcitool -i hci2 lescan
|
||||
LE Scan ...
|
||||
F0:F1:F2:F3:F4:F5 Bumble
|
||||
$ sudo chmod o+w /dev/bus/usb/001/004
|
||||
```
|
||||
This will change the permissions for Device 4 on Bus 1.
|
||||
|
||||
Note that the USB Bus number and Device number may change depending on where you plug the USB
|
||||
dongle and what other USB devices and hubs are also plugged in.
|
||||
|
||||
If you need to make the permission changes permanent across reboots, you can create a `udev`
|
||||
rule for your specific Bluetooth dongle. Visit [this Arch Linux Wiki page](https://wiki.archlinux.org/title/udev) for a
|
||||
good overview of how you may do that.
|
||||
|
||||
### Using HCI over UART
|
||||
|
||||
See the [Serial Transport page](../transports/serial.md) for general information on how to use HCI over a UART (serial port).
|
||||
|
||||
### Using HCI Sockets
|
||||
|
||||
HCI sockets provide a way to send/receive HCI packets to/from a Bluetooth controller managed by the kernel.
|
||||
The HCI device referenced by an `hci-socket` transport (`hciX`, where `X` is an integer, with `hci0` being the first controller device, and so on) must be in the `DOWN` state before it can be opened as a transport.
|
||||
You can bring a HCI controller `UP` or `DOWN` with `hciconfig`.
|
||||
See the [HCI Socket Transport page](../transports/hci_socket.md) for details on the `hci-socket` tansport syntax.
|
||||
|
||||
The HCI device referenced by an `hci-socket` transport (`hci<X>`, where `<X>` is an integer, with `hci0` being the first controller device, and so on) must be in the `DOWN` state before it can be opened as a transport.
|
||||
You can bring a HCI controller `UP` or `DOWN` with `hciconfig hci<X> up` and `hciconfig hci<X> up`.
|
||||
|
||||
!!! tip "HCI Socket Permissions"
|
||||
By default, when running as a regular user, you won't have the permission to use
|
||||
an HCI socket to a Bluetooth controller (you may see an exception like `PermissionError: [Errno 1] Operation not permitted`).
|
||||
|
||||
If you want to run without using `sudo`, you need to manage the capabilities by adding the appropriate entries in `/etc/security/capability.conf` to grant a user or group the `cap_net_admin` capability.
|
||||
See [this manpage](https://manpages.ubuntu.com/manpages/bionic/man5/capability.conf.5.html) for details.
|
||||
|
||||
Alternatively, if you are just experimenting temporarily, the `capsh` command may be useful in order
|
||||
to execute a single command with enhanced permissions, as in this example:
|
||||
|
||||
|
||||
```
|
||||
$ sudo capsh --caps="cap_net_admin+eip cap_setpcap,cap_setuid,cap_setgid+ep" --keep=1 --user=$USER --addamb=cap_net_admin -- -c "<path/to/executable> <executable-args>"
|
||||
```
|
||||
Where `<path/to/executable>` is the path to your `python3` executable or to one of the Bumble bundled command-line applications.
|
||||
|
||||
!!! tip "List all available controllers"
|
||||
The command
|
||||
```
|
||||
@@ -72,29 +110,16 @@ You can bring a HCI controller `UP` or `DOWN` with `hciconfig`.
|
||||
```
|
||||
$ hciconfig hci0 down
|
||||
```
|
||||
(or `hciX` with `X` being the index of the controller device you want to use), but a simpler solution is to just stop the `bluetoothd` daemon, with a command like:
|
||||
(or `hci<X>` with `<X>` being the index of the controller device you want to use), but a simpler solution is to just stop the `bluetoothd` daemon, with a command like:
|
||||
```
|
||||
$ sudo systemctl stop bluetooth.service
|
||||
```
|
||||
You can always re-start the daemon with
|
||||
```
|
||||
$ sudo systemctl start bluetooth.service
|
||||
```
|
||||
|
||||
### Using a Simulated UART HCI
|
||||
|
||||
### Bridge to a Remote Controller
|
||||
|
||||
|
||||
Using Bumble With Bluetooth Controllers
|
||||
---------------------------------------
|
||||
|
||||
A Bumble application can interface with a local Bluetooth controller.
|
||||
If your Bluetooth controller is a standard HCI USB controller, see the [USB Transport page](../transports/usb.md) for details on how to use HCI USB controllers.
|
||||
If your Bluetooth controller is a standard HCI UART controller, see the [Serial Transport page](../transports/serial.md).
|
||||
Alternatively, a Bumble Host object can communicate with one of the platform's controllers via an HCI Socket.
|
||||
|
||||
`<details to be filled in>`
|
||||
Bumble on the Raspberry Pi
|
||||
--------------------------
|
||||
|
||||
### Raspberry Pi 4 :fontawesome-brands-raspberry-pi:
|
||||
|
||||
@@ -102,9 +127,10 @@ You can use the Bluetooth controller either via the kernel, or directly to the d
|
||||
|
||||
#### Via The Kernel
|
||||
|
||||
Use an HCI Socket transport
|
||||
Use an HCI Socket transport (see section above)
|
||||
|
||||
#### Directly
|
||||
|
||||
In order to use the Bluetooth controller directly on a Raspberry Pi 4 board, you need to ensure that it isn't being used by the BlueZ stack (which it probably is by default).
|
||||
|
||||
```
|
||||
@@ -136,3 +162,47 @@ should detach the controller from the stack, after which you can use the HCI UAR
|
||||
python3 run_scanner.py serial:/dev/serial1,3000000
|
||||
```
|
||||
|
||||
|
||||
Using Bumble With BlueZ
|
||||
-----------------------
|
||||
|
||||
In addition to all the standard functionality available from the project by running the python tools and/or writing your own apps by leveraging the API, it is also possible on Linux hosts to interface the Bumble stack with the native BlueZ stack, and with Bluetooth controllers.
|
||||
|
||||
A Bumble virtual controller can be attached to the BlueZ stack.
|
||||
Attaching a controller to BlueZ can be done by either simulating a UART HCI interface, or by using the VHCI driver interface if available.
|
||||
In both cases, the controller can run locally on the Linux host, or remotely on a different host, with a bridge between the remote controller and the local BlueZ host, which may be useful when the BlueZ stack is running on an embedded system, or a host on which running the Bumble controller is not convenient.
|
||||
|
||||
### Using VHCI
|
||||
|
||||
With the [VHCI transport](../transports/vhci.md) you can attach a Bumble virtual controller to the BlueZ stack. Once attached, the controller will appear just like any other controller, and thus can be used with the standard BlueZ tools.
|
||||
|
||||
!!! example "Attaching a virtual controller"
|
||||
With the example app `run_controller.py`:
|
||||
```
|
||||
python3 examples/run_controller.py F6:F7:F8:F9:FA:FB examples/device1.json vhci
|
||||
```
|
||||
|
||||
You should see a 'Virtual Bus' controller. For example:
|
||||
```
|
||||
$ hciconfig
|
||||
hci0: Type: Primary Bus: Virtual
|
||||
BD Address: F6:F7:F8:F9:FA:FB ACL MTU: 27:64 SCO MTU: 0:0
|
||||
UP RUNNING
|
||||
RX bytes:0 acl:0 sco:0 events:43 errors:0
|
||||
TX bytes:274 acl:0 sco:0 commands:43 errors:0
|
||||
```
|
||||
|
||||
And scanning for devices should show the virtual 'Bumble' device that's running as part of the `run_controller.py` example app:
|
||||
```
|
||||
pi@raspberrypi:~ $ sudo hcitool -i hci2 lescan
|
||||
LE Scan ...
|
||||
F0:F1:F2:F3:F4:F5 Bumble
|
||||
```
|
||||
|
||||
```
|
||||
|
||||
### Using a Simulated UART HCI
|
||||
|
||||
### Bridge to a Remote Controller
|
||||
|
||||
|
||||
|
||||
@@ -5,8 +5,9 @@ The Android emulator transport either connects, as a host, to a "Root Canal" vir
|
||||
("host" mode), or attaches a virtual controller to the Android Bluetooth host stack ("controller" mode).
|
||||
|
||||
## Moniker
|
||||
The moniker syntax for an Android Emulator transport is: `android-emulator:[mode=<host|controller>][mode=<host|controller>]`.
|
||||
Both the `mode=<host|controller>` and `mode=<host|controller>` parameters are optional (so the moniker `android-emulator` by itself is a valid moniker, which will create a transport in `host` mode, connected to `localhost` on the default gRPC port for the emulator)
|
||||
The moniker syntax for an Android Emulator transport is: `android-emulator:[mode=<host|controller>][<hostname>:<port>]`, where
|
||||
the `mode` parameter can specify running as a host or a controller, and `<hostname>:<port>` can specify a host name (or IP address) and TCP port number on which to reach the gRPC server for the emulator.
|
||||
Both the `mode=<host|controller>` and `<hostname>:<port>` parameters are optional (so the moniker `android-emulator` by itself is a valid moniker, which will create a transport in `host` mode, connected to `localhost` on the default gRPC port for the emulator).
|
||||
|
||||
!!! example Example
|
||||
`android-emulator`
|
||||
|
||||
@@ -5,6 +5,7 @@ The USB transport interfaces with a local Bluetooth USB dongle.
|
||||
|
||||
## Moniker
|
||||
The moniker for a USB transport is either:
|
||||
|
||||
* `usb:<index>`
|
||||
* `usb:<vendor>:<product>`
|
||||
* `usb:<vendor>:<product>/<serial-number>`
|
||||
@@ -16,6 +17,10 @@ In the `usb:<vendor>:<product>#<index>` form, matching devices are the ones with
|
||||
|
||||
`<vendor>` and `<product>` are a vendor ID and product ID in hexadecimal.
|
||||
|
||||
In addition, if the moniker ends with the symbol "!", the device will be used in "forced" mode:
|
||||
the first USB interface of the device will be used, regardless of the interface class/subclass.
|
||||
This may be useful for some devices that use a custom class/subclass but may nonetheless work as-is.
|
||||
|
||||
!!! examples
|
||||
`usb:04b4:f901`
|
||||
The USB dongle with `<vendor>` equal to `04b4` and `<product>` equal to `f901`
|
||||
@@ -29,6 +34,10 @@ In the `usb:<vendor>:<product>#<index>` form, matching devices are the ones with
|
||||
`usb:04b4:f901/#1`
|
||||
The second USB dongle with `<vendor>` equal to `04b4` and `<product>` equal to `f901`
|
||||
|
||||
`usb:0B05:17CB!`
|
||||
The BT USB dongle vendor=0B05 and product=17CB, in "forced" mode.
|
||||
|
||||
|
||||
## Alternative
|
||||
The library includes two different implementations of the USB transport, implemented using different python bindings for `libusb`.
|
||||
Using the transport prefix `pyusb:` instead of `usb:` selects the implementation based on [PyUSB](https://pypi.org/project/pyusb/), using the synchronous API of `libusb`, whereas the default implementation is based on [libusb1](https://pypi.org/project/libusb1/), using the asynchronous API of `libusb`. In order to use the alternative PyUSB-based implementation, you need to ensure that you have installed that python module, as it isn't installed by default as a dependency of Bumble.
|
||||
|
||||
@@ -29,18 +29,31 @@ from bumble.transport import open_transport_or_link
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def main():
|
||||
if len(sys.argv) != 3:
|
||||
print('Usage: run_advertiser.py <config-file> <transport-spec>')
|
||||
print('example: run_advertiser.py device1.json link-relay:ws://localhost:8888/test')
|
||||
if len(sys.argv) < 3:
|
||||
print('Usage: run_advertiser.py <config-file> <transport-spec> [type] [address]')
|
||||
print('example: run_advertiser.py device1.json usb:0')
|
||||
return
|
||||
|
||||
if len(sys.argv) >= 4:
|
||||
advertising_type = AdvertisingType(int(sys.argv[3]))
|
||||
else:
|
||||
advertising_type = AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE
|
||||
|
||||
if advertising_type.is_directed:
|
||||
if len(sys.argv) < 5:
|
||||
print('<address> required for directed advertising')
|
||||
return
|
||||
target = Address(sys.argv[4])
|
||||
else:
|
||||
target = None
|
||||
|
||||
print('<<< connecting to HCI...')
|
||||
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
|
||||
print('<<< connected')
|
||||
|
||||
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
|
||||
await device.power_on()
|
||||
await device.start_advertising()
|
||||
await device.start_advertising(advertising_type=advertising_type, target=target)
|
||||
await hci_source.wait_for_termination()
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
@@ -29,15 +29,15 @@ from bumble.transport import open_transport_or_link
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
class ScannerListener(Device.Listener):
|
||||
def on_advertisement(self, address, ad_data, rssi, connectable):
|
||||
address_type_string = ('P', 'R', 'PI', 'RI')[address.address_type]
|
||||
address_color = 'yellow' if connectable else 'red'
|
||||
def on_advertisement(self, advertisement):
|
||||
address_type_string = ('P', 'R', 'PI', 'RI')[advertisement.address.address_type]
|
||||
address_color = 'yellow' if advertisement.is_connectable else 'red'
|
||||
if address_type_string.startswith('P'):
|
||||
type_color = 'green'
|
||||
else:
|
||||
type_color = 'cyan'
|
||||
|
||||
print(f'>>> {color(address, address_color)} [{color(address_type_string, type_color)}]: RSSI={rssi}, {ad_data}')
|
||||
print(f'>>> {color(advertisement.address, address_color)} [{color(address_type_string, type_color)}]: RSSI={advertisement.rssi}, {advertisement.data}')
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
@@ -40,24 +40,24 @@ async def main():
|
||||
device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink)
|
||||
|
||||
@device.on('advertisement')
|
||||
def _(address, ad_data, rssi, connectable):
|
||||
address_type_string = ('PUBLIC', 'RANDOM', 'PUBLIC_ID', 'RANDOM_ID')[address.address_type]
|
||||
address_color = 'yellow' if connectable else 'red'
|
||||
def _(advertisement):
|
||||
address_type_string = ('PUBLIC', 'RANDOM', 'PUBLIC_ID', 'RANDOM_ID')[advertisement.address.address_type]
|
||||
address_color = 'yellow' if advertisement.is_connectable else 'red'
|
||||
address_qualifier = ''
|
||||
if address_type_string.startswith('P'):
|
||||
type_color = 'cyan'
|
||||
else:
|
||||
if address.is_static:
|
||||
if advertisement.address.is_static:
|
||||
type_color = 'green'
|
||||
address_qualifier = '(static)'
|
||||
elif address.is_resolvable:
|
||||
elif advertisement.address.is_resolvable:
|
||||
type_color = 'magenta'
|
||||
address_qualifier = '(resolvable)'
|
||||
else:
|
||||
type_color = 'white'
|
||||
|
||||
separator = '\n '
|
||||
print(f'>>> {color(address, address_color)} [{color(address_type_string, type_color)}]{address_qualifier}:{separator}RSSI:{rssi}{separator}{ad_data.to_string(separator)}')
|
||||
print(f'>>> {color(advertisement.address, address_color)} [{color(address_type_string, type_color)}]{address_qualifier}:{separator}RSSI:{advertisement.rssi}{separator}{advertisement.data.to_string(separator)}')
|
||||
|
||||
await device.power_on()
|
||||
await device.start_scanning(filter_duplicates=filter_duplicates)
|
||||
|
||||
@@ -48,6 +48,7 @@ install_requires =
|
||||
[options.entry_points]
|
||||
console_scripts =
|
||||
bumble-console = bumble.apps.console:main
|
||||
bumble-controller-info = bumble.apps.controller_info:main
|
||||
bumble-gatt-dump = bumble.apps.gatt_dump:main
|
||||
bumble-hci-bridge = bumble.apps.hci_bridge:main
|
||||
bumble-pair = bumble.apps.pair:main
|
||||
@@ -67,6 +68,6 @@ development =
|
||||
invoke >= 1.4
|
||||
nox >= 2022
|
||||
documentation =
|
||||
mkdocs >= 1.2.3
|
||||
mkdocs-material >= 8.1.9
|
||||
mkdocs >= 1.4.0
|
||||
mkdocs-material >= 8.5.6
|
||||
mkdocstrings[python] >= 0.19.0
|
||||
|
||||
@@ -22,6 +22,7 @@ import struct
|
||||
import pytest
|
||||
|
||||
from bumble.controller import Controller
|
||||
from bumble.gatt_client import CharacteristicProxy
|
||||
from bumble.link import LocalLink
|
||||
from bumble.device import Device, Peer
|
||||
from bumble.host import Host
|
||||
@@ -53,29 +54,29 @@ def basic_check(x):
|
||||
parsed = ATT_PDU.from_bytes(pdu)
|
||||
x_str = str(x)
|
||||
parsed_str = str(parsed)
|
||||
assert(x_str == parsed_str)
|
||||
assert x_str == parsed_str
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
def test_UUID():
|
||||
u = UUID.from_16_bits(0x7788)
|
||||
assert(str(u) == 'UUID-16:7788')
|
||||
assert str(u) == 'UUID-16:7788'
|
||||
u = UUID.from_32_bits(0x11223344)
|
||||
assert(str(u) == 'UUID-32:11223344')
|
||||
assert str(u) == 'UUID-32:11223344'
|
||||
u = UUID('61A3512C-09BE-4DDC-A6A6-0B03667AAFC6')
|
||||
assert(str(u) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6')
|
||||
assert str(u) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6'
|
||||
v = UUID(str(u))
|
||||
assert(str(v) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6')
|
||||
assert str(v) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6'
|
||||
w = UUID.from_bytes(v.to_bytes())
|
||||
assert(str(w) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6')
|
||||
assert str(w) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6'
|
||||
|
||||
u1 = UUID.from_16_bits(0x1234)
|
||||
b1 = u1.to_bytes(force_128 = True)
|
||||
u2 = UUID.from_bytes(b1)
|
||||
assert(u1 == u2)
|
||||
assert u1 == u2
|
||||
|
||||
u3 = UUID.from_16_bits(0x180a)
|
||||
assert(str(u3) == 'UUID-16:180A (Device Information)')
|
||||
assert str(u3) == 'UUID-16:180A (Device Information)'
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@@ -98,6 +99,133 @@ def test_ATT_Read_By_Group_Type_Request():
|
||||
basic_check(pdu)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@pytest.mark.asyncio
|
||||
async def test_characteristic_encoding():
|
||||
class Foo(Characteristic):
|
||||
def encode_value(self, value):
|
||||
return bytes([value])
|
||||
|
||||
def decode_value(self, value_bytes):
|
||||
return value_bytes[0]
|
||||
|
||||
c = Foo(GATT_BATTERY_LEVEL_CHARACTERISTIC, Characteristic.READ, Characteristic.READABLE, 123)
|
||||
x = c.read_value(None)
|
||||
assert x == bytes([123])
|
||||
c.write_value(None, bytes([122]))
|
||||
assert c.value == 122
|
||||
|
||||
class FooProxy(CharacteristicProxy):
|
||||
def __init__(self, characteristic):
|
||||
super().__init__(
|
||||
characteristic.client,
|
||||
characteristic.handle,
|
||||
characteristic.end_group_handle,
|
||||
characteristic.uuid,
|
||||
characteristic.properties
|
||||
)
|
||||
|
||||
def encode_value(self, value):
|
||||
return bytes([value])
|
||||
|
||||
def decode_value(self, value_bytes):
|
||||
return value_bytes[0]
|
||||
|
||||
[client, server] = LinkedDevices().devices[:2]
|
||||
|
||||
characteristic = Characteristic(
|
||||
'FDB159DB-036C-49E3-B3DB-6325AC750806',
|
||||
Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY,
|
||||
Characteristic.READABLE | Characteristic.WRITEABLE,
|
||||
bytes([123])
|
||||
)
|
||||
|
||||
service = Service(
|
||||
'3A657F47-D34F-46B3-B1EC-698E29B6B829',
|
||||
[characteristic]
|
||||
)
|
||||
server.add_service(service)
|
||||
|
||||
await client.power_on()
|
||||
await server.power_on()
|
||||
connection = await client.connect(server.random_address)
|
||||
peer = Peer(connection)
|
||||
|
||||
await peer.discover_services()
|
||||
await peer.discover_characteristics()
|
||||
c = peer.get_characteristics_by_uuid(characteristic.uuid)
|
||||
assert len(c) == 1
|
||||
c = c[0]
|
||||
cp = FooProxy(c)
|
||||
|
||||
v = await cp.read_value()
|
||||
assert v == 123
|
||||
await cp.write_value(124)
|
||||
await async_barrier()
|
||||
assert characteristic.value == bytes([124])
|
||||
|
||||
v = await cp.read_value()
|
||||
assert v == 124
|
||||
await cp.write_value(125, with_response=True)
|
||||
await async_barrier()
|
||||
assert characteristic.value == bytes([125])
|
||||
|
||||
cd = DelegatedCharacteristicAdapter(c, encode=lambda x: bytes([x // 2]))
|
||||
await cd.write_value(100, with_response=True)
|
||||
await async_barrier()
|
||||
assert characteristic.value == bytes([50])
|
||||
|
||||
last_change = None
|
||||
|
||||
def on_change(value):
|
||||
nonlocal last_change
|
||||
last_change = value
|
||||
|
||||
await c.subscribe(on_change)
|
||||
await server.notify_subscribers(characteristic)
|
||||
await async_barrier()
|
||||
assert last_change == characteristic.value
|
||||
last_change = None
|
||||
|
||||
await server.notify_subscribers(characteristic, value=bytes([125]))
|
||||
await async_barrier()
|
||||
assert last_change == bytes([125])
|
||||
last_change = None
|
||||
|
||||
await c.unsubscribe(on_change)
|
||||
await server.notify_subscribers(characteristic)
|
||||
await async_barrier()
|
||||
assert last_change is None
|
||||
|
||||
await cp.subscribe(on_change)
|
||||
await server.notify_subscribers(characteristic)
|
||||
await async_barrier()
|
||||
assert last_change == characteristic.value[0]
|
||||
last_change = None
|
||||
|
||||
await server.notify_subscribers(characteristic, value=bytes([126]))
|
||||
await async_barrier()
|
||||
assert last_change == 126
|
||||
last_change = None
|
||||
|
||||
await cp.unsubscribe(on_change)
|
||||
await server.notify_subscribers(characteristic)
|
||||
await async_barrier()
|
||||
assert last_change is None
|
||||
|
||||
cd = DelegatedCharacteristicAdapter(c, decode=lambda x: x[0])
|
||||
await cd.subscribe(on_change)
|
||||
await server.notify_subscribers(characteristic)
|
||||
await async_barrier()
|
||||
assert last_change == characteristic.value[0]
|
||||
last_change = None
|
||||
|
||||
await cd.unsubscribe(on_change)
|
||||
await server.notify_subscribers(characteristic)
|
||||
await async_barrier()
|
||||
assert last_change is None
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
def test_CharacteristicAdapter():
|
||||
# Check that the CharacteristicAdapter base class is transparent
|
||||
@@ -106,21 +234,21 @@ def test_CharacteristicAdapter():
|
||||
a = CharacteristicAdapter(c)
|
||||
|
||||
value = a.read_value(None)
|
||||
assert(value == v)
|
||||
assert value == v
|
||||
|
||||
v = bytes([3, 4, 5])
|
||||
a.write_value(None, v)
|
||||
assert(c.value == v)
|
||||
assert c.value == v
|
||||
|
||||
# Simple delegated adapter
|
||||
a = DelegatedCharacteristicAdapter(c, lambda x: bytes(reversed(x)), lambda x: bytes(reversed(x)))
|
||||
|
||||
value = a.read_value(None)
|
||||
assert(value == bytes(reversed(v)))
|
||||
assert value == bytes(reversed(v))
|
||||
|
||||
v = bytes([3, 4, 5])
|
||||
a.write_value(None, v)
|
||||
assert(a.value == bytes(reversed(v)))
|
||||
assert a.value == bytes(reversed(v))
|
||||
|
||||
# Packed adapter with single element format
|
||||
v = 1234
|
||||
@@ -129,10 +257,10 @@ def test_CharacteristicAdapter():
|
||||
a = PackedCharacteristicAdapter(c, '>H')
|
||||
|
||||
value = a.read_value(None)
|
||||
assert(value == pv)
|
||||
assert value == pv
|
||||
c.value = None
|
||||
a.write_value(None, pv)
|
||||
assert(a.value == v)
|
||||
assert a.value == v
|
||||
|
||||
# Packed adapter with multi-element format
|
||||
v1 = 1234
|
||||
@@ -142,10 +270,10 @@ def test_CharacteristicAdapter():
|
||||
a = PackedCharacteristicAdapter(c, '>HH')
|
||||
|
||||
value = a.read_value(None)
|
||||
assert(value == pv)
|
||||
assert value == pv
|
||||
c.value = None
|
||||
a.write_value(None, pv)
|
||||
assert(a.value == (v1, v2))
|
||||
assert a.value == (v1, v2)
|
||||
|
||||
# Mapped adapter
|
||||
v1 = 1234
|
||||
@@ -156,10 +284,10 @@ def test_CharacteristicAdapter():
|
||||
a = MappedCharacteristicAdapter(c, '>HH', ('v1', 'v2'))
|
||||
|
||||
value = a.read_value(None)
|
||||
assert(value == pv)
|
||||
assert value == pv
|
||||
c.value = None
|
||||
a.write_value(None, pv)
|
||||
assert(a.value == mapped)
|
||||
assert a.value == mapped
|
||||
|
||||
# UTF-8 adapter
|
||||
v = 'Hello π'
|
||||
@@ -168,10 +296,10 @@ def test_CharacteristicAdapter():
|
||||
a = UTF8CharacteristicAdapter(c)
|
||||
|
||||
value = a.read_value(None)
|
||||
assert(value == ev)
|
||||
assert value == ev
|
||||
c.value = None
|
||||
a.write_value(None, ev)
|
||||
assert(a.value == v)
|
||||
assert a.value == v
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@@ -179,24 +307,25 @@ def test_CharacteristicValue():
|
||||
b = bytes([1, 2, 3])
|
||||
c = CharacteristicValue(read=lambda _: b)
|
||||
x = c.read(None)
|
||||
assert(x == b)
|
||||
assert x == b
|
||||
|
||||
result = []
|
||||
c = CharacteristicValue(write=lambda connection, value: result.append((connection, value)))
|
||||
z = object()
|
||||
c.write(z, b)
|
||||
assert(result == [(z, b)])
|
||||
assert result == [(z, b)]
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
class TwoDevices:
|
||||
class LinkedDevices:
|
||||
def __init__(self):
|
||||
self.connections = [None, None]
|
||||
self.connections = [None, None, None]
|
||||
|
||||
self.link = LocalLink()
|
||||
self.controllers = [
|
||||
Controller('C1', link = self.link),
|
||||
Controller('C2', link = self.link)
|
||||
Controller('C2', link = self.link),
|
||||
Controller('C3', link = self.link)
|
||||
]
|
||||
self.devices = [
|
||||
Device(
|
||||
@@ -204,12 +333,16 @@ class TwoDevices:
|
||||
host = Host(self.controllers[0], AsyncPipeSink(self.controllers[0]))
|
||||
),
|
||||
Device(
|
||||
address = 'F5:F4:F3:F2:F1:F0',
|
||||
address = 'F1:F2:F3:F4:F5:F6',
|
||||
host = Host(self.controllers[1], AsyncPipeSink(self.controllers[1]))
|
||||
),
|
||||
Device(
|
||||
address = 'F2:F3:F4:F5:F6:F7',
|
||||
host = Host(self.controllers[2], AsyncPipeSink(self.controllers[2]))
|
||||
)
|
||||
]
|
||||
|
||||
self.paired = [None, None]
|
||||
self.paired = [None, None, None]
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@@ -222,7 +355,7 @@ async def async_barrier():
|
||||
# -----------------------------------------------------------------------------
|
||||
@pytest.mark.asyncio
|
||||
async def test_read_write():
|
||||
[client, server] = TwoDevices().devices
|
||||
[client, server] = LinkedDevices().devices[:2]
|
||||
|
||||
characteristic1 = Characteristic(
|
||||
'FDB159DB-036C-49E3-B3DB-6325AC750806',
|
||||
@@ -265,41 +398,41 @@ async def test_read_write():
|
||||
await peer.discover_services()
|
||||
await peer.discover_characteristics()
|
||||
c = peer.get_characteristics_by_uuid(characteristic1.uuid)
|
||||
assert(len(c) == 1)
|
||||
assert len(c) == 1
|
||||
c1 = c[0]
|
||||
c = peer.get_characteristics_by_uuid(characteristic2.uuid)
|
||||
assert(len(c) == 1)
|
||||
assert len(c) == 1
|
||||
c2 = c[0]
|
||||
|
||||
v1 = await peer.read_value(c1)
|
||||
assert(v1 == b'')
|
||||
assert v1 == b''
|
||||
b = bytes([1, 2, 3])
|
||||
await peer.write_value(c1, b)
|
||||
await async_barrier()
|
||||
assert(characteristic1.value == b)
|
||||
assert characteristic1.value == b
|
||||
v1 = await peer.read_value(c1)
|
||||
assert(v1 == b)
|
||||
assert(type(characteristic1._last_value) is tuple)
|
||||
assert(len(characteristic1._last_value) == 2)
|
||||
assert(str(characteristic1._last_value[0].peer_address) == str(client.random_address))
|
||||
assert(characteristic1._last_value[1] == b)
|
||||
assert v1 == b
|
||||
assert type(characteristic1._last_value is tuple)
|
||||
assert len(characteristic1._last_value) == 2
|
||||
assert str(characteristic1._last_value[0].peer_address) == str(client.random_address)
|
||||
assert characteristic1._last_value[1] == b
|
||||
bb = bytes([3, 4, 5, 6])
|
||||
characteristic1.value = bb
|
||||
v1 = await peer.read_value(c1)
|
||||
assert(v1 == bb)
|
||||
assert v1 == bb
|
||||
|
||||
await peer.write_value(c2, b)
|
||||
await async_barrier()
|
||||
assert(type(characteristic2._last_value) is tuple)
|
||||
assert(len(characteristic2._last_value) == 2)
|
||||
assert(str(characteristic2._last_value[0].peer_address) == str(client.random_address))
|
||||
assert(characteristic2._last_value[1] == b)
|
||||
assert type(characteristic2._last_value is tuple)
|
||||
assert len(characteristic2._last_value) == 2
|
||||
assert str(characteristic2._last_value[0].peer_address) == str(client.random_address)
|
||||
assert characteristic2._last_value[1] == b
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@pytest.mark.asyncio
|
||||
async def test_read_write2():
|
||||
[client, server] = TwoDevices().devices
|
||||
[client, server] = LinkedDevices().devices[:2]
|
||||
|
||||
v = bytes([0x11, 0x22, 0x33, 0x44])
|
||||
characteristic1 = Characteristic(
|
||||
@@ -324,32 +457,32 @@ async def test_read_write2():
|
||||
|
||||
await peer.discover_services()
|
||||
c = peer.get_services_by_uuid(service1.uuid)
|
||||
assert(len(c) == 1)
|
||||
assert len(c) == 1
|
||||
s = c[0]
|
||||
await s.discover_characteristics()
|
||||
c = s.get_characteristics_by_uuid(characteristic1.uuid)
|
||||
assert(len(c) == 1)
|
||||
assert len(c) == 1
|
||||
c1 = c[0]
|
||||
|
||||
v1 = await c1.read_value()
|
||||
assert(v1 == v)
|
||||
assert v1 == v
|
||||
|
||||
a1 = PackedCharacteristicAdapter(c1, '>I')
|
||||
v1 = await a1.read_value()
|
||||
assert(v1 == struct.unpack('>I', v)[0])
|
||||
assert v1 == struct.unpack('>I', v)[0]
|
||||
|
||||
b = bytes([0x55, 0x66, 0x77, 0x88])
|
||||
await a1.write_value(struct.unpack('>I', b)[0])
|
||||
await async_barrier()
|
||||
assert(characteristic1.value == b)
|
||||
assert characteristic1.value == b
|
||||
v1 = await a1.read_value()
|
||||
assert(v1 == struct.unpack('>I', b)[0])
|
||||
assert v1 == struct.unpack('>I', b)[0]
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@pytest.mark.asyncio
|
||||
async def test_subscribe_notify():
|
||||
[client, server] = TwoDevices().devices
|
||||
[client, server] = LinkedDevices().devices[:2]
|
||||
|
||||
characteristic1 = Characteristic(
|
||||
'FDB159DB-036C-49E3-B3DB-6325AC750806',
|
||||
@@ -410,13 +543,13 @@ async def test_subscribe_notify():
|
||||
await peer.discover_services()
|
||||
await peer.discover_characteristics()
|
||||
c = peer.get_characteristics_by_uuid(characteristic1.uuid)
|
||||
assert(len(c) == 1)
|
||||
assert len(c) == 1
|
||||
c1 = c[0]
|
||||
c = peer.get_characteristics_by_uuid(characteristic2.uuid)
|
||||
assert(len(c) == 1)
|
||||
assert len(c) == 1
|
||||
c2 = c[0]
|
||||
c = peer.get_characteristics_by_uuid(characteristic3.uuid)
|
||||
assert(len(c) == 1)
|
||||
assert len(c) == 1
|
||||
c3 = c[0]
|
||||
|
||||
c1._called = False
|
||||
@@ -429,23 +562,32 @@ async def test_subscribe_notify():
|
||||
c1.on('update', on_c1_update)
|
||||
await peer.subscribe(c1)
|
||||
await async_barrier()
|
||||
assert(server._last_subscription[1] == characteristic1)
|
||||
assert(server._last_subscription[2])
|
||||
assert(not server._last_subscription[3])
|
||||
assert(characteristic1._last_subscription[1])
|
||||
assert(not characteristic1._last_subscription[2])
|
||||
assert server._last_subscription[1] == characteristic1
|
||||
assert server._last_subscription[2]
|
||||
assert not server._last_subscription[3]
|
||||
assert characteristic1._last_subscription[1]
|
||||
assert not characteristic1._last_subscription[2]
|
||||
await server.indicate_subscribers(characteristic1)
|
||||
await async_barrier()
|
||||
assert(not c1._called)
|
||||
assert not c1._called
|
||||
await server.notify_subscribers(characteristic1)
|
||||
await async_barrier()
|
||||
assert(c1._called)
|
||||
assert(c1._last_update == characteristic1.value)
|
||||
assert c1._called
|
||||
assert c1._last_update == characteristic1.value
|
||||
|
||||
c1._called = False
|
||||
c1._last_update = None
|
||||
c1_value = characteristic1.value
|
||||
await server.notify_subscribers(characteristic1, bytes([0, 1, 2]))
|
||||
await async_barrier()
|
||||
assert c1._called
|
||||
assert c1._last_update == bytes([0, 1, 2])
|
||||
assert characteristic1.value == c1_value
|
||||
|
||||
c1._called = False
|
||||
await peer.unsubscribe(c1)
|
||||
await server.notify_subscribers(characteristic1)
|
||||
assert(not c1._called)
|
||||
assert not c1._called
|
||||
|
||||
c2._called = False
|
||||
c2._last_update = None
|
||||
@@ -458,17 +600,17 @@ async def test_subscribe_notify():
|
||||
await async_barrier()
|
||||
await server.notify_subscriber(characteristic2._last_subscription[0], characteristic2)
|
||||
await async_barrier()
|
||||
assert(not c2._called)
|
||||
assert not c2._called
|
||||
await server.indicate_subscriber(characteristic2._last_subscription[0], characteristic2)
|
||||
await async_barrier()
|
||||
assert(c2._called)
|
||||
assert(c2._last_update == characteristic2.value)
|
||||
assert c2._called
|
||||
assert c2._last_update == characteristic2.value
|
||||
|
||||
c2._called = False
|
||||
await peer.unsubscribe(c2, on_c2_update)
|
||||
await server.indicate_subscriber(characteristic2._last_subscription[0], characteristic2)
|
||||
await async_barrier()
|
||||
assert(not c2._called)
|
||||
assert not c2._called
|
||||
|
||||
def on_c3_update(value):
|
||||
c3._called = True
|
||||
@@ -483,17 +625,17 @@ async def test_subscribe_notify():
|
||||
await async_barrier()
|
||||
await server.notify_subscriber(characteristic3._last_subscription[0], characteristic3)
|
||||
await async_barrier()
|
||||
assert(c3._called)
|
||||
assert(c3._last_update == characteristic3.value)
|
||||
assert(c3._called_2)
|
||||
assert(c3._last_update_2 == characteristic3.value)
|
||||
assert c3._called
|
||||
assert c3._last_update == characteristic3.value
|
||||
assert c3._called_2
|
||||
assert c3._last_update_2 == characteristic3.value
|
||||
characteristic3.value = bytes([1, 2, 3])
|
||||
await server.indicate_subscriber(characteristic3._last_subscription[0], characteristic3)
|
||||
await async_barrier()
|
||||
assert(c3._called)
|
||||
assert(c3._last_update == characteristic3.value)
|
||||
assert(c3._called_2)
|
||||
assert(c3._last_update_2 == characteristic3.value)
|
||||
assert c3._called
|
||||
assert c3._last_update == characteristic3.value
|
||||
assert c3._called_2
|
||||
assert c3._last_update_2 == characteristic3.value
|
||||
|
||||
c3._called = False
|
||||
c3._called_2 = False
|
||||
@@ -501,8 +643,44 @@ async def test_subscribe_notify():
|
||||
await server.notify_subscriber(characteristic3._last_subscription[0], characteristic3)
|
||||
await server.indicate_subscriber(characteristic3._last_subscription[0], characteristic3)
|
||||
await async_barrier()
|
||||
assert(not c3._called)
|
||||
assert(not c3._called_2)
|
||||
assert not c3._called
|
||||
assert not c3._called_2
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@pytest.mark.asyncio
|
||||
async def test_mtu_exchange():
|
||||
[d1, d2, d3] = LinkedDevices().devices[:3]
|
||||
|
||||
d3.gatt_server.max_mtu = 100
|
||||
|
||||
d3_connections = []
|
||||
@d3.on('connection')
|
||||
def on_d3_connection(connection):
|
||||
d3_connections.append(connection)
|
||||
|
||||
await d1.power_on()
|
||||
await d2.power_on()
|
||||
await d3.power_on()
|
||||
|
||||
d1_connection = await d1.connect(d3.random_address)
|
||||
assert len(d3_connections) == 1
|
||||
assert d3_connections[0] is not None
|
||||
|
||||
d2_connection = await d2.connect(d3.random_address)
|
||||
assert len(d3_connections) == 2
|
||||
assert d3_connections[1] is not None
|
||||
|
||||
d1_peer = Peer(d1_connection)
|
||||
d2_peer = Peer(d2_connection)
|
||||
|
||||
d1_client_mtu = await d1_peer.request_mtu(220)
|
||||
assert d1_client_mtu == 100
|
||||
assert d1_connection.att_mtu == 100
|
||||
|
||||
d2_client_mtu = await d2_peer.request_mtu(50)
|
||||
assert d2_client_mtu == 50
|
||||
assert d2_connection.att_mtu == 50
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@@ -510,6 +688,9 @@ async def async_main():
|
||||
await test_read_write()
|
||||
await test_read_write2()
|
||||
await test_subscribe_notify()
|
||||
await test_characteristic_encoding()
|
||||
await test_mtu_exchange()
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
if __name__ == '__main__':
|
||||
|
||||
@@ -27,8 +27,8 @@ def basic_check(x):
|
||||
parsed_str = str(parsed)
|
||||
print(x_str)
|
||||
parsed_bytes = parsed.to_bytes()
|
||||
assert(x_str == parsed_str)
|
||||
assert(packet == parsed_bytes)
|
||||
assert x_str == parsed_str
|
||||
assert packet == parsed_bytes
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@@ -49,10 +49,10 @@ def test_HCI_LE_Connection_Complete_Event():
|
||||
role = 1,
|
||||
peer_address_type = 1,
|
||||
peer_address = address,
|
||||
conn_interval = 3,
|
||||
conn_latency = 4,
|
||||
connection_interval = 3,
|
||||
peripheral_latency = 4,
|
||||
supervision_timeout = 5,
|
||||
master_clock_accuracy = 6
|
||||
central_clock_accuracy = 6
|
||||
)
|
||||
basic_check(event)
|
||||
|
||||
@@ -60,8 +60,8 @@ def test_HCI_LE_Connection_Complete_Event():
|
||||
# -----------------------------------------------------------------------------
|
||||
def test_HCI_LE_Advertising_Report_Event():
|
||||
address = Address('00:11:22:33:44:55')
|
||||
report = HCI_Object(
|
||||
HCI_LE_Advertising_Report_Event.REPORT_FIELDS,
|
||||
report = HCI_LE_Advertising_Report_Event.Report(
|
||||
HCI_LE_Advertising_Report_Event.Report.FIELDS,
|
||||
event_type = HCI_LE_Advertising_Report_Event.ADV_IND,
|
||||
address_type = Address.PUBLIC_DEVICE_ADDRESS,
|
||||
address = address,
|
||||
@@ -87,8 +87,8 @@ def test_HCI_LE_Connection_Update_Complete_Event():
|
||||
event = HCI_LE_Connection_Update_Complete_Event(
|
||||
status = HCI_SUCCESS,
|
||||
connection_handle = 0x007,
|
||||
conn_interval = 10,
|
||||
conn_latency = 3,
|
||||
connection_interval = 10,
|
||||
peripheral_latency = 3,
|
||||
supervision_timeout = 5
|
||||
)
|
||||
basic_check(event)
|
||||
@@ -133,7 +133,7 @@ def test_HCI_Command_Complete_Event():
|
||||
)
|
||||
basic_check(event)
|
||||
event = HCI_Packet.from_bytes(event.to_bytes())
|
||||
assert(event.return_parameters == 7)
|
||||
assert event.return_parameters == 7
|
||||
|
||||
# With a simple status as an integer status
|
||||
event = HCI_Command_Complete_Event(
|
||||
@@ -142,7 +142,7 @@ def test_HCI_Command_Complete_Event():
|
||||
return_parameters = 9
|
||||
)
|
||||
basic_check(event)
|
||||
assert(event.return_parameters == 9)
|
||||
assert event.return_parameters == 9
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@@ -283,12 +283,32 @@ def test_HCI_LE_Create_Connection_Command():
|
||||
peer_address_type = 1,
|
||||
peer_address = Address('00:11:22:33:44:55'),
|
||||
own_address_type = 2,
|
||||
conn_interval_min = 7,
|
||||
conn_interval_max = 8,
|
||||
conn_latency = 9,
|
||||
connection_interval_min = 7,
|
||||
connection_interval_max = 8,
|
||||
max_latency = 9,
|
||||
supervision_timeout = 10,
|
||||
minimum_ce_length = 11,
|
||||
maximum_ce_length = 12
|
||||
min_ce_length = 11,
|
||||
max_ce_length = 12
|
||||
)
|
||||
basic_check(command)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
def test_HCI_LE_Extended_Create_Connection_Command():
|
||||
command = HCI_LE_Extended_Create_Connection_Command(
|
||||
initiator_filter_policy = 0,
|
||||
own_address_type = 0,
|
||||
peer_address_type = 1,
|
||||
peer_address = Address('00:11:22:33:44:55'),
|
||||
initiating_phys = 3,
|
||||
scan_intervals = (10, 11),
|
||||
scan_windows = (12, 13),
|
||||
connection_interval_mins = (14, 15),
|
||||
connection_interval_maxs = (16, 17),
|
||||
max_latencies = (18, 19),
|
||||
supervision_timeouts = (20, 21),
|
||||
min_ce_lengths = (100, 101),
|
||||
max_ce_lengths = (102, 103)
|
||||
)
|
||||
basic_check(command)
|
||||
|
||||
@@ -314,13 +334,13 @@ def test_HCI_LE_Remove_Device_From_Filter_Accept_List_Command():
|
||||
# -----------------------------------------------------------------------------
|
||||
def test_HCI_LE_Connection_Update_Command():
|
||||
command = HCI_LE_Connection_Update_Command(
|
||||
connection_handle = 0x0002,
|
||||
conn_interval_min = 10,
|
||||
conn_interval_max = 20,
|
||||
conn_latency = 7,
|
||||
supervision_timeout = 3,
|
||||
minimum_ce_length = 100,
|
||||
maximum_ce_length = 200
|
||||
connection_handle = 0x0002,
|
||||
connection_interval_min = 10,
|
||||
connection_interval_max = 20,
|
||||
max_latency = 7,
|
||||
supervision_timeout = 3,
|
||||
min_ce_length = 100,
|
||||
max_ce_length = 200
|
||||
)
|
||||
basic_check(command)
|
||||
|
||||
@@ -348,7 +368,7 @@ def test_HCI_LE_Set_Extended_Scan_Parameters_Command():
|
||||
command = HCI_LE_Set_Extended_Scan_Parameters_Command(
|
||||
own_address_type=Address.RANDOM_DEVICE_ADDRESS,
|
||||
scanning_filter_policy=HCI_LE_Set_Extended_Scan_Parameters_Command.BASIC_FILTERED_POLICY,
|
||||
scanning_phys=(1 << HCI_LE_Set_Extended_Scan_Parameters_Command.LE_1M_PHY | 1 << HCI_LE_Set_Extended_Scan_Parameters_Command.LE_CODED_PHY | 1 << 4),
|
||||
scanning_phys=(1 << HCI_LE_1M_PHY_BIT | 1 << HCI_LE_CODED_PHY_BIT | 1 << 4),
|
||||
scan_types=[
|
||||
HCI_LE_Set_Extended_Scan_Parameters_Command.ACTIVE_SCANNING,
|
||||
HCI_LE_Set_Extended_Scan_Parameters_Command.ACTIVE_SCANNING,
|
||||
@@ -363,20 +383,20 @@ def test_HCI_LE_Set_Extended_Scan_Parameters_Command():
|
||||
# -----------------------------------------------------------------------------
|
||||
def test_address():
|
||||
a = Address('C4:F2:17:1A:1D:BB')
|
||||
assert(not a.is_public)
|
||||
assert(a.is_random)
|
||||
assert(a.address_type == Address.RANDOM_DEVICE_ADDRESS)
|
||||
assert(not a.is_resolvable)
|
||||
assert(not a.is_resolved)
|
||||
assert(a.is_static)
|
||||
assert not a.is_public
|
||||
assert a.is_random
|
||||
assert a.address_type == Address.RANDOM_DEVICE_ADDRESS
|
||||
assert not a.is_resolvable
|
||||
assert not a.is_resolved
|
||||
assert a.is_static
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
def test_custom():
|
||||
data = bytes([0x77, 0x02, 0x01, 0x03])
|
||||
packet = HCI_CustomPacket(data)
|
||||
assert(packet.hci_packet_type == 0x77)
|
||||
assert(packet.payload == data)
|
||||
assert packet.hci_packet_type == 0x77
|
||||
assert packet.payload == data
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@@ -408,6 +428,7 @@ def run_test_commands():
|
||||
test_HCI_LE_Set_Scan_Parameters_Command()
|
||||
test_HCI_LE_Set_Scan_Enable_Command()
|
||||
test_HCI_LE_Create_Connection_Command()
|
||||
test_HCI_LE_Extended_Create_Connection_Command()
|
||||
test_HCI_LE_Add_Device_To_Filter_Accept_List_Command()
|
||||
test_HCI_LE_Remove_Device_From_Filter_Accept_List_Command()
|
||||
test_HCI_LE_Connection_Update_Command()
|
||||
|
||||
@@ -62,6 +62,57 @@ def test_import():
|
||||
assert utils
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
def test_app_imports():
|
||||
from bumble.apps.console import main
|
||||
assert main
|
||||
|
||||
from bumble.apps.controller_info import main
|
||||
assert main
|
||||
|
||||
from bumble.apps.controllers import main
|
||||
assert main
|
||||
|
||||
from bumble.apps.gatt_dump import main
|
||||
assert main
|
||||
|
||||
from bumble.apps.gg_bridge import main
|
||||
assert main
|
||||
|
||||
from bumble.apps.hci_bridge import main
|
||||
assert main
|
||||
|
||||
from bumble.apps.pair import main
|
||||
assert main
|
||||
|
||||
from bumble.apps.scan import main
|
||||
assert main
|
||||
|
||||
from bumble.apps.show import main
|
||||
assert main
|
||||
|
||||
from bumble.apps.unbond import main
|
||||
assert main
|
||||
|
||||
from bumble.apps.usb_probe import main
|
||||
assert main
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
def test_profiles_imports():
|
||||
from bumble.profiles import (
|
||||
battery_service,
|
||||
device_information_service,
|
||||
heart_rate_service
|
||||
)
|
||||
|
||||
assert battery_service
|
||||
assert device_information_service
|
||||
assert heart_rate_service
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
if __name__ == '__main__':
|
||||
test_import()
|
||||
test_app_imports()
|
||||
test_profiles_imports()
|
||||
|
||||
@@ -21,9 +21,9 @@ from bumble.transport import PacketParser
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
class ScannerListener(Device.Listener):
|
||||
def on_advertisement(self, address, ad_data, rssi, connectable):
|
||||
address_type_string = ('P', 'R', 'PI', 'RI')[address.address_type]
|
||||
print(f'>>> {address} [{address_type_string}]: RSSI={rssi}, {ad_data}')
|
||||
def on_advertisement(self, advertisement):
|
||||
address_type_string = ('P', 'R', 'PI', 'RI')[advertisement.address.address_type]
|
||||
print(f'>>> {advertisement.address} [{address_type_string}]: RSSI={advertisement.rssi}, {advertisement.ad_data}')
|
||||
|
||||
|
||||
class HciSource:
|
||||
|
||||
Reference in New Issue
Block a user