Compare commits

...

8 Commits

Author SHA1 Message Date
Gilles Boccon-Gibod
ce25cdc223 fix tests 2022-10-17 14:32:20 -07:00
Gilles Boccon-Gibod
9c429ec75a add phy options 2022-10-17 09:47:09 -07:00
Gilles Boccon-Gibod
d5eebc2101 add AdvertisingType 2022-10-15 21:43:08 -07:00
Gilles Boccon-Gibod
d10dda7e10 wip 2022-10-15 14:44:46 -07:00
Gilles Boccon-Gibod
c316e8805f wip 2022-10-15 14:44:46 -07:00
Gilles Boccon-Gibod
de7e74652d more HCI commands 2022-10-15 14:44:46 -07:00
Gilles Boccon-Gibod
31edd58b3d add extended report class 2022-10-15 14:44:46 -07:00
Gilles Boccon-Gibod
7c4b042026 wip 2022-10-15 14:44:46 -07:00
16 changed files with 1972 additions and 523 deletions

View File

@@ -28,11 +28,16 @@ import click
from collections import OrderedDict from collections import OrderedDict
import colors import colors
from bumble.core import UUID, AdvertisingData from bumble.core import UUID, AdvertisingData, TimeoutError, BT_LE_TRANSPORT
from bumble.device import Device, Connection, Peer from bumble.device import ConnectionParametersPreferences, Device, Connection, Peer
from bumble.utils import AsyncRunner from bumble.utils import AsyncRunner
from bumble.transport import open_transport_or_link from bumble.transport import open_transport_or_link
from bumble.gatt import Characteristic from bumble.gatt import Characteristic
from bumble.hci import (
HCI_LE_1M_PHY,
HCI_LE_2M_PHY,
HCI_LE_CODED_PHY,
)
from prompt_toolkit import Application from prompt_toolkit import Application
from prompt_toolkit.history import FileHistory from prompt_toolkit.history import FileHistory
@@ -43,6 +48,7 @@ from prompt_toolkit.styles import Style
from prompt_toolkit.filters import Condition from prompt_toolkit.filters import Condition
from prompt_toolkit.widgets import TextArea, Frame from prompt_toolkit.widgets import TextArea, Frame
from prompt_toolkit.widgets.toolbars import FormattedTextToolbar from prompt_toolkit.widgets.toolbars import FormattedTextToolbar
from prompt_toolkit.data_structures import Point
from prompt_toolkit.layout import ( from prompt_toolkit.layout import (
Layout, Layout,
HSplit, HSplit,
@@ -51,17 +57,20 @@ from prompt_toolkit.layout import (
Float, Float,
FormattedTextControl, FormattedTextControl,
FloatContainer, FloatContainer,
ConditionalContainer ConditionalContainer,
Dimension
) )
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# Constants # Constants
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
BUMBLE_USER_DIR = os.path.expanduser('~/.bumble') BUMBLE_USER_DIR = os.path.expanduser('~/.bumble')
DEFAULT_PROMPT_HEIGHT = 20
DEFAULT_RSSI_BAR_WIDTH = 20 DEFAULT_RSSI_BAR_WIDTH = 20
DEFAULT_CONNECTION_TIMEOUT = 30.0
DISPLAY_MIN_RSSI = -100 DISPLAY_MIN_RSSI = -100
DISPLAY_MAX_RSSI = -30 DISPLAY_MAX_RSSI = -30
RSSI_MONITOR_INTERVAL = 5.0 # Seconds
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# Globals # Globals
@@ -69,6 +78,45 @@ DISPLAY_MAX_RSSI = -30
App = None App = None
# -----------------------------------------------------------------------------
# Utils
# -----------------------------------------------------------------------------
def le_phy_name(phy_id):
return {
HCI_LE_1M_PHY: '1M',
HCI_LE_2M_PHY: '2M',
HCI_LE_CODED_PHY: 'CODED'
}.get(phy_id, HCI_Constant.le_phy_name(phy_id))
def rssi_bar(rssi):
blocks = ['', '', '', '', '', '', '', '']
bar_width = (rssi - DISPLAY_MIN_RSSI) / (DISPLAY_MAX_RSSI - DISPLAY_MIN_RSSI)
bar_width = min(max(bar_width, 0), 1)
bar_ticks = int(bar_width * DEFAULT_RSSI_BAR_WIDTH * 8)
bar_blocks = ('' * int(bar_ticks / 8)) + blocks[bar_ticks % 8]
return f'{rssi:4} {bar_blocks}'
def parse_phys(phys):
if phys.lower() == '*':
return None
else:
phy_list = []
elements = phys.lower().split(',')
for element in elements:
if element == '1m':
phy_list.append(HCI_LE_1M_PHY)
elif element == '2m':
phy_list.append(HCI_LE_2M_PHY)
elif element == 'coded':
phy_list.append(HCI_LE_CODED_PHY)
else:
raise ValueError('invalid PHY name')
return phy_list
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# Console App # Console App
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -79,6 +127,8 @@ class ConsoleApp:
self.device = None self.device = None
self.connected_peer = None self.connected_peer = None
self.top_tab = 'scan' self.top_tab = 'scan'
self.monitor_rssi = False
self.connection_rssi = None
style = Style.from_dict({ style = Style.from_dict({
'output-field': 'bg:#000044 #ffffff', 'output-field': 'bg:#000044 #ffffff',
@@ -106,6 +156,10 @@ class ConsoleApp:
'on': None, 'on': None,
'off': None 'off': None
}, },
'rssi': {
'on': None,
'off': None
},
'show': { 'show': {
'scan': None, 'scan': None,
'services': None, 'services': None,
@@ -120,10 +174,17 @@ class ConsoleApp:
'services': None, 'services': None,
'attributes': None 'attributes': None
}, },
'request-mtu': None,
'read': LiveCompleter(self.known_attributes), 'read': LiveCompleter(self.known_attributes),
'write': LiveCompleter(self.known_attributes), 'write': LiveCompleter(self.known_attributes),
'subscribe': LiveCompleter(self.known_attributes), 'subscribe': LiveCompleter(self.known_attributes),
'unsubscribe': LiveCompleter(self.known_attributes), 'unsubscribe': LiveCompleter(self.known_attributes),
'set-phy': {
'1m': None,
'2m': None,
'coded': None
},
'set-default-phy': None,
'quit': None, 'quit': None,
'exit': None 'exit': None
}) })
@@ -139,14 +200,16 @@ class ConsoleApp:
self.input_field.accept_handler = self.accept_input self.input_field.accept_handler = self.accept_input
self.output_height = 7 self.output_height = Dimension(min=7, max=7, weight=1)
self.output_lines = [] self.output_lines = []
self.output = FormattedTextControl() self.output = FormattedTextControl(get_cursor_position=lambda: Point(0, max(0, len(self.output_lines) - 1)))
self.output_max_lines = 20
self.scan_results_text = FormattedTextControl() self.scan_results_text = FormattedTextControl()
self.services_text = FormattedTextControl() self.services_text = FormattedTextControl()
self.attributes_text = FormattedTextControl() self.attributes_text = FormattedTextControl()
self.log_text = FormattedTextControl() self.log_text = FormattedTextControl(get_cursor_position=lambda: Point(0, max(0, len(self.log_lines) - 1)))
self.log_height = 20 self.log_height = Dimension(min=7, weight=4)
self.log_max_lines = 100
self.log_lines = [] self.log_lines = []
container = HSplit([ container = HSplit([
@@ -163,11 +226,10 @@ class ConsoleApp:
filter=Condition(lambda: self.top_tab == 'attributes') filter=Condition(lambda: self.top_tab == 'attributes')
), ),
ConditionalContainer( ConditionalContainer(
Frame(Window(self.log_text), title='Log'), Frame(Window(self.log_text, height=self.log_height), title='Log'),
filter=Condition(lambda: self.top_tab == 'log') filter=Condition(lambda: self.top_tab == 'log')
), ),
Frame(Window(self.output), height=self.output_height), Frame(Window(self.output, height=self.output_height)),
# HorizontalLine(),
FormattedTextToolbar(text=self.get_status_bar_text, style='reverse'), FormattedTextToolbar(text=self.get_status_bar_text, style='reverse'),
self.input_field self.input_field
]) ])
@@ -199,6 +261,8 @@ class ConsoleApp:
) )
async def run_async(self, device_config, transport): async def run_async(self, device_config, transport):
rssi_monitoring_task = asyncio.create_task(self.rssi_monitor_loop())
async with await open_transport_or_link(transport) as (hci_source, hci_sink): async with await open_transport_or_link(transport) as (hci_source, hci_sink):
if device_config: if device_config:
self.device = Device.from_config_file_with_hci(device_config, hci_source, hci_sink) self.device = Device.from_config_file_with_hci(device_config, hci_source, hci_sink)
@@ -210,6 +274,8 @@ class ConsoleApp:
# Run the UI # Run the UI
await self.ui.run_async() await self.ui.run_async()
rssi_monitoring_task.cancel()
def add_known_address(self, address): def add_known_address(self, address):
self.known_addresses.add(address) self.known_addresses.add(address)
@@ -224,22 +290,33 @@ class ConsoleApp:
connection_state = 'NONE' connection_state = 'NONE'
encryption_state = '' encryption_state = ''
att_mtu = ''
rssi = '' if self.connection_rssi is None else rssi_bar(self.connection_rssi)
if self.device: if self.device:
if self.device.is_connecting: if self.device.is_connecting:
connection_state = 'CONNECTING' connection_state = 'CONNECTING'
elif self.connected_peer: elif self.connected_peer:
connection = self.connected_peer.connection connection = self.connected_peer.connection
connection_parameters = f'{connection.parameters.connection_interval}/{connection.parameters.connection_latency}/{connection.parameters.supervision_timeout}' connection_parameters = f'{connection.parameters.connection_interval}/{connection.parameters.peripheral_latency}/{connection.parameters.supervision_timeout}'
connection_state = f'{connection.peer_address} {connection_parameters} {connection.data_length}' if connection.transport == BT_LE_TRANSPORT:
phy_state = f' RX={le_phy_name(connection.phy.rx_phy)}/TX={le_phy_name(connection.phy.tx_phy)}'
else:
phy_state = ''
connection_state = f'{connection.peer_address} {connection_parameters} {connection.data_length}{phy_state}'
encryption_state = 'ENCRYPTED' if connection.is_encrypted else 'NOT ENCRYPTED' encryption_state = 'ENCRYPTED' if connection.is_encrypted else 'NOT ENCRYPTED'
att_mtu = f'ATT_MTU: {connection.att_mtu}'
return [ return [
('ansigreen', f' SCAN: {scanning} '), ('ansigreen', f' SCAN: {scanning} '),
('', ' '), ('', ' '),
('ansiblue', f' CONNECTION: {connection_state} '), ('ansiblue', f' CONNECTION: {connection_state} '),
('', ' '), ('', ' '),
('ansimagenta', f' {encryption_state} ') ('ansimagenta', f' {encryption_state} '),
('', ' '),
('ansicyan', f' {att_mtu} '),
('', ' '),
('ansiyellow', f' {rssi} ')
] ]
def show_error(self, title, details = None): def show_error(self, title, details = None):
@@ -286,7 +363,7 @@ class ConsoleApp:
def append_to_output(self, line, invalidate=True): def append_to_output(self, line, invalidate=True):
if type(line) is str: if type(line) is str:
line = [('', line)] line = [('', line)]
self.output_lines = self.output_lines[-(self.output_height - 3):] self.output_lines = self.output_lines[-self.output_max_lines:]
self.output_lines.append(line) self.output_lines.append(line)
formatted_text = [] formatted_text = []
for line in self.output_lines: for line in self.output_lines:
@@ -298,7 +375,7 @@ class ConsoleApp:
def append_to_log(self, lines, invalidate=True): def append_to_log(self, lines, invalidate=True):
self.log_lines.extend(lines.split('\n')) self.log_lines.extend(lines.split('\n'))
self.log_lines = self.log_lines[-(self.log_height - 3):] self.log_lines = self.log_lines[-self.log_max_lines:]
self.log_text.text = ANSI('\n'.join(self.log_lines)) self.log_text.text = ANSI('\n'.join(self.log_lines))
if invalidate: if invalidate:
self.ui.invalidate() self.ui.invalidate()
@@ -351,6 +428,12 @@ class ConsoleApp:
if characteristic.handle == attribute_handle: if characteristic.handle == attribute_handle:
return characteristic return characteristic
async def rssi_monitor_loop(self):
while True:
if self.monitor_rssi and self.connected_peer:
self.connection_rssi = await self.connected_peer.connection.get_rssi()
await asyncio.sleep(RSSI_MONITOR_INTERVAL)
async def command(self, command): async def command(self, command):
try: try:
(keyword, *params) = command.strip().split(' ') (keyword, *params) = command.strip().split(' ')
@@ -379,16 +462,50 @@ class ConsoleApp:
else: else:
self.show_error('unsupported arguments for scan command') self.show_error('unsupported arguments for scan command')
async def do_rssi(self, params):
if len(params) == 0:
# Toggle monitoring
self.monitor_rssi = not self.monitor_rssi
elif params[0] == 'on':
self.monitor_rssi = True
elif params[0] == 'off':
self.monitor_rssi = False
else:
self.show_error('unsupported arguments for rssi command')
async def do_connect(self, params): async def do_connect(self, params):
if len(params) != 1: if len(params) != 1 and len(params) != 2:
self.show_error('invalid syntax', 'expected connect <address>') self.show_error('invalid syntax', 'expected connect <address> [phys]')
return return
if len(params) == 1:
phys = None
else:
phys = parse_phys(params[1])
if phys is None:
connection_parameters_preferences = None
else:
connection_parameters_preferences = {
phy: ConnectionParametersPreferences()
for phy in phys
}
self.append_to_output('connecting...') self.append_to_output('connecting...')
await self.device.connect(params[0])
try:
await self.device.connect(
params[0],
connection_parameters_preferences=connection_parameters_preferences,
timeout=DEFAULT_CONNECTION_TIMEOUT
)
self.top_tab = 'services' self.top_tab = 'services'
except TimeoutError:
self.show_error('connection timed out')
async def do_disconnect(self, params): async def do_disconnect(self, params):
if self.device.connecting:
await self.device.cancel_connection()
else:
if not self.connected_peer: if not self.connected_peer:
self.show_error('not connected') self.show_error('not connected')
return return
@@ -397,21 +514,21 @@ class ConsoleApp:
async def do_update_parameters(self, params): async def do_update_parameters(self, params):
if len(params) != 1 or len(params[0].split('/')) != 3: if len(params) != 1 or len(params[0].split('/')) != 3:
self.show_error('invalid syntax', 'expected update-parameters <interval-min>-<interval-max>/<latency>/<supervision>') self.show_error('invalid syntax', 'expected update-parameters <interval-min>-<interval-max>/<max-latency>/<supervision>')
return return
if not self.connected_peer: if not self.connected_peer:
self.show_error('not connected') self.show_error('not connected')
return return
connection_intervals, connection_latency, supervision_timeout = params[0].split('/') connection_intervals, max_latency, supervision_timeout = params[0].split('/')
connection_interval_min, connection_interval_max = [int(x) for x in connection_intervals.split('-')] connection_interval_min, connection_interval_max = [int(x) for x in connection_intervals.split('-')]
connection_latency = int(connection_latency) max_latency = int(max_latency)
supervision_timeout = int(supervision_timeout) supervision_timeout = int(supervision_timeout)
await self.connected_peer.connection.update_parameters( await self.connected_peer.connection.update_parameters(
connection_interval_min, connection_interval_min,
connection_interval_max, connection_interval_max,
connection_latency, max_latency,
supervision_timeout supervision_timeout
) )
@@ -442,6 +559,25 @@ class ConsoleApp:
self.top_tab = params[0] self.top_tab = params[0]
self.ui.invalidate() self.ui.invalidate()
async def do_get_phy(self, params):
if not self.connected_peer:
self.show_error('not connected')
return
phy = await self.connected_peer.connection.get_phy()
self.append_to_output(f'PHY: RX={HCI_Constant.le_phy_name(phy[0])}, TX={HCI_Constant.le_phy_name(phy[1])}')
async def do_request_mtu(self, params):
if len(params) != 1:
self.show_error('invalid syntax', 'expected request-mtu <mtu>')
return
if not self.connected_peer:
self.show_error('not connected')
return
await self.connected_peer.request_mtu(int(params[0]))
async def do_discover(self, params): async def do_discover(self, params):
if not params: if not params:
self.show_error('invalid syntax', 'expected discover services|attributes') self.show_error('invalid syntax', 'expected discover services|attributes')
@@ -454,14 +590,14 @@ class ConsoleApp:
await self.discover_attributes() await self.discover_attributes()
async def do_read(self, params): async def do_read(self, params):
if not self.connected_peer:
self.show_error('not connected')
return
if len(params) != 1: if len(params) != 1:
self.show_error('invalid syntax', 'expected read <attribute>') self.show_error('invalid syntax', 'expected read <attribute>')
return return
if not self.connected_peer:
self.show_error('not connected')
return
characteristic = self.find_characteristic(params[0]) characteristic = self.find_characteristic(params[0])
if characteristic is None: if characteristic is None:
self.show_error('no such characteristic') self.show_error('no such characteristic')
@@ -530,6 +666,42 @@ class ConsoleApp:
await characteristic.unsubscribe() await characteristic.unsubscribe()
async def do_set_phy(self, params):
if len(params) != 1:
self.show_error('invalid syntax', 'expected set-phy <tx_rx_phys>|<tx_phys>/<rx_phys>')
return
if not self.connected_peer:
self.show_error('not connected')
return
if '/' in params[0]:
tx_phys, rx_phys = params[0].split('/')
else:
tx_phys = params[0]
rx_phys = tx_phys
await self.connected_peer.connection.set_phy(
tx_phys=parse_phys(tx_phys),
rx_phys=parse_phys(rx_phys)
)
async def do_set_default_phy(self, params):
if len(params) != 1:
self.show_error('invalid syntax', 'expected set-default-phy <tx_rx_phys>|<tx_phys>/<rx_phys>')
return
if '/' in params[0]:
tx_phys, rx_phys = params[0].split('/')
else:
tx_phys = params[0]
rx_phys = tx_phys
await self.device.set_default_phy(
tx_phys=parse_phys(tx_phys),
rx_phys=parse_phys(rx_phys)
)
async def do_exit(self, params): async def do_exit(self, params):
self.ui.exit() self.ui.exit()
@@ -548,12 +720,14 @@ class DeviceListener(Device.Listener, Connection.Listener):
@AsyncRunner.run_in_task() @AsyncRunner.run_in_task()
async def on_connection(self, connection): async def on_connection(self, connection):
self.app.connected_peer = Peer(connection) self.app.connected_peer = Peer(connection)
self.app.connection_rssi = None
self.app.append_to_output(f'connected to {self.app.connected_peer}') self.app.append_to_output(f'connected to {self.app.connected_peer}')
connection.listener = self connection.listener = self
def on_disconnection(self, reason): def on_disconnection(self, reason):
self.app.append_to_output(f'disconnected from {self.app.connected_peer}, reason: {HCI_Constant.error_name(reason)}') self.app.append_to_output(f'disconnected from {self.app.connected_peer}, reason: {HCI_Constant.error_name(reason)}')
self.app.connected_peer = None self.app.connected_peer = None
self.app.connection_rssi = None
def on_connection_parameters_update(self): def on_connection_parameters_update(self):
self.app.append_to_output(f'connection parameters update: {self.app.connected_peer.connection.parameters}') self.app.append_to_output(f'connection parameters update: {self.app.connected_peer.connection.parameters}')
@@ -570,16 +744,16 @@ class DeviceListener(Device.Listener, Connection.Listener):
def on_connection_data_length_change(self): def on_connection_data_length_change(self):
self.app.append_to_output(f'connection data length change: {self.app.connected_peer.connection.data_length}') self.app.append_to_output(f'connection data length change: {self.app.connected_peer.connection.data_length}')
def on_advertisement(self, address, ad_data, rssi, connectable): def on_advertisement(self, advertisement):
entry_key = f'{address}/{address.address_type}' entry_key = f'{advertisement.address}/{advertisement.address.address_type}'
entry = self.scan_results.get(entry_key) entry = self.scan_results.get(entry_key)
if entry: if entry:
entry.ad_data = ad_data entry.ad_data = advertisement.data
entry.rssi = rssi entry.rssi = advertisement.rssi
entry.connectable = connectable entry.connectable = advertisement.is_connectable
else: else:
self.app.add_known_address(str(address)) self.app.add_known_address(str(advertisement.address))
self.scan_results[entry_key] = ScanResult(address, address.address_type, ad_data, rssi, connectable) self.scan_results[entry_key] = ScanResult(advertisement.address, advertisement.address.address_type, advertisement.data, advertisement.rssi, advertisement.is_connectable)
self.app.show_scan_results(self.scan_results) self.app.show_scan_results(self.scan_results)
@@ -616,12 +790,7 @@ class ScanResult:
name = '' name = ''
# RSSI bar # RSSI bar
blocks = ['', '', '', '', '', '', '', ''] bar_string = rssi_bar(self.rssi)
bar_width = (self.rssi - DISPLAY_MIN_RSSI) / (DISPLAY_MAX_RSSI - DISPLAY_MIN_RSSI)
bar_width = min(max(bar_width, 0), 1)
bar_ticks = int(bar_width * DEFAULT_RSSI_BAR_WIDTH * 8)
bar_blocks = ('' * int(bar_ticks / 8)) + blocks[bar_ticks % 8]
bar_string = f'{self.rssi} {bar_blocks}'
bar_padding = ' ' * (DEFAULT_RSSI_BAR_WIDTH + 5 - len(bar_string)) bar_padding = ' ' * (DEFAULT_RSSI_BAR_WIDTH + 5 - len(bar_string))
return f'{address_color(str(self.address))} [{type_color(address_type_string)}] {bar_string} {bar_padding} {name}' return f'{address_color(str(self.address))} [{type_color(address_type_string)}] {bar_string} {bar_padding} {name}'

View File

@@ -25,15 +25,21 @@ from bumble.company_ids import COMPANY_IDENTIFIERS
from bumble.core import name_or_number from bumble.core import name_or_number
from bumble.hci import ( from bumble.hci import (
map_null_terminated_utf8_string, map_null_terminated_utf8_string,
HCI_LE_SUPPORTED_FEATURES_NAMES,
HCI_SUCCESS, HCI_SUCCESS,
HCI_LE_SUPPORTED_FEATURES_NAMES,
HCI_VERSION_NAMES, HCI_VERSION_NAMES,
LMP_VERSION_NAMES, LMP_VERSION_NAMES,
HCI_Command, HCI_Command,
HCI_Read_BD_ADDR_Command,
HCI_READ_BD_ADDR_COMMAND, HCI_READ_BD_ADDR_COMMAND,
HCI_Read_BD_ADDR_Command,
HCI_READ_LOCAL_NAME_COMMAND,
HCI_Read_Local_Name_Command, HCI_Read_Local_Name_Command,
HCI_READ_LOCAL_NAME_COMMAND HCI_LE_READ_MAXIMUM_DATA_LENGTH_COMMAND,
HCI_LE_Read_Maximum_Data_Length_Command,
HCI_LE_READ_NUMBER_OF_SUPPORTED_ADVERTISING_SETS_COMMAND,
HCI_LE_Read_Number_Of_Supported_Advertising_Sets_Command,
HCI_LE_READ_MAXIMUM_ADVERTISING_DATA_LENGTH_COMMAND,
HCI_LE_Read_Maximum_Advertising_Data_Length_Command
) )
from bumble.host import Host from bumble.host import Host
from bumble.transport import open_transport_or_link from bumble.transport import open_transport_or_link
@@ -57,6 +63,39 @@ async def get_classic_info(host):
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
async def get_le_info(host): async def get_le_info(host):
print() print()
if host.supports_command(HCI_LE_READ_NUMBER_OF_SUPPORTED_ADVERTISING_SETS_COMMAND):
response = await host.send_command(HCI_LE_Read_Number_Of_Supported_Advertising_Sets_Command())
if response.return_parameters.status == HCI_SUCCESS:
print(
color('LE Number Of Supported Advertising Sets:', 'yellow'),
response.return_parameters.num_supported_advertising_sets,
'\n'
)
if host.supports_command(HCI_LE_READ_MAXIMUM_ADVERTISING_DATA_LENGTH_COMMAND):
response = await host.send_command(HCI_LE_Read_Maximum_Advertising_Data_Length_Command())
if response.return_parameters.status == HCI_SUCCESS:
print(
color('LE Maximum Advertising Data Length:', 'yellow'),
response.return_parameters.max_advertising_data_length,
'\n'
)
if host.supports_command(HCI_LE_READ_MAXIMUM_DATA_LENGTH_COMMAND):
response = await host.send_command(HCI_LE_Read_Maximum_Data_Length_Command())
if response.return_parameters.status == HCI_SUCCESS:
print(
color('Maximum Data Length:', 'yellow'),
(
f'tx:{response.return_parameters.supported_max_tx_octets}/'
f'{response.return_parameters.supported_max_tx_time}, '
f'rx:{response.return_parameters.supported_max_rx_octets}/'
f'{response.return_parameters.supported_max_rx_time}'
),
'\n'
)
print(color('LE Features:', 'yellow')) print(color('LE Features:', 'yellow'))
for feature in host.supported_le_features: for feature in host.supported_le_features:
print(' ', name_or_number(HCI_LE_SUPPORTED_FEATURES_NAMES, feature)) print(' ', name_or_number(HCI_LE_SUPPORTED_FEATURES_NAMES, feature))

View File

@@ -25,8 +25,8 @@ from bumble.device import Device
from bumble.transport import open_transport_or_link from bumble.transport import open_transport_or_link
from bumble.keys import JsonKeyStore from bumble.keys import JsonKeyStore
from bumble.smp import AddressResolver from bumble.smp import AddressResolver
from bumble.hci import HCI_LE_Advertising_Report_Event from bumble.device import Advertisement
from bumble.core import AdvertisingData from bumble.hci import HCI_Constant, HCI_LE_1M_PHY, HCI_LE_CODED_PHY
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -48,16 +48,19 @@ class AdvertisementPrinter:
self.min_rssi = min_rssi self.min_rssi = min_rssi
self.resolver = resolver self.resolver = resolver
def print_advertisement(self, address, address_color, ad_data, rssi): def print_advertisement(self, advertisement):
if self.min_rssi is not None and rssi < self.min_rssi: address = advertisement.address
address_color = 'yellow' if advertisement.is_connectable else 'red'
if self.min_rssi is not None and advertisement.rssi < self.min_rssi:
return return
address_qualifier = '' address_qualifier = ''
resolution_qualifier = '' resolution_qualifier = ''
if self.resolver and address.is_resolvable: if self.resolver and advertisement.address.is_resolvable:
resolved = self.resolver.resolve(address) resolved = self.resolver.resolve(advertisement.address)
if resolved is not None: if resolved is not None:
resolution_qualifier = f'(resolved from {address})' resolution_qualifier = f'(resolved from {advertisement.address})'
address = resolved address = resolved
address_type_string = ('PUBLIC', 'RANDOM', 'PUBLIC_ID', 'RANDOM_ID')[address.address_type] address_type_string = ('PUBLIC', 'RANDOM', 'PUBLIC_ID', 'RANDOM_ID')[address.address_type]
@@ -74,18 +77,30 @@ class AdvertisementPrinter:
type_color = 'blue' type_color = 'blue'
address_qualifier = '(non-resolvable)' address_qualifier = '(non-resolvable)'
rssi_bar = make_rssi_bar(rssi)
separator = '\n ' separator = '\n '
print(f'>>> {color(address, address_color)} [{color(address_type_string, type_color)}]{address_qualifier}{resolution_qualifier}:{separator}RSSI:{rssi:4} {rssi_bar}{separator}{ad_data.to_string(separator)}\n') rssi_bar = make_rssi_bar(advertisement.rssi)
if not advertisement.is_legacy:
phy_info = (
f'PHY: {HCI_Constant.le_phy_name(advertisement.primary_phy)}/'
f'{HCI_Constant.le_phy_name(advertisement.secondary_phy)} '
f'{separator}'
)
else:
phy_info = ''
def on_advertisement(self, address, ad_data, rssi, connectable): print(
address_color = 'yellow' if connectable else 'red' f'>>> {color(address, address_color)} '
self.print_advertisement(address, address_color, ad_data, rssi) f'[{color(address_type_string, type_color)}]{address_qualifier}{resolution_qualifier}:{separator}'
f'{phy_info}'
f'RSSI:{advertisement.rssi:4} {rssi_bar}{separator}'
f'{advertisement.data.to_string(separator)}\n')
def on_advertising_report(self, address, ad_data, rssi, event_type): def on_advertisement(self, advertisement):
print(f'{color("EVENT", "green")}: {HCI_LE_Advertising_Report_Event.event_type_name(event_type)}') self.print_advertisement(advertisement)
ad_data = AdvertisingData.from_bytes(ad_data)
self.print_advertisement(address, 'yellow', ad_data, rssi) def on_advertising_report(self, report):
print(f'{color("EVENT", "green")}: {report.event_type_string()}')
self.print_advertisement(Advertisement.from_advertising_report(report))
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -94,6 +109,7 @@ async def scan(
passive, passive,
scan_interval, scan_interval,
scan_window, scan_window,
phy,
filter_duplicates, filter_duplicates,
raw, raw,
keystore_file, keystore_file,
@@ -126,11 +142,18 @@ async def scan(
device.on('advertisement', printer.on_advertisement) device.on('advertisement', printer.on_advertisement)
await device.power_on() await device.power_on()
if phy is None:
scanning_phys = [HCI_LE_1M_PHY, HCI_LE_CODED_PHY]
else:
scanning_phys = [{'1m': HCI_LE_1M_PHY, 'coded': HCI_LE_CODED_PHY}[phy]]
await device.start_scanning( await device.start_scanning(
active=(not passive), active=(not passive),
scan_interval=scan_interval, scan_interval=scan_interval,
scan_window=scan_window, scan_window=scan_window,
filter_duplicates=filter_duplicates filter_duplicates=filter_duplicates,
scanning_phys=scanning_phys
) )
await hci_source.wait_for_termination() await hci_source.wait_for_termination()
@@ -142,14 +165,15 @@ async def scan(
@click.option('--passive', is_flag=True, default=False, help='Perform passive scanning') @click.option('--passive', is_flag=True, default=False, help='Perform passive scanning')
@click.option('--scan-interval', type=int, default=60, help='Scan interval') @click.option('--scan-interval', type=int, default=60, help='Scan interval')
@click.option('--scan-window', type=int, default=60, help='Scan window') @click.option('--scan-window', type=int, default=60, help='Scan window')
@click.option('--phy', type=click.Choice(['1m', 'coded']), help='Only scan on the specified PHY')
@click.option('--filter-duplicates', type=bool, default=True, help='Filter duplicates at the controller level') @click.option('--filter-duplicates', type=bool, default=True, help='Filter duplicates at the controller level')
@click.option('--raw', is_flag=True, default=False, help='Listen for raw advertising reports instead of processed ones') @click.option('--raw', is_flag=True, default=False, help='Listen for raw advertising reports instead of processed ones')
@click.option('--keystore-file', help='Keystore file to use when resolving addresses') @click.option('--keystore-file', help='Keystore file to use when resolving addresses')
@click.option('--device-config', help='Device config file for the scanning device') @click.option('--device-config', help='Device config file for the scanning device')
@click.argument('transport') @click.argument('transport')
def main(min_rssi, passive, scan_interval, scan_window, filter_duplicates, raw, keystore_file, device_config, transport): def main(min_rssi, passive, scan_interval, scan_window, phy, filter_duplicates, raw, keystore_file, device_config, transport):
logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'WARNING').upper()) logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'WARNING').upper())
asyncio.run(scan(min_rssi, passive, scan_interval, scan_window, filter_duplicates, raw, keystore_file, device_config, transport)) asyncio.run(scan(min_rssi, passive, scan_interval, scan_window, phy, filter_duplicates, raw, keystore_file, device_config, transport))
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------

View File

@@ -76,7 +76,7 @@ class Controller:
self.supported_commands = bytes.fromhex('2000800000c000000000e40000002822000000000000040000f7ffff7f00000030f0f9ff01008004000000000000000000000000000000000000000000000000') self.supported_commands = bytes.fromhex('2000800000c000000000e40000002822000000000000040000f7ffff7f00000030f0f9ff01008004000000000000000000000000000000000000000000000000')
self.le_features = bytes.fromhex('ff49010000000000') self.le_features = bytes.fromhex('ff49010000000000')
self.le_states = bytes.fromhex('ffff3fffff030000') self.le_states = bytes.fromhex('ffff3fffff030000')
self.avertising_channel_tx_power = 0 self.advertising_channel_tx_power = 0
self.filter_accept_list_size = 8 self.filter_accept_list_size = 8
self.resolving_list_size = 8 self.resolving_list_size = 8
self.supported_max_tx_octets = 27 self.supported_max_tx_octets = 27
@@ -264,10 +264,10 @@ class Controller:
role = connection.role, role = connection.role,
peer_address_type = peer_address_type, peer_address_type = peer_address_type,
peer_address = peer_address, peer_address = peer_address,
conn_interval = 10, # FIXME connection_interval = 10, # FIXME
conn_latency = 0, # FIXME peripheral_latency = 0, # FIXME
supervision_timeout = 10, # FIXME supervision_timeout = 10, # FIXME
master_clock_accuracy = 7 # FIXME central_clock_accuracy = 7 # FIXME
)) ))
def on_link_central_disconnected(self, peer_address, reason): def on_link_central_disconnected(self, peer_address, reason):
@@ -318,10 +318,10 @@ class Controller:
role = BT_CENTRAL_ROLE, role = BT_CENTRAL_ROLE,
peer_address_type = le_create_connection_command.peer_address_type, peer_address_type = le_create_connection_command.peer_address_type,
peer_address = le_create_connection_command.peer_address, peer_address = le_create_connection_command.peer_address,
conn_interval = le_create_connection_command.conn_interval_min, connection_interval = le_create_connection_command.connection_interval_min,
conn_latency = le_create_connection_command.conn_latency, peripheral_latency = le_create_connection_command.max_latency,
supervision_timeout = le_create_connection_command.supervision_timeout, supervision_timeout = le_create_connection_command.supervision_timeout,
master_clock_accuracy = 0 central_clock_accuracy = 0
)) ))
def on_link_peripheral_disconnection_complete(self, disconnection_command, status): def on_link_peripheral_disconnection_complete(self, disconnection_command, status):
@@ -583,13 +583,15 @@ class Controller:
''' '''
See Bluetooth spec Vol 2, Part E - 7.4.1 Read Local Version Information Command See Bluetooth spec Vol 2, Part E - 7.4.1 Read Local Version Information Command
''' '''
return struct.pack('<BBHBHH', return struct.pack(
'<BBHBHH',
HCI_SUCCESS, HCI_SUCCESS,
self.hci_version, self.hci_version,
self.hci_revision, self.hci_revision,
self.lmp_version, self.lmp_version,
self.manufacturer_name, self.manufacturer_name,
self.lmp_subversion) self.lmp_subversion
)
def on_hci_read_local_supported_commands_command(self, command): def on_hci_read_local_supported_commands_command(self, command):
''' '''
@@ -650,7 +652,7 @@ class Controller:
''' '''
See Bluetooth spec Vol 2, Part E - 7.8.6 LE Read Advertising Channel Tx Power Command See Bluetooth spec Vol 2, Part E - 7.8.6 LE Read Advertising Channel Tx Power Command
''' '''
return bytes([HCI_SUCCESS, self.avertising_channel_tx_power]) return bytes([HCI_SUCCESS, self.advertising_channel_tx_power])
def on_hci_le_set_advertising_data_command(self, command): def on_hci_le_set_advertising_data_command(self, command):
''' '''
@@ -876,12 +878,26 @@ class Controller:
''' '''
See Bluetooth spec Vol 2, Part E - 7.8.46 LE Read Maximum Data Length Command See Bluetooth spec Vol 2, Part E - 7.8.46 LE Read Maximum Data Length Command
''' '''
return struct.pack('<BHHHH', return struct.pack(
'<BHHHH',
HCI_SUCCESS, HCI_SUCCESS,
self.supported_max_tx_octets, self.supported_max_tx_octets,
self.supported_max_tx_time, self.supported_max_tx_time,
self.supported_max_rx_octets, self.supported_max_rx_octets,
self.supported_max_rx_time) self.supported_max_rx_time
)
def on_hci_le_read_phy_command(self, command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.47 LE Read PHY command
'''
return struct.pack(
'<BHBB',
HCI_SUCCESS,
command.connection_handle,
HCI_LE_1M_PHY,
HCI_LE_1M_PHY
)
def on_hci_le_set_default_phy_command(self, command): def on_hci_le_set_default_phy_command(self, command):
''' '''
@@ -893,3 +909,4 @@ class Controller:
'rx_phys': command.rx_phys 'rx_phys': command.rx_phys
} }
return bytes([HCI_SUCCESS]) return bytes([HCI_SUCCESS])

View File

@@ -831,13 +831,17 @@ class AdvertisingData:
# Connection Parameters # Connection Parameters
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class ConnectionParameters: class ConnectionParameters:
def __init__(self, connection_interval, connection_latency, supervision_timeout): def __init__(self, connection_interval, peripheral_latency, supervision_timeout):
self.connection_interval = connection_interval self.connection_interval = connection_interval
self.connection_latency = connection_latency self.peripheral_latency = peripheral_latency
self.supervision_timeout = supervision_timeout self.supervision_timeout = supervision_timeout
def __str__(self): def __str__(self):
return f'ConnectionParameters(connection_interval={self.connection_interval}, connection_latency={self.connection_latency}, supervision_timeout={self.supervision_timeout}' return (
f'ConnectionParameters(connection_interval={self.connection_interval}, '
f'peripheral_latency={self.peripheral_latency}, '
f'supervision_timeout={self.supervision_timeout}'
)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------

View File

@@ -15,10 +15,12 @@
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# Imports # Imports
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
from enum import IntEnum
import json import json
import asyncio import asyncio
import logging import logging
from contextlib import asynccontextmanager, AsyncExitStack from contextlib import asynccontextmanager, AsyncExitStack
from dataclasses import dataclass
from .hci import * from .hci import *
from .host import Host from .host import Host
@@ -51,39 +53,207 @@ DEVICE_DEFAULT_SCAN_RESPONSE_DATA = b''
DEVICE_DEFAULT_DATA_LENGTH = (27, 328, 27, 328) DEVICE_DEFAULT_DATA_LENGTH = (27, 328, 27, 328)
DEVICE_DEFAULT_SCAN_INTERVAL = 60 # ms DEVICE_DEFAULT_SCAN_INTERVAL = 60 # ms
DEVICE_DEFAULT_SCAN_WINDOW = 60 # ms DEVICE_DEFAULT_SCAN_WINDOW = 60 # ms
DEVICE_DEFAULT_CONNECT_TIMEOUT = None # No timeout
DEVICE_DEFAULT_CONNECT_SCAN_INTERVAL = 60 # ms
DEVICE_DEFAULT_CONNECT_SCAN_WINDOW = 60 # ms
DEVICE_DEFAULT_CONNECTION_INTERVAL_MIN = 15 # ms
DEVICE_DEFAULT_CONNECTION_INTERVAL_MAX = 30 # ms
DEVICE_DEFAULT_CONNECTION_MAX_LATENCY = 0
DEVICE_DEFAULT_CONNECTION_SUPERVISION_TIMEOUT = 720 # ms
DEVICE_DEFAULT_CONNECTION_MIN_CE_LENGTH = 0 # ms
DEVICE_DEFAULT_CONNECTION_MAX_CE_LENGTH = 0 # ms
DEVICE_MIN_SCAN_INTERVAL = 25 DEVICE_MIN_SCAN_INTERVAL = 25
DEVICE_MAX_SCAN_INTERVAL = 10240 DEVICE_MAX_SCAN_INTERVAL = 10240
DEVICE_MIN_SCAN_WINDOW = 25 DEVICE_MIN_SCAN_WINDOW = 25
DEVICE_MAX_SCAN_WINDOW = 10240 DEVICE_MAX_SCAN_WINDOW = 10240
DEVICE_MIN_LE_RSSI = -127
DEVICE_MAX_LE_RSSI = 20
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# Classes # Classes
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
class Advertisement:
TX_POWER_NOT_AVAILABLE = HCI_LE_Extended_Advertising_Report_Event.TX_POWER_INFORMATION_NOT_AVAILABLE
RSSI_NOT_AVAILABLE = HCI_LE_Extended_Advertising_Report_Event.RSSI_NOT_AVAILABLE
@classmethod
def from_advertising_report(cls, report):
if isinstance(report, HCI_LE_Advertising_Report_Event.Report):
return LegacyAdvertisement.from_advertising_report(report)
elif isinstance(report, HCI_LE_Extended_Advertising_Report_Event.Report):
return ExtendedAdvertisement.from_advertising_report(report)
def __init__(
self,
address,
rssi = HCI_LE_Extended_Advertising_Report_Event.RSSI_NOT_AVAILABLE,
is_legacy = False,
is_anonymous = False,
is_connectable = False,
is_directed = False,
is_scannable = False,
is_scan_response = False,
is_complete = True,
is_truncated = False,
primary_phy = 0,
secondary_phy = 0,
tx_power = HCI_LE_Extended_Advertising_Report_Event.TX_POWER_INFORMATION_NOT_AVAILABLE,
sid = 0,
data = b''
):
self.address = address
self.rssi = rssi
self.is_legacy = is_legacy
self.is_anonymous = is_anonymous
self.is_connectable = is_connectable
self.is_directed = is_directed
self.is_scannable = is_scannable
self.is_scan_response = is_scan_response
self.is_complete = is_complete
self.is_truncated = is_truncated
self.primary_phy = primary_phy
self.secondary_phy = secondary_phy
self.tx_power = tx_power
self.sid = sid
self.data = AdvertisingData.from_bytes(data)
# -----------------------------------------------------------------------------
class LegacyAdvertisement(Advertisement):
@classmethod
def from_advertising_report(cls, report):
return cls(
address = report.address,
rssi = report.rssi,
is_legacy = True,
is_connectable = report.event_type in {
HCI_LE_Advertising_Report_Event.ADV_IND,
HCI_LE_Advertising_Report_Event.ADV_DIRECT_IND
},
is_directed = report.event_type == HCI_LE_Advertising_Report_Event.ADV_DIRECT_IND,
is_scannable = report.event_type in {
HCI_LE_Advertising_Report_Event.ADV_IND,
HCI_LE_Advertising_Report_Event.ADV_SCAN_IND
},
is_scan_response = report.event_type == HCI_LE_Advertising_Report_Event.SCAN_RSP,
data = report.data
)
# -----------------------------------------------------------------------------
class ExtendedAdvertisement(Advertisement):
@classmethod
def from_advertising_report(cls, report):
return cls(
address = report.address,
rssi = report.rssi,
is_legacy = report.event_type & (1 << HCI_LE_Extended_Advertising_Report_Event.LEGACY_ADVERTISING_PDU_USED) != 0,
is_anonymous = report.address.address_type == HCI_LE_Extended_Advertising_Report_Event.ANONYMOUS_ADDRESS_TYPE,
is_connectable = report.event_type & (1 << HCI_LE_Extended_Advertising_Report_Event.CONNECTABLE_ADVERTISING) != 0,
is_directed = report.event_type & (1 << HCI_LE_Extended_Advertising_Report_Event.DIRECTED_ADVERTISING) != 0,
is_scannable = report.event_type & (1 << HCI_LE_Extended_Advertising_Report_Event.SCANNABLE_ADVERTISING) != 0,
is_scan_response = report.event_type & (1 << HCI_LE_Extended_Advertising_Report_Event.SCAN_RESPONSE) != 0,
is_complete = (report.event_type >> 5 & 3) == HCI_LE_Extended_Advertising_Report_Event.DATA_COMPLETE,
is_truncated = (report.event_type >> 5 & 3) == HCI_LE_Extended_Advertising_Report_Event.DATA_INCOMPLETE_TRUNCATED_NO_MORE_TO_COME,
primary_phy = report.primary_phy,
secondary_phy = report.secondary_phy,
tx_power = report.tx_power,
sid = report.advertising_sid,
data = report.data
)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class AdvertisementDataAccumulator: class AdvertisementDataAccumulator:
def __init__(self): def __init__(self, passive=False):
self.advertising_data = AdvertisingData() self.passive = passive
self.last_advertisement_type = None self.last_advertisement = None
self.connectable = False self.last_data = b''
self.flushable = False
def update(self, data, advertisement_type): def update(self, report):
if advertisement_type == HCI_LE_Advertising_Report_Event.SCAN_RSP: advertisement = Advertisement.from_advertising_report(report)
if self.last_advertisement_type != HCI_LE_Advertising_Report_Event.SCAN_RSP: result = None
self.advertising_data.append(data)
self.flushable = True if advertisement.is_scan_response:
if self.last_advertisement is not None and not self.last_advertisement.is_scan_response:
# This is the response to a scannable advertisement
result = Advertisement.from_advertising_report(report)
result.is_connectable = self.last_advertisement.is_connectable
result.is_scannable = True
result.data = AdvertisingData.from_bytes(self.last_data + report.data)
self.last_data = b''
else: else:
self.advertising_data = AdvertisingData.from_bytes(data) if (
self.flushable = self.last_advertisement_type != HCI_LE_Advertising_Report_Event.SCAN_RSP self.passive or
(not advertisement.is_scannable) or
(self.last_advertisement is not None and not self.last_advertisement.is_scan_response)
):
# Don't wait for a scan response
result = Advertisement.from_advertising_report(report)
if advertisement_type == HCI_LE_Advertising_Report_Event.ADV_IND or advertisement_type == HCI_LE_Advertising_Report_Event.ADV_DIRECT_IND: self.last_data = report.data
self.connectable = True
elif advertisement_type == HCI_LE_Advertising_Report_Event.ADV_SCAN_IND or advertisement_type == HCI_LE_Advertising_Report_Event.ADV_NONCONN_IND:
self.connectable = False
self.last_advertisement_type = advertisement_type self.last_advertisement = advertisement
return result
# -----------------------------------------------------------------------------
class AdvertisingType(IntEnum):
UNDIRECTED_CONNECTABLE_SCANNABLE = 0x00 # Undirected, connectable, scannable
DIRECTED_CONNECTABLE_HIGH_DUTY = 0x01 # Directed, connectable, non-scannable
UNDIRECTED_SCANNABLE = 0x02 # Undirected, non-connectable, scannable
UNDIRECTED = 0x03 # Undirected, non-connectable, non-scannable
DIRECTED_CONNECTABLE_LOW_DUTY = 0x04 # Directed, connectable, non-scannable
@property
def has_data(self):
return self in {
AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE,
AdvertisingType.UNDIRECTED_SCANNABLE,
AdvertisingType.UNDIRECTED
}
@property
def is_connectable(self):
return self in {
AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE,
AdvertisingType.DIRECTED_CONNECTABLE_HIGH_DUTY,
AdvertisingType.DIRECTED_CONNECTABLE_LOW_DUTY
}
@property
def is_scannable(self):
return self in {
AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE,
AdvertisingType.UNDIRECTED_SCANNABLE
}
@property
def is_directed(self):
return self in {
AdvertisingType.DIRECTED_CONNECTABLE_HIGH_DUTY,
AdvertisingType.DIRECTED_CONNECTABLE_LOW_DUTY
}
# -----------------------------------------------------------------------------
class LePhyOptions:
# Coded PHY preference
ANY_CODED_PHY = 0
PREFER_S_2_CODED_PHY = 1
PREFER_S_8_CODED_PHY = 2
def __init__(self, coded_phy_preference=0):
self.coded_phy_preference = coded_phy_preference
def __int__(self):
return self.coded_phy_preference & 3
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -100,7 +270,9 @@ class Peer:
return self.gatt_client.services return self.gatt_client.services
async def request_mtu(self, mtu): async def request_mtu(self, mtu):
return await self.gatt_client.request_mtu(mtu) mtu = await self.gatt_client.request_mtu(mtu)
self.connection.emit('connection_att_mtu_update')
return mtu
async def discover_service(self, uuid): async def discover_service(self, uuid):
return await self.gatt_client.discover_service(uuid) return await self.gatt_client.discover_service(uuid)
@@ -169,11 +341,24 @@ class Peer:
async def __aexit__(self, exc_type, exc_value, traceback): async def __aexit__(self, exc_type, exc_value, traceback):
pass pass
def __str__(self): def __str__(self):
return f'{self.connection.peer_address} as {self.connection.role_name}' return f'{self.connection.peer_address} as {self.connection.role_name}'
# -----------------------------------------------------------------------------
@dataclass
class ConnectionParametersPreferences:
connection_interval_min: int = DEVICE_DEFAULT_CONNECTION_INTERVAL_MIN
connection_interval_max: int = DEVICE_DEFAULT_CONNECTION_INTERVAL_MAX
max_latency: int = DEVICE_DEFAULT_CONNECTION_MAX_LATENCY
supervision_timeout: int = DEVICE_DEFAULT_CONNECTION_SUPERVISION_TIMEOUT
min_ce_length: int = DEVICE_DEFAULT_CONNECTION_MIN_CE_LENGTH
max_ce_length: int = DEVICE_DEFAULT_CONNECTION_MAX_CE_LENGTH
ConnectionParametersPreferences.default = ConnectionParametersPreferences()
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class Connection(CompositeEventEmitter): class Connection(CompositeEventEmitter):
@composite_listener @composite_listener
@@ -202,7 +387,17 @@ class Connection(CompositeEventEmitter):
def on_connection_encryption_key_refresh(self): def on_connection_encryption_key_refresh(self):
pass pass
def __init__(self, device, handle, transport, peer_address, peer_resolvable_address, role, parameters): def __init__(
self,
device,
handle,
transport,
peer_address,
peer_resolvable_address,
role,
parameters,
phy
):
super().__init__() super().__init__()
self.device = device self.device = device
self.handle = handle self.handle = handle
@@ -214,7 +409,7 @@ class Connection(CompositeEventEmitter):
self.parameters = parameters self.parameters = parameters
self.encryption = 0 self.encryption = 0
self.authenticated = False self.authenticated = False
self.phy = ConnectionPHY(HCI_LE_1M_PHY, HCI_LE_1M_PHY) self.phy = phy
self.att_mtu = ATT_DEFAULT_MTU self.att_mtu = ATT_DEFAULT_MTU
self.data_length = DEVICE_DEFAULT_DATA_LENGTH self.data_length = DEVICE_DEFAULT_DATA_LENGTH
self.gatt_client = None # Per-connection client self.gatt_client = None # Per-connection client
@@ -267,19 +462,28 @@ class Connection(CompositeEventEmitter):
async def update_parameters( async def update_parameters(
self, self,
conn_interval_min, connection_interval_min,
conn_interval_max, connection_interval_max,
conn_latency, max_latency,
supervision_timeout supervision_timeout
): ):
return await self.device.update_connection_parameters( return await self.device.update_connection_parameters(
self, self,
conn_interval_min, connection_interval_min,
conn_interval_max, connection_interval_max,
conn_latency, max_latency,
supervision_timeout supervision_timeout
) )
async def set_phy(self, tx_phys=None, rx_phys=None, phy_options=None):
return await self.device.set_connection_phy(self, tx_phys, rx_phys, phy_options)
async def get_rssi(self):
return await self.device.get_connection_rssi(self)
async def get_phy(self):
return await self.device.get_connection_phy(self)
# [Classic only] # [Classic only]
async def request_remote_name(self): async def request_remote_name(self):
return await self.device.request_remote_name(self) return await self.device.request_remote_name(self)
@@ -403,7 +607,7 @@ class Device(CompositeEventEmitter):
@composite_listener @composite_listener
class Listener: class Listener:
def on_advertisement(self, address, data, rssi, advertisement_type): def on_advertisement(self, advertisement):
pass pass
def on_inquiry_result(self, address, class_of_device, data, rssi): def on_inquiry_result(self, address, class_of_device, data, rssi):
@@ -446,14 +650,17 @@ class Device(CompositeEventEmitter):
self._host = None self._host = None
self.powered_on = False self.powered_on = False
self.advertising = False self.advertising = False
self.advertising_type = None
self.auto_restart_advertising = False self.auto_restart_advertising = False
self.command_timeout = 10 # seconds self.command_timeout = 10 # seconds
self.gatt_server = gatt_server.Server(self) self.gatt_server = gatt_server.Server(self)
self.sdp_server = sdp.Server(self) self.sdp_server = sdp.Server(self)
self.l2cap_channel_manager = l2cap.ChannelManager( self.l2cap_channel_manager = l2cap.ChannelManager(
[l2cap.L2CAP_Information_Request.EXTENDED_FEATURE_FIXED_CHANNELS]) [l2cap.L2CAP_Information_Request.EXTENDED_FEATURE_FIXED_CHANNELS]
self.advertisement_data = {} )
self.advertisement_accumulators = {} # Accumulators, by address
self.scanning = False self.scanning = False
self.scanning_is_passive = False
self.discovering = False self.discovering = False
self.connecting = False self.connecting = False
self.disconnecting = False self.disconnecting = False
@@ -569,9 +776,12 @@ class Device(CompositeEventEmitter):
def send_l2cap_pdu(self, connection_handle, cid, pdu): def send_l2cap_pdu(self, connection_handle, cid, pdu):
self.host.send_l2cap_pdu(connection_handle, cid, pdu) self.host.send_l2cap_pdu(connection_handle, cid, pdu)
async def send_command(self, command): async def send_command(self, command, check_result=False):
try: try:
return await asyncio.wait_for(self.host.send_command(command), self.command_timeout) return await asyncio.wait_for(
self.host.send_command(command, check_result),
self.command_timeout
)
except asyncio.TimeoutError: except asyncio.TimeoutError:
logger.warning('!!! Command timed out') logger.warning('!!! Command timed out')
@@ -594,10 +804,10 @@ class Device(CompositeEventEmitter):
# Set the controller address # Set the controller address
await self.send_command(HCI_LE_Set_Random_Address_Command( await self.send_command(HCI_LE_Set_Random_Address_Command(
random_address = self.random_address random_address = self.random_address
)) ), check_result=True)
# Load the address resolving list # Load the address resolving list
if self.keystore: if self.keystore and self.host.supports_command(HCI_LE_CLEAR_RESOLVING_LIST_COMMAND):
await self.send_command(HCI_LE_Clear_Resolving_List_Command()) await self.send_command(HCI_LE_Clear_Resolving_List_Command())
resolving_keys = await self.keystore.get_resolving_keys() resolving_keys = await self.keystore.get_resolving_keys()
@@ -644,41 +854,74 @@ class Device(CompositeEventEmitter):
# Done # Done
self.powered_on = True self.powered_on = True
async def start_advertising(self, auto_restart=False): def supports_le_feature(self, feature):
self.auto_restart_advertising = auto_restart return self.host.supports_le_feature(feature)
def supports_le_phy(self, phy):
if phy == HCI_LE_1M_PHY:
return True
feature_map = {
HCI_LE_2M_PHY: HCI_LE_2M_PHY_LE_SUPPORTED_FEATURE,
HCI_LE_CODED_PHY: HCI_LE_CODED_PHY_LE_SUPPORTED_FEATURE
}
if phy not in feature_map:
raise ValueError('invalid PHY')
return self.host.supports_le_feature(feature_map[phy])
async def start_advertising(
self,
advertising_type=AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE,
target=None,
auto_restart=False
):
# If we're advertising, stop first # If we're advertising, stop first
if self.advertising: if self.advertising:
await self.stop_advertising() await self.stop_advertising()
# Set/update the advertising data # Set/update the advertising data if the advertising type allows it
if advertising_type.has_data:
await self.send_command(HCI_LE_Set_Advertising_Data_Command( await self.send_command(HCI_LE_Set_Advertising_Data_Command(
advertising_data = self.advertising_data advertising_data = self.advertising_data
)) ), check_result=True)
# Set/update the scan response data # Set/update the scan response data if the advertising is scannable
if advertising_type.is_scannable:
await self.send_command(HCI_LE_Set_Scan_Response_Data_Command( await self.send_command(HCI_LE_Set_Scan_Response_Data_Command(
scan_response_data = self.scan_response_data scan_response_data = self.scan_response_data
)) ), check_result=True)
# Decide what peer address to use
if advertising_type.is_directed:
if target is None:
raise ValueError('directed advertising requires a target address')
peer_address = target
peer_address_type = target.address_type
else:
peer_address = Address('00:00:00:00:00:00')
peer_address_type = Address.PUBLIC_DEVICE_ADDRESS
# Set the advertising parameters # Set the advertising parameters
await self.send_command(HCI_LE_Set_Advertising_Parameters_Command( await self.send_command(HCI_LE_Set_Advertising_Parameters_Command(
# TODO: use real values, not fixed ones
advertising_interval_min = self.advertising_interval_min, advertising_interval_min = self.advertising_interval_min,
advertising_interval_max = self.advertising_interval_max, advertising_interval_max = self.advertising_interval_max,
advertising_type = HCI_LE_Set_Advertising_Parameters_Command.ADV_IND, advertising_type = int(advertising_type),
own_address_type = Address.RANDOM_DEVICE_ADDRESS, # TODO: allow using the public address own_address_type = Address.RANDOM_DEVICE_ADDRESS, # TODO: allow using the public address
peer_address_type = Address.PUBLIC_DEVICE_ADDRESS, peer_address_type = peer_address_type,
peer_address = Address('00:00:00:00:00:00'), peer_address = peer_address,
advertising_channel_map = 7, advertising_channel_map = 7,
advertising_filter_policy = 0 advertising_filter_policy = 0
)) ), check_result=True)
# Enable advertising # Enable advertising
await self.send_command(HCI_LE_Set_Advertising_Enable_Command( await self.send_command(HCI_LE_Set_Advertising_Enable_Command(
advertising_enable = 1 advertising_enable = 1
)) ), check_result=True)
self.auto_restart_advertising = auto_restart
self.advertising_type = advertising_type
self.advertising = True self.advertising = True
async def stop_advertising(self): async def stop_advertising(self):
@@ -686,9 +929,11 @@ class Device(CompositeEventEmitter):
if self.advertising: if self.advertising:
await self.send_command(HCI_LE_Set_Advertising_Enable_Command( await self.send_command(HCI_LE_Set_Advertising_Enable_Command(
advertising_enable = 0 advertising_enable = 0
)) ), check_result=True)
self.advertising = False self.advertising = False
self.advertising_type = None
self.auto_restart_advertising = False
@property @property
def is_advertising(self): def is_advertising(self):
@@ -700,7 +945,8 @@ class Device(CompositeEventEmitter):
scan_interval=DEVICE_DEFAULT_SCAN_INTERVAL, # Scan interval in ms scan_interval=DEVICE_DEFAULT_SCAN_INTERVAL, # Scan interval in ms
scan_window=DEVICE_DEFAULT_SCAN_WINDOW, # Scan window in ms scan_window=DEVICE_DEFAULT_SCAN_WINDOW, # Scan window in ms
own_address_type=Address.RANDOM_DEVICE_ADDRESS, own_address_type=Address.RANDOM_DEVICE_ADDRESS,
filter_duplicates=False filter_duplicates=False,
scanning_phys=(HCI_LE_1M_PHY, HCI_LE_CODED_PHY)
): ):
# Check that the arguments are legal # Check that the arguments are legal
if scan_interval < scan_window: if scan_interval < scan_window:
@@ -710,6 +956,45 @@ class Device(CompositeEventEmitter):
if scan_window < DEVICE_MIN_SCAN_WINDOW or scan_window > DEVICE_MAX_SCAN_WINDOW: if scan_window < DEVICE_MIN_SCAN_WINDOW or scan_window > DEVICE_MAX_SCAN_WINDOW:
raise ValueError('scan_interval out of range') raise ValueError('scan_interval out of range')
# Reset the accumulators
self.advertisement_accumulator = {}
# Enable scanning
if self.supports_le_feature(HCI_LE_EXTENDED_ADVERTISING_LE_SUPPORTED_FEATURE):
# Set the scanning parameters
scan_type = HCI_LE_Set_Extended_Scan_Parameters_Command.ACTIVE_SCANNING if active else HCI_LE_Set_Extended_Scan_Parameters_Command.PASSIVE_SCANNING
scanning_filter_policy = HCI_LE_Set_Extended_Scan_Parameters_Command.BASIC_UNFILTERED_POLICY # TODO: support other types
scanning_phy_count = 0
scanning_phys_bits = 0
if HCI_LE_1M_PHY in scanning_phys:
scanning_phys_bits |= 1 << HCI_LE_1M_PHY_BIT
scanning_phy_count += 1
if HCI_LE_CODED_PHY in scanning_phys:
if self.supports_le_feature(HCI_LE_CODED_PHY_LE_SUPPORTED_FEATURE):
scanning_phys_bits |= 1 << HCI_LE_CODED_PHY_BIT
scanning_phy_count += 1
if scanning_phy_count == 0:
raise ValueError('at least one scanning PHY must be enabled')
await self.send_command(HCI_LE_Set_Extended_Scan_Parameters_Command(
own_address_type = own_address_type,
scanning_filter_policy = scanning_filter_policy,
scanning_phys = scanning_phys_bits,
scan_types = [scan_type] * scanning_phy_count,
scan_intervals = [int(scan_window / 0.625)] * scanning_phy_count,
scan_windows = [int(scan_window / 0.625)] * scanning_phy_count
), check_result=True)
# Enable scanning
await self.send_command(HCI_LE_Set_Extended_Scan_Enable_Command(
enable = 1,
filter_duplicates = 1 if filter_duplicates else 0,
duration = 0, # TODO allow other values
period = 0 # TODO allow other values
), check_result=True)
else:
# Set the scanning parameters # Set the scanning parameters
scan_type = HCI_LE_Set_Scan_Parameters_Command.ACTIVE_SCANNING if active else HCI_LE_Set_Scan_Parameters_Command.PASSIVE_SCANNING scan_type = HCI_LE_Set_Scan_Parameters_Command.ACTIVE_SCANNING if active else HCI_LE_Set_Scan_Parameters_Command.PASSIVE_SCANNING
await self.send_command(HCI_LE_Set_Scan_Parameters_Command( await self.send_command(HCI_LE_Set_Scan_Parameters_Command(
@@ -718,20 +1003,32 @@ class Device(CompositeEventEmitter):
le_scan_window = int(scan_window / 0.625), le_scan_window = int(scan_window / 0.625),
own_address_type = own_address_type, own_address_type = own_address_type,
scanning_filter_policy = HCI_LE_Set_Scan_Parameters_Command.BASIC_UNFILTERED_POLICY scanning_filter_policy = HCI_LE_Set_Scan_Parameters_Command.BASIC_UNFILTERED_POLICY
)) ), check_result=True)
# Enable scanning # Enable scanning
await self.send_command(HCI_LE_Set_Scan_Enable_Command( await self.send_command(HCI_LE_Set_Scan_Enable_Command(
le_scan_enable = 1, le_scan_enable = 1,
filter_duplicates = 1 if filter_duplicates else 0 filter_duplicates = 1 if filter_duplicates else 0
)) ), check_result=True)
self.scanning_is_passive = not active
self.scanning = True self.scanning = True
async def stop_scanning(self): async def stop_scanning(self):
# Disable scanning
if self.supports_le_feature(HCI_LE_EXTENDED_ADVERTISING_LE_SUPPORTED_FEATURE):
await self.send_command(HCI_LE_Set_Extended_Scan_Enable_Command(
enable = 0,
filter_duplicates = 0,
duration = 0,
period = 0
), check_result=True)
else:
await self.send_command(HCI_LE_Set_Scan_Enable_Command( await self.send_command(HCI_LE_Set_Scan_Enable_Command(
le_scan_enable = 0, le_scan_enable = 0,
filter_duplicates = 0 filter_duplicates = 0
)) ), check_result=True)
self.scanning = False self.scanning = False
@property @property
@@ -739,22 +1036,17 @@ class Device(CompositeEventEmitter):
return self.scanning return self.scanning
@host_event_handler @host_event_handler
def on_advertising_report(self, address, data, rssi, advertisement_type): def on_advertising_report(self, report):
if not (accumulator := self.advertisement_data.get(address)): if not (accumulator := self.advertisement_accumulators.get(report.address)):
accumulator = AdvertisementDataAccumulator() accumulator = AdvertisementDataAccumulator(passive=self.scanning_is_passive)
self.advertisement_data[address] = accumulator self.advertisement_accumulators[report.address] = accumulator
accumulator.update(data, advertisement_type) if advertisement := accumulator.update(report):
if accumulator.flushable: self.emit('advertisement', advertisement)
self.emit(
'advertisement',
address,
accumulator.advertising_data,
rssi,
accumulator.connectable
)
async def start_discovery(self): async def start_discovery(self):
await self.host.send_command(HCI_Write_Inquiry_Mode_Command(inquiry_mode=HCI_EXTENDED_INQUIRY_MODE)) await self.send_command(HCI_Write_Inquiry_Mode_Command(
inquiry_mode=HCI_EXTENDED_INQUIRY_MODE
), check_result=True)
response = await self.send_command(HCI_Inquiry_Command( response = await self.send_command(HCI_Inquiry_Command(
lap = HCI_GENERAL_INQUIRY_LAP, lap = HCI_GENERAL_INQUIRY_LAP,
@@ -768,7 +1060,7 @@ class Device(CompositeEventEmitter):
self.discovering = True self.discovering = True
async def stop_discovery(self): async def stop_discovery(self):
await self.send_command(HCI_Inquiry_Cancel_Command()) await self.send_command(HCI_Inquiry_Cancel_Command(), check_result=True)
self.discovering = False self.discovering = False
@host_event_handler @host_event_handler
@@ -805,11 +1097,12 @@ class Device(CompositeEventEmitter):
) )
# Update the controller # Update the controller
await self.host.send_command( await self.send_command(
HCI_Write_Extended_Inquiry_Response_Command( HCI_Write_Extended_Inquiry_Response_Command(
fec_required = 0, fec_required = 0,
extended_inquiry_response = self.inquiry_response extended_inquiry_response = self.inquiry_response
) ),
check_result=True
) )
await self.set_scan_enable( await self.set_scan_enable(
inquiry_scan_enabled = self.discoverable, inquiry_scan_enabled = self.discoverable,
@@ -824,12 +1117,26 @@ class Device(CompositeEventEmitter):
page_scan_enabled = self.connectable page_scan_enabled = self.connectable
) )
async def connect(self, peer_address, transport=BT_LE_TRANSPORT): async def connect(
self,
peer_address,
transport=BT_LE_TRANSPORT,
connection_parameters_preferences=None,
timeout=DEVICE_DEFAULT_CONNECT_TIMEOUT
):
''' '''
Request a connection to a peer. Request a connection to a peer.
This method cannot be called if there is already a pending connection. This method cannot be called if there is already a pending connection.
connection_parameters_preferences: (BLE only, ignored for BR/EDR)
* None: use all PHYs with default parameters
* map: each entry has a PHY as key and a ConnectionParametersPreferences object as value
''' '''
# Check parameters
if transport not in {BT_LE_TRANSPORT, BT_BR_EDR_TRANSPORT}:
raise ValueError('invalid transport')
# Adjust the transport automatically if we need to # Adjust the transport automatically if we need to
if transport == BT_LE_TRANSPORT and not self.le_enabled: if transport == BT_LE_TRANSPORT and not self.le_enabled:
transport = BT_BR_EDR_TRANSPORT transport = BT_BR_EDR_TRANSPORT
@@ -846,32 +1153,88 @@ class Device(CompositeEventEmitter):
except ValueError: except ValueError:
# If the address is not parsable, assume it is a name instead # If the address is not parsable, assume it is a name instead
logger.debug('looking for peer by name') logger.debug('looking for peer by name')
peer_address = await self.find_peer_by_name(peer_address, transport) peer_address = await self.find_peer_by_name(peer_address, transport) # TODO: timeout
# Create a future so that we can wait for the connection's result # Create a future so that we can wait for the connection's result
pending_connection = asyncio.get_running_loop().create_future() pending_connection = asyncio.get_running_loop().create_future()
self.on('connection', pending_connection.set_result) self.on('connection', pending_connection.set_result)
self.on('connection_failure', pending_connection.set_exception) self.on('connection_failure', pending_connection.set_exception)
try:
# Tell the controller to connect # Tell the controller to connect
if transport == BT_LE_TRANSPORT: if transport == BT_LE_TRANSPORT:
# TODO: use real values, not fixed ones if connection_parameters_preferences is None:
if connection_parameters_preferences is None:
connection_parameters_preferences = {
HCI_LE_1M_PHY: ConnectionParametersPreferences.default,
HCI_LE_2M_PHY: ConnectionParametersPreferences.default,
HCI_LE_CODED_PHY: ConnectionParametersPreferences.default
}
if self.host.supports_command(HCI_LE_EXTENDED_CREATE_CONNECTION_COMMAND):
# Only keep supported PHYs
phys = sorted(list(set(filter(self.supports_le_phy, connection_parameters_preferences.keys()))))
if not phys:
raise ValueError('least one supported PHY needed')
phy_count = len(phys)
initiating_phys = phy_list_to_bits(phys)
connection_interval_mins = [
int(connection_parameters_preferences[phy].connection_interval_min / 1.25) for phy in phys
]
connection_interval_maxs = [
int(connection_parameters_preferences[phy].connection_interval_max / 1.25) for phy in phys
]
max_latencies = [
connection_parameters_preferences[phy].max_latency for phy in phys
]
supervision_timeouts = [
int(connection_parameters_preferences[phy].supervision_timeout / 10) for phy in phys
]
min_ce_lengths = [
int(connection_parameters_preferences[phy].min_ce_length / 0.625) for phy in phys
]
max_ce_lengths = [
int(connection_parameters_preferences[phy].max_ce_length / 0.625) for phy in phys
]
result = await self.send_command(HCI_LE_Extended_Create_Connection_Command(
initiator_filter_policy = 0,
own_address_type = Address.RANDOM_DEVICE_ADDRESS,
peer_address_type = peer_address.address_type,
peer_address = peer_address,
initiating_phys = initiating_phys,
scan_intervals = (int(DEVICE_DEFAULT_CONNECT_SCAN_INTERVAL / 0.625),) * phy_count,
scan_windows = (int(DEVICE_DEFAULT_CONNECT_SCAN_WINDOW / 0.625),) * phy_count,
connection_interval_mins = connection_interval_mins,
connection_interval_maxs = connection_interval_maxs,
max_latencies = max_latencies,
supervision_timeouts = supervision_timeouts,
min_ce_lengths = min_ce_lengths,
max_ce_lengths = max_ce_lengths
))
else:
if HCI_LE_1M_PHY not in connection_parameters_preferences:
raise ValueError('1M PHY preferences required')
prefs = connection_parameters_preferences[HCI_LE_1M_PHY]
result = await self.send_command(HCI_LE_Create_Connection_Command( result = await self.send_command(HCI_LE_Create_Connection_Command(
le_scan_interval = 96, le_scan_interval = int(DEVICE_DEFAULT_CONNECT_SCAN_INTERVAL / 0.625),
le_scan_window = 96, le_scan_window = int(DEVICE_DEFAULT_CONNECT_SCAN_WINDOW / 0.625),
initiator_filter_policy = 0, initiator_filter_policy = 0,
peer_address_type = peer_address.address_type, peer_address_type = peer_address.address_type,
peer_address = peer_address, peer_address = peer_address,
own_address_type = Address.RANDOM_DEVICE_ADDRESS, own_address_type = Address.RANDOM_DEVICE_ADDRESS,
conn_interval_min = 12, connection_interval_min = int(prefs.connection_interval_min / 1.25),
conn_interval_max = 24, connection_interval_max = int(prefs.connection_interval_max / 1.25),
conn_latency = 0, max_latency = prefs.max_latency,
supervision_timeout = 72, supervision_timeout = int(prefs.supervision_timeout / 10),
minimum_ce_length = 0, min_ce_length = int(prefs.min_ce_length / 0.625),
maximum_ce_length = 0 max_ce_length = int(prefs.max_ce_length / 0.625),
)) ))
else: else:
# TODO: use real values, not fixed ones # TODO: allow passing other settings
result = await self.send_command(HCI_Create_Connection_Command( result = await self.send_command(HCI_Create_Connection_Command(
bd_addr = peer_address, bd_addr = peer_address,
packet_type = 0xCC18, # FIXME: change packet_type = 0xCC18, # FIXME: change
@@ -881,14 +1244,26 @@ class Device(CompositeEventEmitter):
reserved = 0 reserved = 0
)) ))
try:
if result.status != HCI_Command_Status_Event.PENDING: if result.status != HCI_Command_Status_Event.PENDING:
raise HCI_StatusError(result) raise HCI_StatusError(result)
# Wait for the connection process to complete # Wait for the connection process to complete
self.connecting = True self.connecting = True
if timeout is None:
return await pending_connection return await pending_connection
else:
try:
return await asyncio.wait_for(asyncio.shield(pending_connection), timeout)
except asyncio.TimeoutError:
if transport == BT_LE_TRANSPORT:
await self.send_command(HCI_LE_Create_Connection_Cancel_Command())
else:
await self.send_command(HCI_Create_Connection_Cancel_Command(bd_addr=peer_address))
try:
return await pending_connection
except ConnectionError:
raise TimeoutError()
finally: finally:
self.remove_listener('connection', pending_connection.set_result) self.remove_listener('connection', pending_connection.set_result)
self.remove_listener('connection_failure', pending_connection.set_exception) self.remove_listener('connection_failure', pending_connection.set_exception)
@@ -913,7 +1288,7 @@ class Device(CompositeEventEmitter):
async def cancel_connection(self): async def cancel_connection(self):
if not self.is_connecting: if not self.is_connecting:
return return
await self.send_command(HCI_LE_Create_Connection_Cancel_Command()) await self.send_command(HCI_LE_Create_Connection_Cancel_Command(), check_result=True)
async def disconnect(self, connection, reason): async def disconnect(self, connection, reason):
# Create a future so that we can wait for the disconnection's result # Create a future so that we can wait for the disconnection's result
@@ -922,7 +1297,9 @@ class Device(CompositeEventEmitter):
connection.on('disconnection_failure', pending_disconnection.set_exception) connection.on('disconnection_failure', pending_disconnection.set_exception)
# Request a disconnection # Request a disconnection
result = await self.send_command(HCI_Disconnect_Command(connection_handle = connection.handle, reason = reason)) result = await self.send_command(HCI_Disconnect_Command(
connection_handle = connection.handle, reason = reason
))
try: try:
if result.status != HCI_Command_Status_Event.PENDING: if result.status != HCI_Command_Status_Event.PENDING:
@@ -939,26 +1316,66 @@ class Device(CompositeEventEmitter):
async def update_connection_parameters( async def update_connection_parameters(
self, self,
connection, connection,
conn_interval_min, connection_interval_min,
conn_interval_max, connection_interval_max,
conn_latency, max_latency,
supervision_timeout, supervision_timeout,
minimum_ce_length = 0, min_ce_length = 0,
maximum_ce_length = 0 max_ce_length = 0
): ):
''' '''
NOTE: the name of the parameters may look odd, but it just follows the names used in the Bluetooth spec. NOTE: the name of the parameters may look odd, but it just follows the names used in the Bluetooth spec.
''' '''
await self.send_command(HCI_LE_Connection_Update_Command( await self.send_command(HCI_LE_Connection_Update_Command(
connection_handle = connection.handle, connection_handle = connection.handle,
conn_interval_min = conn_interval_min, connection_interval_min = connection_interval_min,
conn_interval_max = conn_interval_max, connection_interval_max = connection_interval_max,
conn_latency = conn_latency, max_latency = max_latency,
supervision_timeout = supervision_timeout, supervision_timeout = supervision_timeout,
minimum_ce_length = minimum_ce_length, min_ce_length = min_ce_length,
maximum_ce_length = maximum_ce_length max_ce_length = max_ce_length
)) ), check_result=True)
# TODO: check result
async def get_connection_rssi(self, connection):
result = await self.send_command(HCI_Read_RSSI_Command(handle = connection.handle), check_result=True)
return result.return_parameters.rssi
async def get_connection_phy(self, connection):
result = await self.send_command(
HCI_LE_Read_PHY_Command(connection_handle = connection.handle),
check_result=True
)
return (result.return_parameters.tx_phy, result.return_parameters.rx_phy)
async def set_connection_phy(
self,
connection,
tx_phys=None,
rx_phys=None,
phy_options=None
):
all_phys_bits = (1 if tx_phys is None else 0) | ((1 if rx_phys is None else 0) << 1)
return await self.send_command(
HCI_LE_Set_PHY_Command(
connection_handle = connection.handle,
all_phys = all_phys_bits,
tx_phys = phy_list_to_bits(tx_phys),
rx_phys = phy_list_to_bits(rx_phys),
phy_options = 0 if phy_options is None else int(phy_options)
), check_result=True
)
async def set_default_phy(self, tx_phys=None, rx_phys=None):
all_phys_bits = (1 if tx_phys is None else 0) | ((1 if rx_phys is None else 0) << 1)
return await self.send_command(
HCI_LE_Set_Default_PHY_Command(
all_phys = all_phys_bits,
tx_phys = phy_list_to_bits(tx_phys),
rx_phys = phy_list_to_bits(rx_phys)
), check_result=True
)
async def find_peer_by_name(self, name, transport=BT_LE_TRANSPORT): async def find_peer_by_name(self, name, transport=BT_LE_TRANSPORT):
""" """
@@ -982,8 +1399,7 @@ class Device(CompositeEventEmitter):
event_name = 'advertisement' event_name = 'advertisement'
handler = self.on( handler = self.on(
event_name, event_name,
lambda address, ad_data, rssi, connectable: lambda advertisement: on_peer_found(advertisement.address, advertisement.data)
on_peer_found(address, ad_data)
) )
was_scanning = self.scanning was_scanning = self.scanning
@@ -1228,6 +1644,23 @@ class Device(CompositeEventEmitter):
if connection_handle in self.connections: if connection_handle in self.connections:
logger.warn('new connection reuses the same handle as a previous connection') logger.warn('new connection reuses the same handle as a previous connection')
if transport == BT_BR_EDR_TRANSPORT:
# Create a new connection
connection = Connection(
self,
connection_handle,
transport,
peer_address,
peer_resolvable_address,
role,
connection_parameters,
phy=None
)
self.connections[connection_handle] = connection
# Emit an event to notify listeners of the new connection
self.emit('connection', connection)
else:
# Resolve the peer address if we can # Resolve the peer address if we can
if self.address_resolver: if self.address_resolver:
if peer_address.is_resolvable: if peer_address.is_resolvable:
@@ -1237,6 +1670,21 @@ class Device(CompositeEventEmitter):
peer_resolvable_address = peer_address peer_resolvable_address = peer_address
peer_address = resolved_address peer_address = resolved_address
# We are no longer advertising
self.advertising = False
# Create and notify of the new connection asynchronously
async def new_connection():
# Figure out which PHY we're connected with
if self.host.supports_command(HCI_LE_READ_PHY_COMMAND):
result = await self.send_command(
HCI_LE_Read_PHY_Command(connection_handle=connection_handle),
check_result=True
)
phy = ConnectionPHY(result.return_parameters.tx_phy, result.return_parameters.rx_phy)
else:
phy = ConnectionPHY(HCI_LE_1M_PHY, HCI_LE_1M_PHY)
# Create a new connection # Create a new connection
connection = Connection( connection = Connection(
self, self,
@@ -1245,19 +1693,25 @@ class Device(CompositeEventEmitter):
peer_address, peer_address,
peer_resolvable_address, peer_resolvable_address,
role, role,
connection_parameters connection_parameters,
phy
) )
self.connections[connection_handle] = connection self.connections[connection_handle] = connection
# We are no longer advertising
self.advertising = False
# Emit an event to notify listeners of the new connection # Emit an event to notify listeners of the new connection
self.emit('connection', connection) self.emit('connection', connection)
asyncio.create_task(new_connection())
@host_event_handler @host_event_handler
def on_connection_failure(self, error_code): def on_connection_failure(self, connection_handle, error_code):
logger.debug(f'*** Connection failed: {error_code}') logger.debug(f'*** Connection failed: {HCI_Constant.error_name(error_code)}')
# For directed advertising, this means a timeout
if self.advertising and self.advertising_type.is_directed:
self.advertising = False
# Notify listeners
error = ConnectionError( error = ConnectionError(
error_code, error_code,
'hci', 'hci',
@@ -1280,7 +1734,10 @@ class Device(CompositeEventEmitter):
# Restart advertising if auto-restart is enabled # Restart advertising if auto-restart is enabled
if self.auto_restart_advertising: if self.auto_restart_advertising:
logger.debug('restarting advertising') logger.debug('restarting advertising')
asyncio.create_task(self.start_advertising(auto_restart=self.auto_restart_advertising)) asyncio.create_task(self.start_advertising(
advertising_type = self.advertising_type,
auto_restart = True
))
@host_event_handler @host_event_handler
@with_connection_from_handle @with_connection_from_handle

File diff suppressed because it is too large Load Diff

View File

@@ -76,7 +76,7 @@ class Host(EventEmitter):
self.hc_total_num_acl_data_packets = HOST_HC_TOTAL_NUM_ACL_DATA_PACKETS self.hc_total_num_acl_data_packets = HOST_HC_TOTAL_NUM_ACL_DATA_PACKETS
self.acl_packet_queue = collections.deque() self.acl_packet_queue = collections.deque()
self.acl_packets_in_flight = 0 self.acl_packets_in_flight = 0
self.local_version = HCI_VERSION_BLUETOOTH_CORE_4_0 self.local_version = None
self.local_supported_commands = bytes(64) self.local_supported_commands = bytes(64)
self.local_le_features = 0 self.local_le_features = 0
self.command_semaphore = asyncio.Semaphore(1) self.command_semaphore = asyncio.Semaphore(1)
@@ -91,32 +91,23 @@ class Host(EventEmitter):
self.set_packet_sink(controller_sink) self.set_packet_sink(controller_sink)
async def reset(self): async def reset(self):
await self.send_command(HCI_Reset_Command()) await self.send_command(HCI_Reset_Command(), check_result=True)
self.ready = True self.ready = True
response = await self.send_command(HCI_Read_Local_Supported_Commands_Command()) response = await self.send_command(HCI_Read_Local_Supported_Commands_Command(), check_result=True)
if response.return_parameters.status == HCI_SUCCESS:
self.local_supported_commands = response.return_parameters.supported_commands self.local_supported_commands = response.return_parameters.supported_commands
else:
logger.warn(f'HCI_Read_Local_Supported_Commands_Command failed: {response.return_parameters.status}')
if self.supports_command(HCI_LE_READ_LOCAL_SUPPORTED_FEATURES_COMMAND): if self.supports_command(HCI_LE_READ_LOCAL_SUPPORTED_FEATURES_COMMAND):
response = await self.send_command(HCI_LE_Read_Local_Supported_Features_Command()) response = await self.send_command(HCI_LE_Read_Local_Supported_Features_Command(), check_result=True)
if response.return_parameters.status == HCI_SUCCESS:
self.local_le_features = struct.unpack('<Q', response.return_parameters.le_features)[0] self.local_le_features = struct.unpack('<Q', response.return_parameters.le_features)[0]
else:
logger.warn(f'HCI_LE_Read_Supported_Features_Command failed: {response.return_parameters.status}')
if self.supports_command(HCI_READ_LOCAL_VERSION_INFORMATION_COMMAND): if self.supports_command(HCI_READ_LOCAL_VERSION_INFORMATION_COMMAND):
response = await self.send_command(HCI_Read_Local_Version_Information_Command()) response = await self.send_command(HCI_Read_Local_Version_Information_Command(), check_result=True)
if response.return_parameters.status == HCI_SUCCESS:
self.local_version = response.return_parameters self.local_version = response.return_parameters
else:
logger.warn(f'HCI_Read_Local_Version_Information_Command failed: {response.return_parameters.status}')
await self.send_command(HCI_Set_Event_Mask_Command(event_mask = bytes.fromhex('FFFFFFFFFFFFFF3F'))) await self.send_command(HCI_Set_Event_Mask_Command(event_mask = bytes.fromhex('FFFFFFFFFFFFFF3F')))
if self.local_version.hci_version <= HCI_VERSION_BLUETOOTH_CORE_4_0: if self.local_version is not None and self.local_version.hci_version <= HCI_VERSION_BLUETOOTH_CORE_4_0:
# Some older controllers don't like event masks with bits they don't understand # Some older controllers don't like event masks with bits they don't understand
le_event_mask = bytes.fromhex('1F00000000000000') le_event_mask = bytes.fromhex('1F00000000000000')
else: else:
@@ -124,20 +115,14 @@ class Host(EventEmitter):
await self.send_command(HCI_LE_Set_Event_Mask_Command(le_event_mask = le_event_mask)) await self.send_command(HCI_LE_Set_Event_Mask_Command(le_event_mask = le_event_mask))
if self.supports_command(HCI_READ_BUFFER_SIZE_COMMAND): if self.supports_command(HCI_READ_BUFFER_SIZE_COMMAND):
response = await self.send_command(HCI_Read_Buffer_Size_Command()) response = await self.send_command(HCI_Read_Buffer_Size_Command(), check_result=True)
if response.return_parameters.status == HCI_SUCCESS:
self.hc_acl_data_packet_length = response.return_parameters.hc_acl_data_packet_length self.hc_acl_data_packet_length = response.return_parameters.hc_acl_data_packet_length
self.hc_total_num_acl_data_packets = response.return_parameters.hc_total_num_acl_data_packets self.hc_total_num_acl_data_packets = response.return_parameters.hc_total_num_acl_data_packets
else:
logger.warn(f'HCI_Read_Buffer_Size_Command failed: {response.return_parameters.status}')
if self.supports_command(HCI_LE_READ_BUFFER_SIZE_COMMAND): if self.supports_command(HCI_LE_READ_BUFFER_SIZE_COMMAND):
response = await self.send_command(HCI_LE_Read_Buffer_Size_Command()) response = await self.send_command(HCI_LE_Read_Buffer_Size_Command(), check_result=True)
if response.return_parameters.status == HCI_SUCCESS:
self.hc_le_acl_data_packet_length = response.return_parameters.hc_le_acl_data_packet_length self.hc_le_acl_data_packet_length = response.return_parameters.hc_le_acl_data_packet_length
self.hc_total_num_le_acl_data_packets = response.return_parameters.hc_total_num_le_acl_data_packets self.hc_total_num_le_acl_data_packets = response.return_parameters.hc_total_num_le_acl_data_packets
else:
logger.warn(f'HCI_LE_Read_Buffer_Size_Command failed: {response.return_parameters.status}')
if response.return_parameters.hc_le_acl_data_packet_length == 0 or response.return_parameters.hc_total_num_le_acl_data_packets == 0: if response.return_parameters.hc_le_acl_data_packet_length == 0 or response.return_parameters.hc_total_num_le_acl_data_packets == 0:
# LE and Classic share the same values # LE and Classic share the same values
@@ -171,7 +156,7 @@ class Host(EventEmitter):
def send_hci_packet(self, packet): def send_hci_packet(self, packet):
self.hci_sink.on_packet(packet.to_bytes()) self.hci_sink.on_packet(packet.to_bytes())
async def send_command(self, command): async def send_command(self, command, check_result=False):
logger.debug(f'{color("### HOST -> CONTROLLER", "blue")}: {command}') logger.debug(f'{color("### HOST -> CONTROLLER", "blue")}: {command}')
# Wait until we can send (only one pending command at a time) # Wait until we can send (only one pending command at a time)
@@ -186,11 +171,22 @@ class Host(EventEmitter):
try: try:
self.send_hci_packet(command) self.send_hci_packet(command)
response = await self.pending_response response = await self.pending_response
# TODO: check error values
# Check the return parameters if required
if check_result:
if type(response.return_parameters) is int:
status = response.return_parameters
else:
status = response.return_parameters.status
if status != HCI_SUCCESS:
logger.warning(f'{command.name} failed ({HCI_Constant.error_name(status)})')
raise HCI_Error(status)
return response return response
except Exception as error: except Exception as error:
logger.warning(f'{color("!!! Exception while sending HCI packet:", "red")} {error}') logger.warning(f'{color("!!! Exception while sending HCI packet:", "red")} {error}')
# raise error raise error
finally: finally:
self.pending_command = None self.pending_command = None
self.pending_response = None self.pending_response = None
@@ -370,8 +366,8 @@ class Host(EventEmitter):
# Notify the client # Notify the client
connection_parameters = ConnectionParameters( connection_parameters = ConnectionParameters(
event.conn_interval, event.connection_interval,
event.conn_latency, event.peripheral_latency,
event.supervision_timeout event.supervision_timeout
) )
self.emit( self.emit(
@@ -387,7 +383,7 @@ class Host(EventEmitter):
logger.debug(f'### CONNECTION FAILED: {event.status}') logger.debug(f'### CONNECTION FAILED: {event.status}')
# Notify the listeners # Notify the listeners
self.emit('connection_failure', event.status) self.emit('connection_failure', event.connection_handle, event.status)
def on_hci_le_enhanced_connection_complete_event(self, event): def on_hci_le_enhanced_connection_complete_event(self, event):
# Just use the same implementation as for the non-enhanced event for now # Just use the same implementation as for the non-enhanced event for now
@@ -435,7 +431,7 @@ class Host(EventEmitter):
logger.debug(f'### DISCONNECTION FAILED: {event.status}') logger.debug(f'### DISCONNECTION FAILED: {event.status}')
# Notify the listeners # Notify the listeners
self.emit('disconnection_failure', event.status) self.emit('disconnection_failure', event.connection_handle, event.status)
def on_hci_le_connection_update_complete_event(self, event): def on_hci_le_connection_update_complete_event(self, event):
if (connection := self.connections.get(event.connection_handle)) is None: if (connection := self.connections.get(event.connection_handle)) is None:
@@ -445,8 +441,8 @@ class Host(EventEmitter):
# Notify the client # Notify the client
if event.status == HCI_SUCCESS: if event.status == HCI_SUCCESS:
connection_parameters = ConnectionParameters( connection_parameters = ConnectionParameters(
event.conn_interval, event.connection_interval,
event.conn_latency, event.peripheral_latency,
event.supervision_timeout event.supervision_timeout
) )
self.emit('connection_parameters_update', connection.handle, connection_parameters) self.emit('connection_parameters_update', connection.handle, connection_parameters)
@@ -467,13 +463,10 @@ class Host(EventEmitter):
def on_hci_le_advertising_report_event(self, event): def on_hci_le_advertising_report_event(self, event):
for report in event.reports: for report in event.reports:
self.emit( self.emit('advertising_report', report)
'advertising_report',
report.address, def on_hci_le_extended_advertising_report_event(self, event):
report.data, self.on_hci_le_advertising_report_event(event)
report.rssi,
report.event_type
)
def on_hci_le_remote_connection_parameter_request_event(self, event): def on_hci_le_remote_connection_parameter_request_event(self, event):
if event.connection_handle not in self.connections: if event.connection_handle not in self.connections:
@@ -489,8 +482,8 @@ class Host(EventEmitter):
interval_max = event.interval_max, interval_max = event.interval_max,
latency = event.latency, latency = event.latency,
timeout = event.timeout, timeout = event.timeout,
minimum_ce_length = 0, min_ce_length = 0,
maximum_ce_length = 0 max_ce_length = 0
) )
) )

View File

@@ -464,8 +464,8 @@ class L2CAP_Information_Response(L2CAP_Control_Frame):
@L2CAP_Control_Frame.subclass([ @L2CAP_Control_Frame.subclass([
('interval_min', 2), ('interval_min', 2),
('interval_max', 2), ('interval_max', 2),
('slave_latency', 2), ('latency', 2),
('timeout_multiplier', 2) ('timeout', 2)
]) ])
class L2CAP_Connection_Parameter_Update_Request(L2CAP_Control_Frame): class L2CAP_Connection_Parameter_Update_Request(L2CAP_Control_Frame):
''' '''
@@ -1058,12 +1058,12 @@ class ChannelManager:
) )
self.host.send_command_sync(HCI_LE_Connection_Update_Command( self.host.send_command_sync(HCI_LE_Connection_Update_Command(
connection_handle = connection.handle, connection_handle = connection.handle,
conn_interval_min = request.interval_min, connection_interval_min = request.interval_min,
conn_interval_max = request.interval_max, connection_interval_max = request.interval_max,
conn_latency = request.slave_latency, max_latency = request.latency,
supervision_timeout = request.timeout_multiplier, supervision_timeout = request.timeout,
minimum_ce_length = 0, min_ce_length = 0,
maximum_ce_length = 0 max_ce_length = 0
)) ))
else: else:
self.send_control_frame( self.send_control_frame(

View File

@@ -5,8 +5,9 @@ The Android emulator transport either connects, as a host, to a "Root Canal" vir
("host" mode), or attaches a virtual controller to the Android Bluetooth host stack ("controller" mode). ("host" mode), or attaches a virtual controller to the Android Bluetooth host stack ("controller" mode).
## Moniker ## Moniker
The moniker syntax for an Android Emulator transport is: `android-emulator:[mode=<host|controller>][mode=<host|controller>]`. The moniker syntax for an Android Emulator transport is: `android-emulator:[mode=<host|controller>][<hostname>:<port>]`, where
Both the `mode=<host|controller>` and `mode=<host|controller>` parameters are optional (so the moniker `android-emulator` by itself is a valid moniker, which will create a transport in `host` mode, connected to `localhost` on the default gRPC port for the emulator) the `mode` parameter can specify running as a host or a controller, and `<hostname>:<port>` can specify a host name (or IP address) and TCP port number on which to reach the gRPC server for the emulator.
Both the `mode=<host|controller>` and `<hostname>:<port>` parameters are optional (so the moniker `android-emulator` by itself is a valid moniker, which will create a transport in `host` mode, connected to `localhost` on the default gRPC port for the emulator).
!!! example Example !!! example Example
`android-emulator` `android-emulator`

View File

@@ -29,18 +29,31 @@ from bumble.transport import open_transport_or_link
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
async def main(): async def main():
if len(sys.argv) != 3: if len(sys.argv) < 3:
print('Usage: run_advertiser.py <config-file> <transport-spec>') print('Usage: run_advertiser.py <config-file> <transport-spec> [type] [address]')
print('example: run_advertiser.py device1.json link-relay:ws://localhost:8888/test') print('example: run_advertiser.py device1.json usb:0')
return return
if len(sys.argv) >= 4:
advertising_type = AdvertisingType(int(sys.argv[3]))
else:
advertising_type = AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE
if advertising_type.is_directed:
if len(sys.argv) < 5:
print('<address> required for directed advertising')
return
target = Address(sys.argv[4])
else:
target = None
print('<<< connecting to HCI...') print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
print('<<< connected') print('<<< connected')
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
await device.power_on() await device.power_on()
await device.start_advertising() await device.start_advertising(advertising_type=advertising_type, target=target)
await hci_source.wait_for_termination() await hci_source.wait_for_termination()
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------

View File

@@ -29,15 +29,15 @@ from bumble.transport import open_transport_or_link
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class ScannerListener(Device.Listener): class ScannerListener(Device.Listener):
def on_advertisement(self, address, ad_data, rssi, connectable): def on_advertisement(self, advertisement):
address_type_string = ('P', 'R', 'PI', 'RI')[address.address_type] address_type_string = ('P', 'R', 'PI', 'RI')[advertisement.address.address_type]
address_color = 'yellow' if connectable else 'red' address_color = 'yellow' if advertisement.is_connectable else 'red'
if address_type_string.startswith('P'): if address_type_string.startswith('P'):
type_color = 'green' type_color = 'green'
else: else:
type_color = 'cyan' type_color = 'cyan'
print(f'>>> {color(address, address_color)} [{color(address_type_string, type_color)}]: RSSI={rssi}, {ad_data}') print(f'>>> {color(advertisement.address, address_color)} [{color(address_type_string, type_color)}]: RSSI={advertisement.rssi}, {advertisement.data}')
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------

View File

@@ -40,24 +40,24 @@ async def main():
device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink) device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink)
@device.on('advertisement') @device.on('advertisement')
def _(address, ad_data, rssi, connectable): def _(advertisement):
address_type_string = ('PUBLIC', 'RANDOM', 'PUBLIC_ID', 'RANDOM_ID')[address.address_type] address_type_string = ('PUBLIC', 'RANDOM', 'PUBLIC_ID', 'RANDOM_ID')[advertisement.address.address_type]
address_color = 'yellow' if connectable else 'red' address_color = 'yellow' if advertisement.is_connectable else 'red'
address_qualifier = '' address_qualifier = ''
if address_type_string.startswith('P'): if address_type_string.startswith('P'):
type_color = 'cyan' type_color = 'cyan'
else: else:
if address.is_static: if advertisement.address.is_static:
type_color = 'green' type_color = 'green'
address_qualifier = '(static)' address_qualifier = '(static)'
elif address.is_resolvable: elif advertisement.address.is_resolvable:
type_color = 'magenta' type_color = 'magenta'
address_qualifier = '(resolvable)' address_qualifier = '(resolvable)'
else: else:
type_color = 'white' type_color = 'white'
separator = '\n ' separator = '\n '
print(f'>>> {color(address, address_color)} [{color(address_type_string, type_color)}]{address_qualifier}:{separator}RSSI:{rssi}{separator}{ad_data.to_string(separator)}') print(f'>>> {color(advertisement.address, address_color)} [{color(address_type_string, type_color)}]{address_qualifier}:{separator}RSSI:{advertisement.rssi}{separator}{advertisement.data.to_string(separator)}')
await device.power_on() await device.power_on()
await device.start_scanning(filter_duplicates=filter_duplicates) await device.start_scanning(filter_duplicates=filter_duplicates)

View File

@@ -27,8 +27,8 @@ def basic_check(x):
parsed_str = str(parsed) parsed_str = str(parsed)
print(x_str) print(x_str)
parsed_bytes = parsed.to_bytes() parsed_bytes = parsed.to_bytes()
assert(x_str == parsed_str) assert x_str == parsed_str
assert(packet == parsed_bytes) assert packet == parsed_bytes
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -49,10 +49,10 @@ def test_HCI_LE_Connection_Complete_Event():
role = 1, role = 1,
peer_address_type = 1, peer_address_type = 1,
peer_address = address, peer_address = address,
conn_interval = 3, connection_interval = 3,
conn_latency = 4, peripheral_latency = 4,
supervision_timeout = 5, supervision_timeout = 5,
master_clock_accuracy = 6 central_clock_accuracy = 6
) )
basic_check(event) basic_check(event)
@@ -60,8 +60,8 @@ def test_HCI_LE_Connection_Complete_Event():
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_HCI_LE_Advertising_Report_Event(): def test_HCI_LE_Advertising_Report_Event():
address = Address('00:11:22:33:44:55') address = Address('00:11:22:33:44:55')
report = HCI_Object( report = HCI_LE_Advertising_Report_Event.Report(
HCI_LE_Advertising_Report_Event.REPORT_FIELDS, HCI_LE_Advertising_Report_Event.Report.FIELDS,
event_type = HCI_LE_Advertising_Report_Event.ADV_IND, event_type = HCI_LE_Advertising_Report_Event.ADV_IND,
address_type = Address.PUBLIC_DEVICE_ADDRESS, address_type = Address.PUBLIC_DEVICE_ADDRESS,
address = address, address = address,
@@ -87,8 +87,8 @@ def test_HCI_LE_Connection_Update_Complete_Event():
event = HCI_LE_Connection_Update_Complete_Event( event = HCI_LE_Connection_Update_Complete_Event(
status = HCI_SUCCESS, status = HCI_SUCCESS,
connection_handle = 0x007, connection_handle = 0x007,
conn_interval = 10, connection_interval = 10,
conn_latency = 3, peripheral_latency = 3,
supervision_timeout = 5 supervision_timeout = 5
) )
basic_check(event) basic_check(event)
@@ -133,7 +133,7 @@ def test_HCI_Command_Complete_Event():
) )
basic_check(event) basic_check(event)
event = HCI_Packet.from_bytes(event.to_bytes()) event = HCI_Packet.from_bytes(event.to_bytes())
assert(event.return_parameters == 7) assert event.return_parameters == 7
# With a simple status as an integer status # With a simple status as an integer status
event = HCI_Command_Complete_Event( event = HCI_Command_Complete_Event(
@@ -142,7 +142,7 @@ def test_HCI_Command_Complete_Event():
return_parameters = 9 return_parameters = 9
) )
basic_check(event) basic_check(event)
assert(event.return_parameters == 9) assert event.return_parameters == 9
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -283,12 +283,32 @@ def test_HCI_LE_Create_Connection_Command():
peer_address_type = 1, peer_address_type = 1,
peer_address = Address('00:11:22:33:44:55'), peer_address = Address('00:11:22:33:44:55'),
own_address_type = 2, own_address_type = 2,
conn_interval_min = 7, connection_interval_min = 7,
conn_interval_max = 8, connection_interval_max = 8,
conn_latency = 9, max_latency = 9,
supervision_timeout = 10, supervision_timeout = 10,
minimum_ce_length = 11, min_ce_length = 11,
maximum_ce_length = 12 max_ce_length = 12
)
basic_check(command)
# -----------------------------------------------------------------------------
def test_HCI_LE_Extended_Create_Connection_Command():
command = HCI_LE_Extended_Create_Connection_Command(
initiator_filter_policy = 0,
own_address_type = 0,
peer_address_type = 1,
peer_address = Address('00:11:22:33:44:55'),
initiating_phys = 3,
scan_intervals = (10, 11),
scan_windows = (12, 13),
connection_interval_mins = (14, 15),
connection_interval_maxs = (16, 17),
max_latencies = (18, 19),
supervision_timeouts = (20, 21),
min_ce_lengths = (100, 101),
max_ce_lengths = (102, 103)
) )
basic_check(command) basic_check(command)
@@ -315,12 +335,12 @@ def test_HCI_LE_Remove_Device_From_Filter_Accept_List_Command():
def test_HCI_LE_Connection_Update_Command(): def test_HCI_LE_Connection_Update_Command():
command = HCI_LE_Connection_Update_Command( command = HCI_LE_Connection_Update_Command(
connection_handle = 0x0002, connection_handle = 0x0002,
conn_interval_min = 10, connection_interval_min = 10,
conn_interval_max = 20, connection_interval_max = 20,
conn_latency = 7, max_latency = 7,
supervision_timeout = 3, supervision_timeout = 3,
minimum_ce_length = 100, min_ce_length = 100,
maximum_ce_length = 200 max_ce_length = 200
) )
basic_check(command) basic_check(command)
@@ -348,7 +368,7 @@ def test_HCI_LE_Set_Extended_Scan_Parameters_Command():
command = HCI_LE_Set_Extended_Scan_Parameters_Command( command = HCI_LE_Set_Extended_Scan_Parameters_Command(
own_address_type=Address.RANDOM_DEVICE_ADDRESS, own_address_type=Address.RANDOM_DEVICE_ADDRESS,
scanning_filter_policy=HCI_LE_Set_Extended_Scan_Parameters_Command.BASIC_FILTERED_POLICY, scanning_filter_policy=HCI_LE_Set_Extended_Scan_Parameters_Command.BASIC_FILTERED_POLICY,
scanning_phys=(1 << HCI_LE_Set_Extended_Scan_Parameters_Command.LE_1M_PHY | 1 << HCI_LE_Set_Extended_Scan_Parameters_Command.LE_CODED_PHY | 1 << 4), scanning_phys=(1 << HCI_LE_1M_PHY_BIT | 1 << HCI_LE_CODED_PHY_BIT | 1 << 4),
scan_types=[ scan_types=[
HCI_LE_Set_Extended_Scan_Parameters_Command.ACTIVE_SCANNING, HCI_LE_Set_Extended_Scan_Parameters_Command.ACTIVE_SCANNING,
HCI_LE_Set_Extended_Scan_Parameters_Command.ACTIVE_SCANNING, HCI_LE_Set_Extended_Scan_Parameters_Command.ACTIVE_SCANNING,
@@ -363,20 +383,20 @@ def test_HCI_LE_Set_Extended_Scan_Parameters_Command():
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_address(): def test_address():
a = Address('C4:F2:17:1A:1D:BB') a = Address('C4:F2:17:1A:1D:BB')
assert(not a.is_public) assert not a.is_public
assert(a.is_random) assert a.is_random
assert(a.address_type == Address.RANDOM_DEVICE_ADDRESS) assert a.address_type == Address.RANDOM_DEVICE_ADDRESS
assert(not a.is_resolvable) assert not a.is_resolvable
assert(not a.is_resolved) assert not a.is_resolved
assert(a.is_static) assert a.is_static
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_custom(): def test_custom():
data = bytes([0x77, 0x02, 0x01, 0x03]) data = bytes([0x77, 0x02, 0x01, 0x03])
packet = HCI_CustomPacket(data) packet = HCI_CustomPacket(data)
assert(packet.hci_packet_type == 0x77) assert packet.hci_packet_type == 0x77
assert(packet.payload == data) assert packet.payload == data
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -408,6 +428,7 @@ def run_test_commands():
test_HCI_LE_Set_Scan_Parameters_Command() test_HCI_LE_Set_Scan_Parameters_Command()
test_HCI_LE_Set_Scan_Enable_Command() test_HCI_LE_Set_Scan_Enable_Command()
test_HCI_LE_Create_Connection_Command() test_HCI_LE_Create_Connection_Command()
test_HCI_LE_Extended_Create_Connection_Command()
test_HCI_LE_Add_Device_To_Filter_Accept_List_Command() test_HCI_LE_Add_Device_To_Filter_Accept_List_Command()
test_HCI_LE_Remove_Device_From_Filter_Accept_List_Command() test_HCI_LE_Remove_Device_From_Filter_Accept_List_Command()
test_HCI_LE_Connection_Update_Command() test_HCI_LE_Connection_Update_Command()

View File

@@ -62,6 +62,57 @@ def test_import():
assert utils assert utils
# -----------------------------------------------------------------------------
def test_app_imports():
from bumble.apps.console import main
assert main
from bumble.apps.controller_info import main
assert main
from bumble.apps.controllers import main
assert main
from bumble.apps.gatt_dump import main
assert main
from bumble.apps.gg_bridge import main
assert main
from bumble.apps.hci_bridge import main
assert main
from bumble.apps.pair import main
assert main
from bumble.apps.scan import main
assert main
from bumble.apps.show import main
assert main
from bumble.apps.unbond import main
assert main
from bumble.apps.usb_probe import main
assert main
# -----------------------------------------------------------------------------
def test_profiles_imports():
from bumble.profiles import (
battery_service,
device_information_service,
heart_rate_service
)
assert battery_service
assert device_information_service
assert heart_rate_service
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
if __name__ == '__main__': if __name__ == '__main__':
test_import() test_import()
test_app_imports()
test_profiles_imports()

View File

@@ -21,9 +21,9 @@ from bumble.transport import PacketParser
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class ScannerListener(Device.Listener): class ScannerListener(Device.Listener):
def on_advertisement(self, address, ad_data, rssi, connectable): def on_advertisement(self, advertisement):
address_type_string = ('P', 'R', 'PI', 'RI')[address.address_type] address_type_string = ('P', 'R', 'PI', 'RI')[advertisement.address.address_type]
print(f'>>> {address} [{address_type_string}]: RSSI={rssi}, {ad_data}') print(f'>>> {advertisement.address} [{address_type_string}]: RSSI={advertisement.rssi}, {advertisement.ad_data}')
class HciSource: class HciSource: