Compare commits

...

8 Commits

Author SHA1 Message Date
Gilles Boccon-Gibod
ce25cdc223 fix tests 2022-10-17 14:32:20 -07:00
Gilles Boccon-Gibod
9c429ec75a add phy options 2022-10-17 09:47:09 -07:00
Gilles Boccon-Gibod
d5eebc2101 add AdvertisingType 2022-10-15 21:43:08 -07:00
Gilles Boccon-Gibod
d10dda7e10 wip 2022-10-15 14:44:46 -07:00
Gilles Boccon-Gibod
c316e8805f wip 2022-10-15 14:44:46 -07:00
Gilles Boccon-Gibod
de7e74652d more HCI commands 2022-10-15 14:44:46 -07:00
Gilles Boccon-Gibod
31edd58b3d add extended report class 2022-10-15 14:44:46 -07:00
Gilles Boccon-Gibod
7c4b042026 wip 2022-10-15 14:44:46 -07:00
16 changed files with 1972 additions and 523 deletions

View File

@@ -28,11 +28,16 @@ import click
from collections import OrderedDict
import colors
from bumble.core import UUID, AdvertisingData
from bumble.device import Device, Connection, Peer
from bumble.core import UUID, AdvertisingData, TimeoutError, BT_LE_TRANSPORT
from bumble.device import ConnectionParametersPreferences, Device, Connection, Peer
from bumble.utils import AsyncRunner
from bumble.transport import open_transport_or_link
from bumble.gatt import Characteristic
from bumble.hci import (
HCI_LE_1M_PHY,
HCI_LE_2M_PHY,
HCI_LE_CODED_PHY,
)
from prompt_toolkit import Application
from prompt_toolkit.history import FileHistory
@@ -43,6 +48,7 @@ from prompt_toolkit.styles import Style
from prompt_toolkit.filters import Condition
from prompt_toolkit.widgets import TextArea, Frame
from prompt_toolkit.widgets.toolbars import FormattedTextToolbar
from prompt_toolkit.data_structures import Point
from prompt_toolkit.layout import (
Layout,
HSplit,
@@ -51,17 +57,20 @@ from prompt_toolkit.layout import (
Float,
FormattedTextControl,
FloatContainer,
ConditionalContainer
ConditionalContainer,
Dimension
)
# -----------------------------------------------------------------------------
# Constants
# -----------------------------------------------------------------------------
BUMBLE_USER_DIR = os.path.expanduser('~/.bumble')
DEFAULT_PROMPT_HEIGHT = 20
DEFAULT_RSSI_BAR_WIDTH = 20
DISPLAY_MIN_RSSI = -100
DISPLAY_MAX_RSSI = -30
BUMBLE_USER_DIR = os.path.expanduser('~/.bumble')
DEFAULT_RSSI_BAR_WIDTH = 20
DEFAULT_CONNECTION_TIMEOUT = 30.0
DISPLAY_MIN_RSSI = -100
DISPLAY_MAX_RSSI = -30
RSSI_MONITOR_INTERVAL = 5.0 # Seconds
# -----------------------------------------------------------------------------
# Globals
@@ -69,16 +78,57 @@ DISPLAY_MAX_RSSI = -30
App = None
# -----------------------------------------------------------------------------
# Utils
# -----------------------------------------------------------------------------
def le_phy_name(phy_id):
return {
HCI_LE_1M_PHY: '1M',
HCI_LE_2M_PHY: '2M',
HCI_LE_CODED_PHY: 'CODED'
}.get(phy_id, HCI_Constant.le_phy_name(phy_id))
def rssi_bar(rssi):
blocks = ['', '', '', '', '', '', '', '']
bar_width = (rssi - DISPLAY_MIN_RSSI) / (DISPLAY_MAX_RSSI - DISPLAY_MIN_RSSI)
bar_width = min(max(bar_width, 0), 1)
bar_ticks = int(bar_width * DEFAULT_RSSI_BAR_WIDTH * 8)
bar_blocks = ('' * int(bar_ticks / 8)) + blocks[bar_ticks % 8]
return f'{rssi:4} {bar_blocks}'
def parse_phys(phys):
if phys.lower() == '*':
return None
else:
phy_list = []
elements = phys.lower().split(',')
for element in elements:
if element == '1m':
phy_list.append(HCI_LE_1M_PHY)
elif element == '2m':
phy_list.append(HCI_LE_2M_PHY)
elif element == 'coded':
phy_list.append(HCI_LE_CODED_PHY)
else:
raise ValueError('invalid PHY name')
return phy_list
# -----------------------------------------------------------------------------
# Console App
# -----------------------------------------------------------------------------
class ConsoleApp:
def __init__(self):
self.known_addresses = set()
self.known_addresses = set()
self.known_attributes = []
self.device = None
self.connected_peer = None
self.top_tab = 'scan'
self.device = None
self.connected_peer = None
self.top_tab = 'scan'
self.monitor_rssi = False
self.connection_rssi = None
style = Style.from_dict({
'output-field': 'bg:#000044 #ffffff',
@@ -106,6 +156,10 @@ class ConsoleApp:
'on': None,
'off': None
},
'rssi': {
'on': None,
'off': None
},
'show': {
'scan': None,
'services': None,
@@ -120,10 +174,17 @@ class ConsoleApp:
'services': None,
'attributes': None
},
'request-mtu': None,
'read': LiveCompleter(self.known_attributes),
'write': LiveCompleter(self.known_attributes),
'subscribe': LiveCompleter(self.known_attributes),
'unsubscribe': LiveCompleter(self.known_attributes),
'set-phy': {
'1m': None,
'2m': None,
'coded': None
},
'set-default-phy': None,
'quit': None,
'exit': None
})
@@ -139,14 +200,16 @@ class ConsoleApp:
self.input_field.accept_handler = self.accept_input
self.output_height = 7
self.output_height = Dimension(min=7, max=7, weight=1)
self.output_lines = []
self.output = FormattedTextControl()
self.output = FormattedTextControl(get_cursor_position=lambda: Point(0, max(0, len(self.output_lines) - 1)))
self.output_max_lines = 20
self.scan_results_text = FormattedTextControl()
self.services_text = FormattedTextControl()
self.attributes_text = FormattedTextControl()
self.log_text = FormattedTextControl()
self.log_height = 20
self.log_text = FormattedTextControl(get_cursor_position=lambda: Point(0, max(0, len(self.log_lines) - 1)))
self.log_height = Dimension(min=7, weight=4)
self.log_max_lines = 100
self.log_lines = []
container = HSplit([
@@ -163,11 +226,10 @@ class ConsoleApp:
filter=Condition(lambda: self.top_tab == 'attributes')
),
ConditionalContainer(
Frame(Window(self.log_text), title='Log'),
Frame(Window(self.log_text, height=self.log_height), title='Log'),
filter=Condition(lambda: self.top_tab == 'log')
),
Frame(Window(self.output), height=self.output_height),
# HorizontalLine(),
Frame(Window(self.output, height=self.output_height)),
FormattedTextToolbar(text=self.get_status_bar_text, style='reverse'),
self.input_field
])
@@ -199,6 +261,8 @@ class ConsoleApp:
)
async def run_async(self, device_config, transport):
rssi_monitoring_task = asyncio.create_task(self.rssi_monitor_loop())
async with await open_transport_or_link(transport) as (hci_source, hci_sink):
if device_config:
self.device = Device.from_config_file_with_hci(device_config, hci_source, hci_sink)
@@ -210,6 +274,8 @@ class ConsoleApp:
# Run the UI
await self.ui.run_async()
rssi_monitoring_task.cancel()
def add_known_address(self, address):
self.known_addresses.add(address)
@@ -224,22 +290,33 @@ class ConsoleApp:
connection_state = 'NONE'
encryption_state = ''
att_mtu = ''
rssi = '' if self.connection_rssi is None else rssi_bar(self.connection_rssi)
if self.device:
if self.device.is_connecting:
connection_state = 'CONNECTING'
elif self.connected_peer:
connection = self.connected_peer.connection
connection_parameters = f'{connection.parameters.connection_interval}/{connection.parameters.connection_latency}/{connection.parameters.supervision_timeout}'
connection_state = f'{connection.peer_address} {connection_parameters} {connection.data_length}'
connection_parameters = f'{connection.parameters.connection_interval}/{connection.parameters.peripheral_latency}/{connection.parameters.supervision_timeout}'
if connection.transport == BT_LE_TRANSPORT:
phy_state = f' RX={le_phy_name(connection.phy.rx_phy)}/TX={le_phy_name(connection.phy.tx_phy)}'
else:
phy_state = ''
connection_state = f'{connection.peer_address} {connection_parameters} {connection.data_length}{phy_state}'
encryption_state = 'ENCRYPTED' if connection.is_encrypted else 'NOT ENCRYPTED'
att_mtu = f'ATT_MTU: {connection.att_mtu}'
return [
('ansigreen', f' SCAN: {scanning} '),
('', ' '),
('ansiblue', f' CONNECTION: {connection_state} '),
('', ' '),
('ansimagenta', f' {encryption_state} ')
('ansimagenta', f' {encryption_state} '),
('', ' '),
('ansicyan', f' {att_mtu} '),
('', ' '),
('ansiyellow', f' {rssi} ')
]
def show_error(self, title, details = None):
@@ -286,7 +363,7 @@ class ConsoleApp:
def append_to_output(self, line, invalidate=True):
if type(line) is str:
line = [('', line)]
self.output_lines = self.output_lines[-(self.output_height - 3):]
self.output_lines = self.output_lines[-self.output_max_lines:]
self.output_lines.append(line)
formatted_text = []
for line in self.output_lines:
@@ -298,7 +375,7 @@ class ConsoleApp:
def append_to_log(self, lines, invalidate=True):
self.log_lines.extend(lines.split('\n'))
self.log_lines = self.log_lines[-(self.log_height - 3):]
self.log_lines = self.log_lines[-self.log_max_lines:]
self.log_text.text = ANSI('\n'.join(self.log_lines))
if invalidate:
self.ui.invalidate()
@@ -351,6 +428,12 @@ class ConsoleApp:
if characteristic.handle == attribute_handle:
return characteristic
async def rssi_monitor_loop(self):
while True:
if self.monitor_rssi and self.connected_peer:
self.connection_rssi = await self.connected_peer.connection.get_rssi()
await asyncio.sleep(RSSI_MONITOR_INTERVAL)
async def command(self, command):
try:
(keyword, *params) = command.strip().split(' ')
@@ -379,39 +462,73 @@ class ConsoleApp:
else:
self.show_error('unsupported arguments for scan command')
async def do_rssi(self, params):
if len(params) == 0:
# Toggle monitoring
self.monitor_rssi = not self.monitor_rssi
elif params[0] == 'on':
self.monitor_rssi = True
elif params[0] == 'off':
self.monitor_rssi = False
else:
self.show_error('unsupported arguments for rssi command')
async def do_connect(self, params):
if len(params) != 1:
self.show_error('invalid syntax', 'expected connect <address>')
if len(params) != 1 and len(params) != 2:
self.show_error('invalid syntax', 'expected connect <address> [phys]')
return
if len(params) == 1:
phys = None
else:
phys = parse_phys(params[1])
if phys is None:
connection_parameters_preferences = None
else:
connection_parameters_preferences = {
phy: ConnectionParametersPreferences()
for phy in phys
}
self.append_to_output('connecting...')
await self.device.connect(params[0])
self.top_tab = 'services'
try:
await self.device.connect(
params[0],
connection_parameters_preferences=connection_parameters_preferences,
timeout=DEFAULT_CONNECTION_TIMEOUT
)
self.top_tab = 'services'
except TimeoutError:
self.show_error('connection timed out')
async def do_disconnect(self, params):
if not self.connected_peer:
self.show_error('not connected')
return
if self.device.connecting:
await self.device.cancel_connection()
else:
if not self.connected_peer:
self.show_error('not connected')
return
await self.connected_peer.connection.disconnect()
await self.connected_peer.connection.disconnect()
async def do_update_parameters(self, params):
if len(params) != 1 or len(params[0].split('/')) != 3:
self.show_error('invalid syntax', 'expected update-parameters <interval-min>-<interval-max>/<latency>/<supervision>')
self.show_error('invalid syntax', 'expected update-parameters <interval-min>-<interval-max>/<max-latency>/<supervision>')
return
if not self.connected_peer:
self.show_error('not connected')
return
connection_intervals, connection_latency, supervision_timeout = params[0].split('/')
connection_intervals, max_latency, supervision_timeout = params[0].split('/')
connection_interval_min, connection_interval_max = [int(x) for x in connection_intervals.split('-')]
connection_latency = int(connection_latency)
max_latency = int(max_latency)
supervision_timeout = int(supervision_timeout)
await self.connected_peer.connection.update_parameters(
connection_interval_min,
connection_interval_max,
connection_latency,
max_latency,
supervision_timeout
)
@@ -442,6 +559,25 @@ class ConsoleApp:
self.top_tab = params[0]
self.ui.invalidate()
async def do_get_phy(self, params):
if not self.connected_peer:
self.show_error('not connected')
return
phy = await self.connected_peer.connection.get_phy()
self.append_to_output(f'PHY: RX={HCI_Constant.le_phy_name(phy[0])}, TX={HCI_Constant.le_phy_name(phy[1])}')
async def do_request_mtu(self, params):
if len(params) != 1:
self.show_error('invalid syntax', 'expected request-mtu <mtu>')
return
if not self.connected_peer:
self.show_error('not connected')
return
await self.connected_peer.request_mtu(int(params[0]))
async def do_discover(self, params):
if not params:
self.show_error('invalid syntax', 'expected discover services|attributes')
@@ -454,14 +590,14 @@ class ConsoleApp:
await self.discover_attributes()
async def do_read(self, params):
if not self.connected_peer:
self.show_error('not connected')
return
if len(params) != 1:
self.show_error('invalid syntax', 'expected read <attribute>')
return
if not self.connected_peer:
self.show_error('not connected')
return
characteristic = self.find_characteristic(params[0])
if characteristic is None:
self.show_error('no such characteristic')
@@ -530,6 +666,42 @@ class ConsoleApp:
await characteristic.unsubscribe()
async def do_set_phy(self, params):
if len(params) != 1:
self.show_error('invalid syntax', 'expected set-phy <tx_rx_phys>|<tx_phys>/<rx_phys>')
return
if not self.connected_peer:
self.show_error('not connected')
return
if '/' in params[0]:
tx_phys, rx_phys = params[0].split('/')
else:
tx_phys = params[0]
rx_phys = tx_phys
await self.connected_peer.connection.set_phy(
tx_phys=parse_phys(tx_phys),
rx_phys=parse_phys(rx_phys)
)
async def do_set_default_phy(self, params):
if len(params) != 1:
self.show_error('invalid syntax', 'expected set-default-phy <tx_rx_phys>|<tx_phys>/<rx_phys>')
return
if '/' in params[0]:
tx_phys, rx_phys = params[0].split('/')
else:
tx_phys = params[0]
rx_phys = tx_phys
await self.device.set_default_phy(
tx_phys=parse_phys(tx_phys),
rx_phys=parse_phys(rx_phys)
)
async def do_exit(self, params):
self.ui.exit()
@@ -548,12 +720,14 @@ class DeviceListener(Device.Listener, Connection.Listener):
@AsyncRunner.run_in_task()
async def on_connection(self, connection):
self.app.connected_peer = Peer(connection)
self.app.connection_rssi = None
self.app.append_to_output(f'connected to {self.app.connected_peer}')
connection.listener = self
def on_disconnection(self, reason):
self.app.append_to_output(f'disconnected from {self.app.connected_peer}, reason: {HCI_Constant.error_name(reason)}')
self.app.connected_peer = None
self.app.connection_rssi = None
def on_connection_parameters_update(self):
self.app.append_to_output(f'connection parameters update: {self.app.connected_peer.connection.parameters}')
@@ -570,16 +744,16 @@ class DeviceListener(Device.Listener, Connection.Listener):
def on_connection_data_length_change(self):
self.app.append_to_output(f'connection data length change: {self.app.connected_peer.connection.data_length}')
def on_advertisement(self, address, ad_data, rssi, connectable):
entry_key = f'{address}/{address.address_type}'
def on_advertisement(self, advertisement):
entry_key = f'{advertisement.address}/{advertisement.address.address_type}'
entry = self.scan_results.get(entry_key)
if entry:
entry.ad_data = ad_data
entry.rssi = rssi
entry.connectable = connectable
entry.ad_data = advertisement.data
entry.rssi = advertisement.rssi
entry.connectable = advertisement.is_connectable
else:
self.app.add_known_address(str(address))
self.scan_results[entry_key] = ScanResult(address, address.address_type, ad_data, rssi, connectable)
self.app.add_known_address(str(advertisement.address))
self.scan_results[entry_key] = ScanResult(advertisement.address, advertisement.address.address_type, advertisement.data, advertisement.rssi, advertisement.is_connectable)
self.app.show_scan_results(self.scan_results)
@@ -616,12 +790,7 @@ class ScanResult:
name = ''
# RSSI bar
blocks = ['', '', '', '', '', '', '', '']
bar_width = (self.rssi - DISPLAY_MIN_RSSI) / (DISPLAY_MAX_RSSI - DISPLAY_MIN_RSSI)
bar_width = min(max(bar_width, 0), 1)
bar_ticks = int(bar_width * DEFAULT_RSSI_BAR_WIDTH * 8)
bar_blocks = ('' * int(bar_ticks / 8)) + blocks[bar_ticks % 8]
bar_string = f'{self.rssi} {bar_blocks}'
bar_string = rssi_bar(self.rssi)
bar_padding = ' ' * (DEFAULT_RSSI_BAR_WIDTH + 5 - len(bar_string))
return f'{address_color(str(self.address))} [{type_color(address_type_string)}] {bar_string} {bar_padding} {name}'

View File

@@ -25,15 +25,21 @@ from bumble.company_ids import COMPANY_IDENTIFIERS
from bumble.core import name_or_number
from bumble.hci import (
map_null_terminated_utf8_string,
HCI_LE_SUPPORTED_FEATURES_NAMES,
HCI_SUCCESS,
HCI_LE_SUPPORTED_FEATURES_NAMES,
HCI_VERSION_NAMES,
LMP_VERSION_NAMES,
HCI_Command,
HCI_Read_BD_ADDR_Command,
HCI_READ_BD_ADDR_COMMAND,
HCI_Read_BD_ADDR_Command,
HCI_READ_LOCAL_NAME_COMMAND,
HCI_Read_Local_Name_Command,
HCI_READ_LOCAL_NAME_COMMAND
HCI_LE_READ_MAXIMUM_DATA_LENGTH_COMMAND,
HCI_LE_Read_Maximum_Data_Length_Command,
HCI_LE_READ_NUMBER_OF_SUPPORTED_ADVERTISING_SETS_COMMAND,
HCI_LE_Read_Number_Of_Supported_Advertising_Sets_Command,
HCI_LE_READ_MAXIMUM_ADVERTISING_DATA_LENGTH_COMMAND,
HCI_LE_Read_Maximum_Advertising_Data_Length_Command
)
from bumble.host import Host
from bumble.transport import open_transport_or_link
@@ -57,6 +63,39 @@ async def get_classic_info(host):
# -----------------------------------------------------------------------------
async def get_le_info(host):
print()
if host.supports_command(HCI_LE_READ_NUMBER_OF_SUPPORTED_ADVERTISING_SETS_COMMAND):
response = await host.send_command(HCI_LE_Read_Number_Of_Supported_Advertising_Sets_Command())
if response.return_parameters.status == HCI_SUCCESS:
print(
color('LE Number Of Supported Advertising Sets:', 'yellow'),
response.return_parameters.num_supported_advertising_sets,
'\n'
)
if host.supports_command(HCI_LE_READ_MAXIMUM_ADVERTISING_DATA_LENGTH_COMMAND):
response = await host.send_command(HCI_LE_Read_Maximum_Advertising_Data_Length_Command())
if response.return_parameters.status == HCI_SUCCESS:
print(
color('LE Maximum Advertising Data Length:', 'yellow'),
response.return_parameters.max_advertising_data_length,
'\n'
)
if host.supports_command(HCI_LE_READ_MAXIMUM_DATA_LENGTH_COMMAND):
response = await host.send_command(HCI_LE_Read_Maximum_Data_Length_Command())
if response.return_parameters.status == HCI_SUCCESS:
print(
color('Maximum Data Length:', 'yellow'),
(
f'tx:{response.return_parameters.supported_max_tx_octets}/'
f'{response.return_parameters.supported_max_tx_time}, '
f'rx:{response.return_parameters.supported_max_rx_octets}/'
f'{response.return_parameters.supported_max_rx_time}'
),
'\n'
)
print(color('LE Features:', 'yellow'))
for feature in host.supported_le_features:
print(' ', name_or_number(HCI_LE_SUPPORTED_FEATURES_NAMES, feature))

View File

@@ -25,8 +25,8 @@ from bumble.device import Device
from bumble.transport import open_transport_or_link
from bumble.keys import JsonKeyStore
from bumble.smp import AddressResolver
from bumble.hci import HCI_LE_Advertising_Report_Event
from bumble.core import AdvertisingData
from bumble.device import Advertisement
from bumble.hci import HCI_Constant, HCI_LE_1M_PHY, HCI_LE_CODED_PHY
# -----------------------------------------------------------------------------
@@ -48,16 +48,19 @@ class AdvertisementPrinter:
self.min_rssi = min_rssi
self.resolver = resolver
def print_advertisement(self, address, address_color, ad_data, rssi):
if self.min_rssi is not None and rssi < self.min_rssi:
def print_advertisement(self, advertisement):
address = advertisement.address
address_color = 'yellow' if advertisement.is_connectable else 'red'
if self.min_rssi is not None and advertisement.rssi < self.min_rssi:
return
address_qualifier = ''
resolution_qualifier = ''
if self.resolver and address.is_resolvable:
resolved = self.resolver.resolve(address)
if self.resolver and advertisement.address.is_resolvable:
resolved = self.resolver.resolve(advertisement.address)
if resolved is not None:
resolution_qualifier = f'(resolved from {address})'
resolution_qualifier = f'(resolved from {advertisement.address})'
address = resolved
address_type_string = ('PUBLIC', 'RANDOM', 'PUBLIC_ID', 'RANDOM_ID')[address.address_type]
@@ -74,18 +77,30 @@ class AdvertisementPrinter:
type_color = 'blue'
address_qualifier = '(non-resolvable)'
rssi_bar = make_rssi_bar(rssi)
separator = '\n '
print(f'>>> {color(address, address_color)} [{color(address_type_string, type_color)}]{address_qualifier}{resolution_qualifier}:{separator}RSSI:{rssi:4} {rssi_bar}{separator}{ad_data.to_string(separator)}\n')
rssi_bar = make_rssi_bar(advertisement.rssi)
if not advertisement.is_legacy:
phy_info = (
f'PHY: {HCI_Constant.le_phy_name(advertisement.primary_phy)}/'
f'{HCI_Constant.le_phy_name(advertisement.secondary_phy)} '
f'{separator}'
)
else:
phy_info = ''
def on_advertisement(self, address, ad_data, rssi, connectable):
address_color = 'yellow' if connectable else 'red'
self.print_advertisement(address, address_color, ad_data, rssi)
print(
f'>>> {color(address, address_color)} '
f'[{color(address_type_string, type_color)}]{address_qualifier}{resolution_qualifier}:{separator}'
f'{phy_info}'
f'RSSI:{advertisement.rssi:4} {rssi_bar}{separator}'
f'{advertisement.data.to_string(separator)}\n')
def on_advertising_report(self, address, ad_data, rssi, event_type):
print(f'{color("EVENT", "green")}: {HCI_LE_Advertising_Report_Event.event_type_name(event_type)}')
ad_data = AdvertisingData.from_bytes(ad_data)
self.print_advertisement(address, 'yellow', ad_data, rssi)
def on_advertisement(self, advertisement):
self.print_advertisement(advertisement)
def on_advertising_report(self, report):
print(f'{color("EVENT", "green")}: {report.event_type_string()}')
self.print_advertisement(Advertisement.from_advertising_report(report))
# -----------------------------------------------------------------------------
@@ -94,6 +109,7 @@ async def scan(
passive,
scan_interval,
scan_window,
phy,
filter_duplicates,
raw,
keystore_file,
@@ -126,11 +142,18 @@ async def scan(
device.on('advertisement', printer.on_advertisement)
await device.power_on()
if phy is None:
scanning_phys = [HCI_LE_1M_PHY, HCI_LE_CODED_PHY]
else:
scanning_phys = [{'1m': HCI_LE_1M_PHY, 'coded': HCI_LE_CODED_PHY}[phy]]
await device.start_scanning(
active=(not passive),
scan_interval=scan_interval,
scan_window=scan_window,
filter_duplicates=filter_duplicates
filter_duplicates=filter_duplicates,
scanning_phys=scanning_phys
)
await hci_source.wait_for_termination()
@@ -142,14 +165,15 @@ async def scan(
@click.option('--passive', is_flag=True, default=False, help='Perform passive scanning')
@click.option('--scan-interval', type=int, default=60, help='Scan interval')
@click.option('--scan-window', type=int, default=60, help='Scan window')
@click.option('--phy', type=click.Choice(['1m', 'coded']), help='Only scan on the specified PHY')
@click.option('--filter-duplicates', type=bool, default=True, help='Filter duplicates at the controller level')
@click.option('--raw', is_flag=True, default=False, help='Listen for raw advertising reports instead of processed ones')
@click.option('--keystore-file', help='Keystore file to use when resolving addresses')
@click.option('--device-config', help='Device config file for the scanning device')
@click.argument('transport')
def main(min_rssi, passive, scan_interval, scan_window, filter_duplicates, raw, keystore_file, device_config, transport):
def main(min_rssi, passive, scan_interval, scan_window, phy, filter_duplicates, raw, keystore_file, device_config, transport):
logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'WARNING').upper())
asyncio.run(scan(min_rssi, passive, scan_interval, scan_window, filter_duplicates, raw, keystore_file, device_config, transport))
asyncio.run(scan(min_rssi, passive, scan_interval, scan_window, phy, filter_duplicates, raw, keystore_file, device_config, transport))
# -----------------------------------------------------------------------------

View File

@@ -76,7 +76,7 @@ class Controller:
self.supported_commands = bytes.fromhex('2000800000c000000000e40000002822000000000000040000f7ffff7f00000030f0f9ff01008004000000000000000000000000000000000000000000000000')
self.le_features = bytes.fromhex('ff49010000000000')
self.le_states = bytes.fromhex('ffff3fffff030000')
self.avertising_channel_tx_power = 0
self.advertising_channel_tx_power = 0
self.filter_accept_list_size = 8
self.resolving_list_size = 8
self.supported_max_tx_octets = 27
@@ -259,15 +259,15 @@ class Controller:
# Then say that the connection has completed
self.send_hci_packet(HCI_LE_Connection_Complete_Event(
status = HCI_SUCCESS,
connection_handle = connection.handle,
role = connection.role,
peer_address_type = peer_address_type,
peer_address = peer_address,
conn_interval = 10, # FIXME
conn_latency = 0, # FIXME
supervision_timeout = 10, # FIXME
master_clock_accuracy = 7 # FIXME
status = HCI_SUCCESS,
connection_handle = connection.handle,
role = connection.role,
peer_address_type = peer_address_type,
peer_address = peer_address,
connection_interval = 10, # FIXME
peripheral_latency = 0, # FIXME
supervision_timeout = 10, # FIXME
central_clock_accuracy = 7 # FIXME
))
def on_link_central_disconnected(self, peer_address, reason):
@@ -313,15 +313,15 @@ class Controller:
# Say that the connection has completed
self.send_hci_packet(HCI_LE_Connection_Complete_Event(
status = status,
connection_handle = connection.handle if connection else 0,
role = BT_CENTRAL_ROLE,
peer_address_type = le_create_connection_command.peer_address_type,
peer_address = le_create_connection_command.peer_address,
conn_interval = le_create_connection_command.conn_interval_min,
conn_latency = le_create_connection_command.conn_latency,
supervision_timeout = le_create_connection_command.supervision_timeout,
master_clock_accuracy = 0
status = status,
connection_handle = connection.handle if connection else 0,
role = BT_CENTRAL_ROLE,
peer_address_type = le_create_connection_command.peer_address_type,
peer_address = le_create_connection_command.peer_address,
connection_interval = le_create_connection_command.connection_interval_min,
peripheral_latency = le_create_connection_command.max_latency,
supervision_timeout = le_create_connection_command.supervision_timeout,
central_clock_accuracy = 0
))
def on_link_peripheral_disconnection_complete(self, disconnection_command, status):
@@ -583,13 +583,15 @@ class Controller:
'''
See Bluetooth spec Vol 2, Part E - 7.4.1 Read Local Version Information Command
'''
return struct.pack('<BBHBHH',
HCI_SUCCESS,
self.hci_version,
self.hci_revision,
self.lmp_version,
self.manufacturer_name,
self.lmp_subversion)
return struct.pack(
'<BBHBHH',
HCI_SUCCESS,
self.hci_version,
self.hci_revision,
self.lmp_version,
self.manufacturer_name,
self.lmp_subversion
)
def on_hci_read_local_supported_commands_command(self, command):
'''
@@ -650,7 +652,7 @@ class Controller:
'''
See Bluetooth spec Vol 2, Part E - 7.8.6 LE Read Advertising Channel Tx Power Command
'''
return bytes([HCI_SUCCESS, self.avertising_channel_tx_power])
return bytes([HCI_SUCCESS, self.advertising_channel_tx_power])
def on_hci_le_set_advertising_data_command(self, command):
'''
@@ -876,12 +878,26 @@ class Controller:
'''
See Bluetooth spec Vol 2, Part E - 7.8.46 LE Read Maximum Data Length Command
'''
return struct.pack('<BHHHH',
HCI_SUCCESS,
self.supported_max_tx_octets,
self.supported_max_tx_time,
self.supported_max_rx_octets,
self.supported_max_rx_time)
return struct.pack(
'<BHHHH',
HCI_SUCCESS,
self.supported_max_tx_octets,
self.supported_max_tx_time,
self.supported_max_rx_octets,
self.supported_max_rx_time
)
def on_hci_le_read_phy_command(self, command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.47 LE Read PHY command
'''
return struct.pack(
'<BHBB',
HCI_SUCCESS,
command.connection_handle,
HCI_LE_1M_PHY,
HCI_LE_1M_PHY
)
def on_hci_le_set_default_phy_command(self, command):
'''
@@ -893,3 +909,4 @@ class Controller:
'rx_phys': command.rx_phys
}
return bytes([HCI_SUCCESS])

View File

@@ -831,13 +831,17 @@ class AdvertisingData:
# Connection Parameters
# -----------------------------------------------------------------------------
class ConnectionParameters:
def __init__(self, connection_interval, connection_latency, supervision_timeout):
def __init__(self, connection_interval, peripheral_latency, supervision_timeout):
self.connection_interval = connection_interval
self.connection_latency = connection_latency
self.peripheral_latency = peripheral_latency
self.supervision_timeout = supervision_timeout
def __str__(self):
return f'ConnectionParameters(connection_interval={self.connection_interval}, connection_latency={self.connection_latency}, supervision_timeout={self.supervision_timeout}'
return (
f'ConnectionParameters(connection_interval={self.connection_interval}, '
f'peripheral_latency={self.peripheral_latency}, '
f'supervision_timeout={self.supervision_timeout}'
)
# -----------------------------------------------------------------------------

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -76,7 +76,7 @@ class Host(EventEmitter):
self.hc_total_num_acl_data_packets = HOST_HC_TOTAL_NUM_ACL_DATA_PACKETS
self.acl_packet_queue = collections.deque()
self.acl_packets_in_flight = 0
self.local_version = HCI_VERSION_BLUETOOTH_CORE_4_0
self.local_version = None
self.local_supported_commands = bytes(64)
self.local_le_features = 0
self.command_semaphore = asyncio.Semaphore(1)
@@ -91,32 +91,23 @@ class Host(EventEmitter):
self.set_packet_sink(controller_sink)
async def reset(self):
await self.send_command(HCI_Reset_Command())
await self.send_command(HCI_Reset_Command(), check_result=True)
self.ready = True
response = await self.send_command(HCI_Read_Local_Supported_Commands_Command())
if response.return_parameters.status == HCI_SUCCESS:
self.local_supported_commands = response.return_parameters.supported_commands
else:
logger.warn(f'HCI_Read_Local_Supported_Commands_Command failed: {response.return_parameters.status}')
response = await self.send_command(HCI_Read_Local_Supported_Commands_Command(), check_result=True)
self.local_supported_commands = response.return_parameters.supported_commands
if self.supports_command(HCI_LE_READ_LOCAL_SUPPORTED_FEATURES_COMMAND):
response = await self.send_command(HCI_LE_Read_Local_Supported_Features_Command())
if response.return_parameters.status == HCI_SUCCESS:
self.local_le_features = struct.unpack('<Q', response.return_parameters.le_features)[0]
else:
logger.warn(f'HCI_LE_Read_Supported_Features_Command failed: {response.return_parameters.status}')
response = await self.send_command(HCI_LE_Read_Local_Supported_Features_Command(), check_result=True)
self.local_le_features = struct.unpack('<Q', response.return_parameters.le_features)[0]
if self.supports_command(HCI_READ_LOCAL_VERSION_INFORMATION_COMMAND):
response = await self.send_command(HCI_Read_Local_Version_Information_Command())
if response.return_parameters.status == HCI_SUCCESS:
self.local_version = response.return_parameters
else:
logger.warn(f'HCI_Read_Local_Version_Information_Command failed: {response.return_parameters.status}')
response = await self.send_command(HCI_Read_Local_Version_Information_Command(), check_result=True)
self.local_version = response.return_parameters
await self.send_command(HCI_Set_Event_Mask_Command(event_mask = bytes.fromhex('FFFFFFFFFFFFFF3F')))
if self.local_version.hci_version <= HCI_VERSION_BLUETOOTH_CORE_4_0:
if self.local_version is not None and self.local_version.hci_version <= HCI_VERSION_BLUETOOTH_CORE_4_0:
# Some older controllers don't like event masks with bits they don't understand
le_event_mask = bytes.fromhex('1F00000000000000')
else:
@@ -124,20 +115,14 @@ class Host(EventEmitter):
await self.send_command(HCI_LE_Set_Event_Mask_Command(le_event_mask = le_event_mask))
if self.supports_command(HCI_READ_BUFFER_SIZE_COMMAND):
response = await self.send_command(HCI_Read_Buffer_Size_Command())
if response.return_parameters.status == HCI_SUCCESS:
self.hc_acl_data_packet_length = response.return_parameters.hc_acl_data_packet_length
self.hc_total_num_acl_data_packets = response.return_parameters.hc_total_num_acl_data_packets
else:
logger.warn(f'HCI_Read_Buffer_Size_Command failed: {response.return_parameters.status}')
response = await self.send_command(HCI_Read_Buffer_Size_Command(), check_result=True)
self.hc_acl_data_packet_length = response.return_parameters.hc_acl_data_packet_length
self.hc_total_num_acl_data_packets = response.return_parameters.hc_total_num_acl_data_packets
if self.supports_command(HCI_LE_READ_BUFFER_SIZE_COMMAND):
response = await self.send_command(HCI_LE_Read_Buffer_Size_Command())
if response.return_parameters.status == HCI_SUCCESS:
self.hc_le_acl_data_packet_length = response.return_parameters.hc_le_acl_data_packet_length
self.hc_total_num_le_acl_data_packets = response.return_parameters.hc_total_num_le_acl_data_packets
else:
logger.warn(f'HCI_LE_Read_Buffer_Size_Command failed: {response.return_parameters.status}')
response = await self.send_command(HCI_LE_Read_Buffer_Size_Command(), check_result=True)
self.hc_le_acl_data_packet_length = response.return_parameters.hc_le_acl_data_packet_length
self.hc_total_num_le_acl_data_packets = response.return_parameters.hc_total_num_le_acl_data_packets
if response.return_parameters.hc_le_acl_data_packet_length == 0 or response.return_parameters.hc_total_num_le_acl_data_packets == 0:
# LE and Classic share the same values
@@ -171,7 +156,7 @@ class Host(EventEmitter):
def send_hci_packet(self, packet):
self.hci_sink.on_packet(packet.to_bytes())
async def send_command(self, command):
async def send_command(self, command, check_result=False):
logger.debug(f'{color("### HOST -> CONTROLLER", "blue")}: {command}')
# Wait until we can send (only one pending command at a time)
@@ -186,11 +171,22 @@ class Host(EventEmitter):
try:
self.send_hci_packet(command)
response = await self.pending_response
# TODO: check error values
# Check the return parameters if required
if check_result:
if type(response.return_parameters) is int:
status = response.return_parameters
else:
status = response.return_parameters.status
if status != HCI_SUCCESS:
logger.warning(f'{command.name} failed ({HCI_Constant.error_name(status)})')
raise HCI_Error(status)
return response
except Exception as error:
logger.warning(f'{color("!!! Exception while sending HCI packet:", "red")} {error}')
# raise error
raise error
finally:
self.pending_command = None
self.pending_response = None
@@ -370,8 +366,8 @@ class Host(EventEmitter):
# Notify the client
connection_parameters = ConnectionParameters(
event.conn_interval,
event.conn_latency,
event.connection_interval,
event.peripheral_latency,
event.supervision_timeout
)
self.emit(
@@ -387,7 +383,7 @@ class Host(EventEmitter):
logger.debug(f'### CONNECTION FAILED: {event.status}')
# Notify the listeners
self.emit('connection_failure', event.status)
self.emit('connection_failure', event.connection_handle, event.status)
def on_hci_le_enhanced_connection_complete_event(self, event):
# Just use the same implementation as for the non-enhanced event for now
@@ -435,7 +431,7 @@ class Host(EventEmitter):
logger.debug(f'### DISCONNECTION FAILED: {event.status}')
# Notify the listeners
self.emit('disconnection_failure', event.status)
self.emit('disconnection_failure', event.connection_handle, event.status)
def on_hci_le_connection_update_complete_event(self, event):
if (connection := self.connections.get(event.connection_handle)) is None:
@@ -445,8 +441,8 @@ class Host(EventEmitter):
# Notify the client
if event.status == HCI_SUCCESS:
connection_parameters = ConnectionParameters(
event.conn_interval,
event.conn_latency,
event.connection_interval,
event.peripheral_latency,
event.supervision_timeout
)
self.emit('connection_parameters_update', connection.handle, connection_parameters)
@@ -467,13 +463,10 @@ class Host(EventEmitter):
def on_hci_le_advertising_report_event(self, event):
for report in event.reports:
self.emit(
'advertising_report',
report.address,
report.data,
report.rssi,
report.event_type
)
self.emit('advertising_report', report)
def on_hci_le_extended_advertising_report_event(self, event):
self.on_hci_le_advertising_report_event(event)
def on_hci_le_remote_connection_parameter_request_event(self, event):
if event.connection_handle not in self.connections:
@@ -489,8 +482,8 @@ class Host(EventEmitter):
interval_max = event.interval_max,
latency = event.latency,
timeout = event.timeout,
minimum_ce_length = 0,
maximum_ce_length = 0
min_ce_length = 0,
max_ce_length = 0
)
)

View File

@@ -462,10 +462,10 @@ class L2CAP_Information_Response(L2CAP_Control_Frame):
# -----------------------------------------------------------------------------
@L2CAP_Control_Frame.subclass([
('interval_min', 2),
('interval_max', 2),
('slave_latency', 2),
('timeout_multiplier', 2)
('interval_min', 2),
('interval_max', 2),
('latency', 2),
('timeout', 2)
])
class L2CAP_Connection_Parameter_Update_Request(L2CAP_Control_Frame):
'''
@@ -857,10 +857,10 @@ class ChannelManager:
identifier = (self.identifiers.setdefault(connection.handle, 0) + 1) % 256
self.identifiers[connection.handle] = identifier
return identifier
def register_fixed_channel(self, cid, handler):
self.fixed_channels[cid] = handler
def deregister_fixed_channel(self, cid):
if cid in self.fixed_channels:
del self.fixed_channels[cid]
@@ -1057,13 +1057,13 @@ class ChannelManager:
)
)
self.host.send_command_sync(HCI_LE_Connection_Update_Command(
connection_handle = connection.handle,
conn_interval_min = request.interval_min,
conn_interval_max = request.interval_max,
conn_latency = request.slave_latency,
supervision_timeout = request.timeout_multiplier,
minimum_ce_length = 0,
maximum_ce_length = 0
connection_handle = connection.handle,
connection_interval_min = request.interval_min,
connection_interval_max = request.interval_max,
max_latency = request.latency,
supervision_timeout = request.timeout,
min_ce_length = 0,
max_ce_length = 0
))
else:
self.send_control_frame(

View File

@@ -5,8 +5,9 @@ The Android emulator transport either connects, as a host, to a "Root Canal" vir
("host" mode), or attaches a virtual controller to the Android Bluetooth host stack ("controller" mode).
## Moniker
The moniker syntax for an Android Emulator transport is: `android-emulator:[mode=<host|controller>][mode=<host|controller>]`.
Both the `mode=<host|controller>` and `mode=<host|controller>` parameters are optional (so the moniker `android-emulator` by itself is a valid moniker, which will create a transport in `host` mode, connected to `localhost` on the default gRPC port for the emulator)
The moniker syntax for an Android Emulator transport is: `android-emulator:[mode=<host|controller>][<hostname>:<port>]`, where
the `mode` parameter can specify running as a host or a controller, and `<hostname>:<port>` can specify a host name (or IP address) and TCP port number on which to reach the gRPC server for the emulator.
Both the `mode=<host|controller>` and `<hostname>:<port>` parameters are optional (so the moniker `android-emulator` by itself is a valid moniker, which will create a transport in `host` mode, connected to `localhost` on the default gRPC port for the emulator).
!!! example Example
`android-emulator`

View File

@@ -29,18 +29,31 @@ from bumble.transport import open_transport_or_link
# -----------------------------------------------------------------------------
async def main():
if len(sys.argv) != 3:
print('Usage: run_advertiser.py <config-file> <transport-spec>')
print('example: run_advertiser.py device1.json link-relay:ws://localhost:8888/test')
if len(sys.argv) < 3:
print('Usage: run_advertiser.py <config-file> <transport-spec> [type] [address]')
print('example: run_advertiser.py device1.json usb:0')
return
if len(sys.argv) >= 4:
advertising_type = AdvertisingType(int(sys.argv[3]))
else:
advertising_type = AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE
if advertising_type.is_directed:
if len(sys.argv) < 5:
print('<address> required for directed advertising')
return
target = Address(sys.argv[4])
else:
target = None
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
print('<<< connected')
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
await device.power_on()
await device.start_advertising()
await device.start_advertising(advertising_type=advertising_type, target=target)
await hci_source.wait_for_termination()
# -----------------------------------------------------------------------------

View File

@@ -29,15 +29,15 @@ from bumble.transport import open_transport_or_link
# -----------------------------------------------------------------------------
class ScannerListener(Device.Listener):
def on_advertisement(self, address, ad_data, rssi, connectable):
address_type_string = ('P', 'R', 'PI', 'RI')[address.address_type]
address_color = 'yellow' if connectable else 'red'
def on_advertisement(self, advertisement):
address_type_string = ('P', 'R', 'PI', 'RI')[advertisement.address.address_type]
address_color = 'yellow' if advertisement.is_connectable else 'red'
if address_type_string.startswith('P'):
type_color = 'green'
else:
type_color = 'cyan'
print(f'>>> {color(address, address_color)} [{color(address_type_string, type_color)}]: RSSI={rssi}, {ad_data}')
print(f'>>> {color(advertisement.address, address_color)} [{color(address_type_string, type_color)}]: RSSI={advertisement.rssi}, {advertisement.data}')
# -----------------------------------------------------------------------------

View File

@@ -40,24 +40,24 @@ async def main():
device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink)
@device.on('advertisement')
def _(address, ad_data, rssi, connectable):
address_type_string = ('PUBLIC', 'RANDOM', 'PUBLIC_ID', 'RANDOM_ID')[address.address_type]
address_color = 'yellow' if connectable else 'red'
def _(advertisement):
address_type_string = ('PUBLIC', 'RANDOM', 'PUBLIC_ID', 'RANDOM_ID')[advertisement.address.address_type]
address_color = 'yellow' if advertisement.is_connectable else 'red'
address_qualifier = ''
if address_type_string.startswith('P'):
type_color = 'cyan'
else:
if address.is_static:
if advertisement.address.is_static:
type_color = 'green'
address_qualifier = '(static)'
elif address.is_resolvable:
elif advertisement.address.is_resolvable:
type_color = 'magenta'
address_qualifier = '(resolvable)'
else:
type_color = 'white'
separator = '\n '
print(f'>>> {color(address, address_color)} [{color(address_type_string, type_color)}]{address_qualifier}:{separator}RSSI:{rssi}{separator}{ad_data.to_string(separator)}')
print(f'>>> {color(advertisement.address, address_color)} [{color(address_type_string, type_color)}]{address_qualifier}:{separator}RSSI:{advertisement.rssi}{separator}{advertisement.data.to_string(separator)}')
await device.power_on()
await device.start_scanning(filter_duplicates=filter_duplicates)

View File

@@ -27,8 +27,8 @@ def basic_check(x):
parsed_str = str(parsed)
print(x_str)
parsed_bytes = parsed.to_bytes()
assert(x_str == parsed_str)
assert(packet == parsed_bytes)
assert x_str == parsed_str
assert packet == parsed_bytes
# -----------------------------------------------------------------------------
@@ -49,10 +49,10 @@ def test_HCI_LE_Connection_Complete_Event():
role = 1,
peer_address_type = 1,
peer_address = address,
conn_interval = 3,
conn_latency = 4,
connection_interval = 3,
peripheral_latency = 4,
supervision_timeout = 5,
master_clock_accuracy = 6
central_clock_accuracy = 6
)
basic_check(event)
@@ -60,8 +60,8 @@ def test_HCI_LE_Connection_Complete_Event():
# -----------------------------------------------------------------------------
def test_HCI_LE_Advertising_Report_Event():
address = Address('00:11:22:33:44:55')
report = HCI_Object(
HCI_LE_Advertising_Report_Event.REPORT_FIELDS,
report = HCI_LE_Advertising_Report_Event.Report(
HCI_LE_Advertising_Report_Event.Report.FIELDS,
event_type = HCI_LE_Advertising_Report_Event.ADV_IND,
address_type = Address.PUBLIC_DEVICE_ADDRESS,
address = address,
@@ -87,8 +87,8 @@ def test_HCI_LE_Connection_Update_Complete_Event():
event = HCI_LE_Connection_Update_Complete_Event(
status = HCI_SUCCESS,
connection_handle = 0x007,
conn_interval = 10,
conn_latency = 3,
connection_interval = 10,
peripheral_latency = 3,
supervision_timeout = 5
)
basic_check(event)
@@ -133,7 +133,7 @@ def test_HCI_Command_Complete_Event():
)
basic_check(event)
event = HCI_Packet.from_bytes(event.to_bytes())
assert(event.return_parameters == 7)
assert event.return_parameters == 7
# With a simple status as an integer status
event = HCI_Command_Complete_Event(
@@ -142,7 +142,7 @@ def test_HCI_Command_Complete_Event():
return_parameters = 9
)
basic_check(event)
assert(event.return_parameters == 9)
assert event.return_parameters == 9
# -----------------------------------------------------------------------------
@@ -283,12 +283,32 @@ def test_HCI_LE_Create_Connection_Command():
peer_address_type = 1,
peer_address = Address('00:11:22:33:44:55'),
own_address_type = 2,
conn_interval_min = 7,
conn_interval_max = 8,
conn_latency = 9,
connection_interval_min = 7,
connection_interval_max = 8,
max_latency = 9,
supervision_timeout = 10,
minimum_ce_length = 11,
maximum_ce_length = 12
min_ce_length = 11,
max_ce_length = 12
)
basic_check(command)
# -----------------------------------------------------------------------------
def test_HCI_LE_Extended_Create_Connection_Command():
command = HCI_LE_Extended_Create_Connection_Command(
initiator_filter_policy = 0,
own_address_type = 0,
peer_address_type = 1,
peer_address = Address('00:11:22:33:44:55'),
initiating_phys = 3,
scan_intervals = (10, 11),
scan_windows = (12, 13),
connection_interval_mins = (14, 15),
connection_interval_maxs = (16, 17),
max_latencies = (18, 19),
supervision_timeouts = (20, 21),
min_ce_lengths = (100, 101),
max_ce_lengths = (102, 103)
)
basic_check(command)
@@ -314,13 +334,13 @@ def test_HCI_LE_Remove_Device_From_Filter_Accept_List_Command():
# -----------------------------------------------------------------------------
def test_HCI_LE_Connection_Update_Command():
command = HCI_LE_Connection_Update_Command(
connection_handle = 0x0002,
conn_interval_min = 10,
conn_interval_max = 20,
conn_latency = 7,
supervision_timeout = 3,
minimum_ce_length = 100,
maximum_ce_length = 200
connection_handle = 0x0002,
connection_interval_min = 10,
connection_interval_max = 20,
max_latency = 7,
supervision_timeout = 3,
min_ce_length = 100,
max_ce_length = 200
)
basic_check(command)
@@ -348,7 +368,7 @@ def test_HCI_LE_Set_Extended_Scan_Parameters_Command():
command = HCI_LE_Set_Extended_Scan_Parameters_Command(
own_address_type=Address.RANDOM_DEVICE_ADDRESS,
scanning_filter_policy=HCI_LE_Set_Extended_Scan_Parameters_Command.BASIC_FILTERED_POLICY,
scanning_phys=(1 << HCI_LE_Set_Extended_Scan_Parameters_Command.LE_1M_PHY | 1 << HCI_LE_Set_Extended_Scan_Parameters_Command.LE_CODED_PHY | 1 << 4),
scanning_phys=(1 << HCI_LE_1M_PHY_BIT | 1 << HCI_LE_CODED_PHY_BIT | 1 << 4),
scan_types=[
HCI_LE_Set_Extended_Scan_Parameters_Command.ACTIVE_SCANNING,
HCI_LE_Set_Extended_Scan_Parameters_Command.ACTIVE_SCANNING,
@@ -363,20 +383,20 @@ def test_HCI_LE_Set_Extended_Scan_Parameters_Command():
# -----------------------------------------------------------------------------
def test_address():
a = Address('C4:F2:17:1A:1D:BB')
assert(not a.is_public)
assert(a.is_random)
assert(a.address_type == Address.RANDOM_DEVICE_ADDRESS)
assert(not a.is_resolvable)
assert(not a.is_resolved)
assert(a.is_static)
assert not a.is_public
assert a.is_random
assert a.address_type == Address.RANDOM_DEVICE_ADDRESS
assert not a.is_resolvable
assert not a.is_resolved
assert a.is_static
# -----------------------------------------------------------------------------
def test_custom():
data = bytes([0x77, 0x02, 0x01, 0x03])
packet = HCI_CustomPacket(data)
assert(packet.hci_packet_type == 0x77)
assert(packet.payload == data)
assert packet.hci_packet_type == 0x77
assert packet.payload == data
# -----------------------------------------------------------------------------
@@ -408,6 +428,7 @@ def run_test_commands():
test_HCI_LE_Set_Scan_Parameters_Command()
test_HCI_LE_Set_Scan_Enable_Command()
test_HCI_LE_Create_Connection_Command()
test_HCI_LE_Extended_Create_Connection_Command()
test_HCI_LE_Add_Device_To_Filter_Accept_List_Command()
test_HCI_LE_Remove_Device_From_Filter_Accept_List_Command()
test_HCI_LE_Connection_Update_Command()

View File

@@ -62,6 +62,57 @@ def test_import():
assert utils
# -----------------------------------------------------------------------------
def test_app_imports():
from bumble.apps.console import main
assert main
from bumble.apps.controller_info import main
assert main
from bumble.apps.controllers import main
assert main
from bumble.apps.gatt_dump import main
assert main
from bumble.apps.gg_bridge import main
assert main
from bumble.apps.hci_bridge import main
assert main
from bumble.apps.pair import main
assert main
from bumble.apps.scan import main
assert main
from bumble.apps.show import main
assert main
from bumble.apps.unbond import main
assert main
from bumble.apps.usb_probe import main
assert main
# -----------------------------------------------------------------------------
def test_profiles_imports():
from bumble.profiles import (
battery_service,
device_information_service,
heart_rate_service
)
assert battery_service
assert device_information_service
assert heart_rate_service
# -----------------------------------------------------------------------------
if __name__ == '__main__':
test_import()
test_app_imports()
test_profiles_imports()

View File

@@ -21,9 +21,9 @@ from bumble.transport import PacketParser
# -----------------------------------------------------------------------------
class ScannerListener(Device.Listener):
def on_advertisement(self, address, ad_data, rssi, connectable):
address_type_string = ('P', 'R', 'PI', 'RI')[address.address_type]
print(f'>>> {address} [{address_type_string}]: RSSI={rssi}, {ad_data}')
def on_advertisement(self, advertisement):
address_type_string = ('P', 'R', 'PI', 'RI')[advertisement.address.address_type]
print(f'>>> {advertisement.address} [{address_type_string}]: RSSI={advertisement.rssi}, {advertisement.ad_data}')
class HciSource: