diff --git a/apps/console.py b/apps/console.py
index 7ddbae40..c72d0226 100644
--- a/apps/console.py
+++ b/apps/console.py
@@ -28,11 +28,16 @@ import click
from collections import OrderedDict
import colors
-from bumble.core import UUID, AdvertisingData
-from bumble.device import Device, Connection, Peer
+from bumble.core import UUID, AdvertisingData, TimeoutError, BT_LE_TRANSPORT
+from bumble.device import ConnectionParametersPreferences, Device, Connection, Peer
from bumble.utils import AsyncRunner
from bumble.transport import open_transport_or_link
from bumble.gatt import Characteristic
+from bumble.hci import (
+ HCI_LE_1M_PHY,
+ HCI_LE_2M_PHY,
+ HCI_LE_CODED_PHY,
+)
from prompt_toolkit import Application
from prompt_toolkit.history import FileHistory
@@ -43,6 +48,7 @@ from prompt_toolkit.styles import Style
from prompt_toolkit.filters import Condition
from prompt_toolkit.widgets import TextArea, Frame
from prompt_toolkit.widgets.toolbars import FormattedTextToolbar
+from prompt_toolkit.data_structures import Point
from prompt_toolkit.layout import (
Layout,
HSplit,
@@ -51,17 +57,20 @@ from prompt_toolkit.layout import (
Float,
FormattedTextControl,
FloatContainer,
- ConditionalContainer
+ ConditionalContainer,
+ Dimension
)
# -----------------------------------------------------------------------------
# Constants
# -----------------------------------------------------------------------------
-BUMBLE_USER_DIR = os.path.expanduser('~/.bumble')
-DEFAULT_PROMPT_HEIGHT = 20
-DEFAULT_RSSI_BAR_WIDTH = 20
-DISPLAY_MIN_RSSI = -100
-DISPLAY_MAX_RSSI = -30
+BUMBLE_USER_DIR = os.path.expanduser('~/.bumble')
+DEFAULT_RSSI_BAR_WIDTH = 20
+DEFAULT_CONNECTION_TIMEOUT = 30.0
+DISPLAY_MIN_RSSI = -100
+DISPLAY_MAX_RSSI = -30
+RSSI_MONITOR_INTERVAL = 5.0 # Seconds
+
# -----------------------------------------------------------------------------
# Globals
@@ -69,16 +78,57 @@ DISPLAY_MAX_RSSI = -30
App = None
+# -----------------------------------------------------------------------------
+# Utils
+# -----------------------------------------------------------------------------
+
+def le_phy_name(phy_id):
+ return {
+ HCI_LE_1M_PHY: '1M',
+ HCI_LE_2M_PHY: '2M',
+ HCI_LE_CODED_PHY: 'CODED'
+ }.get(phy_id, HCI_Constant.le_phy_name(phy_id))
+
+
+def rssi_bar(rssi):
+ blocks = ['', '▏', '▎', '▍', '▌', '▋', '▊', '▉']
+ bar_width = (rssi - DISPLAY_MIN_RSSI) / (DISPLAY_MAX_RSSI - DISPLAY_MIN_RSSI)
+ bar_width = min(max(bar_width, 0), 1)
+ bar_ticks = int(bar_width * DEFAULT_RSSI_BAR_WIDTH * 8)
+ bar_blocks = ('█' * int(bar_ticks / 8)) + blocks[bar_ticks % 8]
+ return f'{rssi:4} {bar_blocks}'
+
+
+def parse_phys(phys):
+ if phys.lower() == '*':
+ return None
+ else:
+ phy_list = []
+ elements = phys.lower().split(',')
+ for element in elements:
+ if element == '1m':
+ phy_list.append(HCI_LE_1M_PHY)
+ elif element == '2m':
+ phy_list.append(HCI_LE_2M_PHY)
+ elif element == 'coded':
+ phy_list.append(HCI_LE_CODED_PHY)
+ else:
+ raise ValueError('invalid PHY name')
+ return phy_list
+
+
# -----------------------------------------------------------------------------
# Console App
# -----------------------------------------------------------------------------
class ConsoleApp:
def __init__(self):
- self.known_addresses = set()
+ self.known_addresses = set()
self.known_attributes = []
- self.device = None
- self.connected_peer = None
- self.top_tab = 'scan'
+ self.device = None
+ self.connected_peer = None
+ self.top_tab = 'scan'
+ self.monitor_rssi = False
+ self.connection_rssi = None
style = Style.from_dict({
'output-field': 'bg:#000044 #ffffff',
@@ -106,6 +156,10 @@ class ConsoleApp:
'on': None,
'off': None
},
+ 'rssi': {
+ 'on': None,
+ 'off': None
+ },
'show': {
'scan': None,
'services': None,
@@ -120,10 +174,17 @@ class ConsoleApp:
'services': None,
'attributes': None
},
+ 'request-mtu': None,
'read': LiveCompleter(self.known_attributes),
'write': LiveCompleter(self.known_attributes),
'subscribe': LiveCompleter(self.known_attributes),
'unsubscribe': LiveCompleter(self.known_attributes),
+ 'set-phy': {
+ '1m': None,
+ '2m': None,
+ 'coded': None
+ },
+ 'set-default-phy': None,
'quit': None,
'exit': None
})
@@ -139,14 +200,16 @@ class ConsoleApp:
self.input_field.accept_handler = self.accept_input
- self.output_height = 7
+ self.output_height = Dimension(min=7, max=7, weight=1)
self.output_lines = []
- self.output = FormattedTextControl()
+ self.output = FormattedTextControl(get_cursor_position=lambda: Point(0, max(0, len(self.output_lines) - 1)))
+ self.output_max_lines = 20
self.scan_results_text = FormattedTextControl()
self.services_text = FormattedTextControl()
self.attributes_text = FormattedTextControl()
- self.log_text = FormattedTextControl()
- self.log_height = 20
+ self.log_text = FormattedTextControl(get_cursor_position=lambda: Point(0, max(0, len(self.log_lines) - 1)))
+ self.log_height = Dimension(min=7, weight=4)
+ self.log_max_lines = 100
self.log_lines = []
container = HSplit([
@@ -163,11 +226,10 @@ class ConsoleApp:
filter=Condition(lambda: self.top_tab == 'attributes')
),
ConditionalContainer(
- Frame(Window(self.log_text), title='Log'),
+ Frame(Window(self.log_text, height=self.log_height), title='Log'),
filter=Condition(lambda: self.top_tab == 'log')
),
- Frame(Window(self.output), height=self.output_height),
- # HorizontalLine(),
+ Frame(Window(self.output, height=self.output_height)),
FormattedTextToolbar(text=self.get_status_bar_text, style='reverse'),
self.input_field
])
@@ -199,6 +261,8 @@ class ConsoleApp:
)
async def run_async(self, device_config, transport):
+ rssi_monitoring_task = asyncio.create_task(self.rssi_monitor_loop())
+
async with await open_transport_or_link(transport) as (hci_source, hci_sink):
if device_config:
self.device = Device.from_config_file_with_hci(device_config, hci_source, hci_sink)
@@ -210,6 +274,8 @@ class ConsoleApp:
# Run the UI
await self.ui.run_async()
+ rssi_monitoring_task.cancel()
+
def add_known_address(self, address):
self.known_addresses.add(address)
@@ -224,22 +290,33 @@ class ConsoleApp:
connection_state = 'NONE'
encryption_state = ''
+ att_mtu = ''
+ rssi = '' if self.connection_rssi is None else rssi_bar(self.connection_rssi)
if self.device:
if self.device.is_connecting:
connection_state = 'CONNECTING'
elif self.connected_peer:
connection = self.connected_peer.connection
- connection_parameters = f'{connection.parameters.connection_interval}/{connection.parameters.connection_latency}/{connection.parameters.supervision_timeout}'
- connection_state = f'{connection.peer_address} {connection_parameters} {connection.data_length}'
+ connection_parameters = f'{connection.parameters.connection_interval}/{connection.parameters.peripheral_latency}/{connection.parameters.supervision_timeout}'
+ if connection.transport == BT_LE_TRANSPORT:
+ phy_state = f' RX={le_phy_name(connection.phy.rx_phy)}/TX={le_phy_name(connection.phy.tx_phy)}'
+ else:
+ phy_state = ''
+ connection_state = f'{connection.peer_address} {connection_parameters} {connection.data_length}{phy_state}'
encryption_state = 'ENCRYPTED' if connection.is_encrypted else 'NOT ENCRYPTED'
+ att_mtu = f'ATT_MTU: {connection.att_mtu}'
return [
('ansigreen', f' SCAN: {scanning} '),
('', ' '),
('ansiblue', f' CONNECTION: {connection_state} '),
('', ' '),
- ('ansimagenta', f' {encryption_state} ')
+ ('ansimagenta', f' {encryption_state} '),
+ ('', ' '),
+ ('ansicyan', f' {att_mtu} '),
+ ('', ' '),
+ ('ansiyellow', f' {rssi} ')
]
def show_error(self, title, details = None):
@@ -286,7 +363,7 @@ class ConsoleApp:
def append_to_output(self, line, invalidate=True):
if type(line) is str:
line = [('', line)]
- self.output_lines = self.output_lines[-(self.output_height - 3):]
+ self.output_lines = self.output_lines[-self.output_max_lines:]
self.output_lines.append(line)
formatted_text = []
for line in self.output_lines:
@@ -298,7 +375,7 @@ class ConsoleApp:
def append_to_log(self, lines, invalidate=True):
self.log_lines.extend(lines.split('\n'))
- self.log_lines = self.log_lines[-(self.log_height - 3):]
+ self.log_lines = self.log_lines[-self.log_max_lines:]
self.log_text.text = ANSI('\n'.join(self.log_lines))
if invalidate:
self.ui.invalidate()
@@ -351,6 +428,12 @@ class ConsoleApp:
if characteristic.handle == attribute_handle:
return characteristic
+ async def rssi_monitor_loop(self):
+ while True:
+ if self.monitor_rssi and self.connected_peer:
+ self.connection_rssi = await self.connected_peer.connection.get_rssi()
+ await asyncio.sleep(RSSI_MONITOR_INTERVAL)
+
async def command(self, command):
try:
(keyword, *params) = command.strip().split(' ')
@@ -379,39 +462,73 @@ class ConsoleApp:
else:
self.show_error('unsupported arguments for scan command')
+ async def do_rssi(self, params):
+ if len(params) == 0:
+ # Toggle monitoring
+ self.monitor_rssi = not self.monitor_rssi
+ elif params[0] == 'on':
+ self.monitor_rssi = True
+ elif params[0] == 'off':
+ self.monitor_rssi = False
+ else:
+ self.show_error('unsupported arguments for rssi command')
+
async def do_connect(self, params):
- if len(params) != 1:
- self.show_error('invalid syntax', 'expected connect
')
+ if len(params) != 1 and len(params) != 2:
+ self.show_error('invalid syntax', 'expected connect [phys]')
return
+ if len(params) == 1:
+ phys = None
+ else:
+ phys = parse_phys(params[1])
+ if phys is None:
+ connection_parameters_preferences = None
+ else:
+ connection_parameters_preferences = {
+ phy: ConnectionParametersPreferences()
+ for phy in phys
+ }
+
self.append_to_output('connecting...')
- await self.device.connect(params[0])
- self.top_tab = 'services'
+
+ try:
+ await self.device.connect(
+ params[0],
+ connection_parameters_preferences=connection_parameters_preferences,
+ timeout=DEFAULT_CONNECTION_TIMEOUT
+ )
+ self.top_tab = 'services'
+ except TimeoutError:
+ self.show_error('connection timed out')
async def do_disconnect(self, params):
- if not self.connected_peer:
- self.show_error('not connected')
- return
+ if self.device.connecting:
+ await self.device.cancel_connection()
+ else:
+ if not self.connected_peer:
+ self.show_error('not connected')
+ return
- await self.connected_peer.connection.disconnect()
+ await self.connected_peer.connection.disconnect()
async def do_update_parameters(self, params):
if len(params) != 1 or len(params[0].split('/')) != 3:
- self.show_error('invalid syntax', 'expected update-parameters -//')
+ self.show_error('invalid syntax', 'expected update-parameters -//')
return
if not self.connected_peer:
self.show_error('not connected')
return
- connection_intervals, connection_latency, supervision_timeout = params[0].split('/')
+ connection_intervals, max_latency, supervision_timeout = params[0].split('/')
connection_interval_min, connection_interval_max = [int(x) for x in connection_intervals.split('-')]
- connection_latency = int(connection_latency)
+ max_latency = int(max_latency)
supervision_timeout = int(supervision_timeout)
await self.connected_peer.connection.update_parameters(
connection_interval_min,
connection_interval_max,
- connection_latency,
+ max_latency,
supervision_timeout
)
@@ -442,6 +559,25 @@ class ConsoleApp:
self.top_tab = params[0]
self.ui.invalidate()
+ async def do_get_phy(self, params):
+ if not self.connected_peer:
+ self.show_error('not connected')
+ return
+
+ phy = await self.connected_peer.connection.get_phy()
+ self.append_to_output(f'PHY: RX={HCI_Constant.le_phy_name(phy[0])}, TX={HCI_Constant.le_phy_name(phy[1])}')
+
+ async def do_request_mtu(self, params):
+ if len(params) != 1:
+ self.show_error('invalid syntax', 'expected request-mtu ')
+ return
+
+ if not self.connected_peer:
+ self.show_error('not connected')
+ return
+
+ await self.connected_peer.request_mtu(int(params[0]))
+
async def do_discover(self, params):
if not params:
self.show_error('invalid syntax', 'expected discover services|attributes')
@@ -454,14 +590,14 @@ class ConsoleApp:
await self.discover_attributes()
async def do_read(self, params):
- if not self.connected_peer:
- self.show_error('not connected')
- return
-
if len(params) != 1:
self.show_error('invalid syntax', 'expected read ')
return
+ if not self.connected_peer:
+ self.show_error('not connected')
+ return
+
characteristic = self.find_characteristic(params[0])
if characteristic is None:
self.show_error('no such characteristic')
@@ -530,6 +666,42 @@ class ConsoleApp:
await characteristic.unsubscribe()
+ async def do_set_phy(self, params):
+ if len(params) != 1:
+ self.show_error('invalid syntax', 'expected set-phy |/')
+ return
+
+ if not self.connected_peer:
+ self.show_error('not connected')
+ return
+
+ if '/' in params[0]:
+ tx_phys, rx_phys = params[0].split('/')
+ else:
+ tx_phys = params[0]
+ rx_phys = tx_phys
+
+ await self.connected_peer.connection.set_phy(
+ tx_phys=parse_phys(tx_phys),
+ rx_phys=parse_phys(rx_phys)
+ )
+
+ async def do_set_default_phy(self, params):
+ if len(params) != 1:
+ self.show_error('invalid syntax', 'expected set-default-phy |/')
+ return
+
+ if '/' in params[0]:
+ tx_phys, rx_phys = params[0].split('/')
+ else:
+ tx_phys = params[0]
+ rx_phys = tx_phys
+
+ await self.device.set_default_phy(
+ tx_phys=parse_phys(tx_phys),
+ rx_phys=parse_phys(rx_phys)
+ )
+
async def do_exit(self, params):
self.ui.exit()
@@ -548,12 +720,14 @@ class DeviceListener(Device.Listener, Connection.Listener):
@AsyncRunner.run_in_task()
async def on_connection(self, connection):
self.app.connected_peer = Peer(connection)
+ self.app.connection_rssi = None
self.app.append_to_output(f'connected to {self.app.connected_peer}')
connection.listener = self
def on_disconnection(self, reason):
self.app.append_to_output(f'disconnected from {self.app.connected_peer}, reason: {HCI_Constant.error_name(reason)}')
self.app.connected_peer = None
+ self.app.connection_rssi = None
def on_connection_parameters_update(self):
self.app.append_to_output(f'connection parameters update: {self.app.connected_peer.connection.parameters}')
@@ -570,16 +744,16 @@ class DeviceListener(Device.Listener, Connection.Listener):
def on_connection_data_length_change(self):
self.app.append_to_output(f'connection data length change: {self.app.connected_peer.connection.data_length}')
- def on_advertisement(self, address, ad_data, rssi, connectable):
- entry_key = f'{address}/{address.address_type}'
+ def on_advertisement(self, advertisement):
+ entry_key = f'{advertisement.address}/{advertisement.address.address_type}'
entry = self.scan_results.get(entry_key)
if entry:
- entry.ad_data = ad_data
- entry.rssi = rssi
- entry.connectable = connectable
+ entry.ad_data = advertisement.data
+ entry.rssi = advertisement.rssi
+ entry.connectable = advertisement.is_connectable
else:
- self.app.add_known_address(str(address))
- self.scan_results[entry_key] = ScanResult(address, address.address_type, ad_data, rssi, connectable)
+ self.app.add_known_address(str(advertisement.address))
+ self.scan_results[entry_key] = ScanResult(advertisement.address, advertisement.address.address_type, advertisement.data, advertisement.rssi, advertisement.is_connectable)
self.app.show_scan_results(self.scan_results)
@@ -616,12 +790,7 @@ class ScanResult:
name = ''
# RSSI bar
- blocks = ['', '▏', '▎', '▍', '▌', '▋', '▊', '▉']
- bar_width = (self.rssi - DISPLAY_MIN_RSSI) / (DISPLAY_MAX_RSSI - DISPLAY_MIN_RSSI)
- bar_width = min(max(bar_width, 0), 1)
- bar_ticks = int(bar_width * DEFAULT_RSSI_BAR_WIDTH * 8)
- bar_blocks = ('█' * int(bar_ticks / 8)) + blocks[bar_ticks % 8]
- bar_string = f'{self.rssi} {bar_blocks}'
+ bar_string = rssi_bar(self.rssi)
bar_padding = ' ' * (DEFAULT_RSSI_BAR_WIDTH + 5 - len(bar_string))
return f'{address_color(str(self.address))} [{type_color(address_type_string)}] {bar_string} {bar_padding} {name}'
diff --git a/apps/controller_info.py b/apps/controller_info.py
index 74d45504..b65caab0 100644
--- a/apps/controller_info.py
+++ b/apps/controller_info.py
@@ -25,15 +25,21 @@ from bumble.company_ids import COMPANY_IDENTIFIERS
from bumble.core import name_or_number
from bumble.hci import (
map_null_terminated_utf8_string,
- HCI_LE_SUPPORTED_FEATURES_NAMES,
HCI_SUCCESS,
+ HCI_LE_SUPPORTED_FEATURES_NAMES,
HCI_VERSION_NAMES,
LMP_VERSION_NAMES,
HCI_Command,
- HCI_Read_BD_ADDR_Command,
HCI_READ_BD_ADDR_COMMAND,
+ HCI_Read_BD_ADDR_Command,
+ HCI_READ_LOCAL_NAME_COMMAND,
HCI_Read_Local_Name_Command,
- HCI_READ_LOCAL_NAME_COMMAND
+ HCI_LE_READ_MAXIMUM_DATA_LENGTH_COMMAND,
+ HCI_LE_Read_Maximum_Data_Length_Command,
+ HCI_LE_READ_NUMBER_OF_SUPPORTED_ADVERTISING_SETS_COMMAND,
+ HCI_LE_Read_Number_Of_Supported_Advertising_Sets_Command,
+ HCI_LE_READ_MAXIMUM_ADVERTISING_DATA_LENGTH_COMMAND,
+ HCI_LE_Read_Maximum_Advertising_Data_Length_Command
)
from bumble.host import Host
from bumble.transport import open_transport_or_link
@@ -57,6 +63,39 @@ async def get_classic_info(host):
# -----------------------------------------------------------------------------
async def get_le_info(host):
print()
+
+ if host.supports_command(HCI_LE_READ_NUMBER_OF_SUPPORTED_ADVERTISING_SETS_COMMAND):
+ response = await host.send_command(HCI_LE_Read_Number_Of_Supported_Advertising_Sets_Command())
+ if response.return_parameters.status == HCI_SUCCESS:
+ print(
+ color('LE Number Of Supported Advertising Sets:', 'yellow'),
+ response.return_parameters.num_supported_advertising_sets,
+ '\n'
+ )
+
+ if host.supports_command(HCI_LE_READ_MAXIMUM_ADVERTISING_DATA_LENGTH_COMMAND):
+ response = await host.send_command(HCI_LE_Read_Maximum_Advertising_Data_Length_Command())
+ if response.return_parameters.status == HCI_SUCCESS:
+ print(
+ color('LE Maximum Advertising Data Length:', 'yellow'),
+ response.return_parameters.max_advertising_data_length,
+ '\n'
+ )
+
+ if host.supports_command(HCI_LE_READ_MAXIMUM_DATA_LENGTH_COMMAND):
+ response = await host.send_command(HCI_LE_Read_Maximum_Data_Length_Command())
+ if response.return_parameters.status == HCI_SUCCESS:
+ print(
+ color('Maximum Data Length:', 'yellow'),
+ (
+ f'tx:{response.return_parameters.supported_max_tx_octets}/'
+ f'{response.return_parameters.supported_max_tx_time}, '
+ f'rx:{response.return_parameters.supported_max_rx_octets}/'
+ f'{response.return_parameters.supported_max_rx_time}'
+ ),
+ '\n'
+ )
+
print(color('LE Features:', 'yellow'))
for feature in host.supported_le_features:
print(' ', name_or_number(HCI_LE_SUPPORTED_FEATURES_NAMES, feature))
diff --git a/apps/scan.py b/apps/scan.py
index 045cb57d..d6c10923 100644
--- a/apps/scan.py
+++ b/apps/scan.py
@@ -25,8 +25,8 @@ from bumble.device import Device
from bumble.transport import open_transport_or_link
from bumble.keys import JsonKeyStore
from bumble.smp import AddressResolver
-from bumble.hci import HCI_LE_Advertising_Report_Event
-from bumble.core import AdvertisingData
+from bumble.device import Advertisement
+from bumble.hci import HCI_Constant, HCI_LE_1M_PHY, HCI_LE_CODED_PHY
# -----------------------------------------------------------------------------
@@ -48,16 +48,19 @@ class AdvertisementPrinter:
self.min_rssi = min_rssi
self.resolver = resolver
- def print_advertisement(self, address, address_color, ad_data, rssi):
- if self.min_rssi is not None and rssi < self.min_rssi:
+ def print_advertisement(self, advertisement):
+ address = advertisement.address
+ address_color = 'yellow' if advertisement.is_connectable else 'red'
+
+ if self.min_rssi is not None and advertisement.rssi < self.min_rssi:
return
address_qualifier = ''
resolution_qualifier = ''
- if self.resolver and address.is_resolvable:
- resolved = self.resolver.resolve(address)
+ if self.resolver and advertisement.address.is_resolvable:
+ resolved = self.resolver.resolve(advertisement.address)
if resolved is not None:
- resolution_qualifier = f'(resolved from {address})'
+ resolution_qualifier = f'(resolved from {advertisement.address})'
address = resolved
address_type_string = ('PUBLIC', 'RANDOM', 'PUBLIC_ID', 'RANDOM_ID')[address.address_type]
@@ -74,18 +77,30 @@ class AdvertisementPrinter:
type_color = 'blue'
address_qualifier = '(non-resolvable)'
- rssi_bar = make_rssi_bar(rssi)
separator = '\n '
- print(f'>>> {color(address, address_color)} [{color(address_type_string, type_color)}]{address_qualifier}{resolution_qualifier}:{separator}RSSI:{rssi:4} {rssi_bar}{separator}{ad_data.to_string(separator)}\n')
+ rssi_bar = make_rssi_bar(advertisement.rssi)
+ if not advertisement.is_legacy:
+ phy_info = (
+ f'PHY: {HCI_Constant.le_phy_name(advertisement.primary_phy)}/'
+ f'{HCI_Constant.le_phy_name(advertisement.secondary_phy)} '
+ f'{separator}'
+ )
+ else:
+ phy_info = ''
- def on_advertisement(self, address, ad_data, rssi, connectable):
- address_color = 'yellow' if connectable else 'red'
- self.print_advertisement(address, address_color, ad_data, rssi)
+ print(
+ f'>>> {color(address, address_color)} '
+ f'[{color(address_type_string, type_color)}]{address_qualifier}{resolution_qualifier}:{separator}'
+ f'{phy_info}'
+ f'RSSI:{advertisement.rssi:4} {rssi_bar}{separator}'
+ f'{advertisement.data.to_string(separator)}\n')
- def on_advertising_report(self, address, ad_data, rssi, event_type):
- print(f'{color("EVENT", "green")}: {HCI_LE_Advertising_Report_Event.event_type_name(event_type)}')
- ad_data = AdvertisingData.from_bytes(ad_data)
- self.print_advertisement(address, 'yellow', ad_data, rssi)
+ def on_advertisement(self, advertisement):
+ self.print_advertisement(advertisement)
+
+ def on_advertising_report(self, report):
+ print(f'{color("EVENT", "green")}: {report.event_type_string()}')
+ self.print_advertisement(Advertisement.from_advertising_report(report))
# -----------------------------------------------------------------------------
@@ -94,6 +109,7 @@ async def scan(
passive,
scan_interval,
scan_window,
+ phy,
filter_duplicates,
raw,
keystore_file,
@@ -126,11 +142,18 @@ async def scan(
device.on('advertisement', printer.on_advertisement)
await device.power_on()
+
+ if phy is None:
+ scanning_phys = [HCI_LE_1M_PHY, HCI_LE_CODED_PHY]
+ else:
+ scanning_phys = [{'1m': HCI_LE_1M_PHY, 'coded': HCI_LE_CODED_PHY}[phy]]
+
await device.start_scanning(
active=(not passive),
scan_interval=scan_interval,
scan_window=scan_window,
- filter_duplicates=filter_duplicates
+ filter_duplicates=filter_duplicates,
+ scanning_phys=scanning_phys
)
await hci_source.wait_for_termination()
@@ -142,14 +165,15 @@ async def scan(
@click.option('--passive', is_flag=True, default=False, help='Perform passive scanning')
@click.option('--scan-interval', type=int, default=60, help='Scan interval')
@click.option('--scan-window', type=int, default=60, help='Scan window')
+@click.option('--phy', type=click.Choice(['1m', 'coded']), help='Only scan on the specified PHY')
@click.option('--filter-duplicates', type=bool, default=True, help='Filter duplicates at the controller level')
@click.option('--raw', is_flag=True, default=False, help='Listen for raw advertising reports instead of processed ones')
@click.option('--keystore-file', help='Keystore file to use when resolving addresses')
@click.option('--device-config', help='Device config file for the scanning device')
@click.argument('transport')
-def main(min_rssi, passive, scan_interval, scan_window, filter_duplicates, raw, keystore_file, device_config, transport):
+def main(min_rssi, passive, scan_interval, scan_window, phy, filter_duplicates, raw, keystore_file, device_config, transport):
logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'WARNING').upper())
- asyncio.run(scan(min_rssi, passive, scan_interval, scan_window, filter_duplicates, raw, keystore_file, device_config, transport))
+ asyncio.run(scan(min_rssi, passive, scan_interval, scan_window, phy, filter_duplicates, raw, keystore_file, device_config, transport))
# -----------------------------------------------------------------------------
diff --git a/bumble/controller.py b/bumble/controller.py
index 9ca9b59c..7982e9bd 100644
--- a/bumble/controller.py
+++ b/bumble/controller.py
@@ -76,7 +76,7 @@ class Controller:
self.supported_commands = bytes.fromhex('2000800000c000000000e40000002822000000000000040000f7ffff7f00000030f0f9ff01008004000000000000000000000000000000000000000000000000')
self.le_features = bytes.fromhex('ff49010000000000')
self.le_states = bytes.fromhex('ffff3fffff030000')
- self.avertising_channel_tx_power = 0
+ self.advertising_channel_tx_power = 0
self.filter_accept_list_size = 8
self.resolving_list_size = 8
self.supported_max_tx_octets = 27
@@ -259,15 +259,15 @@ class Controller:
# Then say that the connection has completed
self.send_hci_packet(HCI_LE_Connection_Complete_Event(
- status = HCI_SUCCESS,
- connection_handle = connection.handle,
- role = connection.role,
- peer_address_type = peer_address_type,
- peer_address = peer_address,
- conn_interval = 10, # FIXME
- conn_latency = 0, # FIXME
- supervision_timeout = 10, # FIXME
- master_clock_accuracy = 7 # FIXME
+ status = HCI_SUCCESS,
+ connection_handle = connection.handle,
+ role = connection.role,
+ peer_address_type = peer_address_type,
+ peer_address = peer_address,
+ connection_interval = 10, # FIXME
+ peripheral_latency = 0, # FIXME
+ supervision_timeout = 10, # FIXME
+ central_clock_accuracy = 7 # FIXME
))
def on_link_central_disconnected(self, peer_address, reason):
@@ -313,15 +313,15 @@ class Controller:
# Say that the connection has completed
self.send_hci_packet(HCI_LE_Connection_Complete_Event(
- status = status,
- connection_handle = connection.handle if connection else 0,
- role = BT_CENTRAL_ROLE,
- peer_address_type = le_create_connection_command.peer_address_type,
- peer_address = le_create_connection_command.peer_address,
- conn_interval = le_create_connection_command.conn_interval_min,
- conn_latency = le_create_connection_command.conn_latency,
- supervision_timeout = le_create_connection_command.supervision_timeout,
- master_clock_accuracy = 0
+ status = status,
+ connection_handle = connection.handle if connection else 0,
+ role = BT_CENTRAL_ROLE,
+ peer_address_type = le_create_connection_command.peer_address_type,
+ peer_address = le_create_connection_command.peer_address,
+ connection_interval = le_create_connection_command.connection_interval_min,
+ peripheral_latency = le_create_connection_command.max_latency,
+ supervision_timeout = le_create_connection_command.supervision_timeout,
+ central_clock_accuracy = 0
))
def on_link_peripheral_disconnection_complete(self, disconnection_command, status):
@@ -583,13 +583,15 @@ class Controller:
'''
See Bluetooth spec Vol 2, Part E - 7.4.1 Read Local Version Information Command
'''
- return struct.pack('> 5 & 3) == HCI_LE_Extended_Advertising_Report_Event.DATA_COMPLETE,
+ is_truncated = (report.event_type >> 5 & 3) == HCI_LE_Extended_Advertising_Report_Event.DATA_INCOMPLETE_TRUNCATED_NO_MORE_TO_COME,
+ primary_phy = report.primary_phy,
+ secondary_phy = report.secondary_phy,
+ tx_power = report.tx_power,
+ sid = report.advertising_sid,
+ data = report.data
+ )
+
+
# -----------------------------------------------------------------------------
class AdvertisementDataAccumulator:
- def __init__(self):
- self.advertising_data = AdvertisingData()
- self.last_advertisement_type = None
- self.connectable = False
- self.flushable = False
+ def __init__(self, passive=False):
+ self.passive = passive
+ self.last_advertisement = None
+ self.last_data = b''
- def update(self, data, advertisement_type):
- if advertisement_type == HCI_LE_Advertising_Report_Event.SCAN_RSP:
- if self.last_advertisement_type != HCI_LE_Advertising_Report_Event.SCAN_RSP:
- self.advertising_data.append(data)
- self.flushable = True
+ def update(self, report):
+ advertisement = Advertisement.from_advertising_report(report)
+ result = None
+
+ if advertisement.is_scan_response:
+ if self.last_advertisement is not None and not self.last_advertisement.is_scan_response:
+ # This is the response to a scannable advertisement
+ result = Advertisement.from_advertising_report(report)
+ result.is_connectable = self.last_advertisement.is_connectable
+ result.is_scannable = True
+ result.data = AdvertisingData.from_bytes(self.last_data + report.data)
+ self.last_data = b''
else:
- self.advertising_data = AdvertisingData.from_bytes(data)
- self.flushable = self.last_advertisement_type != HCI_LE_Advertising_Report_Event.SCAN_RSP
+ if (
+ self.passive or
+ (not advertisement.is_scannable) or
+ (self.last_advertisement is not None and not self.last_advertisement.is_scan_response)
+ ):
+ # Don't wait for a scan response
+ result = Advertisement.from_advertising_report(report)
- if advertisement_type == HCI_LE_Advertising_Report_Event.ADV_IND or advertisement_type == HCI_LE_Advertising_Report_Event.ADV_DIRECT_IND:
- self.connectable = True
- elif advertisement_type == HCI_LE_Advertising_Report_Event.ADV_SCAN_IND or advertisement_type == HCI_LE_Advertising_Report_Event.ADV_NONCONN_IND:
- self.connectable = False
+ self.last_data = report.data
- self.last_advertisement_type = advertisement_type
+ self.last_advertisement = advertisement
+
+ return result
+
+
+# -----------------------------------------------------------------------------
+class AdvertisingType(IntEnum):
+ UNDIRECTED_CONNECTABLE_SCANNABLE = 0x00 # Undirected, connectable, scannable
+ DIRECTED_CONNECTABLE_HIGH_DUTY = 0x01 # Directed, connectable, non-scannable
+ UNDIRECTED_SCANNABLE = 0x02 # Undirected, non-connectable, scannable
+ UNDIRECTED = 0x03 # Undirected, non-connectable, non-scannable
+ DIRECTED_CONNECTABLE_LOW_DUTY = 0x04 # Directed, connectable, non-scannable
+
+ @property
+ def has_data(self):
+ return self in {
+ AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE,
+ AdvertisingType.UNDIRECTED_SCANNABLE,
+ AdvertisingType.UNDIRECTED
+ }
+
+ @property
+ def is_connectable(self):
+ return self in {
+ AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE,
+ AdvertisingType.DIRECTED_CONNECTABLE_HIGH_DUTY,
+ AdvertisingType.DIRECTED_CONNECTABLE_LOW_DUTY
+ }
+
+ @property
+ def is_scannable(self):
+ return self in {
+ AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE,
+ AdvertisingType.UNDIRECTED_SCANNABLE
+ }
+
+ @property
+ def is_directed(self):
+ return self in {
+ AdvertisingType.DIRECTED_CONNECTABLE_HIGH_DUTY,
+ AdvertisingType.DIRECTED_CONNECTABLE_LOW_DUTY
+ }
+
+
+# -----------------------------------------------------------------------------
+class LePhyOptions:
+ # Coded PHY preference
+ ANY_CODED_PHY = 0
+ PREFER_S_2_CODED_PHY = 1
+ PREFER_S_8_CODED_PHY = 2
+
+ def __init__(self, coded_phy_preference=0):
+ self.coded_phy_preference = coded_phy_preference
+
+ def __int__(self):
+ return self.coded_phy_preference & 3
# -----------------------------------------------------------------------------
@@ -100,7 +270,9 @@ class Peer:
return self.gatt_client.services
async def request_mtu(self, mtu):
- return await self.gatt_client.request_mtu(mtu)
+ mtu = await self.gatt_client.request_mtu(mtu)
+ self.connection.emit('connection_att_mtu_update')
+ return mtu
async def discover_service(self, uuid):
return await self.gatt_client.discover_service(uuid)
@@ -169,11 +341,24 @@ class Peer:
async def __aexit__(self, exc_type, exc_value, traceback):
pass
-
def __str__(self):
return f'{self.connection.peer_address} as {self.connection.role_name}'
+# -----------------------------------------------------------------------------
+@dataclass
+class ConnectionParametersPreferences:
+ connection_interval_min: int = DEVICE_DEFAULT_CONNECTION_INTERVAL_MIN
+ connection_interval_max: int = DEVICE_DEFAULT_CONNECTION_INTERVAL_MAX
+ max_latency: int = DEVICE_DEFAULT_CONNECTION_MAX_LATENCY
+ supervision_timeout: int = DEVICE_DEFAULT_CONNECTION_SUPERVISION_TIMEOUT
+ min_ce_length: int = DEVICE_DEFAULT_CONNECTION_MIN_CE_LENGTH
+ max_ce_length: int = DEVICE_DEFAULT_CONNECTION_MAX_CE_LENGTH
+
+
+ConnectionParametersPreferences.default = ConnectionParametersPreferences()
+
+
# -----------------------------------------------------------------------------
class Connection(CompositeEventEmitter):
@composite_listener
@@ -202,7 +387,17 @@ class Connection(CompositeEventEmitter):
def on_connection_encryption_key_refresh(self):
pass
- def __init__(self, device, handle, transport, peer_address, peer_resolvable_address, role, parameters):
+ def __init__(
+ self,
+ device,
+ handle,
+ transport,
+ peer_address,
+ peer_resolvable_address,
+ role,
+ parameters,
+ phy
+ ):
super().__init__()
self.device = device
self.handle = handle
@@ -214,7 +409,7 @@ class Connection(CompositeEventEmitter):
self.parameters = parameters
self.encryption = 0
self.authenticated = False
- self.phy = ConnectionPHY(HCI_LE_1M_PHY, HCI_LE_1M_PHY)
+ self.phy = phy
self.att_mtu = ATT_DEFAULT_MTU
self.data_length = DEVICE_DEFAULT_DATA_LENGTH
self.gatt_client = None # Per-connection client
@@ -267,19 +462,28 @@ class Connection(CompositeEventEmitter):
async def update_parameters(
self,
- conn_interval_min,
- conn_interval_max,
- conn_latency,
+ connection_interval_min,
+ connection_interval_max,
+ max_latency,
supervision_timeout
):
return await self.device.update_connection_parameters(
self,
- conn_interval_min,
- conn_interval_max,
- conn_latency,
+ connection_interval_min,
+ connection_interval_max,
+ max_latency,
supervision_timeout
)
+ async def set_phy(self, tx_phys=None, rx_phys=None, phy_options=None):
+ return await self.device.set_connection_phy(self, tx_phys, rx_phys, phy_options)
+
+ async def get_rssi(self):
+ return await self.device.get_connection_rssi(self)
+
+ async def get_phy(self):
+ return await self.device.get_connection_phy(self)
+
# [Classic only]
async def request_remote_name(self):
return await self.device.request_remote_name(self)
@@ -403,7 +607,7 @@ class Device(CompositeEventEmitter):
@composite_listener
class Listener:
- def on_advertisement(self, address, data, rssi, advertisement_type):
+ def on_advertisement(self, advertisement):
pass
def on_inquiry_result(self, address, class_of_device, data, rssi):
@@ -443,24 +647,27 @@ class Device(CompositeEventEmitter):
def __init__(self, name = None, address = None, config = None, host = None, generic_access_service = True):
super().__init__()
- self._host = None
- self.powered_on = False
- self.advertising = False
- self.auto_restart_advertising = False
- self.command_timeout = 10 # seconds
- self.gatt_server = gatt_server.Server(self)
- self.sdp_server = sdp.Server(self)
- self.l2cap_channel_manager = l2cap.ChannelManager(
- [l2cap.L2CAP_Information_Request.EXTENDED_FEATURE_FIXED_CHANNELS])
- self.advertisement_data = {}
- self.scanning = False
- self.discovering = False
- self.connecting = False
- self.disconnecting = False
- self.connections = {} # Connections, by connection handle
- self.classic_enabled = False
- self.inquiry_response = None
- self.address_resolver = None
+ self._host = None
+ self.powered_on = False
+ self.advertising = False
+ self.advertising_type = None
+ self.auto_restart_advertising = False
+ self.command_timeout = 10 # seconds
+ self.gatt_server = gatt_server.Server(self)
+ self.sdp_server = sdp.Server(self)
+ self.l2cap_channel_manager = l2cap.ChannelManager(
+ [l2cap.L2CAP_Information_Request.EXTENDED_FEATURE_FIXED_CHANNELS]
+ )
+ self.advertisement_accumulators = {} # Accumulators, by address
+ self.scanning = False
+ self.scanning_is_passive = False
+ self.discovering = False
+ self.connecting = False
+ self.disconnecting = False
+ self.connections = {} # Connections, by connection handle
+ self.classic_enabled = False
+ self.inquiry_response = None
+ self.address_resolver = None
# Use the initial config or a default
self.public_address = Address('00:00:00:00:00:00')
@@ -569,9 +776,12 @@ class Device(CompositeEventEmitter):
def send_l2cap_pdu(self, connection_handle, cid, pdu):
self.host.send_l2cap_pdu(connection_handle, cid, pdu)
- async def send_command(self, command):
+ async def send_command(self, command, check_result=False):
try:
- return await asyncio.wait_for(self.host.send_command(command), self.command_timeout)
+ return await asyncio.wait_for(
+ self.host.send_command(command, check_result),
+ self.command_timeout
+ )
except asyncio.TimeoutError:
logger.warning('!!! Command timed out')
@@ -594,10 +804,10 @@ class Device(CompositeEventEmitter):
# Set the controller address
await self.send_command(HCI_LE_Set_Random_Address_Command(
random_address = self.random_address
- ))
+ ), check_result=True)
# Load the address resolving list
- if self.keystore:
+ if self.keystore and self.host.supports_command(HCI_LE_CLEAR_RESOLVING_LIST_COMMAND):
await self.send_command(HCI_LE_Clear_Resolving_List_Command())
resolving_keys = await self.keystore.get_resolving_keys()
@@ -644,51 +854,86 @@ class Device(CompositeEventEmitter):
# Done
self.powered_on = True
- async def start_advertising(self, auto_restart=False):
- self.auto_restart_advertising = auto_restart
+ def supports_le_feature(self, feature):
+ return self.host.supports_le_feature(feature)
+ def supports_le_phy(self, phy):
+ if phy == HCI_LE_1M_PHY:
+ return True
+
+ feature_map = {
+ HCI_LE_2M_PHY: HCI_LE_2M_PHY_LE_SUPPORTED_FEATURE,
+ HCI_LE_CODED_PHY: HCI_LE_CODED_PHY_LE_SUPPORTED_FEATURE
+ }
+ if phy not in feature_map:
+ raise ValueError('invalid PHY')
+
+ return self.host.supports_le_feature(feature_map[phy])
+
+ async def start_advertising(
+ self,
+ advertising_type=AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE,
+ target=None,
+ auto_restart=False
+ ):
# If we're advertising, stop first
if self.advertising:
await self.stop_advertising()
- # Set/update the advertising data
- await self.send_command(HCI_LE_Set_Advertising_Data_Command(
- advertising_data = self.advertising_data
- ))
+ # Set/update the advertising data if the advertising type allows it
+ if advertising_type.has_data:
+ await self.send_command(HCI_LE_Set_Advertising_Data_Command(
+ advertising_data = self.advertising_data
+ ), check_result=True)
- # Set/update the scan response data
- await self.send_command(HCI_LE_Set_Scan_Response_Data_Command(
- scan_response_data = self.scan_response_data
- ))
+ # Set/update the scan response data if the advertising is scannable
+ if advertising_type.is_scannable:
+ await self.send_command(HCI_LE_Set_Scan_Response_Data_Command(
+ scan_response_data = self.scan_response_data
+ ), check_result=True)
+
+ # Decide what peer address to use
+ if advertising_type.is_directed:
+ if target is None:
+ raise ValueError('directed advertising requires a target address')
+
+ peer_address = target
+ peer_address_type = target.address_type
+ else:
+ peer_address = Address('00:00:00:00:00:00')
+ peer_address_type = Address.PUBLIC_DEVICE_ADDRESS
# Set the advertising parameters
await self.send_command(HCI_LE_Set_Advertising_Parameters_Command(
- # TODO: use real values, not fixed ones
advertising_interval_min = self.advertising_interval_min,
advertising_interval_max = self.advertising_interval_max,
- advertising_type = HCI_LE_Set_Advertising_Parameters_Command.ADV_IND,
+ advertising_type = int(advertising_type),
own_address_type = Address.RANDOM_DEVICE_ADDRESS, # TODO: allow using the public address
- peer_address_type = Address.PUBLIC_DEVICE_ADDRESS,
- peer_address = Address('00:00:00:00:00:00'),
+ peer_address_type = peer_address_type,
+ peer_address = peer_address,
advertising_channel_map = 7,
advertising_filter_policy = 0
- ))
+ ), check_result=True)
# Enable advertising
await self.send_command(HCI_LE_Set_Advertising_Enable_Command(
advertising_enable = 1
- ))
+ ), check_result=True)
- self.advertising = True
+ self.auto_restart_advertising = auto_restart
+ self.advertising_type = advertising_type
+ self.advertising = True
async def stop_advertising(self):
# Disable advertising
if self.advertising:
await self.send_command(HCI_LE_Set_Advertising_Enable_Command(
advertising_enable = 0
- ))
+ ), check_result=True)
- self.advertising = False
+ self.advertising = False
+ self.advertising_type = None
+ self.auto_restart_advertising = False
@property
def is_advertising(self):
@@ -700,7 +945,8 @@ class Device(CompositeEventEmitter):
scan_interval=DEVICE_DEFAULT_SCAN_INTERVAL, # Scan interval in ms
scan_window=DEVICE_DEFAULT_SCAN_WINDOW, # Scan window in ms
own_address_type=Address.RANDOM_DEVICE_ADDRESS,
- filter_duplicates=False
+ filter_duplicates=False,
+ scanning_phys=(HCI_LE_1M_PHY, HCI_LE_CODED_PHY)
):
# Check that the arguments are legal
if scan_interval < scan_window:
@@ -710,28 +956,79 @@ class Device(CompositeEventEmitter):
if scan_window < DEVICE_MIN_SCAN_WINDOW or scan_window > DEVICE_MAX_SCAN_WINDOW:
raise ValueError('scan_interval out of range')
- # Set the scanning parameters
- scan_type = HCI_LE_Set_Scan_Parameters_Command.ACTIVE_SCANNING if active else HCI_LE_Set_Scan_Parameters_Command.PASSIVE_SCANNING
- await self.send_command(HCI_LE_Set_Scan_Parameters_Command(
- le_scan_type = scan_type,
- le_scan_interval = int(scan_window / 0.625),
- le_scan_window = int(scan_window / 0.625),
- own_address_type = own_address_type,
- scanning_filter_policy = HCI_LE_Set_Scan_Parameters_Command.BASIC_UNFILTERED_POLICY
- ))
+ # Reset the accumulators
+ self.advertisement_accumulator = {}
# Enable scanning
- await self.send_command(HCI_LE_Set_Scan_Enable_Command(
- le_scan_enable = 1,
- filter_duplicates = 1 if filter_duplicates else 0
- ))
- self.scanning = True
+ if self.supports_le_feature(HCI_LE_EXTENDED_ADVERTISING_LE_SUPPORTED_FEATURE):
+ # Set the scanning parameters
+ scan_type = HCI_LE_Set_Extended_Scan_Parameters_Command.ACTIVE_SCANNING if active else HCI_LE_Set_Extended_Scan_Parameters_Command.PASSIVE_SCANNING
+ scanning_filter_policy = HCI_LE_Set_Extended_Scan_Parameters_Command.BASIC_UNFILTERED_POLICY # TODO: support other types
+
+ scanning_phy_count = 0
+ scanning_phys_bits = 0
+ if HCI_LE_1M_PHY in scanning_phys:
+ scanning_phys_bits |= 1 << HCI_LE_1M_PHY_BIT
+ scanning_phy_count += 1
+ if HCI_LE_CODED_PHY in scanning_phys:
+ if self.supports_le_feature(HCI_LE_CODED_PHY_LE_SUPPORTED_FEATURE):
+ scanning_phys_bits |= 1 << HCI_LE_CODED_PHY_BIT
+ scanning_phy_count += 1
+
+ if scanning_phy_count == 0:
+ raise ValueError('at least one scanning PHY must be enabled')
+
+ await self.send_command(HCI_LE_Set_Extended_Scan_Parameters_Command(
+ own_address_type = own_address_type,
+ scanning_filter_policy = scanning_filter_policy,
+ scanning_phys = scanning_phys_bits,
+ scan_types = [scan_type] * scanning_phy_count,
+ scan_intervals = [int(scan_window / 0.625)] * scanning_phy_count,
+ scan_windows = [int(scan_window / 0.625)] * scanning_phy_count
+ ), check_result=True)
+
+ # Enable scanning
+ await self.send_command(HCI_LE_Set_Extended_Scan_Enable_Command(
+ enable = 1,
+ filter_duplicates = 1 if filter_duplicates else 0,
+ duration = 0, # TODO allow other values
+ period = 0 # TODO allow other values
+ ), check_result=True)
+ else:
+ # Set the scanning parameters
+ scan_type = HCI_LE_Set_Scan_Parameters_Command.ACTIVE_SCANNING if active else HCI_LE_Set_Scan_Parameters_Command.PASSIVE_SCANNING
+ await self.send_command(HCI_LE_Set_Scan_Parameters_Command(
+ le_scan_type = scan_type,
+ le_scan_interval = int(scan_window / 0.625),
+ le_scan_window = int(scan_window / 0.625),
+ own_address_type = own_address_type,
+ scanning_filter_policy = HCI_LE_Set_Scan_Parameters_Command.BASIC_UNFILTERED_POLICY
+ ), check_result=True)
+
+ # Enable scanning
+ await self.send_command(HCI_LE_Set_Scan_Enable_Command(
+ le_scan_enable = 1,
+ filter_duplicates = 1 if filter_duplicates else 0
+ ), check_result=True)
+
+ self.scanning_is_passive = not active
+ self.scanning = True
async def stop_scanning(self):
- await self.send_command(HCI_LE_Set_Scan_Enable_Command(
- le_scan_enable = 0,
- filter_duplicates = 0
- ))
+ # Disable scanning
+ if self.supports_le_feature(HCI_LE_EXTENDED_ADVERTISING_LE_SUPPORTED_FEATURE):
+ await self.send_command(HCI_LE_Set_Extended_Scan_Enable_Command(
+ enable = 0,
+ filter_duplicates = 0,
+ duration = 0,
+ period = 0
+ ), check_result=True)
+ else:
+ await self.send_command(HCI_LE_Set_Scan_Enable_Command(
+ le_scan_enable = 0,
+ filter_duplicates = 0
+ ), check_result=True)
+
self.scanning = False
@property
@@ -739,22 +1036,17 @@ class Device(CompositeEventEmitter):
return self.scanning
@host_event_handler
- def on_advertising_report(self, address, data, rssi, advertisement_type):
- if not (accumulator := self.advertisement_data.get(address)):
- accumulator = AdvertisementDataAccumulator()
- self.advertisement_data[address] = accumulator
- accumulator.update(data, advertisement_type)
- if accumulator.flushable:
- self.emit(
- 'advertisement',
- address,
- accumulator.advertising_data,
- rssi,
- accumulator.connectable
- )
+ def on_advertising_report(self, report):
+ if not (accumulator := self.advertisement_accumulators.get(report.address)):
+ accumulator = AdvertisementDataAccumulator(passive=self.scanning_is_passive)
+ self.advertisement_accumulators[report.address] = accumulator
+ if advertisement := accumulator.update(report):
+ self.emit('advertisement', advertisement)
async def start_discovery(self):
- await self.host.send_command(HCI_Write_Inquiry_Mode_Command(inquiry_mode=HCI_EXTENDED_INQUIRY_MODE))
+ await self.send_command(HCI_Write_Inquiry_Mode_Command(
+ inquiry_mode=HCI_EXTENDED_INQUIRY_MODE
+ ), check_result=True)
response = await self.send_command(HCI_Inquiry_Command(
lap = HCI_GENERAL_INQUIRY_LAP,
@@ -768,7 +1060,7 @@ class Device(CompositeEventEmitter):
self.discovering = True
async def stop_discovery(self):
- await self.send_command(HCI_Inquiry_Cancel_Command())
+ await self.send_command(HCI_Inquiry_Cancel_Command(), check_result=True)
self.discovering = False
@host_event_handler
@@ -805,11 +1097,12 @@ class Device(CompositeEventEmitter):
)
# Update the controller
- await self.host.send_command(
+ await self.send_command(
HCI_Write_Extended_Inquiry_Response_Command(
fec_required = 0,
extended_inquiry_response = self.inquiry_response
- )
+ ),
+ check_result=True
)
await self.set_scan_enable(
inquiry_scan_enabled = self.discoverable,
@@ -824,12 +1117,26 @@ class Device(CompositeEventEmitter):
page_scan_enabled = self.connectable
)
- async def connect(self, peer_address, transport=BT_LE_TRANSPORT):
+ async def connect(
+ self,
+ peer_address,
+ transport=BT_LE_TRANSPORT,
+ connection_parameters_preferences=None,
+ timeout=DEVICE_DEFAULT_CONNECT_TIMEOUT
+ ):
'''
Request a connection to a peer.
This method cannot be called if there is already a pending connection.
+
+ connection_parameters_preferences: (BLE only, ignored for BR/EDR)
+ * None: use all PHYs with default parameters
+ * map: each entry has a PHY as key and a ConnectionParametersPreferences object as value
'''
+ # Check parameters
+ if transport not in {BT_LE_TRANSPORT, BT_BR_EDR_TRANSPORT}:
+ raise ValueError('invalid transport')
+
# Adjust the transport automatically if we need to
if transport == BT_LE_TRANSPORT and not self.le_enabled:
transport = BT_BR_EDR_TRANSPORT
@@ -846,49 +1153,117 @@ class Device(CompositeEventEmitter):
except ValueError:
# If the address is not parsable, assume it is a name instead
logger.debug('looking for peer by name')
- peer_address = await self.find_peer_by_name(peer_address, transport)
+ peer_address = await self.find_peer_by_name(peer_address, transport) # TODO: timeout
# Create a future so that we can wait for the connection's result
pending_connection = asyncio.get_running_loop().create_future()
self.on('connection', pending_connection.set_result)
self.on('connection_failure', pending_connection.set_exception)
- # Tell the controller to connect
- if transport == BT_LE_TRANSPORT:
- # TODO: use real values, not fixed ones
- result = await self.send_command(HCI_LE_Create_Connection_Command(
- le_scan_interval = 96,
- le_scan_window = 96,
- initiator_filter_policy = 0,
- peer_address_type = peer_address.address_type,
- peer_address = peer_address,
- own_address_type = Address.RANDOM_DEVICE_ADDRESS,
- conn_interval_min = 12,
- conn_interval_max = 24,
- conn_latency = 0,
- supervision_timeout = 72,
- minimum_ce_length = 0,
- maximum_ce_length = 0
- ))
- else:
- # TODO: use real values, not fixed ones
- result = await self.send_command(HCI_Create_Connection_Command(
- bd_addr = peer_address,
- packet_type = 0xCC18, # FIXME: change
- page_scan_repetition_mode = HCI_R2_PAGE_SCAN_REPETITION_MODE,
- clock_offset = 0x0000,
- allow_role_switch = 0x01,
- reserved = 0
- ))
-
try:
+ # Tell the controller to connect
+ if transport == BT_LE_TRANSPORT:
+ if connection_parameters_preferences is None:
+ if connection_parameters_preferences is None:
+ connection_parameters_preferences = {
+ HCI_LE_1M_PHY: ConnectionParametersPreferences.default,
+ HCI_LE_2M_PHY: ConnectionParametersPreferences.default,
+ HCI_LE_CODED_PHY: ConnectionParametersPreferences.default
+ }
+
+ if self.host.supports_command(HCI_LE_EXTENDED_CREATE_CONNECTION_COMMAND):
+ # Only keep supported PHYs
+ phys = sorted(list(set(filter(self.supports_le_phy, connection_parameters_preferences.keys()))))
+ if not phys:
+ raise ValueError('least one supported PHY needed')
+
+ phy_count = len(phys)
+ initiating_phys = phy_list_to_bits(phys)
+
+ connection_interval_mins = [
+ int(connection_parameters_preferences[phy].connection_interval_min / 1.25) for phy in phys
+ ]
+ connection_interval_maxs = [
+ int(connection_parameters_preferences[phy].connection_interval_max / 1.25) for phy in phys
+ ]
+ max_latencies = [
+ connection_parameters_preferences[phy].max_latency for phy in phys
+ ]
+ supervision_timeouts = [
+ int(connection_parameters_preferences[phy].supervision_timeout / 10) for phy in phys
+ ]
+ min_ce_lengths = [
+ int(connection_parameters_preferences[phy].min_ce_length / 0.625) for phy in phys
+ ]
+ max_ce_lengths = [
+ int(connection_parameters_preferences[phy].max_ce_length / 0.625) for phy in phys
+ ]
+
+ result = await self.send_command(HCI_LE_Extended_Create_Connection_Command(
+ initiator_filter_policy = 0,
+ own_address_type = Address.RANDOM_DEVICE_ADDRESS,
+ peer_address_type = peer_address.address_type,
+ peer_address = peer_address,
+ initiating_phys = initiating_phys,
+ scan_intervals = (int(DEVICE_DEFAULT_CONNECT_SCAN_INTERVAL / 0.625),) * phy_count,
+ scan_windows = (int(DEVICE_DEFAULT_CONNECT_SCAN_WINDOW / 0.625),) * phy_count,
+ connection_interval_mins = connection_interval_mins,
+ connection_interval_maxs = connection_interval_maxs,
+ max_latencies = max_latencies,
+ supervision_timeouts = supervision_timeouts,
+ min_ce_lengths = min_ce_lengths,
+ max_ce_lengths = max_ce_lengths
+ ))
+ else:
+ if HCI_LE_1M_PHY not in connection_parameters_preferences:
+ raise ValueError('1M PHY preferences required')
+
+ prefs = connection_parameters_preferences[HCI_LE_1M_PHY]
+ result = await self.send_command(HCI_LE_Create_Connection_Command(
+ le_scan_interval = int(DEVICE_DEFAULT_CONNECT_SCAN_INTERVAL / 0.625),
+ le_scan_window = int(DEVICE_DEFAULT_CONNECT_SCAN_WINDOW / 0.625),
+ initiator_filter_policy = 0,
+ peer_address_type = peer_address.address_type,
+ peer_address = peer_address,
+ own_address_type = Address.RANDOM_DEVICE_ADDRESS,
+ connection_interval_min = int(prefs.connection_interval_min / 1.25),
+ connection_interval_max = int(prefs.connection_interval_max / 1.25),
+ max_latency = prefs.max_latency,
+ supervision_timeout = int(prefs.supervision_timeout / 10),
+ min_ce_length = int(prefs.min_ce_length / 0.625),
+ max_ce_length = int(prefs.max_ce_length / 0.625),
+ ))
+ else:
+ # TODO: allow passing other settings
+ result = await self.send_command(HCI_Create_Connection_Command(
+ bd_addr = peer_address,
+ packet_type = 0xCC18, # FIXME: change
+ page_scan_repetition_mode = HCI_R2_PAGE_SCAN_REPETITION_MODE,
+ clock_offset = 0x0000,
+ allow_role_switch = 0x01,
+ reserved = 0
+ ))
+
if result.status != HCI_Command_Status_Event.PENDING:
raise HCI_StatusError(result)
# Wait for the connection process to complete
self.connecting = True
- return await pending_connection
+ if timeout is None:
+ return await pending_connection
+ else:
+ try:
+ return await asyncio.wait_for(asyncio.shield(pending_connection), timeout)
+ except asyncio.TimeoutError:
+ if transport == BT_LE_TRANSPORT:
+ await self.send_command(HCI_LE_Create_Connection_Cancel_Command())
+ else:
+ await self.send_command(HCI_Create_Connection_Cancel_Command(bd_addr=peer_address))
+ try:
+ return await pending_connection
+ except ConnectionError:
+ raise TimeoutError()
finally:
self.remove_listener('connection', pending_connection.set_result)
self.remove_listener('connection_failure', pending_connection.set_exception)
@@ -913,7 +1288,7 @@ class Device(CompositeEventEmitter):
async def cancel_connection(self):
if not self.is_connecting:
return
- await self.send_command(HCI_LE_Create_Connection_Cancel_Command())
+ await self.send_command(HCI_LE_Create_Connection_Cancel_Command(), check_result=True)
async def disconnect(self, connection, reason):
# Create a future so that we can wait for the disconnection's result
@@ -922,7 +1297,9 @@ class Device(CompositeEventEmitter):
connection.on('disconnection_failure', pending_disconnection.set_exception)
# Request a disconnection
- result = await self.send_command(HCI_Disconnect_Command(connection_handle = connection.handle, reason = reason))
+ result = await self.send_command(HCI_Disconnect_Command(
+ connection_handle = connection.handle, reason = reason
+ ))
try:
if result.status != HCI_Command_Status_Event.PENDING:
@@ -939,26 +1316,66 @@ class Device(CompositeEventEmitter):
async def update_connection_parameters(
self,
connection,
- conn_interval_min,
- conn_interval_max,
- conn_latency,
+ connection_interval_min,
+ connection_interval_max,
+ max_latency,
supervision_timeout,
- minimum_ce_length = 0,
- maximum_ce_length = 0
+ min_ce_length = 0,
+ max_ce_length = 0
):
'''
NOTE: the name of the parameters may look odd, but it just follows the names used in the Bluetooth spec.
'''
await self.send_command(HCI_LE_Connection_Update_Command(
- connection_handle = connection.handle,
- conn_interval_min = conn_interval_min,
- conn_interval_max = conn_interval_max,
- conn_latency = conn_latency,
- supervision_timeout = supervision_timeout,
- minimum_ce_length = minimum_ce_length,
- maximum_ce_length = maximum_ce_length
- ))
- # TODO: check result
+ connection_handle = connection.handle,
+ connection_interval_min = connection_interval_min,
+ connection_interval_max = connection_interval_max,
+ max_latency = max_latency,
+ supervision_timeout = supervision_timeout,
+ min_ce_length = min_ce_length,
+ max_ce_length = max_ce_length
+ ), check_result=True)
+
+ async def get_connection_rssi(self, connection):
+ result = await self.send_command(HCI_Read_RSSI_Command(handle = connection.handle), check_result=True)
+ return result.return_parameters.rssi
+
+ async def get_connection_phy(self, connection):
+ result = await self.send_command(
+ HCI_LE_Read_PHY_Command(connection_handle = connection.handle),
+ check_result=True
+ )
+ return (result.return_parameters.tx_phy, result.return_parameters.rx_phy)
+
+ async def set_connection_phy(
+ self,
+ connection,
+ tx_phys=None,
+ rx_phys=None,
+ phy_options=None
+ ):
+ all_phys_bits = (1 if tx_phys is None else 0) | ((1 if rx_phys is None else 0) << 1)
+
+ return await self.send_command(
+ HCI_LE_Set_PHY_Command(
+ connection_handle = connection.handle,
+ all_phys = all_phys_bits,
+ tx_phys = phy_list_to_bits(tx_phys),
+ rx_phys = phy_list_to_bits(rx_phys),
+ phy_options = 0 if phy_options is None else int(phy_options)
+ ), check_result=True
+ )
+
+ async def set_default_phy(self, tx_phys=None, rx_phys=None):
+ all_phys_bits = (1 if tx_phys is None else 0) | ((1 if rx_phys is None else 0) << 1)
+
+ return await self.send_command(
+ HCI_LE_Set_Default_PHY_Command(
+ all_phys = all_phys_bits,
+ tx_phys = phy_list_to_bits(tx_phys),
+ rx_phys = phy_list_to_bits(rx_phys)
+ ), check_result=True
+ )
async def find_peer_by_name(self, name, transport=BT_LE_TRANSPORT):
"""
@@ -982,8 +1399,7 @@ class Device(CompositeEventEmitter):
event_name = 'advertisement'
handler = self.on(
event_name,
- lambda address, ad_data, rssi, connectable:
- on_peer_found(address, ad_data)
+ lambda advertisement: on_peer_found(advertisement.address, advertisement.data)
)
was_scanning = self.scanning
@@ -1228,36 +1644,74 @@ class Device(CompositeEventEmitter):
if connection_handle in self.connections:
logger.warn('new connection reuses the same handle as a previous connection')
- # Resolve the peer address if we can
- if self.address_resolver:
- if peer_address.is_resolvable:
- resolved_address = self.address_resolver.resolve(peer_address)
- if resolved_address is not None:
- logger.debug(f'*** Address resolved as {resolved_address}')
- peer_resolvable_address = peer_address
- peer_address = resolved_address
+ if transport == BT_BR_EDR_TRANSPORT:
+ # Create a new connection
+ connection = Connection(
+ self,
+ connection_handle,
+ transport,
+ peer_address,
+ peer_resolvable_address,
+ role,
+ connection_parameters,
+ phy=None
+ )
+ self.connections[connection_handle] = connection
- # Create a new connection
- connection = Connection(
- self,
- connection_handle,
- transport,
- peer_address,
- peer_resolvable_address,
- role,
- connection_parameters
- )
- self.connections[connection_handle] = connection
+ # Emit an event to notify listeners of the new connection
+ self.emit('connection', connection)
+ else:
+ # Resolve the peer address if we can
+ if self.address_resolver:
+ if peer_address.is_resolvable:
+ resolved_address = self.address_resolver.resolve(peer_address)
+ if resolved_address is not None:
+ logger.debug(f'*** Address resolved as {resolved_address}')
+ peer_resolvable_address = peer_address
+ peer_address = resolved_address
- # We are no longer advertising
- self.advertising = False
+ # We are no longer advertising
+ self.advertising = False
- # Emit an event to notify listeners of the new connection
- self.emit('connection', connection)
+ # Create and notify of the new connection asynchronously
+ async def new_connection():
+ # Figure out which PHY we're connected with
+ if self.host.supports_command(HCI_LE_READ_PHY_COMMAND):
+ result = await self.send_command(
+ HCI_LE_Read_PHY_Command(connection_handle=connection_handle),
+ check_result=True
+ )
+ phy = ConnectionPHY(result.return_parameters.tx_phy, result.return_parameters.rx_phy)
+ else:
+ phy = ConnectionPHY(HCI_LE_1M_PHY, HCI_LE_1M_PHY)
+
+ # Create a new connection
+ connection = Connection(
+ self,
+ connection_handle,
+ transport,
+ peer_address,
+ peer_resolvable_address,
+ role,
+ connection_parameters,
+ phy
+ )
+ self.connections[connection_handle] = connection
+
+ # Emit an event to notify listeners of the new connection
+ self.emit('connection', connection)
+
+ asyncio.create_task(new_connection())
@host_event_handler
- def on_connection_failure(self, error_code):
- logger.debug(f'*** Connection failed: {error_code}')
+ def on_connection_failure(self, connection_handle, error_code):
+ logger.debug(f'*** Connection failed: {HCI_Constant.error_name(error_code)}')
+
+ # For directed advertising, this means a timeout
+ if self.advertising and self.advertising_type.is_directed:
+ self.advertising = False
+
+ # Notify listeners
error = ConnectionError(
error_code,
'hci',
@@ -1280,7 +1734,10 @@ class Device(CompositeEventEmitter):
# Restart advertising if auto-restart is enabled
if self.auto_restart_advertising:
logger.debug('restarting advertising')
- asyncio.create_task(self.start_advertising(auto_restart=self.auto_restart_advertising))
+ asyncio.create_task(self.start_advertising(
+ advertising_type = self.advertising_type,
+ auto_restart = True
+ ))
@host_event_handler
@with_connection_from_handle
diff --git a/bumble/hci.py b/bumble/hci.py
index 9858c04d..065901bb 100644
--- a/bumble/hci.py
+++ b/bumble/hci.py
@@ -62,6 +62,18 @@ def map_class_of_device(class_of_device):
return f'[{class_of_device:06X}] Services({",".join(DeviceClass.service_class_labels(service_classes))}),Class({DeviceClass.major_device_class_name(major_device_class)}|{DeviceClass.minor_device_class_name(major_device_class, minor_device_class)})'
+def phy_list_to_bits(phys):
+ if phys is None:
+ return 0
+ else:
+ phy_bits = 0
+ for phy in phys:
+ if phy not in HCI_LE_PHY_TYPE_TO_BIT:
+ raise ValueError('invalid PHY')
+ phy_bits |= (1 << HCI_LE_PHY_TYPE_TO_BIT[phy])
+ return phy_bits
+
+
# -----------------------------------------------------------------------------
# Constants
# -----------------------------------------------------------------------------
@@ -670,10 +682,22 @@ HCI_LE_CODED_PHY = 3
HCI_LE_PHY_NAMES = {
HCI_LE_1M_PHY: 'LE 1M',
- HCI_LE_2M_PHY: 'L2 2M',
+ HCI_LE_2M_PHY: 'LE 2M',
HCI_LE_CODED_PHY: 'LE Coded'
}
+HCI_LE_1M_PHY_BIT = 0
+HCI_LE_2M_PHY_BIT = 1
+HCI_LE_CODED_PHY_BIT = 2
+
+HCI_LE_PHY_BIT_NAMES = ['LE_1M_PHY', 'LE_2M_PHY', 'LE_CODED_PHY']
+
+HCI_LE_PHY_TYPE_TO_BIT = {
+ HCI_LE_1M_PHY: HCI_LE_1M_PHY_BIT,
+ HCI_LE_2M_PHY: HCI_LE_2M_PHY_BIT,
+ HCI_LE_CODED_PHY: HCI_LE_CODED_PHY_BIT
+}
+
# Connection Parameters
HCI_CONNECTION_INTERVAL_MS_PER_UNIT = 1.25
HCI_CONNECTION_LATENCY_MS_PER_UNIT = 1.25
@@ -1373,11 +1397,14 @@ class HCI_Error(ProtocolError):
super().__init__(error_code, 'hci', HCI_Constant.error_name(error_code))
+# -----------------------------------------------------------------------------
class HCI_StatusError(ProtocolError):
def __init__(self, response):
- super().__init__(response.status,
- error_namespace=HCI_Command.command_name(response.command_opcode),
- error_name=HCI_Constant.status_name(response.status))
+ super().__init__(
+ response.status,
+ error_namespace=HCI_Command.command_name(response.command_opcode),
+ error_name=HCI_Constant.status_name(response.status)
+ )
# -----------------------------------------------------------------------------
@@ -1402,7 +1429,7 @@ class HCI_Object:
def dict_from_bytes(data, offset, fields):
result = collections.OrderedDict()
for (field_name, field_type) in fields:
- # The field_type may be a dictionnary with a mapper, parser, and/or size
+ # The field_type may be a dictionary with a mapper, parser, and/or size
if type(field_type) is dict:
if 'size' in field_type:
field_type = field_type['size']
@@ -1464,7 +1491,7 @@ class HCI_Object:
def dict_to_bytes(object, fields):
result = bytearray()
for (field_name, field_type) in fields:
- # The field_type may be a dictionnary with a mapper, parser, serializer, and/or size
+ # The field_type may be a dictionary with a mapper, parser, serializer, and/or size
serializer = None
if type(field_type) is dict:
if 'serializer' in field_type:
@@ -1523,9 +1550,9 @@ class HCI_Object:
return bytes(result)
- @staticmethod
- def from_bytes(data, offset, fields):
- return HCI_Object(fields, **HCI_Object.dict_from_bytes(data, offset, fields))
+ @classmethod
+ def from_bytes(cls, data, offset, fields):
+ return cls(fields, **cls.dict_from_bytes(data, offset, fields))
def to_bytes(self):
return HCI_Object.dict_to_bytes(self.__dict__, self.fields)
@@ -1881,6 +1908,22 @@ class HCI_Disconnect_Command(HCI_Command):
'''
+# -----------------------------------------------------------------------------
+@HCI_Command.command(
+ fields=[
+ ('bd_addr', Address.parse_address)
+ ],
+ return_parameters_fields=[
+ ('status', STATUS_SPEC),
+ ('bd_addr', Address.parse_address)
+ ]
+)
+class HCI_Create_Connection_Cancel_Command(HCI_Command):
+ '''
+ See Bluetooth spec @ 7.1.7 Create Connection Cancel Command
+ '''
+
+
# -----------------------------------------------------------------------------
@HCI_Command.command([
('bd_addr', Address.parse_address),
@@ -2297,7 +2340,7 @@ class HCI_Read_Local_Name_Command(HCI_Command):
# -----------------------------------------------------------------------------
@HCI_Command.command([
- ('conn_accept_timeout', 2)
+ ('connection_accept_timeout', 2)
])
class HCI_Write_Connection_Accept_Timeout_Command(HCI_Command):
'''
@@ -2690,6 +2733,23 @@ class HCI_Read_Local_Supported_Codecs_Command(HCI_Command):
'''
+# -----------------------------------------------------------------------------
+@HCI_Command.command(
+ fields=[
+ ('handle', 2)
+ ],
+ return_parameters_fields=[
+ ('status', STATUS_SPEC),
+ ('handle', 2),
+ ('rssi', -1)
+ ]
+)
+class HCI_Read_RSSI_Command(HCI_Command):
+ '''
+ See Bluetooth spec @ 7.5.4 Read RSSI Command
+ '''
+
+
# -----------------------------------------------------------------------------
@HCI_Command.command(
fields=[
@@ -2766,18 +2826,18 @@ class HCI_LE_Set_Advertising_Parameters_Command(HCI_Command):
See Bluetooth spec @ 7.8.5 LE Set Advertising Parameters Command
'''
- ADV_IND = 0x00
- ADV_DIRECT_IND = 0x01
- ADV_SCAN_IND = 0x02
- ADV_NONCONN_IND = 0x03
- ADV_DIRECT_IND = 0x04
+ ADV_IND = 0x00
+ ADV_DIRECT_IND = 0x01
+ ADV_SCAN_IND = 0x02
+ ADV_NONCONN_IND = 0x03
+ ADV_DIRECT_IND_LOW_DUTY = 0x04
ADVERTISING_TYPE_NAMES = {
- ADV_IND: 'ADV_IND',
- ADV_DIRECT_IND: 'ADV_DIRECT_IND',
- ADV_SCAN_IND: 'ADV_SCAN_IND',
- ADV_NONCONN_IND: 'ADV_NONCONN_IND',
- ADV_DIRECT_IND: 'ADV_DIRECT_IND'
+ ADV_IND: 'ADV_IND',
+ ADV_DIRECT_IND: 'ADV_DIRECT_IND',
+ ADV_SCAN_IND: 'ADV_SCAN_IND',
+ ADV_NONCONN_IND: 'ADV_NONCONN_IND',
+ ADV_DIRECT_IND_LOW_DUTY: 'ADV_DIRECT_IND_LOW_DUTY'
}
@classmethod
@@ -2869,12 +2929,12 @@ class HCI_LE_Set_Scan_Enable_Command(HCI_Command):
('peer_address_type', Address.ADDRESS_TYPE_SPEC),
('peer_address', Address.parse_address_preceded_by_type),
('own_address_type', Address.ADDRESS_TYPE_SPEC),
- ('conn_interval_min', 2),
- ('conn_interval_max', 2),
- ('conn_latency', 2),
+ ('connection_interval_min', 2),
+ ('connection_interval_max', 2),
+ ('max_latency', 2),
('supervision_timeout', 2),
- ('minimum_ce_length', 2),
- ('maximum_ce_length', 2)
+ ('min_ce_length', 2),
+ ('max_ce_length', 2)
])
class HCI_LE_Create_Connection_Command(HCI_Command):
'''
@@ -2930,13 +2990,13 @@ class HCI_LE_Remove_Device_From_Filter_Accept_List_Command(HCI_Command):
# -----------------------------------------------------------------------------
@HCI_Command.command([
- ('connection_handle', 2),
- ('conn_interval_min', 2),
- ('conn_interval_max', 2),
- ('conn_latency', 2),
- ('supervision_timeout', 2),
- ('minimum_ce_length', 2),
- ('maximum_ce_length', 2)
+ ('connection_handle', 2),
+ ('connection_interval_min', 2),
+ ('connection_interval_max', 2),
+ ('max_latency', 2),
+ ('supervision_timeout', 2),
+ ('min_ce_length', 2),
+ ('max_ce_length', 2)
])
class HCI_LE_Connection_Update_Command(HCI_Command):
'''
@@ -3002,10 +3062,10 @@ class HCI_LE_Read_Supported_States_Command(HCI_Command):
('connection_handle', 2),
('interval_min', 2),
('interval_max', 2),
- ('latency', 2),
+ ('max_latency', 2),
('timeout', 2),
- ('minimum_ce_length', 2),
- ('maximum_ce_length', 2)
+ ('min_ce_length', 2),
+ ('max_ce_length', 2)
])
class HCI_LE_Remote_Connection_Parameter_Request_Reply_Command(HCI_Command):
'''
@@ -3024,6 +3084,36 @@ class HCI_LE_Remote_Connection_Parameter_Request_Negative_Reply_Command(HCI_Comm
'''
+# -----------------------------------------------------------------------------
+@HCI_Command.command(
+ fields=[
+ ('connection_handle', 2),
+ ('tx_octets', 2),
+ ('tx_time', 2),
+ ],
+ return_parameters_fields=[
+ ('status', STATUS_SPEC),
+ ('connection_handle', 2)
+ ]
+)
+class HCI_LE_Set_Data_Length_Command(HCI_Command):
+ '''
+ See Bluetooth spec @ 7.8.33 LE Set Data Length Command
+ '''
+
+
+# -----------------------------------------------------------------------------
+@HCI_Command.command(return_parameters_fields=[
+ ('status', STATUS_SPEC),
+ ('suggested_max_tx_octets', 2),
+ ('suggested_max_tx_time', 2),
+])
+class HCI_LE_Read_Suggested_Default_Data_Length_Command(HCI_Command):
+ '''
+ See Bluetooth spec @ 7.8.34 LE Read Suggested Default Data Length Command
+ '''
+
+
# -----------------------------------------------------------------------------
@HCI_Command.command([
('suggested_max_tx_octets', 2),
@@ -3056,18 +3146,6 @@ class HCI_LE_Clear_Resolving_List_Command(HCI_Command):
'''
-# -----------------------------------------------------------------------------
-@HCI_Command.command([
- ('all_phys', 1),
- ('tx_phys', 1),
- ('rx_phys', 1)
-])
-class HCI_LE_Set_Default_PHY_Command(HCI_Command):
- '''
- See Bluetooth spec @ 7.8.48 LE Set Default PHY Command
- '''
-
-
# -----------------------------------------------------------------------------
@HCI_Command.command([
('address_resolution_enable', 1)
@@ -3088,6 +3166,313 @@ class HCI_LE_Set_Resolvable_Private_Address_Timeout_Command(HCI_Command):
'''
+# -----------------------------------------------------------------------------
+@HCI_Command.command(return_parameters_fields=[
+ ('status', STATUS_SPEC),
+ ('supported_max_tx_octets', 2),
+ ('supported_max_tx_time', 2),
+ ('supported_max_rx_octets', 2),
+ ('supported_max_rx_time', 2)
+])
+class HCI_LE_Read_Maximum_Data_Length_Command(HCI_Command):
+ '''
+ See Bluetooth spec @ 7.8.46 LE Read Maximum Data Length Command
+ '''
+
+
+# -----------------------------------------------------------------------------
+@HCI_Command.command(
+ fields=[
+ ('connection_handle', 2)
+ ],
+ return_parameters_fields=[
+ ('status', STATUS_SPEC),
+ ('connection_handle', 2),
+ ('tx_phy', {'size': 1, 'mapper': HCI_Constant.le_phy_name}),
+ ('rx_phy', {'size': 1, 'mapper': HCI_Constant.le_phy_name})
+ ])
+class HCI_LE_Read_PHY_Command(HCI_Command):
+ '''
+ See Bluetooth spec @ 7.8.47 LE Read PHY Command
+ '''
+
+
+# -----------------------------------------------------------------------------
+@HCI_Command.command([
+ ('all_phys', {'size': 1, 'mapper': lambda x: bit_flags_to_strings(x, HCI_LE_Set_Default_PHY_Command.ANY_PHY_BIT_NAMES)}),
+ ('tx_phys', {'size': 1, 'mapper': lambda x: bit_flags_to_strings(x, HCI_LE_PHY_BIT_NAMES)}),
+ ('rx_phys', {'size': 1, 'mapper': lambda x: bit_flags_to_strings(x, HCI_LE_PHY_BIT_NAMES)})
+])
+class HCI_LE_Set_Default_PHY_Command(HCI_Command):
+ '''
+ See Bluetooth spec @ 7.8.48 LE Set Default PHY Command
+ '''
+ ANY_TX_PHY_BIT = 0
+ ANY_RX_PHY_BIT = 1
+
+ ANY_PHY_BIT_NAMES = ['Any TX', 'Any RX']
+
+
+# -----------------------------------------------------------------------------
+@HCI_Command.command([
+ ('connection_handle', 2),
+ ('all_phys', {'size': 1, 'mapper': lambda x: bit_flags_to_strings(x, HCI_LE_Set_PHY_Command.ANY_PHY_BIT_NAMES)}),
+ ('tx_phys', {'size': 1, 'mapper': lambda x: bit_flags_to_strings(x, HCI_LE_PHY_BIT_NAMES)}),
+ ('rx_phys', {'size': 1, 'mapper': lambda x: bit_flags_to_strings(x, HCI_LE_PHY_BIT_NAMES)}),
+ ('phy_options', 2)
+])
+class HCI_LE_Set_PHY_Command(HCI_Command):
+ '''
+ See Bluetooth spec @ 7.8.49 LE Set PHY Command
+ '''
+ ANY_TX_PHY_BIT = 0
+ ANY_RX_PHY_BIT = 1
+
+ ANY_PHY_BIT_NAMES = ['Any TX', 'Any RX']
+
+
+# -----------------------------------------------------------------------------
+@HCI_Command.command([
+ ('advertising_handle', 1),
+ ('random_address', lambda data, offset: Address.parse_address_with_type(data, offset, Address.RANDOM_DEVICE_ADDRESS))
+])
+class HCI_LE_Set_Advertising_Set_Random_Address_Command(HCI_Command):
+ '''
+ See Bluetooth spec @ 7.8.52 LE Set Advertising Set Random Address Command
+ '''
+
+
+# -----------------------------------------------------------------------------
+@HCI_Command.command(
+ fields=[
+ ('advertising_handle', 1),
+ ('advertising_event_properties', {'size': 2, 'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Parameters_Command.advertising_properties_string(x)}),
+ ('primary_advertising_interval_min', 3),
+ ('primary_advertising_interval_max', 3),
+ ('primary_advertising_channel_map', {'size': 1, 'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Parameters_Command.channel_map_string(x)}),
+ ('own_address_type', Address.ADDRESS_TYPE_SPEC),
+ ('peer_address_type', Address.ADDRESS_TYPE_SPEC),
+ ('peer_address', Address.parse_address_preceded_by_type),
+ ('advertising_filter_policy', 1),
+ ('advertising_tx_power', 1),
+ ('primary_advertising_phy', {'size': 1, 'mapper': HCI_Constant.le_phy_name}),
+ ('secondary_advertising_max_skip', 1),
+ ('secondary_advertising_phy', {'size': 1, 'mapper': HCI_Constant.le_phy_name}),
+ ('advertising_sid', 1),
+ ('scan_request_notification_enable', 1)
+ ],
+ return_parameters_fields=[
+ ('status', STATUS_SPEC),
+ ('selected_tx__power', 1)
+ ]
+)
+class HCI_LE_Set_Extended_Advertising_Parameters_Command(HCI_Command):
+ '''
+ See Bluetooth spec @ 7.8.53 LE Set Extended Advertising Parameters Command
+ '''
+
+ CONNECTABLE_ADVERTISING = 0
+ SCANNABLE_ADVERTISING = 1
+ DIRECTED_ADVERTISING = 2
+ HIGH_DUTY_CYCLE_DIRECTED_CONNECTABLE_ADVERTISING = 3
+ USE_LEGACY_ADVERTISING_PDUS = 4
+ ANONYMOUS_ADVERTISING = 5
+ INCLUDE_TX_POWER = 6
+
+ ADVERTISING_PROPERTIES_NAMES = (
+ 'CONNECTABLE_ADVERTISING',
+ 'SCANNABLE_ADVERTISING',
+ 'DIRECTED_ADVERTISING',
+ 'HIGH_DUTY_CYCLE_DIRECTED_CONNECTABLE_ADVERTISING',
+ 'USE_LEGACY_ADVERTISING_PDUS',
+ 'ANONYMOUS_ADVERTISING',
+ 'INCLUDE_TX_POWER'
+ )
+
+ CHANNEL_37 = 0
+ CHANNEL_38 = 1
+ CHANNEL_39 = 2
+
+ CHANNEL_NAMES = ('37', '38', '39')
+
+ @classmethod
+ def advertising_properties_string(cls, properties):
+ return f'[{",".join(bit_flags_to_strings(properties, cls.ADVERTISING_PROPERTIES_NAMES))}]'
+
+ @classmethod
+ def channel_map_string(cls, channel_map):
+ return f'[{",".join(bit_flags_to_strings(channel_map, cls.CHANNEL_NAMES))}]'
+
+
+# -----------------------------------------------------------------------------
+@HCI_Command.command([
+ ('advertising_handle', 1),
+ ('operation', {'size': 1, 'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Data_Command.operation_name(x)}),
+ ('fragment_preference', 1),
+ ('advertising_data', {
+ 'parser': HCI_Object.parse_length_prefixed_bytes,
+ 'serializer': functools.partial(HCI_Object.serialize_length_prefixed_bytes)
+ })
+])
+class HCI_LE_Set_Extended_Advertising_Data_Command(HCI_Command):
+ '''
+ See Bluetooth spec @ 7.8.54 LE Set Extended Advertising Data Command
+ '''
+
+ INTERMEDIATE_FRAGMENT = 0x00
+ FIRST_FRAGMENT = 0x01
+ LAST_FRAGMENT = 0x02
+ COMPLETE_DATA = 0x03
+ UNCHANGED_DATA = 0x04
+
+ OPERATION_NAMES = {
+ INTERMEDIATE_FRAGMENT: 'INTERMEDIATE_FRAGMENT',
+ FIRST_FRAGMENT: 'FIRST_FRAGMENT',
+ LAST_FRAGMENT: 'LAST_FRAGMENT',
+ COMPLETE_DATA: 'COMPLETE_DATA',
+ UNCHANGED_DATA: 'UNCHANGED_DATA'
+ }
+
+ @classmethod
+ def operation_name(cls, operation):
+ return name_or_number(cls.OPERATION_NAMES, operation)
+
+
+# -----------------------------------------------------------------------------
+@HCI_Command.command([
+ ('advertising_handle', 1),
+ ('operation', {'size': 1, 'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Data_Command.operation_name(x)}),
+ ('fragment_preference', 1),
+ ('scan_response_data', {
+ 'parser': HCI_Object.parse_length_prefixed_bytes,
+ 'serializer': functools.partial(HCI_Object.serialize_length_prefixed_bytes)
+ })
+])
+class HCI_LE_Set_Extended_Scan_Response_Data_Command(HCI_Command):
+ '''
+ See Bluetooth spec @ 7.8.55 LE Set Extended Scan Response Data Command
+ '''
+
+ INTERMEDIATE_FRAGMENT = 0x00
+ FIRST_FRAGMENT = 0x01
+ LAST_FRAGMENT = 0x02
+ COMPLETE_DATA = 0x03
+
+ OPERATION_NAMES = {
+ INTERMEDIATE_FRAGMENT: 'INTERMEDIATE_FRAGMENT',
+ FIRST_FRAGMENT: 'FIRST_FRAGMENT',
+ LAST_FRAGMENT: 'LAST_FRAGMENT',
+ COMPLETE_DATA: 'COMPLETE_DATA'
+ }
+
+ @classmethod
+ def operation_name(cls, operation):
+ return name_or_number(cls.OPERATION_NAMES, operation)
+
+
+# -----------------------------------------------------------------------------
+@HCI_Command.command(fields=None)
+class HCI_LE_Set_Extended_Advertising_Enable_Command(HCI_Command):
+ '''
+ See Bluetooth spec @ 7.8.56 LE Set Extended Advertising Enable Command
+ '''
+
+ @classmethod
+ def from_parameters(cls, parameters):
+ enable = parameters[0]
+ num_sets = parameters[1]
+ advertising_handles = []
+ durations = []
+ max_extended_advertising_events = []
+ offset = 2
+ for _ in range(num_sets):
+ advertising_handles.append(parameters[offset])
+ durations.append(struct.unpack_from('> 5) & 3])
+
+ if event_type & (1 << HCI_LE_Extended_Advertising_Report_Event.LEGACY_ADVERTISING_PDU_USED):
+ legacy_pdu_type = HCI_LE_Extended_Advertising_Report_Event.LEGACY_PDU_TYPE_MAP.get(event_type & 0x0F)
+ if legacy_pdu_type is not None:
+ legacy_info_string = f'({HCI_LE_Advertising_Report_Event.event_type_name(legacy_pdu_type)})'
+ else:
+ legacy_info_string = ''
+ else:
+ legacy_info_string = ''
+
+ return f'0x{event_type:04X} [{",".join(event_type_flags)}]{legacy_info_string}'
+
+ @classmethod
+ def from_parameters(cls, parameters):
+ num_reports = parameters[1]
+ reports = []
+ offset = 2
+ for _ in range(num_reports):
+ report = cls.Report.from_parameters(parameters, offset)
+ offset += 24 + len(report.data)
+ reports.append(report)
+
+ return cls(reports)
+
+ def __init__(self, reports):
+ self.reports = reports[:]
+
+ # Serialize the fields
+ parameters = bytes([HCI_LE_EXTENDED_ADVERTISING_REPORT_EVENT, len(reports)]) + b''.join([bytes(report) for report in reports])
+
+ super().__init__(self.subevent_code, parameters)
+
+ def __str__(self):
+ reports = '\n'.join([report.to_string(' ') for report in self.reports])
+ return f'{color(self.subevent_name(self.subevent_code), "magenta")}:\n{reports}'
+
+
+HCI_Event.meta_event_classes[HCI_LE_EXTENDED_ADVERTISING_REPORT_EVENT] = HCI_LE_Extended_Advertising_Report_Event
+
+
# -----------------------------------------------------------------------------
@HCI_LE_Meta_Event.event([
('connection_handle', 2),
@@ -3686,7 +4346,7 @@ class HCI_Encryption_Change_Event(HCI_Event):
E0_OR_AES_CCM = 0x01
AES_CCM = 0x02
- ENCYRPTION_ENABLED_NAMES = {
+ ENCRYPTION_ENABLED_NAMES = {
OFF: 'OFF',
E0_OR_AES_CCM: 'E0_OR_AES_CCM',
AES_CCM: 'AES_CCM'
@@ -3694,7 +4354,7 @@ class HCI_Encryption_Change_Event(HCI_Event):
@staticmethod
def encryption_enabled_name(encryption_enabled):
- return name_or_number(HCI_Encryption_Change_Event.ENCYRPTION_ENABLED_NAMES, encryption_enabled)
+ return name_or_number(HCI_Encryption_Change_Event.ENCRYPTION_ENABLED_NAMES, encryption_enabled)
# -----------------------------------------------------------------------------
@@ -3954,7 +4614,7 @@ class HCI_Page_Scan_Repetition_Mode_Change_Event(HCI_Event):
# -----------------------------------------------------------------------------
@HCI_Event.registered
-class HCI_Inquiry_Result_With_Rssi_Event(HCI_Event):
+class HCI_Inquiry_Result_With_RSSI_Event(HCI_Event):
'''
See Bluetooth spec @ 7.7.33 Inquiry Result with RSSI Event
'''
@@ -3974,11 +4634,11 @@ class HCI_Inquiry_Result_With_Rssi_Event(HCI_Event):
responses = []
offset = 1
for _ in range(num_responses):
- response = HCI_Object.from_bytes(parameters, offset, HCI_Inquiry_Result_With_Rssi_Event.RESPONSE_FIELDS)
+ response = HCI_Object.from_bytes(parameters, offset, HCI_Inquiry_Result_With_RSSI_Event.RESPONSE_FIELDS)
offset += 14
responses.append(response)
- return HCI_Inquiry_Result_With_Rssi_Event(responses)
+ return HCI_Inquiry_Result_With_RSSI_Event(responses)
def __init__(self, responses):
self.responses = responses[:]
@@ -4041,7 +4701,7 @@ class HCI_Synchronous_Connection_Complete_Event(HCI_Event):
U_LAW_LOG_AIR_MODE: 'u-law log',
A_LAW_LOG_AIR_MORE: 'A-law log',
CVSD_AIR_MODE: 'CVSD',
- TRANSPARENT_DATA_AIR_MODE: 'Transparend Data'
+ TRANSPARENT_DATA_AIR_MODE: 'Transparent Data'
}
@staticmethod
diff --git a/bumble/host.py b/bumble/host.py
index cc692d0b..35efad49 100644
--- a/bumble/host.py
+++ b/bumble/host.py
@@ -76,7 +76,7 @@ class Host(EventEmitter):
self.hc_total_num_acl_data_packets = HOST_HC_TOTAL_NUM_ACL_DATA_PACKETS
self.acl_packet_queue = collections.deque()
self.acl_packets_in_flight = 0
- self.local_version = HCI_VERSION_BLUETOOTH_CORE_4_0
+ self.local_version = None
self.local_supported_commands = bytes(64)
self.local_le_features = 0
self.command_semaphore = asyncio.Semaphore(1)
@@ -91,32 +91,23 @@ class Host(EventEmitter):
self.set_packet_sink(controller_sink)
async def reset(self):
- await self.send_command(HCI_Reset_Command())
+ await self.send_command(HCI_Reset_Command(), check_result=True)
self.ready = True
- response = await self.send_command(HCI_Read_Local_Supported_Commands_Command())
- if response.return_parameters.status == HCI_SUCCESS:
- self.local_supported_commands = response.return_parameters.supported_commands
- else:
- logger.warn(f'HCI_Read_Local_Supported_Commands_Command failed: {response.return_parameters.status}')
+ response = await self.send_command(HCI_Read_Local_Supported_Commands_Command(), check_result=True)
+ self.local_supported_commands = response.return_parameters.supported_commands
if self.supports_command(HCI_LE_READ_LOCAL_SUPPORTED_FEATURES_COMMAND):
- response = await self.send_command(HCI_LE_Read_Local_Supported_Features_Command())
- if response.return_parameters.status == HCI_SUCCESS:
- self.local_le_features = struct.unpack(' CONTROLLER", "blue")}: {command}')
# Wait until we can send (only one pending command at a time)
@@ -186,11 +171,22 @@ class Host(EventEmitter):
try:
self.send_hci_packet(command)
response = await self.pending_response
- # TODO: check error values
+
+ # Check the return parameters if required
+ if check_result:
+ if type(response.return_parameters) is int:
+ status = response.return_parameters
+ else:
+ status = response.return_parameters.status
+
+ if status != HCI_SUCCESS:
+ logger.warning(f'{command.name} failed ({HCI_Constant.error_name(status)})')
+ raise HCI_Error(status)
+
return response
except Exception as error:
logger.warning(f'{color("!!! Exception while sending HCI packet:", "red")} {error}')
- # raise error
+ raise error
finally:
self.pending_command = None
self.pending_response = None
@@ -370,8 +366,8 @@ class Host(EventEmitter):
# Notify the client
connection_parameters = ConnectionParameters(
- event.conn_interval,
- event.conn_latency,
+ event.connection_interval,
+ event.peripheral_latency,
event.supervision_timeout
)
self.emit(
@@ -387,7 +383,7 @@ class Host(EventEmitter):
logger.debug(f'### CONNECTION FAILED: {event.status}')
# Notify the listeners
- self.emit('connection_failure', event.status)
+ self.emit('connection_failure', event.connection_handle, event.status)
def on_hci_le_enhanced_connection_complete_event(self, event):
# Just use the same implementation as for the non-enhanced event for now
@@ -435,7 +431,7 @@ class Host(EventEmitter):
logger.debug(f'### DISCONNECTION FAILED: {event.status}')
# Notify the listeners
- self.emit('disconnection_failure', event.status)
+ self.emit('disconnection_failure', event.connection_handle, event.status)
def on_hci_le_connection_update_complete_event(self, event):
if (connection := self.connections.get(event.connection_handle)) is None:
@@ -445,8 +441,8 @@ class Host(EventEmitter):
# Notify the client
if event.status == HCI_SUCCESS:
connection_parameters = ConnectionParameters(
- event.conn_interval,
- event.conn_latency,
+ event.connection_interval,
+ event.peripheral_latency,
event.supervision_timeout
)
self.emit('connection_parameters_update', connection.handle, connection_parameters)
@@ -467,13 +463,10 @@ class Host(EventEmitter):
def on_hci_le_advertising_report_event(self, event):
for report in event.reports:
- self.emit(
- 'advertising_report',
- report.address,
- report.data,
- report.rssi,
- report.event_type
- )
+ self.emit('advertising_report', report)
+
+ def on_hci_le_extended_advertising_report_event(self, event):
+ self.on_hci_le_advertising_report_event(event)
def on_hci_le_remote_connection_parameter_request_event(self, event):
if event.connection_handle not in self.connections:
@@ -489,8 +482,8 @@ class Host(EventEmitter):
interval_max = event.interval_max,
latency = event.latency,
timeout = event.timeout,
- minimum_ce_length = 0,
- maximum_ce_length = 0
+ min_ce_length = 0,
+ max_ce_length = 0
)
)
diff --git a/bumble/l2cap.py b/bumble/l2cap.py
index e39dff19..927454e8 100644
--- a/bumble/l2cap.py
+++ b/bumble/l2cap.py
@@ -462,10 +462,10 @@ class L2CAP_Information_Response(L2CAP_Control_Frame):
# -----------------------------------------------------------------------------
@L2CAP_Control_Frame.subclass([
- ('interval_min', 2),
- ('interval_max', 2),
- ('slave_latency', 2),
- ('timeout_multiplier', 2)
+ ('interval_min', 2),
+ ('interval_max', 2),
+ ('latency', 2),
+ ('timeout', 2)
])
class L2CAP_Connection_Parameter_Update_Request(L2CAP_Control_Frame):
'''
@@ -857,10 +857,10 @@ class ChannelManager:
identifier = (self.identifiers.setdefault(connection.handle, 0) + 1) % 256
self.identifiers[connection.handle] = identifier
return identifier
-
+
def register_fixed_channel(self, cid, handler):
self.fixed_channels[cid] = handler
-
+
def deregister_fixed_channel(self, cid):
if cid in self.fixed_channels:
del self.fixed_channels[cid]
@@ -1057,13 +1057,13 @@ class ChannelManager:
)
)
self.host.send_command_sync(HCI_LE_Connection_Update_Command(
- connection_handle = connection.handle,
- conn_interval_min = request.interval_min,
- conn_interval_max = request.interval_max,
- conn_latency = request.slave_latency,
- supervision_timeout = request.timeout_multiplier,
- minimum_ce_length = 0,
- maximum_ce_length = 0
+ connection_handle = connection.handle,
+ connection_interval_min = request.interval_min,
+ connection_interval_max = request.interval_max,
+ max_latency = request.latency,
+ supervision_timeout = request.timeout,
+ min_ce_length = 0,
+ max_ce_length = 0
))
else:
self.send_control_frame(
diff --git a/docs/mkdocs/src/transports/android_emulator.md b/docs/mkdocs/src/transports/android_emulator.md
index d9033942..e43c82c8 100644
--- a/docs/mkdocs/src/transports/android_emulator.md
+++ b/docs/mkdocs/src/transports/android_emulator.md
@@ -5,8 +5,9 @@ The Android emulator transport either connects, as a host, to a "Root Canal" vir
("host" mode), or attaches a virtual controller to the Android Bluetooth host stack ("controller" mode).
## Moniker
-The moniker syntax for an Android Emulator transport is: `android-emulator:[mode=][mode=]`.
-Both the `mode=` and `mode=` parameters are optional (so the moniker `android-emulator` by itself is a valid moniker, which will create a transport in `host` mode, connected to `localhost` on the default gRPC port for the emulator)
+The moniker syntax for an Android Emulator transport is: `android-emulator:[mode=][:]`, where
+the `mode` parameter can specify running as a host or a controller, and `:` can specify a host name (or IP address) and TCP port number on which to reach the gRPC server for the emulator.
+Both the `mode=` and `:` parameters are optional (so the moniker `android-emulator` by itself is a valid moniker, which will create a transport in `host` mode, connected to `localhost` on the default gRPC port for the emulator).
!!! example Example
`android-emulator`
diff --git a/examples/run_advertiser.py b/examples/run_advertiser.py
index 52013562..e54bc37e 100644
--- a/examples/run_advertiser.py
+++ b/examples/run_advertiser.py
@@ -29,18 +29,31 @@ from bumble.transport import open_transport_or_link
# -----------------------------------------------------------------------------
async def main():
- if len(sys.argv) != 3:
- print('Usage: run_advertiser.py ')
- print('example: run_advertiser.py device1.json link-relay:ws://localhost:8888/test')
+ if len(sys.argv) < 3:
+ print('Usage: run_advertiser.py [type] [address]')
+ print('example: run_advertiser.py device1.json usb:0')
return
+ if len(sys.argv) >= 4:
+ advertising_type = AdvertisingType(int(sys.argv[3]))
+ else:
+ advertising_type = AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE
+
+ if advertising_type.is_directed:
+ if len(sys.argv) < 5:
+ print(' required for directed advertising')
+ return
+ target = Address(sys.argv[4])
+ else:
+ target = None
+
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
print('<<< connected')
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
await device.power_on()
- await device.start_advertising()
+ await device.start_advertising(advertising_type=advertising_type, target=target)
await hci_source.wait_for_termination()
# -----------------------------------------------------------------------------
diff --git a/examples/run_controller_with_scanner.py b/examples/run_controller_with_scanner.py
index 88bc1f8c..18ba2743 100644
--- a/examples/run_controller_with_scanner.py
+++ b/examples/run_controller_with_scanner.py
@@ -29,15 +29,15 @@ from bumble.transport import open_transport_or_link
# -----------------------------------------------------------------------------
class ScannerListener(Device.Listener):
- def on_advertisement(self, address, ad_data, rssi, connectable):
- address_type_string = ('P', 'R', 'PI', 'RI')[address.address_type]
- address_color = 'yellow' if connectable else 'red'
+ def on_advertisement(self, advertisement):
+ address_type_string = ('P', 'R', 'PI', 'RI')[advertisement.address.address_type]
+ address_color = 'yellow' if advertisement.is_connectable else 'red'
if address_type_string.startswith('P'):
type_color = 'green'
else:
type_color = 'cyan'
- print(f'>>> {color(address, address_color)} [{color(address_type_string, type_color)}]: RSSI={rssi}, {ad_data}')
+ print(f'>>> {color(advertisement.address, address_color)} [{color(address_type_string, type_color)}]: RSSI={advertisement.rssi}, {advertisement.data}')
# -----------------------------------------------------------------------------
diff --git a/examples/run_scanner.py b/examples/run_scanner.py
index feed88f4..719e58ed 100644
--- a/examples/run_scanner.py
+++ b/examples/run_scanner.py
@@ -40,24 +40,24 @@ async def main():
device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink)
@device.on('advertisement')
- def _(address, ad_data, rssi, connectable):
- address_type_string = ('PUBLIC', 'RANDOM', 'PUBLIC_ID', 'RANDOM_ID')[address.address_type]
- address_color = 'yellow' if connectable else 'red'
+ def _(advertisement):
+ address_type_string = ('PUBLIC', 'RANDOM', 'PUBLIC_ID', 'RANDOM_ID')[advertisement.address.address_type]
+ address_color = 'yellow' if advertisement.is_connectable else 'red'
address_qualifier = ''
if address_type_string.startswith('P'):
type_color = 'cyan'
else:
- if address.is_static:
+ if advertisement.address.is_static:
type_color = 'green'
address_qualifier = '(static)'
- elif address.is_resolvable:
+ elif advertisement.address.is_resolvable:
type_color = 'magenta'
address_qualifier = '(resolvable)'
else:
type_color = 'white'
separator = '\n '
- print(f'>>> {color(address, address_color)} [{color(address_type_string, type_color)}]{address_qualifier}:{separator}RSSI:{rssi}{separator}{ad_data.to_string(separator)}')
+ print(f'>>> {color(advertisement.address, address_color)} [{color(address_type_string, type_color)}]{address_qualifier}:{separator}RSSI:{advertisement.rssi}{separator}{advertisement.data.to_string(separator)}')
await device.power_on()
await device.start_scanning(filter_duplicates=filter_duplicates)
diff --git a/tests/hci_test.py b/tests/hci_test.py
index 63318476..370b4889 100644
--- a/tests/hci_test.py
+++ b/tests/hci_test.py
@@ -27,8 +27,8 @@ def basic_check(x):
parsed_str = str(parsed)
print(x_str)
parsed_bytes = parsed.to_bytes()
- assert(x_str == parsed_str)
- assert(packet == parsed_bytes)
+ assert x_str == parsed_str
+ assert packet == parsed_bytes
# -----------------------------------------------------------------------------
@@ -49,10 +49,10 @@ def test_HCI_LE_Connection_Complete_Event():
role = 1,
peer_address_type = 1,
peer_address = address,
- conn_interval = 3,
- conn_latency = 4,
+ connection_interval = 3,
+ peripheral_latency = 4,
supervision_timeout = 5,
- master_clock_accuracy = 6
+ central_clock_accuracy = 6
)
basic_check(event)
@@ -60,8 +60,8 @@ def test_HCI_LE_Connection_Complete_Event():
# -----------------------------------------------------------------------------
def test_HCI_LE_Advertising_Report_Event():
address = Address('00:11:22:33:44:55')
- report = HCI_Object(
- HCI_LE_Advertising_Report_Event.REPORT_FIELDS,
+ report = HCI_LE_Advertising_Report_Event.Report(
+ HCI_LE_Advertising_Report_Event.Report.FIELDS,
event_type = HCI_LE_Advertising_Report_Event.ADV_IND,
address_type = Address.PUBLIC_DEVICE_ADDRESS,
address = address,
@@ -87,8 +87,8 @@ def test_HCI_LE_Connection_Update_Complete_Event():
event = HCI_LE_Connection_Update_Complete_Event(
status = HCI_SUCCESS,
connection_handle = 0x007,
- conn_interval = 10,
- conn_latency = 3,
+ connection_interval = 10,
+ peripheral_latency = 3,
supervision_timeout = 5
)
basic_check(event)
@@ -133,7 +133,7 @@ def test_HCI_Command_Complete_Event():
)
basic_check(event)
event = HCI_Packet.from_bytes(event.to_bytes())
- assert(event.return_parameters == 7)
+ assert event.return_parameters == 7
# With a simple status as an integer status
event = HCI_Command_Complete_Event(
@@ -142,7 +142,7 @@ def test_HCI_Command_Complete_Event():
return_parameters = 9
)
basic_check(event)
- assert(event.return_parameters == 9)
+ assert event.return_parameters == 9
# -----------------------------------------------------------------------------
@@ -283,12 +283,32 @@ def test_HCI_LE_Create_Connection_Command():
peer_address_type = 1,
peer_address = Address('00:11:22:33:44:55'),
own_address_type = 2,
- conn_interval_min = 7,
- conn_interval_max = 8,
- conn_latency = 9,
+ connection_interval_min = 7,
+ connection_interval_max = 8,
+ max_latency = 9,
supervision_timeout = 10,
- minimum_ce_length = 11,
- maximum_ce_length = 12
+ min_ce_length = 11,
+ max_ce_length = 12
+ )
+ basic_check(command)
+
+
+# -----------------------------------------------------------------------------
+def test_HCI_LE_Extended_Create_Connection_Command():
+ command = HCI_LE_Extended_Create_Connection_Command(
+ initiator_filter_policy = 0,
+ own_address_type = 0,
+ peer_address_type = 1,
+ peer_address = Address('00:11:22:33:44:55'),
+ initiating_phys = 3,
+ scan_intervals = (10, 11),
+ scan_windows = (12, 13),
+ connection_interval_mins = (14, 15),
+ connection_interval_maxs = (16, 17),
+ max_latencies = (18, 19),
+ supervision_timeouts = (20, 21),
+ min_ce_lengths = (100, 101),
+ max_ce_lengths = (102, 103)
)
basic_check(command)
@@ -314,13 +334,13 @@ def test_HCI_LE_Remove_Device_From_Filter_Accept_List_Command():
# -----------------------------------------------------------------------------
def test_HCI_LE_Connection_Update_Command():
command = HCI_LE_Connection_Update_Command(
- connection_handle = 0x0002,
- conn_interval_min = 10,
- conn_interval_max = 20,
- conn_latency = 7,
- supervision_timeout = 3,
- minimum_ce_length = 100,
- maximum_ce_length = 200
+ connection_handle = 0x0002,
+ connection_interval_min = 10,
+ connection_interval_max = 20,
+ max_latency = 7,
+ supervision_timeout = 3,
+ min_ce_length = 100,
+ max_ce_length = 200
)
basic_check(command)
@@ -348,7 +368,7 @@ def test_HCI_LE_Set_Extended_Scan_Parameters_Command():
command = HCI_LE_Set_Extended_Scan_Parameters_Command(
own_address_type=Address.RANDOM_DEVICE_ADDRESS,
scanning_filter_policy=HCI_LE_Set_Extended_Scan_Parameters_Command.BASIC_FILTERED_POLICY,
- scanning_phys=(1 << HCI_LE_Set_Extended_Scan_Parameters_Command.LE_1M_PHY | 1 << HCI_LE_Set_Extended_Scan_Parameters_Command.LE_CODED_PHY | 1 << 4),
+ scanning_phys=(1 << HCI_LE_1M_PHY_BIT | 1 << HCI_LE_CODED_PHY_BIT | 1 << 4),
scan_types=[
HCI_LE_Set_Extended_Scan_Parameters_Command.ACTIVE_SCANNING,
HCI_LE_Set_Extended_Scan_Parameters_Command.ACTIVE_SCANNING,
@@ -363,20 +383,20 @@ def test_HCI_LE_Set_Extended_Scan_Parameters_Command():
# -----------------------------------------------------------------------------
def test_address():
a = Address('C4:F2:17:1A:1D:BB')
- assert(not a.is_public)
- assert(a.is_random)
- assert(a.address_type == Address.RANDOM_DEVICE_ADDRESS)
- assert(not a.is_resolvable)
- assert(not a.is_resolved)
- assert(a.is_static)
+ assert not a.is_public
+ assert a.is_random
+ assert a.address_type == Address.RANDOM_DEVICE_ADDRESS
+ assert not a.is_resolvable
+ assert not a.is_resolved
+ assert a.is_static
# -----------------------------------------------------------------------------
def test_custom():
data = bytes([0x77, 0x02, 0x01, 0x03])
packet = HCI_CustomPacket(data)
- assert(packet.hci_packet_type == 0x77)
- assert(packet.payload == data)
+ assert packet.hci_packet_type == 0x77
+ assert packet.payload == data
# -----------------------------------------------------------------------------
@@ -408,6 +428,7 @@ def run_test_commands():
test_HCI_LE_Set_Scan_Parameters_Command()
test_HCI_LE_Set_Scan_Enable_Command()
test_HCI_LE_Create_Connection_Command()
+ test_HCI_LE_Extended_Create_Connection_Command()
test_HCI_LE_Add_Device_To_Filter_Accept_List_Command()
test_HCI_LE_Remove_Device_From_Filter_Accept_List_Command()
test_HCI_LE_Connection_Update_Command()
diff --git a/tests/import_test.py b/tests/import_test.py
index d6eafbda..c016c3e4 100644
--- a/tests/import_test.py
+++ b/tests/import_test.py
@@ -62,6 +62,57 @@ def test_import():
assert utils
+# -----------------------------------------------------------------------------
+def test_app_imports():
+ from bumble.apps.console import main
+ assert main
+
+ from bumble.apps.controller_info import main
+ assert main
+
+ from bumble.apps.controllers import main
+ assert main
+
+ from bumble.apps.gatt_dump import main
+ assert main
+
+ from bumble.apps.gg_bridge import main
+ assert main
+
+ from bumble.apps.hci_bridge import main
+ assert main
+
+ from bumble.apps.pair import main
+ assert main
+
+ from bumble.apps.scan import main
+ assert main
+
+ from bumble.apps.show import main
+ assert main
+
+ from bumble.apps.unbond import main
+ assert main
+
+ from bumble.apps.usb_probe import main
+ assert main
+
+
+# -----------------------------------------------------------------------------
+def test_profiles_imports():
+ from bumble.profiles import (
+ battery_service,
+ device_information_service,
+ heart_rate_service
+ )
+
+ assert battery_service
+ assert device_information_service
+ assert heart_rate_service
+
+
# -----------------------------------------------------------------------------
if __name__ == '__main__':
test_import()
+ test_app_imports()
+ test_profiles_imports()
diff --git a/web/scanner.py b/web/scanner.py
index 9ab9f471..e734dbf2 100644
--- a/web/scanner.py
+++ b/web/scanner.py
@@ -21,9 +21,9 @@ from bumble.transport import PacketParser
# -----------------------------------------------------------------------------
class ScannerListener(Device.Listener):
- def on_advertisement(self, address, ad_data, rssi, connectable):
- address_type_string = ('P', 'R', 'PI', 'RI')[address.address_type]
- print(f'>>> {address} [{address_type_string}]: RSSI={rssi}, {ad_data}')
+ def on_advertisement(self, advertisement):
+ address_type_string = ('P', 'R', 'PI', 'RI')[advertisement.address.address_type]
+ print(f'>>> {advertisement.address} [{address_type_string}]: RSSI={advertisement.rssi}, {advertisement.ad_data}')
class HciSource: