Compare commits

...

49 Commits

Author SHA1 Message Date
Gilles Boccon-Gibod
0f219eff12 address PR comments 2022-11-09 13:18:30 -08:00
Gilles Boccon-Gibod
4a1345cf95 only force the type if the address is passed as a string 2022-11-08 19:10:13 -08:00
Gilles Boccon-Gibod
8a1cdef152 fix classic connection event filtering 2022-11-08 17:33:29 -08:00
Lucas Abel
cea1905ffb Merge pull request #59 from google/uael/device-cleanup
le: pass `own_address_type` to BLE `Device.connect`
2022-11-08 11:50:40 -08:00
Abel Lucas
af8e0d4dc7 le: pass own_address_type to BLE Device.connect 2022-11-08 18:22:54 +00:00
Gilles Boccon-Gibod
875195aebb Merge pull request #58 from AlanRosenthal/main
Add definition of `Client Characteristic Configuration bit`
2022-11-08 09:34:22 -08:00
Gilles Boccon-Gibod
5aee37aeab Merge pull request #34 from google/gbg/l2cap-bridge
Add L2CAP CoC support
2022-11-07 16:57:17 -08:00
Gilles Boccon-Gibod
edcb7d05d6 fix merge conflict 2022-11-07 16:51:40 -08:00
Gilles Boccon-Gibod
ce9004f0ac Add L2CAP CoC support (squashed)
[85542e0] fix test
[3748781] add ASAH sink example
[e782e29] add app
[83daa30] wip
[7f138a0] add test
[f732108] allow different address syntax
[9d0bbf8] rename deprecated methods
[eb303d5] add LE CoC support
2022-11-07 16:45:37 -08:00
Alan Rosenthal
d4228e3b5b Add definition of Client Characteristic Configuration bit 2022-11-07 19:43:22 -05:00
Lucas Abel
be8f8ac68f Merge pull request #55 from google/uael/device-improvements
Device improvements
2022-11-07 15:22:41 -08:00
Abel Lucas
ca16410a6d device: add option to check for the address type when using find_connection_by_bd_addr 2022-11-07 22:17:01 +00:00
Abel Lucas
b95888eb39 le: permit legacy scanning even when extended is supported 2022-11-07 22:15:54 +00:00
Abel Lucas
56ed46adfa classic: add BR/EDR accept connection logic 2022-11-04 17:26:59 +00:00
Abel Lucas
7044102e05 classic: upgrade Device.cancel_connection logic to support canceling ongoing BR/EDR connections 2022-11-04 17:26:59 +00:00
Abel Lucas
ca8f284888 le: add own_address_type parameter to Device.start_advertising 2022-11-04 17:26:59 +00:00
Abel Lucas
e9e14f5183 le: make the device connecting state relative to LE only
We may need to add a distinct BR/EDR connecting state in the future.
2022-11-04 17:26:59 +00:00
Abel Lucas
b961affd3d device: update Device.connect documentation to match BR/EDR behavior 2022-11-04 17:26:59 +00:00
Abel Lucas
51ddb36c91 device: add auto_restart mechanism to .start_discovery (default to True) 2022-11-04 17:26:59 +00:00
Abel Lucas
78534b659a device: enhance .request_remote_name to also accept an Address as argument 2022-11-04 17:26:59 +00:00
Abel Lucas
ce9472bf42 core: change AdvertisingData.get default raw behavior to False 2022-11-04 17:26:59 +00:00
Abel Lucas
fc331b7aea core: improve Advertisement.ad_data_to_object with support for more data types 2022-11-04 17:26:59 +00:00
Abel Lucas
8119bc210c host: pass remote_host_supported_features event to upper layer
The `HCI_Remote_Name_Request` command may trigger this HCI event.
Instead of warn for not being handled, pass it to upper layer.
2022-11-02 20:23:14 +00:00
Abel Lucas
65deefdc64 host: allow bytes return paramaters when checking command result 2022-11-02 20:23:14 +00:00
Michael Mogenson
2920c3b0d1 Merge pull request #53 from mogenson/mogenson/show-device-tab
Add a show device tab
2022-10-24 09:15:43 -04:00
Michael Mogenson
f5cd825dbc Merge pull request #51 from mogenson/mogenson/console-py-rand-addr
Use random address in console.py if device config is not provided
2022-10-24 09:15:10 -04:00
Gilles Boccon-Gibod
cf4c43c4ff Merge pull request #48 from google/uael/classic-parallel-connect
classic: update `Device.connect` to allow parallels connection creation
2022-10-23 20:52:08 -07:00
Gilles Boccon-Gibod
da2f596e52 Merge pull request #50 from google/uael/command-timeout
device: raise a `CommandTimeoutError` error on command timeout
2022-10-23 20:49:06 -07:00
Gilles Boccon-Gibod
c8aa0b4ef6 Merge pull request #54 from google/gbg/fix-regression-001
use the correct constants as previously renamed
2022-10-23 20:43:43 -07:00
Gilles Boccon-Gibod
75ac276c8b use the correct constants as previously renamed 2022-10-21 17:12:26 -07:00
Michael Mogenson
dd4023ff56 Add a show device tab
Show configuration data about the Bumble device. Make this the default
tab on startup.
2022-10-21 16:00:03 -04:00
Michael Mogenson
dde8c5e1c2 Use random address in console.py if device config is not provided
If a device configuration is not provided on startup, generate a random
BT address instead of using a default static value of
"F0:F1:F2:F3:F4:F5". This is helpful to avoid colisions when there are
two instances of console.py running nearby.

Testing:
Started console.py and began advertising a few times. Scanned from a
second instance of console.py and observed that the advertising address
changed with each restart.
2022-10-21 15:32:58 -04:00
Michael Mogenson
8ed1f4b50d Merge pull request #52 from mogenson/mogenson/console-py-clear-scan-results
add 'scan clear' command to console.py
2022-10-21 14:34:08 -04:00
Michael Mogenson
92de7dff4f add 'scan clear' command to console.py
Add command to clear scan results and known addresses. Useful for
determining if a peripheral has stopped advertising.

Also, check if a scan is in progress before connecting. If it is, stop
scanning. Some BT controllers will fail to connect while scanning.

Testing:
Can clear scan results before, during, and after scan. Can clear scan
results while disconnected and connected.
2022-10-21 13:58:21 -04:00
Abel Lucas
16b4f18c92 tests: add parallel device connection test 2022-10-21 15:49:03 +00:00
Gilles Boccon-Gibod
46f4b82d29 Merge pull request #46 from AlanRosenthal/main
Add runtime switch for filtering by address.
2022-10-20 19:20:28 -07:00
Abel Lucas
4e2f66f709 device: raise a CommandTimeoutError error on command timeout 2022-10-20 22:11:07 +00:00
Alan Rosenthal
3d79d7def5 Add runtime switch for filtering by address.
* scan on [filter pattern]
* filter address <filter pattern>
2022-10-20 14:47:14 -04:00
Abel Lucas
915405a9bd examples: update run_classic_connect example to take multiple addresses instead of one 2022-10-20 14:53:39 +00:00
Abel Lucas
45dd849d9f classic: update ConnectionError to take transport and peer address 2022-10-20 14:53:03 +00:00
Abel Lucas
7208fd6642 classic: update Device.connect to allow parallels connection creation
According to the specification nothing prevent the Host from creating
multiple connections at the same time. This commit add this mechanisme
by matching the `connection` and `connection_failure` events against the
peer address.
2022-10-19 17:44:44 +00:00
Gilles Boccon-Gibod
eb8556ccf6 gbg/extended scanning (#47)
Squashed:
* add extended report class
* more HCI commands
* add AdvertisingType
* add phy options
* fix tests
2022-10-19 10:06:00 -07:00
Octavian Purdila
4d96b821bc Merge pull request #44 from google/tavip/fix-address-resolution
Fix address resolution handling
2022-10-12 10:09:33 -07:00
Gilles Boccon-Gibod
78b36d2049 Merge pull request #45 from google/gbg/add-missing-app
add controller-info CLI app to setup
2022-10-11 22:21:08 -07:00
Gilles Boccon-Gibod
3e0cad1456 add controller-info CLI app to setup 2022-10-11 22:15:23 -07:00
Octavian Purdila
b4de38cdc3 Fix address resolution handling
In one of the refactors the command address_resolution field was
changed to address_reslution_enable but the controller code was not
updated.
2022-10-11 22:53:42 +00:00
Gilles Boccon-Gibod
68d9fbc159 Merge pull request #42 from google/gbg/improve-linux-doc
Refactor and improve the doc for Bumble on Linux
2022-10-11 14:35:14 -07:00
Gilles Boccon-Gibod
a916b7a21a Merge pull request #43 from google/gbg/proxy-write-with-response
support with_response on adapters
2022-10-11 07:41:28 -07:00
Gilles Boccon-Gibod
7fa2eb7658 support with_response on adapters 2022-10-10 12:11:51 -07:00
37 changed files with 4533 additions and 785 deletions

View File

@@ -20,19 +20,26 @@
# Imports # Imports
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
import asyncio import asyncio
from bumble.hci import HCI_Constant
import os
import os.path
import logging import logging
import click import os
import random
import re
from collections import OrderedDict from collections import OrderedDict
import click
import colors import colors
from bumble.core import UUID, AdvertisingData from bumble.core import UUID, AdvertisingData, TimeoutError, BT_LE_TRANSPORT
from bumble.device import Device, Connection, Peer from bumble.device import ConnectionParametersPreferences, Device, Connection, Peer
from bumble.utils import AsyncRunner from bumble.utils import AsyncRunner
from bumble.transport import open_transport_or_link from bumble.transport import open_transport_or_link
from bumble.gatt import Characteristic from bumble.gatt import Characteristic
from bumble.hci import (
HCI_Constant,
HCI_LE_1M_PHY,
HCI_LE_2M_PHY,
HCI_LE_CODED_PHY,
)
from prompt_toolkit import Application from prompt_toolkit import Application
from prompt_toolkit.history import FileHistory from prompt_toolkit.history import FileHistory
@@ -43,6 +50,7 @@ from prompt_toolkit.styles import Style
from prompt_toolkit.filters import Condition from prompt_toolkit.filters import Condition
from prompt_toolkit.widgets import TextArea, Frame from prompt_toolkit.widgets import TextArea, Frame
from prompt_toolkit.widgets.toolbars import FormattedTextToolbar from prompt_toolkit.widgets.toolbars import FormattedTextToolbar
from prompt_toolkit.data_structures import Point
from prompt_toolkit.layout import ( from prompt_toolkit.layout import (
Layout, Layout,
HSplit, HSplit,
@@ -51,17 +59,20 @@ from prompt_toolkit.layout import (
Float, Float,
FormattedTextControl, FormattedTextControl,
FloatContainer, FloatContainer,
ConditionalContainer ConditionalContainer,
Dimension
) )
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# Constants # Constants
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
BUMBLE_USER_DIR = os.path.expanduser('~/.bumble') BUMBLE_USER_DIR = os.path.expanduser('~/.bumble')
DEFAULT_PROMPT_HEIGHT = 20 DEFAULT_RSSI_BAR_WIDTH = 20
DEFAULT_RSSI_BAR_WIDTH = 20 DEFAULT_CONNECTION_TIMEOUT = 30.0
DISPLAY_MIN_RSSI = -100 DISPLAY_MIN_RSSI = -100
DISPLAY_MAX_RSSI = -30 DISPLAY_MAX_RSSI = -30
RSSI_MONITOR_INTERVAL = 5.0 # Seconds
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# Globals # Globals
@@ -69,16 +80,57 @@ DISPLAY_MAX_RSSI = -30
App = None App = None
# -----------------------------------------------------------------------------
# Utils
# -----------------------------------------------------------------------------
def le_phy_name(phy_id):
return {
HCI_LE_1M_PHY: '1M',
HCI_LE_2M_PHY: '2M',
HCI_LE_CODED_PHY: 'CODED'
}.get(phy_id, HCI_Constant.le_phy_name(phy_id))
def rssi_bar(rssi):
blocks = ['', '', '', '', '', '', '', '']
bar_width = (rssi - DISPLAY_MIN_RSSI) / (DISPLAY_MAX_RSSI - DISPLAY_MIN_RSSI)
bar_width = min(max(bar_width, 0), 1)
bar_ticks = int(bar_width * DEFAULT_RSSI_BAR_WIDTH * 8)
bar_blocks = ('' * int(bar_ticks / 8)) + blocks[bar_ticks % 8]
return f'{rssi:4} {bar_blocks}'
def parse_phys(phys):
if phys.lower() == '*':
return None
else:
phy_list = []
elements = phys.lower().split(',')
for element in elements:
if element == '1m':
phy_list.append(HCI_LE_1M_PHY)
elif element == '2m':
phy_list.append(HCI_LE_2M_PHY)
elif element == 'coded':
phy_list.append(HCI_LE_CODED_PHY)
else:
raise ValueError('invalid PHY name')
return phy_list
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# Console App # Console App
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class ConsoleApp: class ConsoleApp:
def __init__(self): def __init__(self):
self.known_addresses = set() self.known_addresses = set()
self.known_attributes = [] self.known_attributes = []
self.device = None self.device = None
self.connected_peer = None self.connected_peer = None
self.top_tab = 'scan' self.top_tab = 'device'
self.monitor_rssi = False
self.connection_rssi = None
style = Style.from_dict({ style = Style.from_dict({
'output-field': 'bg:#000044 #ffffff', 'output-field': 'bg:#000044 #ffffff',
@@ -100,17 +152,26 @@ class ConsoleApp:
return NestedCompleter.from_nested_dict({ return NestedCompleter.from_nested_dict({
'scan': { 'scan': {
'on': None, 'on': None,
'off': None 'off': None,
'clear': None
}, },
'advertise': { 'advertise': {
'on': None, 'on': None,
'off': None 'off': None
}, },
'rssi': {
'on': None,
'off': None
},
'show': { 'show': {
'scan': None, 'scan': None,
'services': None, 'services': None,
'attributes': None, 'attributes': None,
'log': None 'log': None,
'device': None
},
'filter': {
'address': None,
}, },
'connect': LiveCompleter(self.known_addresses), 'connect': LiveCompleter(self.known_addresses),
'update-parameters': None, 'update-parameters': None,
@@ -120,10 +181,17 @@ class ConsoleApp:
'services': None, 'services': None,
'attributes': None 'attributes': None
}, },
'request-mtu': None,
'read': LiveCompleter(self.known_attributes), 'read': LiveCompleter(self.known_attributes),
'write': LiveCompleter(self.known_attributes), 'write': LiveCompleter(self.known_attributes),
'subscribe': LiveCompleter(self.known_attributes), 'subscribe': LiveCompleter(self.known_attributes),
'unsubscribe': LiveCompleter(self.known_attributes), 'unsubscribe': LiveCompleter(self.known_attributes),
'set-phy': {
'1m': None,
'2m': None,
'coded': None
},
'set-default-phy': None,
'quit': None, 'quit': None,
'exit': None 'exit': None
}) })
@@ -139,14 +207,17 @@ class ConsoleApp:
self.input_field.accept_handler = self.accept_input self.input_field.accept_handler = self.accept_input
self.output_height = 7 self.output_height = Dimension(min=7, max=7, weight=1)
self.output_lines = [] self.output_lines = []
self.output = FormattedTextControl() self.output = FormattedTextControl(get_cursor_position=lambda: Point(0, max(0, len(self.output_lines) - 1)))
self.output_max_lines = 20
self.scan_results_text = FormattedTextControl() self.scan_results_text = FormattedTextControl()
self.services_text = FormattedTextControl() self.services_text = FormattedTextControl()
self.attributes_text = FormattedTextControl() self.attributes_text = FormattedTextControl()
self.log_text = FormattedTextControl() self.device_text = FormattedTextControl()
self.log_height = 20 self.log_text = FormattedTextControl(get_cursor_position=lambda: Point(0, max(0, len(self.log_lines) - 1)))
self.log_height = Dimension(min=7, weight=4)
self.log_max_lines = 100
self.log_lines = [] self.log_lines = []
container = HSplit([ container = HSplit([
@@ -163,11 +234,14 @@ class ConsoleApp:
filter=Condition(lambda: self.top_tab == 'attributes') filter=Condition(lambda: self.top_tab == 'attributes')
), ),
ConditionalContainer( ConditionalContainer(
Frame(Window(self.log_text), title='Log'), Frame(Window(self.log_text, height=self.log_height), title='Log'),
filter=Condition(lambda: self.top_tab == 'log') filter=Condition(lambda: self.top_tab == 'log')
), ),
Frame(Window(self.output), height=self.output_height), ConditionalContainer(
# HorizontalLine(), Frame(Window(self.device_text), title='Device'),
filter=Condition(lambda: self.top_tab == 'device')
),
Frame(Window(self.output, height=self.output_height)),
FormattedTextToolbar(text=self.get_status_bar_text, style='reverse'), FormattedTextToolbar(text=self.get_status_bar_text, style='reverse'),
self.input_field self.input_field
]) ])
@@ -199,17 +273,26 @@ class ConsoleApp:
) )
async def run_async(self, device_config, transport): async def run_async(self, device_config, transport):
rssi_monitoring_task = asyncio.create_task(self.rssi_monitor_loop())
async with await open_transport_or_link(transport) as (hci_source, hci_sink): async with await open_transport_or_link(transport) as (hci_source, hci_sink):
if device_config: if device_config:
self.device = Device.from_config_file_with_hci(device_config, hci_source, hci_sink) self.device = Device.from_config_file_with_hci(device_config, hci_source, hci_sink)
else: else:
self.device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink) random_address = f"{random.randint(192,255):02X}" # address is static random
for c in random.sample(range(255), 5):
random_address += f":{c:02X}"
self.append_to_log(f"Setting random address: {random_address}")
self.device = Device.with_hci('Bumble', random_address, hci_source, hci_sink)
self.device.listener = DeviceListener(self) self.device.listener = DeviceListener(self)
await self.device.power_on() await self.device.power_on()
self.show_device(self.device)
# Run the UI # Run the UI
await self.ui.run_async() await self.ui.run_async()
rssi_monitoring_task.cancel()
def add_known_address(self, address): def add_known_address(self, address):
self.known_addresses.add(address) self.known_addresses.add(address)
@@ -224,22 +307,33 @@ class ConsoleApp:
connection_state = 'NONE' connection_state = 'NONE'
encryption_state = '' encryption_state = ''
att_mtu = ''
rssi = '' if self.connection_rssi is None else rssi_bar(self.connection_rssi)
if self.device: if self.device:
if self.device.is_connecting: if self.device.is_le_connecting:
connection_state = 'CONNECTING' connection_state = 'CONNECTING'
elif self.connected_peer: elif self.connected_peer:
connection = self.connected_peer.connection connection = self.connected_peer.connection
connection_parameters = f'{connection.parameters.connection_interval}/{connection.parameters.connection_latency}/{connection.parameters.supervision_timeout}' connection_parameters = f'{connection.parameters.connection_interval}/{connection.parameters.peripheral_latency}/{connection.parameters.supervision_timeout}'
connection_state = f'{connection.peer_address} {connection_parameters} {connection.data_length}' if connection.transport == BT_LE_TRANSPORT:
phy_state = f' RX={le_phy_name(connection.phy.rx_phy)}/TX={le_phy_name(connection.phy.tx_phy)}'
else:
phy_state = ''
connection_state = f'{connection.peer_address} {connection_parameters} {connection.data_length}{phy_state}'
encryption_state = 'ENCRYPTED' if connection.is_encrypted else 'NOT ENCRYPTED' encryption_state = 'ENCRYPTED' if connection.is_encrypted else 'NOT ENCRYPTED'
att_mtu = f'ATT_MTU: {connection.att_mtu}'
return [ return [
('ansigreen', f' SCAN: {scanning} '), ('ansigreen', f' SCAN: {scanning} '),
('', ' '), ('', ' '),
('ansiblue', f' CONNECTION: {connection_state} '), ('ansiblue', f' CONNECTION: {connection_state} '),
('', ' '), ('', ' '),
('ansimagenta', f' {encryption_state} ') ('ansimagenta', f' {encryption_state} '),
('', ' '),
('ansicyan', f' {att_mtu} '),
('', ' '),
('ansiyellow', f' {rssi} ')
] ]
def show_error(self, title, details = None): def show_error(self, title, details = None):
@@ -274,7 +368,7 @@ class ConsoleApp:
self.services_text.text = lines self.services_text.text = lines
self.ui.invalidate() self.ui.invalidate()
async def show_attributes(self, attributes): def show_attributes(self, attributes):
lines = [] lines = []
for attribute in attributes: for attribute in attributes:
@@ -283,10 +377,48 @@ class ConsoleApp:
self.attributes_text.text = lines self.attributes_text.text = lines
self.ui.invalidate() self.ui.invalidate()
def show_device(self, device):
lines = []
lines.append(('ansicyan', 'Name: '))
lines.append(('', f'{device.name}\n'))
lines.append(('ansicyan', 'Public Address: '))
lines.append(('', f'{device.public_address}\n'))
lines.append(('ansicyan', 'Random Address: '))
lines.append(('', f'{device.random_address}\n'))
lines.append(('ansicyan', 'LE Enabled: '))
lines.append(('', f'{device.le_enabled}\n'))
lines.append(('ansicyan', 'Classic Enabled: '))
lines.append(('', f'{device.classic_enabled}\n'))
lines.append(('ansicyan', 'Classic SC Enabled: '))
lines.append(('', f'{device.classic_sc_enabled}\n'))
lines.append(('ansicyan', 'Classic SSP Enabled: '))
lines.append(('', f'{device.classic_ssp_enabled}\n'))
lines.append(('ansicyan', 'Classic Class: '))
lines.append(('', f'{device.class_of_device}\n'))
lines.append(('ansicyan', 'Discoverable: '))
lines.append(('', f'{device.discoverable}\n'))
lines.append(('ansicyan', 'Connectable: '))
lines.append(('', f'{device.connectable}\n'))
lines.append(('ansicyan', 'Advertising Data: '))
lines.append(('', f'{device.advertising_data}\n'))
lines.append(('ansicyan', 'Scan Response Data: '))
lines.append(('', f'{device.scan_response_data}\n'))
advertising_interval = (
device.advertising_interval_min
if device.advertising_interval_min == device.advertising_interval_max
else f"{device.advertising_interval_min} to {device.advertising_interval_max}"
)
lines.append(('ansicyan', 'Advertising Interval: '))
lines.append(('', f'{advertising_interval}\n'))
self.device_text.text = lines
self.ui.invalidate()
def append_to_output(self, line, invalidate=True): def append_to_output(self, line, invalidate=True):
if type(line) is str: if type(line) is str:
line = [('', line)] line = [('', line)]
self.output_lines = self.output_lines[-(self.output_height - 3):] self.output_lines = self.output_lines[-self.output_max_lines:]
self.output_lines.append(line) self.output_lines.append(line)
formatted_text = [] formatted_text = []
for line in self.output_lines: for line in self.output_lines:
@@ -298,7 +430,7 @@ class ConsoleApp:
def append_to_log(self, lines, invalidate=True): def append_to_log(self, lines, invalidate=True):
self.log_lines.extend(lines.split('\n')) self.log_lines.extend(lines.split('\n'))
self.log_lines = self.log_lines[-(self.log_height - 3):] self.log_lines = self.log_lines[-self.log_max_lines:]
self.log_text.text = ANSI('\n'.join(self.log_lines)) self.log_text.text = ANSI('\n'.join(self.log_lines))
if invalidate: if invalidate:
self.ui.invalidate() self.ui.invalidate()
@@ -331,7 +463,7 @@ class ConsoleApp:
attributes = await self.connected_peer.discover_attributes() attributes = await self.connected_peer.discover_attributes()
self.append_to_output(f'discovered {len(attributes)} attributes...') self.append_to_output(f'discovered {len(attributes)} attributes...')
await self.show_attributes(attributes) self.show_attributes(attributes)
def find_characteristic(self, param): def find_characteristic(self, param):
parts = param.split('.') parts = param.split('.')
@@ -351,6 +483,12 @@ class ConsoleApp:
if characteristic.handle == attribute_handle: if characteristic.handle == attribute_handle:
return characteristic return characteristic
async def rssi_monitor_loop(self):
while True:
if self.monitor_rssi and self.connected_peer:
self.connection_rssi = await self.connected_peer.connection.get_rssi()
await asyncio.sleep(RSSI_MONITOR_INTERVAL)
async def command(self, command): async def command(self, command):
try: try:
(keyword, *params) = command.strip().split(' ') (keyword, *params) = command.strip().split(' ')
@@ -372,46 +510,96 @@ class ConsoleApp:
else: else:
await self.device.start_scanning() await self.device.start_scanning()
elif params[0] == 'on': elif params[0] == 'on':
if len(params) == 2:
if not params[1].startswith("filter="):
self.show_error('invalid syntax', 'expected address filter=key1:value1,key2:value,... available filters: address')
# regex: (word):(any char except ,)
matches = re.findall(r"(\w+):([^,]+)", params[1])
for match in matches:
if match[0] == "address":
self.device.listener.address_filter = match[1]
await self.device.start_scanning() await self.device.start_scanning()
self.top_tab = 'scan' self.top_tab = 'scan'
elif params[0] == 'off': elif params[0] == 'off':
await self.device.stop_scanning() await self.device.stop_scanning()
elif params[0] == 'clear':
self.device.listener.scan_results.clear()
self.known_addresses.clear()
self.show_scan_results(self.device.listener.scan_results)
else: else:
self.show_error('unsupported arguments for scan command') self.show_error('unsupported arguments for scan command')
async def do_rssi(self, params):
if len(params) == 0:
# Toggle monitoring
self.monitor_rssi = not self.monitor_rssi
elif params[0] == 'on':
self.monitor_rssi = True
elif params[0] == 'off':
self.monitor_rssi = False
else:
self.show_error('unsupported arguments for rssi command')
async def do_connect(self, params): async def do_connect(self, params):
if len(params) != 1: if len(params) != 1 and len(params) != 2:
self.show_error('invalid syntax', 'expected connect <address>') self.show_error('invalid syntax', 'expected connect <address> [phys]')
return return
if len(params) == 1:
phys = None
else:
phys = parse_phys(params[1])
if phys is None:
connection_parameters_preferences = None
else:
connection_parameters_preferences = {
phy: ConnectionParametersPreferences()
for phy in phys
}
if self.device.is_scanning:
await self.device.stop_scanning()
self.append_to_output('connecting...') self.append_to_output('connecting...')
await self.device.connect(params[0])
self.top_tab = 'services' try:
await self.device.connect(
params[0],
connection_parameters_preferences=connection_parameters_preferences,
timeout=DEFAULT_CONNECTION_TIMEOUT
)
self.top_tab = 'services'
except TimeoutError:
self.show_error('connection timed out')
async def do_disconnect(self, params): async def do_disconnect(self, params):
if not self.connected_peer: if self.device.is_le_connecting:
self.show_error('not connected') await self.device.cancel_connection()
return else:
if not self.connected_peer:
self.show_error('not connected')
return
await self.connected_peer.connection.disconnect() await self.connected_peer.connection.disconnect()
async def do_update_parameters(self, params): async def do_update_parameters(self, params):
if len(params) != 1 or len(params[0].split('/')) != 3: if len(params) != 1 or len(params[0].split('/')) != 3:
self.show_error('invalid syntax', 'expected update-parameters <interval-min>-<interval-max>/<latency>/<supervision>') self.show_error('invalid syntax', 'expected update-parameters <interval-min>-<interval-max>/<max-latency>/<supervision>')
return return
if not self.connected_peer: if not self.connected_peer:
self.show_error('not connected') self.show_error('not connected')
return return
connection_intervals, connection_latency, supervision_timeout = params[0].split('/') connection_intervals, max_latency, supervision_timeout = params[0].split('/')
connection_interval_min, connection_interval_max = [int(x) for x in connection_intervals.split('-')] connection_interval_min, connection_interval_max = [int(x) for x in connection_intervals.split('-')]
connection_latency = int(connection_latency) max_latency = int(max_latency)
supervision_timeout = int(supervision_timeout) supervision_timeout = int(supervision_timeout)
await self.connected_peer.connection.update_parameters( await self.connected_peer.connection.update_parameters(
connection_interval_min, connection_interval_min,
connection_interval_max, connection_interval_max,
connection_latency, max_latency,
supervision_timeout supervision_timeout
) )
@@ -438,10 +626,29 @@ class ConsoleApp:
async def do_show(self, params): async def do_show(self, params):
if params: if params:
if params[0] in {'scan', 'services', 'attributes', 'log'}: if params[0] in {'scan', 'services', 'attributes', 'log', 'device'}:
self.top_tab = params[0] self.top_tab = params[0]
self.ui.invalidate() self.ui.invalidate()
async def do_get_phy(self, params):
if not self.connected_peer:
self.show_error('not connected')
return
phy = await self.connected_peer.connection.get_phy()
self.append_to_output(f'PHY: RX={HCI_Constant.le_phy_name(phy[0])}, TX={HCI_Constant.le_phy_name(phy[1])}')
async def do_request_mtu(self, params):
if len(params) != 1:
self.show_error('invalid syntax', 'expected request-mtu <mtu>')
return
if not self.connected_peer:
self.show_error('not connected')
return
await self.connected_peer.request_mtu(int(params[0]))
async def do_discover(self, params): async def do_discover(self, params):
if not params: if not params:
self.show_error('invalid syntax', 'expected discover services|attributes') self.show_error('invalid syntax', 'expected discover services|attributes')
@@ -454,14 +661,14 @@ class ConsoleApp:
await self.discover_attributes() await self.discover_attributes()
async def do_read(self, params): async def do_read(self, params):
if not self.connected_peer:
self.show_error('not connected')
return
if len(params) != 1: if len(params) != 1:
self.show_error('invalid syntax', 'expected read <attribute>') self.show_error('invalid syntax', 'expected read <attribute>')
return return
if not self.connected_peer:
self.show_error('not connected')
return
characteristic = self.find_characteristic(params[0]) characteristic = self.find_characteristic(params[0])
if characteristic is None: if characteristic is None:
self.show_error('no such characteristic') self.show_error('no such characteristic')
@@ -530,12 +737,54 @@ class ConsoleApp:
await characteristic.unsubscribe() await characteristic.unsubscribe()
async def do_set_phy(self, params):
if len(params) != 1:
self.show_error('invalid syntax', 'expected set-phy <tx_rx_phys>|<tx_phys>/<rx_phys>')
return
if not self.connected_peer:
self.show_error('not connected')
return
if '/' in params[0]:
tx_phys, rx_phys = params[0].split('/')
else:
tx_phys = params[0]
rx_phys = tx_phys
await self.connected_peer.connection.set_phy(
tx_phys=parse_phys(tx_phys),
rx_phys=parse_phys(rx_phys)
)
async def do_set_default_phy(self, params):
if len(params) != 1:
self.show_error('invalid syntax', 'expected set-default-phy <tx_rx_phys>|<tx_phys>/<rx_phys>')
return
if '/' in params[0]:
tx_phys, rx_phys = params[0].split('/')
else:
tx_phys = params[0]
rx_phys = tx_phys
await self.device.set_default_phy(
tx_phys=parse_phys(tx_phys),
rx_phys=parse_phys(rx_phys)
)
async def do_exit(self, params): async def do_exit(self, params):
self.ui.exit() self.ui.exit()
async def do_quit(self, params): async def do_quit(self, params):
self.ui.exit() self.ui.exit()
async def do_filter(self, params):
if params[0] == "address":
if len(params) != 2:
self.show_error('invalid syntax', 'expected filter address <pattern>')
return
self.device.listener.address_filter = params[1]
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# Device and Connection Listener # Device and Connection Listener
@@ -544,16 +793,38 @@ class DeviceListener(Device.Listener, Connection.Listener):
def __init__(self, app): def __init__(self, app):
self.app = app self.app = app
self.scan_results = OrderedDict() self.scan_results = OrderedDict()
self.address_filter = None
@property
def address_filter(self):
return self._address_filter
@address_filter.setter
def address_filter(self, filter_addr):
if filter_addr is None:
self._address_filter = re.compile(r".*")
else:
self._address_filter = re.compile(filter_addr)
self.scan_results = OrderedDict(filter(lambda x: self.filter_address_match(x), self.scan_results))
self.app.show_scan_results(self.scan_results)
def filter_address_match(self, address):
"""
Returns true if an address matches the filter
"""
return bool(self.address_filter.match(address))
@AsyncRunner.run_in_task() @AsyncRunner.run_in_task()
async def on_connection(self, connection): async def on_connection(self, connection):
self.app.connected_peer = Peer(connection) self.app.connected_peer = Peer(connection)
self.app.connection_rssi = None
self.app.append_to_output(f'connected to {self.app.connected_peer}') self.app.append_to_output(f'connected to {self.app.connected_peer}')
connection.listener = self connection.listener = self
def on_disconnection(self, reason): def on_disconnection(self, reason):
self.app.append_to_output(f'disconnected from {self.app.connected_peer}, reason: {HCI_Constant.error_name(reason)}') self.app.append_to_output(f'disconnected from {self.app.connected_peer}, reason: {HCI_Constant.error_name(reason)}')
self.app.connected_peer = None self.app.connected_peer = None
self.app.connection_rssi = None
def on_connection_parameters_update(self): def on_connection_parameters_update(self):
self.app.append_to_output(f'connection parameters update: {self.app.connected_peer.connection.parameters}') self.app.append_to_output(f'connection parameters update: {self.app.connected_peer.connection.parameters}')
@@ -570,16 +841,19 @@ class DeviceListener(Device.Listener, Connection.Listener):
def on_connection_data_length_change(self): def on_connection_data_length_change(self):
self.app.append_to_output(f'connection data length change: {self.app.connected_peer.connection.data_length}') self.app.append_to_output(f'connection data length change: {self.app.connected_peer.connection.data_length}')
def on_advertisement(self, address, ad_data, rssi, connectable): def on_advertisement(self, advertisement):
entry_key = f'{address}/{address.address_type}' if not self.filter_address_match(str(advertisement.address)):
return
entry_key = f'{advertisement.address}/{advertisement.address.address_type}'
entry = self.scan_results.get(entry_key) entry = self.scan_results.get(entry_key)
if entry: if entry:
entry.ad_data = ad_data entry.ad_data = advertisement.data
entry.rssi = rssi entry.rssi = advertisement.rssi
entry.connectable = connectable entry.connectable = advertisement.is_connectable
else: else:
self.app.add_known_address(str(address)) self.app.add_known_address(str(advertisement.address))
self.scan_results[entry_key] = ScanResult(address, address.address_type, ad_data, rssi, connectable) self.scan_results[entry_key] = ScanResult(advertisement.address, advertisement.address.address_type, advertisement.data, advertisement.rssi, advertisement.is_connectable)
self.app.show_scan_results(self.scan_results) self.app.show_scan_results(self.scan_results)
@@ -603,9 +877,9 @@ class ScanResult:
else: else:
type_color = colors.cyan type_color = colors.cyan
name = self.ad_data.get(AdvertisingData.COMPLETE_LOCAL_NAME) name = self.ad_data.get(AdvertisingData.COMPLETE_LOCAL_NAME, raw=True)
if name is None: if name is None:
name = self.ad_data.get(AdvertisingData.SHORTENED_LOCAL_NAME) name = self.ad_data.get(AdvertisingData.SHORTENED_LOCAL_NAME, raw=True)
if name: if name:
# Convert to string # Convert to string
try: try:
@@ -616,12 +890,7 @@ class ScanResult:
name = '' name = ''
# RSSI bar # RSSI bar
blocks = ['', '', '', '', '', '', '', ''] bar_string = rssi_bar(self.rssi)
bar_width = (self.rssi - DISPLAY_MIN_RSSI) / (DISPLAY_MAX_RSSI - DISPLAY_MIN_RSSI)
bar_width = min(max(bar_width, 0), 1)
bar_ticks = int(bar_width * DEFAULT_RSSI_BAR_WIDTH * 8)
bar_blocks = ('' * int(bar_ticks / 8)) + blocks[bar_ticks % 8]
bar_string = f'{self.rssi} {bar_blocks}'
bar_padding = ' ' * (DEFAULT_RSSI_BAR_WIDTH + 5 - len(bar_string)) bar_padding = ' ' * (DEFAULT_RSSI_BAR_WIDTH + 5 - len(bar_string))
return f'{address_color(str(self.address))} [{type_color(address_type_string)}] {bar_string} {bar_padding} {name}' return f'{address_color(str(self.address))} [{type_color(address_type_string)}] {bar_string} {bar_padding} {name}'
@@ -633,6 +902,7 @@ class LogHandler(logging.Handler):
def __init__(self, app): def __init__(self, app):
super().__init__() super().__init__()
self.app = app self.app = app
self.setFormatter("[%(asctime)s][%(pathname)s:%(lineno)d][%(levelname)s] %(message)s")
def emit(self, record): def emit(self, record):
message = self.format(record) message = self.format(record)
@@ -657,6 +927,7 @@ def main(device_config, transport):
# logging.basicConfig(level = 'FATAL') # logging.basicConfig(level = 'FATAL')
# logging.basicConfig(level = 'DEBUG') # logging.basicConfig(level = 'DEBUG')
root_logger = logging.getLogger() root_logger = logging.getLogger()
root_logger.addHandler(LogHandler(app)) root_logger.addHandler(LogHandler(app))
root_logger.setLevel(logging.DEBUG) root_logger.setLevel(logging.DEBUG)

View File

@@ -25,15 +25,21 @@ from bumble.company_ids import COMPANY_IDENTIFIERS
from bumble.core import name_or_number from bumble.core import name_or_number
from bumble.hci import ( from bumble.hci import (
map_null_terminated_utf8_string, map_null_terminated_utf8_string,
HCI_LE_SUPPORTED_FEATURES_NAMES,
HCI_SUCCESS, HCI_SUCCESS,
HCI_LE_SUPPORTED_FEATURES_NAMES,
HCI_VERSION_NAMES, HCI_VERSION_NAMES,
LMP_VERSION_NAMES, LMP_VERSION_NAMES,
HCI_Command, HCI_Command,
HCI_Read_BD_ADDR_Command,
HCI_READ_BD_ADDR_COMMAND, HCI_READ_BD_ADDR_COMMAND,
HCI_Read_BD_ADDR_Command,
HCI_READ_LOCAL_NAME_COMMAND,
HCI_Read_Local_Name_Command, HCI_Read_Local_Name_Command,
HCI_READ_LOCAL_NAME_COMMAND HCI_LE_READ_MAXIMUM_DATA_LENGTH_COMMAND,
HCI_LE_Read_Maximum_Data_Length_Command,
HCI_LE_READ_NUMBER_OF_SUPPORTED_ADVERTISING_SETS_COMMAND,
HCI_LE_Read_Number_Of_Supported_Advertising_Sets_Command,
HCI_LE_READ_MAXIMUM_ADVERTISING_DATA_LENGTH_COMMAND,
HCI_LE_Read_Maximum_Advertising_Data_Length_Command
) )
from bumble.host import Host from bumble.host import Host
from bumble.transport import open_transport_or_link from bumble.transport import open_transport_or_link
@@ -57,6 +63,39 @@ async def get_classic_info(host):
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
async def get_le_info(host): async def get_le_info(host):
print() print()
if host.supports_command(HCI_LE_READ_NUMBER_OF_SUPPORTED_ADVERTISING_SETS_COMMAND):
response = await host.send_command(HCI_LE_Read_Number_Of_Supported_Advertising_Sets_Command())
if response.return_parameters.status == HCI_SUCCESS:
print(
color('LE Number Of Supported Advertising Sets:', 'yellow'),
response.return_parameters.num_supported_advertising_sets,
'\n'
)
if host.supports_command(HCI_LE_READ_MAXIMUM_ADVERTISING_DATA_LENGTH_COMMAND):
response = await host.send_command(HCI_LE_Read_Maximum_Advertising_Data_Length_Command())
if response.return_parameters.status == HCI_SUCCESS:
print(
color('LE Maximum Advertising Data Length:', 'yellow'),
response.return_parameters.max_advertising_data_length,
'\n'
)
if host.supports_command(HCI_LE_READ_MAXIMUM_DATA_LENGTH_COMMAND):
response = await host.send_command(HCI_LE_Read_Maximum_Data_Length_Command())
if response.return_parameters.status == HCI_SUCCESS:
print(
color('Maximum Data Length:', 'yellow'),
(
f'tx:{response.return_parameters.supported_max_tx_octets}/'
f'{response.return_parameters.supported_max_tx_time}, '
f'rx:{response.return_parameters.supported_max_rx_octets}/'
f'{response.return_parameters.supported_max_rx_time}'
),
'\n'
)
print(color('LE Features:', 'yellow')) print(color('LE Features:', 'yellow'))
for feature in host.supported_le_features: for feature in host.supported_le_features:
print(' ', name_or_number(HCI_LE_SUPPORTED_FEATURES_NAMES, feature)) print(' ', name_or_number(HCI_LE_SUPPORTED_FEATURES_NAMES, feature))

View File

@@ -17,13 +17,14 @@
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
import asyncio import asyncio
import os import os
import struct
import logging import logging
import click import click
from colors import color from colors import color
from bumble.device import Device, Peer from bumble.device import Device, Peer
from bumble.core import AdvertisingData from bumble.core import AdvertisingData
from bumble.gatt import Service, Characteristic from bumble.gatt import Service, Characteristic, CharacteristicValue
from bumble.utils import AsyncRunner from bumble.utils import AsyncRunner
from bumble.transport import open_transport_or_link from bumble.transport import open_transport_or_link
from bumble.hci import HCI_Constant from bumble.hci import HCI_Constant
@@ -41,13 +42,59 @@ GG_PREFERRED_MTU = 256
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class GattlinkHubBridge(Device.Listener): class GattlinkL2capEndpoint:
def __init__(self): def __init__(self):
self.peer = None self.l2cap_channel = None
self.rx_socket = None self.l2cap_packet = b''
self.tx_socket = None self.l2cap_packet_size = 0
self.rx_characteristic = None
self.tx_characteristic = None # Called when an L2CAP SDU has been received
def on_coc_sdu(self, sdu):
print(color(f'<<< [L2CAP SDU]: {len(sdu)} bytes', 'cyan'))
while len(sdu):
if self.l2cap_packet_size == 0:
# Expect a new packet
self.l2cap_packet_size = sdu[0] + 1
sdu = sdu[1:]
else:
bytes_needed = self.l2cap_packet_size - len(self.l2cap_packet)
chunk = min(bytes_needed, len(sdu))
self.l2cap_packet += sdu[:chunk]
sdu = sdu[chunk:]
if len(self.l2cap_packet) == self.l2cap_packet_size:
self.on_l2cap_packet(self.l2cap_packet)
self.l2cap_packet = b''
self.l2cap_packet_size = 0
# -----------------------------------------------------------------------------
class GattlinkHubBridge(GattlinkL2capEndpoint, Device.Listener):
def __init__(self, device, peer_address):
super().__init__()
self.device = device
self.peer_address = peer_address
self.peer = None
self.tx_socket = None
self.rx_characteristic = None
self.tx_characteristic = None
self.l2cap_psm_characteristic = None
device.listener = self
async def start(self):
# Connect to the peer
print(f'=== Connecting to {self.peer_address}...')
await self.device.connect(self.peer_address)
async def connect_l2cap(self, psm):
print(color(f'### Connecting with L2CAP on PSM = {psm}', 'yellow'))
try:
self.l2cap_channel = await self.peer.connection.open_l2cap_channel(psm)
print(color('*** Connected', 'yellow'), self.l2cap_channel)
self.l2cap_channel.sink = self.on_coc_sdu
except Exception as error:
print(color(f'!!! Connection failed: {error}', 'red'))
@AsyncRunner.run_in_task() @AsyncRunner.run_in_task()
async def on_connection(self, connection): async def on_connection(self, connection):
@@ -80,15 +127,24 @@ class GattlinkHubBridge(Device.Listener):
self.rx_characteristic = characteristic self.rx_characteristic = characteristic
elif characteristic.uuid == GG_GATTLINK_TX_CHARACTERISTIC_UUID: elif characteristic.uuid == GG_GATTLINK_TX_CHARACTERISTIC_UUID:
self.tx_characteristic = characteristic self.tx_characteristic = characteristic
elif characteristic.uuid == GG_GATTLINK_L2CAP_CHANNEL_PSM_CHARACTERISTIC_UUID:
self.l2cap_psm_characteristic = characteristic
print('RX:', self.rx_characteristic) print('RX:', self.rx_characteristic)
print('TX:', self.tx_characteristic) print('TX:', self.tx_characteristic)
print('PSM:', self.l2cap_psm_characteristic)
# Subscribe to TX if self.l2cap_psm_characteristic:
if self.tx_characteristic: # Subscribe to and then read the PSM value
await self.peer.subscribe(self.l2cap_psm_characteristic, self.on_l2cap_psm_received)
psm_bytes = await self.peer.read_value(self.l2cap_psm_characteristic)
psm = struct.unpack('<H', psm_bytes)[0]
await self.connect_l2cap(psm)
elif self.tx_characteristic:
# Subscribe to TX
await self.peer.subscribe(self.tx_characteristic, self.on_tx_received) await self.peer.subscribe(self.tx_characteristic, self.on_tx_received)
print(color('=== Subscribed to Gattlink TX', 'yellow')) print(color('=== Subscribed to Gattlink TX', 'yellow'))
else: else:
print(color('!!! Gattlink TX not found', 'red')) print(color('!!! No Gattlink TX or PSM found', 'red'))
def on_connection_failure(self, error): def on_connection_failure(self, error):
print(color(f'!!! Connection failed: {error}')) print(color(f'!!! Connection failed: {error}'))
@@ -99,31 +155,23 @@ class GattlinkHubBridge(Device.Listener):
self.rx_characteristic = None self.rx_characteristic = None
self.peer = None self.peer = None
# Called when an L2CAP packet has been received
def on_l2cap_packet(self, packet):
print(color(f'<<< [L2CAP PACKET]: {len(packet)} bytes', 'cyan'))
print(color('>>> [UDP]', 'magenta'))
self.tx_socket.sendto(packet)
# Called by the GATT client when a notification is received # Called by the GATT client when a notification is received
def on_tx_received(self, value): def on_tx_received(self, value):
print(color('>>> TX:', 'magenta'), value.hex()) print(color(f'<<< [GATT TX]: {len(value)} bytes', 'cyan'))
if self.tx_socket: if self.tx_socket:
print(color('>>> [UDP]', 'magenta'))
self.tx_socket.sendto(value) self.tx_socket.sendto(value)
# Called by asyncio when the UDP socket is created # Called by asyncio when the UDP socket is created
def connection_made(self, transport): def on_l2cap_psm_received(self, value):
pass psm = struct.unpack('<H', value)[0]
asyncio.create_task(self.connect_l2cap(psm))
# Called by asyncio when a UDP datagram is received
def datagram_received(self, data, address):
print(color('<<< RX:', 'magenta'), data.hex())
# TODO: use a queue instead of creating a task everytime
if self.peer and self.rx_characteristic:
asyncio.create_task(self.peer.write_value(self.rx_characteristic, data))
# -----------------------------------------------------------------------------
class GattlinkNodeBridge(Device.Listener):
def __init__(self):
self.peer = None
self.rx_socket = None
self.tx_socket = None
# Called by asyncio when the UDP socket is created # Called by asyncio when the UDP socket is created
def connection_made(self, transport): def connection_made(self, transport):
@@ -131,21 +179,130 @@ class GattlinkNodeBridge(Device.Listener):
# Called by asyncio when a UDP datagram is received # Called by asyncio when a UDP datagram is received
def datagram_received(self, data, address): def datagram_received(self, data, address):
print(color('<<< RX:', 'magenta'), data.hex()) print(color(f'<<< [UDP]: {len(data)} bytes', 'green'))
# TODO: use a queue instead of creating a task everytime if self.l2cap_channel:
if self.peer and self.rx_characteristic: print(color('>>> [L2CAP]', 'yellow'))
self.l2cap_channel.write(bytes([len(data) - 1]) + data)
elif self.peer and self.rx_characteristic:
print(color('>>> [GATT RX]', 'yellow'))
asyncio.create_task(self.peer.write_value(self.rx_characteristic, data)) asyncio.create_task(self.peer.write_value(self.rx_characteristic, data))
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
async def run(hci_transport, device_address, send_host, send_port, receive_host, receive_port): class GattlinkNodeBridge(GattlinkL2capEndpoint, Device.Listener):
def __init__(self, device):
super().__init__()
self.device = device
self.peer = None
self.tx_socket = None
self.tx_subscriber = None
self.rx_characteristic = None
# Register as a listener
device.listener = self
# Listen for incoming L2CAP CoC connections
psm = 0xFB
device.register_l2cap_channel_server(0xFB, self.on_coc)
print(f'### Listening for CoC connection on PSM {psm}')
# Setup the Gattlink service
self.rx_characteristic = Characteristic(
GG_GATTLINK_RX_CHARACTERISTIC_UUID,
Characteristic.WRITE_WITHOUT_RESPONSE,
Characteristic.WRITEABLE,
CharacteristicValue(write=self.on_rx_write)
)
self.tx_characteristic = Characteristic(
GG_GATTLINK_TX_CHARACTERISTIC_UUID,
Characteristic.NOTIFY,
Characteristic.READABLE
)
self.tx_characteristic.on('subscription', self.on_tx_subscription)
self.psm_characteristic = Characteristic(
GG_GATTLINK_L2CAP_CHANNEL_PSM_CHARACTERISTIC_UUID,
Characteristic.READ | Characteristic.NOTIFY,
Characteristic.READABLE,
bytes([psm, 0])
)
gattlink_service = Service(
GG_GATTLINK_SERVICE_UUID,
[
self.rx_characteristic,
self.tx_characteristic,
self.psm_characteristic
]
)
device.add_services([gattlink_service])
device.advertising_data = bytes(
AdvertisingData([
(AdvertisingData.COMPLETE_LOCAL_NAME, bytes('Bumble GG', 'utf-8')),
(AdvertisingData.INCOMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS,
bytes(reversed(bytes.fromhex('ABBAFF00E56A484CB8328B17CF6CBFE8'))))
])
)
async def start(self):
await self.device.start_advertising()
# Called by asyncio when the UDP socket is created
def connection_made(self, transport):
self.transport = transport
# Called by asyncio when a UDP datagram is received
def datagram_received(self, data, address):
print(color(f'<<< [UDP]: {len(data)} bytes', 'green'))
if self.l2cap_channel:
print(color('>>> [L2CAP]', 'yellow'))
self.l2cap_channel.write(bytes([len(data) - 1]) + data)
elif self.tx_subscriber:
print(color('>>> [GATT TX]', 'yellow'))
self.tx_characteristic.value = data
asyncio.create_task(self.device.notify_subscribers(self.tx_characteristic))
# Called when a write to the RX characteristic has been received
def on_rx_write(self, connection, data):
print(color(f'<<< [GATT RX]: {len(data)} bytes', 'cyan'))
print(color('>>> [UDP]', 'magenta'))
self.tx_socket.sendto(data)
# Called when the subscription to the TX characteristic has changed
def on_tx_subscription(self, peer, enabled):
print(f'### [GATT TX] subscription from {peer}: {"enabled" if enabled else "disabled"}')
if enabled:
self.tx_subscriber = peer
else:
self.tx_subscriber = None
# Called when an L2CAP packet is received
def on_l2cap_packet(self, packet):
print(color(f'<<< [L2CAP PACKET]: {len(packet)} bytes', 'cyan'))
print(color('>>> [UDP]', 'magenta'))
self.tx_socket.sendto(packet)
# Called when a new connection is established
def on_coc(self, channel):
print('*** CoC Connection', channel)
self.l2cap_channel = channel
channel.sink = self.on_coc_sdu
# -----------------------------------------------------------------------------
async def run(hci_transport, device_address, role_or_peer_address, send_host, send_port, receive_host, receive_port):
print('<<< connecting to HCI...') print('<<< connecting to HCI...')
async with await open_transport_or_link(hci_transport) as (hci_source, hci_sink): async with await open_transport_or_link(hci_transport) as (hci_source, hci_sink):
print('<<< connected') print('<<< connected')
# Instantiate a bridge object # Instantiate a bridge object
bridge = GattlinkNodeBridge() device = Device.with_hci('Bumble GG', device_address, hci_source, hci_sink)
# Instantiate a bridge object
if role_or_peer_address == 'node':
bridge = GattlinkNodeBridge(device)
else:
bridge = GattlinkHubBridge(device, role_or_peer_address)
# Create a UDP to RX bridge (receive from UDP, send to RX) # Create a UDP to RX bridge (receive from UDP, send to RX)
loop = asyncio.get_running_loop() loop = asyncio.get_running_loop()
@@ -160,35 +317,8 @@ async def run(hci_transport, device_address, send_host, send_port, receive_host,
remote_addr=(send_host, send_port) remote_addr=(send_host, send_port)
) )
# Create a device to manage the host, with a custom listener
device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink)
device.listener = bridge
await device.power_on() await device.power_on()
await bridge.start()
# Connect to the peer
# print(f'=== Connecting to {device_address}...')
# await device.connect(device_address)
# TODO move to class
gattlink_service = Service(
GG_GATTLINK_SERVICE_UUID,
[
Characteristic(
GG_GATTLINK_L2CAP_CHANNEL_PSM_CHARACTERISTIC_UUID,
Characteristic.READ,
Characteristic.READABLE,
bytes([193, 0])
)
]
)
device.add_services([gattlink_service])
device.advertising_data = bytes(
AdvertisingData([
(AdvertisingData.COMPLETE_LOCAL_NAME, bytes('Bumble GG', 'utf-8')),
(AdvertisingData.INCOMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS, bytes(reversed(bytes.fromhex('ABBAFF00E56A484CB8328B17CF6CBFE8'))))
])
)
await device.start_advertising()
# Wait until the source terminates # Wait until the source terminates
await hci_source.wait_for_termination() await hci_source.wait_for_termination()
@@ -197,15 +327,16 @@ async def run(hci_transport, device_address, send_host, send_port, receive_host,
@click.command() @click.command()
@click.argument('hci_transport') @click.argument('hci_transport')
@click.argument('device_address') @click.argument('device_address')
@click.argument('role_or_peer_address')
@click.option('-sh', '--send-host', type=str, default='127.0.0.1', help='UDP host to send to') @click.option('-sh', '--send-host', type=str, default='127.0.0.1', help='UDP host to send to')
@click.option('-sp', '--send-port', type=int, default=9001, help='UDP port to send to') @click.option('-sp', '--send-port', type=int, default=9001, help='UDP port to send to')
@click.option('-rh', '--receive-host', type=str, default='127.0.0.1', help='UDP host to receive on') @click.option('-rh', '--receive-host', type=str, default='127.0.0.1', help='UDP host to receive on')
@click.option('-rp', '--receive-port', type=int, default=9000, help='UDP port to receive on') @click.option('-rp', '--receive-port', type=int, default=9000, help='UDP port to receive on')
def main(hci_transport, device_address, send_host, send_port, receive_host, receive_port): def main(hci_transport, device_address, role_or_peer_address, send_host, send_port, receive_host, receive_port):
logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper()) asyncio.run(run(hci_transport, device_address, role_or_peer_address, send_host, send_port, receive_host, receive_port))
asyncio.run(run(hci_transport, device_address, send_host, send_port, receive_host, receive_port))
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'WARNING').upper())
if __name__ == '__main__': if __name__ == '__main__':
main() main()

331
apps/l2cap_bridge.py Normal file
View File

@@ -0,0 +1,331 @@
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import click
import logging
import os
from colors import color
from bumble.transport import open_transport_or_link
from bumble.device import Device
from bumble.utils import FlowControlAsyncPipe
from bumble.hci import HCI_Constant
# -----------------------------------------------------------------------------
class ServerBridge:
"""
L2CAP CoC server bridge: waits for a peer to connect an L2CAP CoC channel
on a specified PSM. When the connection is made, the bridge connects a TCP
socket to a remote host and bridges the data in both directions, with flow
control.
When the L2CAP CoC channel is closed, the bridge disconnects the TCP socket
and waits for a new L2CAP CoC channel to be connected.
When the TCP connection is closed by the TCP server, XXXX
"""
def __init__(
self,
psm,
max_credits,
mtu,
mps,
tcp_host,
tcp_port
):
self.psm = psm
self.max_credits = max_credits
self.mtu = mtu
self.mps = mps
self.tcp_host = tcp_host
self.tcp_port = tcp_port
async def start(self, device):
# Listen for incoming L2CAP CoC connections
device.register_l2cap_channel_server(
psm = self.psm,
server = self.on_coc,
max_credits = self.max_credits,
mtu = self.mtu,
mps = self.mps
)
print(color(f'### Listening for CoC connection on PSM {self.psm}', 'yellow'))
def on_ble_connection(connection):
def on_ble_disconnection(reason):
print(color('@@@ Bluetooth disconnection:', 'red'), HCI_Constant.error_name(reason))
print(color('@@@ Bluetooth connection:', 'green'), connection)
connection.on('disconnection', on_ble_disconnection)
device.on('connection', on_ble_connection)
await device.start_advertising(auto_restart=True)
# Called when a new L2CAP connection is established
def on_coc(self, l2cap_channel):
print(color('*** L2CAP channel:', 'cyan'), l2cap_channel)
class Pipe:
def __init__(self, bridge, l2cap_channel):
self.bridge = bridge
self.tcp_transport = None
self.l2cap_channel = l2cap_channel
l2cap_channel.on('close', self.on_l2cap_close)
l2cap_channel.sink = self.on_coc_sdu
async def connect_to_tcp(self):
# Connect to the TCP server
print(color(f'### Connecting to TCP {self.bridge.tcp_host}:{self.bridge.tcp_port}...', 'yellow'))
class TcpClientProtocol(asyncio.Protocol):
def __init__(self, pipe):
self.pipe = pipe
def connection_lost(self, error):
print(color(f'!!! TCP connection lost: {error}', 'red'))
if self.pipe.l2cap_channel is not None:
asyncio.create_task(self.pipe.l2cap_channel.disconnect())
def data_received(self, data):
print(f'<<< Received on TCP: {len(data)}')
self.pipe.l2cap_channel.write(data)
try:
self.tcp_transport, _ = await asyncio.get_running_loop().create_connection(
lambda: TcpClientProtocol(self),
host=self.bridge.tcp_host,
port=self.bridge.tcp_port,
)
print(color('### Connected', 'green'))
except Exception as error:
print(color(f'!!! Connection failed: {error}', 'red'))
await self.l2cap_channel.disconnect()
def on_l2cap_close(self):
self.l2cap_channel = None
if self.tcp_transport is not None:
self.tcp_transport.close()
def on_coc_sdu(self, sdu):
print(color(f'<<< [L2CAP SDU]: {len(sdu)} bytes', 'cyan'))
if self.tcp_transport is None:
print(color('!!! TCP socket not open, dropping', 'red'))
return
self.tcp_transport.write(sdu)
pipe = Pipe(self, l2cap_channel)
asyncio.create_task(pipe.connect_to_tcp())
# -----------------------------------------------------------------------------
class ClientBridge:
"""
L2CAP CoC client bridge: connects to a BLE device, then waits for an inbound
TCP connection on a specified port number. When a TCP client connects, an
L2CAP CoC channel connection to the BLE device is established, and the data
is bridged in both directions, with flow control.
When the TCP connection is closed by the client, the L2CAP CoC channel is
disconnected, but the connection to the BLE device remains, ready for a new
TCP client to connect.
When the L2CAP CoC channel is closed, XXXX
"""
READ_CHUNK_SIZE = 4096
def __init__(
self,
psm,
max_credits,
mtu,
mps,
address,
tcp_host,
tcp_port
):
self.psm = psm
self.max_credits = max_credits
self.mtu = mtu
self.mps = mps
self.address = address
self.tcp_host = tcp_host
self.tcp_port = tcp_port
async def start(self, device):
print(color(f'### Connecting to {self.address}...', 'yellow'))
connection = await device.connect(self.address)
print(color('### Connected', 'green'))
# Called when the BLE connection is disconnected
def on_ble_disconnection(reason):
print(color('@@@ Bluetooth disconnection:', 'red'), HCI_Constant.error_name(reason))
connection.on('disconnection', on_ble_disconnection)
# Called when a TCP connection is established
async def on_tcp_connection(reader, writer):
peername = writer.get_extra_info('peername')
print(color(f'<<< TCP connection from {peername}', 'magenta'))
def on_coc_sdu(sdu):
print(color(f'<<< [L2CAP SDU]: {len(sdu)} bytes', 'cyan'))
l2cap_to_tcp_pipe.write(sdu)
def on_l2cap_close():
print(color('*** L2CAP channel closed', 'red'))
l2cap_to_tcp_pipe.stop()
writer.close()
# Connect a new L2CAP channel
print(color(f'>>> Opening L2CAP channel on PSM = {self.psm}', 'yellow'))
try:
l2cap_channel = await connection.open_l2cap_channel(
psm = self.psm,
max_credits = self.max_credits,
mtu = self.mtu,
mps = self.mps
)
print(color('*** L2CAP channel:', 'cyan'), l2cap_channel)
except Exception as error:
print(color(f'!!! Connection failed: {error}', 'red'))
writer.close()
return
l2cap_channel.sink = on_coc_sdu
l2cap_channel.on('close', on_l2cap_close)
# Start a flow control pipe from L2CAP to TCP
l2cap_to_tcp_pipe = FlowControlAsyncPipe(
l2cap_channel.pause_reading,
l2cap_channel.resume_reading,
writer.write,
writer.drain
)
l2cap_to_tcp_pipe.start()
# Pipe data from TCP to L2CAP
while True:
try:
data = await reader.read(self.READ_CHUNK_SIZE)
if len(data) == 0:
print(color('!!! End of stream', 'red'))
await l2cap_channel.disconnect()
return
print(color(f'<<< [TCP DATA]: {len(data)} bytes', 'blue'))
l2cap_channel.write(data)
await l2cap_channel.drain()
except Exception as error:
print(f'!!! Exception: {error}')
break
writer.close()
print(color('~~~ Bye bye', 'magenta'))
await asyncio.start_server(
on_tcp_connection,
host=self.tcp_host if self.tcp_host != '_' else None,
port=self.tcp_port
)
print(color(f'### Listening for TCP connections on port {self.tcp_port}', 'magenta'))
# -----------------------------------------------------------------------------
async def run(device_config, hci_transport, bridge):
print('<<< connecting to HCI...')
async with await open_transport_or_link(hci_transport) as (hci_source, hci_sink):
print('<<< connected')
device = Device.from_config_file_with_hci(device_config, hci_source, hci_sink)
# Let's go
await device.power_on()
await bridge.start(device)
# Wait until the transport terminates
await hci_source.wait_for_termination()
# -----------------------------------------------------------------------------
@click.group()
@click.pass_context
@click.option('--device-config', help='Device configuration file', required=True)
@click.option('--hci-transport', help='HCI transport', required=True)
@click.option('--psm', help='PSM for L2CAP CoC', type=int, default=1234)
@click.option('--l2cap-coc-max-credits', help='Maximum L2CAP CoC Credits', type=click.IntRange(1, 65535), default=128)
@click.option('--l2cap-coc-mtu', help='L2CAP CoC MTU', type=click.IntRange(23, 65535), default=1022)
@click.option('--l2cap-coc-mps', help='L2CAP CoC MPS', type=click.IntRange(23, 65533), default=1024)
def cli(context, device_config, hci_transport, psm, l2cap_coc_max_credits, l2cap_coc_mtu, l2cap_coc_mps):
context.ensure_object(dict)
context.obj['device_config'] = device_config
context.obj['hci_transport'] = hci_transport
context.obj['psm'] = psm
context.obj['max_credits'] = l2cap_coc_max_credits
context.obj['mtu'] = l2cap_coc_mtu
context.obj['mps'] = l2cap_coc_mps
# -----------------------------------------------------------------------------
@cli.command()
@click.pass_context
@click.option('--tcp-host', help='TCP host', default='localhost')
@click.option('--tcp-port', help='TCP port', default=9544)
def server(context, tcp_host, tcp_port):
bridge = ServerBridge(
context.obj['psm'],
context.obj['max_credits'],
context.obj['mtu'],
context.obj['mps'],
tcp_host,
tcp_port)
asyncio.run(run(
context.obj['device_config'],
context.obj['hci_transport'],
bridge
))
# -----------------------------------------------------------------------------
@cli.command()
@click.pass_context
@click.argument('bluetooth-address')
@click.option('--tcp-host', help='TCP host', default='_')
@click.option('--tcp-port', help='TCP port', default=9543)
def client(context, bluetooth_address, tcp_host, tcp_port):
bridge = ClientBridge(
context.obj['psm'],
context.obj['max_credits'],
context.obj['mtu'],
context.obj['mps'],
bluetooth_address,
tcp_host,
tcp_port
)
asyncio.run(run(
context.obj['device_config'],
context.obj['hci_transport'],
bridge
))
# -----------------------------------------------------------------------------
logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'WARNING').upper())
if __name__ == '__main__':
cli(obj={})

View File

@@ -25,8 +25,8 @@ from bumble.device import Device
from bumble.transport import open_transport_or_link from bumble.transport import open_transport_or_link
from bumble.keys import JsonKeyStore from bumble.keys import JsonKeyStore
from bumble.smp import AddressResolver from bumble.smp import AddressResolver
from bumble.hci import HCI_LE_Advertising_Report_Event from bumble.device import Advertisement
from bumble.core import AdvertisingData from bumble.hci import HCI_Constant, HCI_LE_1M_PHY, HCI_LE_CODED_PHY
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -48,16 +48,19 @@ class AdvertisementPrinter:
self.min_rssi = min_rssi self.min_rssi = min_rssi
self.resolver = resolver self.resolver = resolver
def print_advertisement(self, address, address_color, ad_data, rssi): def print_advertisement(self, advertisement):
if self.min_rssi is not None and rssi < self.min_rssi: address = advertisement.address
address_color = 'yellow' if advertisement.is_connectable else 'red'
if self.min_rssi is not None and advertisement.rssi < self.min_rssi:
return return
address_qualifier = '' address_qualifier = ''
resolution_qualifier = '' resolution_qualifier = ''
if self.resolver and address.is_resolvable: if self.resolver and advertisement.address.is_resolvable:
resolved = self.resolver.resolve(address) resolved = self.resolver.resolve(advertisement.address)
if resolved is not None: if resolved is not None:
resolution_qualifier = f'(resolved from {address})' resolution_qualifier = f'(resolved from {advertisement.address})'
address = resolved address = resolved
address_type_string = ('PUBLIC', 'RANDOM', 'PUBLIC_ID', 'RANDOM_ID')[address.address_type] address_type_string = ('PUBLIC', 'RANDOM', 'PUBLIC_ID', 'RANDOM_ID')[address.address_type]
@@ -74,18 +77,30 @@ class AdvertisementPrinter:
type_color = 'blue' type_color = 'blue'
address_qualifier = '(non-resolvable)' address_qualifier = '(non-resolvable)'
rssi_bar = make_rssi_bar(rssi)
separator = '\n ' separator = '\n '
print(f'>>> {color(address, address_color)} [{color(address_type_string, type_color)}]{address_qualifier}{resolution_qualifier}:{separator}RSSI:{rssi:4} {rssi_bar}{separator}{ad_data.to_string(separator)}\n') rssi_bar = make_rssi_bar(advertisement.rssi)
if not advertisement.is_legacy:
phy_info = (
f'PHY: {HCI_Constant.le_phy_name(advertisement.primary_phy)}/'
f'{HCI_Constant.le_phy_name(advertisement.secondary_phy)} '
f'{separator}'
)
else:
phy_info = ''
def on_advertisement(self, address, ad_data, rssi, connectable): print(
address_color = 'yellow' if connectable else 'red' f'>>> {color(address, address_color)} '
self.print_advertisement(address, address_color, ad_data, rssi) f'[{color(address_type_string, type_color)}]{address_qualifier}{resolution_qualifier}:{separator}'
f'{phy_info}'
f'RSSI:{advertisement.rssi:4} {rssi_bar}{separator}'
f'{advertisement.data.to_string(separator)}\n')
def on_advertising_report(self, address, ad_data, rssi, event_type): def on_advertisement(self, advertisement):
print(f'{color("EVENT", "green")}: {HCI_LE_Advertising_Report_Event.event_type_name(event_type)}') self.print_advertisement(advertisement)
ad_data = AdvertisingData.from_bytes(ad_data)
self.print_advertisement(address, 'yellow', ad_data, rssi) def on_advertising_report(self, report):
print(f'{color("EVENT", "green")}: {report.event_type_string()}')
self.print_advertisement(Advertisement.from_advertising_report(report))
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -94,6 +109,7 @@ async def scan(
passive, passive,
scan_interval, scan_interval,
scan_window, scan_window,
phy,
filter_duplicates, filter_duplicates,
raw, raw,
keystore_file, keystore_file,
@@ -126,11 +142,18 @@ async def scan(
device.on('advertisement', printer.on_advertisement) device.on('advertisement', printer.on_advertisement)
await device.power_on() await device.power_on()
if phy is None:
scanning_phys = [HCI_LE_1M_PHY, HCI_LE_CODED_PHY]
else:
scanning_phys = [{'1m': HCI_LE_1M_PHY, 'coded': HCI_LE_CODED_PHY}[phy]]
await device.start_scanning( await device.start_scanning(
active=(not passive), active=(not passive),
scan_interval=scan_interval, scan_interval=scan_interval,
scan_window=scan_window, scan_window=scan_window,
filter_duplicates=filter_duplicates filter_duplicates=filter_duplicates,
scanning_phys=scanning_phys
) )
await hci_source.wait_for_termination() await hci_source.wait_for_termination()
@@ -142,14 +165,15 @@ async def scan(
@click.option('--passive', is_flag=True, default=False, help='Perform passive scanning') @click.option('--passive', is_flag=True, default=False, help='Perform passive scanning')
@click.option('--scan-interval', type=int, default=60, help='Scan interval') @click.option('--scan-interval', type=int, default=60, help='Scan interval')
@click.option('--scan-window', type=int, default=60, help='Scan window') @click.option('--scan-window', type=int, default=60, help='Scan window')
@click.option('--phy', type=click.Choice(['1m', 'coded']), help='Only scan on the specified PHY')
@click.option('--filter-duplicates', type=bool, default=True, help='Filter duplicates at the controller level') @click.option('--filter-duplicates', type=bool, default=True, help='Filter duplicates at the controller level')
@click.option('--raw', is_flag=True, default=False, help='Listen for raw advertising reports instead of processed ones') @click.option('--raw', is_flag=True, default=False, help='Listen for raw advertising reports instead of processed ones')
@click.option('--keystore-file', help='Keystore file to use when resolving addresses') @click.option('--keystore-file', help='Keystore file to use when resolving addresses')
@click.option('--device-config', help='Device config file for the scanning device') @click.option('--device-config', help='Device config file for the scanning device')
@click.argument('transport') @click.argument('transport')
def main(min_rssi, passive, scan_interval, scan_window, filter_duplicates, raw, keystore_file, device_config, transport): def main(min_rssi, passive, scan_interval, scan_window, phy, filter_duplicates, raw, keystore_file, device_config, transport):
logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'WARNING').upper()) logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'WARNING').upper())
asyncio.run(scan(min_rssi, passive, scan_interval, scan_window, filter_duplicates, raw, keystore_file, device_config, transport)) asyncio.run(scan(min_rssi, passive, scan_interval, scan_window, phy, filter_duplicates, raw, keystore_file, device_config, transport))
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------

View File

@@ -351,7 +351,7 @@ class MediaPacketPump:
logger.debug('pump canceled') logger.debug('pump canceled')
# Pump packets # Pump packets
self.pump_task = asyncio.get_running_loop().create_task(pump_packets()) self.pump_task = asyncio.create_task(pump_packets())
async def stop(self): async def stop(self):
# Stop the pump # Stop the pump
@@ -1890,10 +1890,10 @@ class LocalSource(LocalStreamEndPoint, EventEmitter):
self.configuration = configuration self.configuration = configuration
def on_start_command(self): def on_start_command(self):
asyncio.get_running_loop().create_task(self.start()) asyncio.create_task(self.start())
def on_suspend_command(self): def on_suspend_command(self):
asyncio.get_running_loop().create_task(self.stop()) asyncio.create_task(self.stop())
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------

View File

@@ -76,7 +76,7 @@ class Controller:
self.supported_commands = bytes.fromhex('2000800000c000000000e40000002822000000000000040000f7ffff7f00000030f0f9ff01008004000000000000000000000000000000000000000000000000') self.supported_commands = bytes.fromhex('2000800000c000000000e40000002822000000000000040000f7ffff7f00000030f0f9ff01008004000000000000000000000000000000000000000000000000')
self.le_features = bytes.fromhex('ff49010000000000') self.le_features = bytes.fromhex('ff49010000000000')
self.le_states = bytes.fromhex('ffff3fffff030000') self.le_states = bytes.fromhex('ffff3fffff030000')
self.avertising_channel_tx_power = 0 self.advertising_channel_tx_power = 0
self.filter_accept_list_size = 8 self.filter_accept_list_size = 8
self.resolving_list_size = 8 self.resolving_list_size = 8
self.supported_max_tx_octets = 27 self.supported_max_tx_octets = 27
@@ -259,15 +259,15 @@ class Controller:
# Then say that the connection has completed # Then say that the connection has completed
self.send_hci_packet(HCI_LE_Connection_Complete_Event( self.send_hci_packet(HCI_LE_Connection_Complete_Event(
status = HCI_SUCCESS, status = HCI_SUCCESS,
connection_handle = connection.handle, connection_handle = connection.handle,
role = connection.role, role = connection.role,
peer_address_type = peer_address_type, peer_address_type = peer_address_type,
peer_address = peer_address, peer_address = peer_address,
conn_interval = 10, # FIXME connection_interval = 10, # FIXME
conn_latency = 0, # FIXME peripheral_latency = 0, # FIXME
supervision_timeout = 10, # FIXME supervision_timeout = 10, # FIXME
master_clock_accuracy = 7 # FIXME central_clock_accuracy = 7 # FIXME
)) ))
def on_link_central_disconnected(self, peer_address, reason): def on_link_central_disconnected(self, peer_address, reason):
@@ -313,15 +313,15 @@ class Controller:
# Say that the connection has completed # Say that the connection has completed
self.send_hci_packet(HCI_LE_Connection_Complete_Event( self.send_hci_packet(HCI_LE_Connection_Complete_Event(
status = status, status = status,
connection_handle = connection.handle if connection else 0, connection_handle = connection.handle if connection else 0,
role = BT_CENTRAL_ROLE, role = BT_CENTRAL_ROLE,
peer_address_type = le_create_connection_command.peer_address_type, peer_address_type = le_create_connection_command.peer_address_type,
peer_address = le_create_connection_command.peer_address, peer_address = le_create_connection_command.peer_address,
conn_interval = le_create_connection_command.conn_interval_min, connection_interval = le_create_connection_command.connection_interval_min,
conn_latency = le_create_connection_command.conn_latency, peripheral_latency = le_create_connection_command.max_latency,
supervision_timeout = le_create_connection_command.supervision_timeout, supervision_timeout = le_create_connection_command.supervision_timeout,
master_clock_accuracy = 0 central_clock_accuracy = 0
)) ))
def on_link_peripheral_disconnection_complete(self, disconnection_command, status): def on_link_peripheral_disconnection_complete(self, disconnection_command, status):
@@ -583,13 +583,15 @@ class Controller:
''' '''
See Bluetooth spec Vol 2, Part E - 7.4.1 Read Local Version Information Command See Bluetooth spec Vol 2, Part E - 7.4.1 Read Local Version Information Command
''' '''
return struct.pack('<BBHBHH', return struct.pack(
HCI_SUCCESS, '<BBHBHH',
self.hci_version, HCI_SUCCESS,
self.hci_revision, self.hci_version,
self.lmp_version, self.hci_revision,
self.manufacturer_name, self.lmp_version,
self.lmp_subversion) self.manufacturer_name,
self.lmp_subversion
)
def on_hci_read_local_supported_commands_command(self, command): def on_hci_read_local_supported_commands_command(self, command):
''' '''
@@ -650,7 +652,7 @@ class Controller:
''' '''
See Bluetooth spec Vol 2, Part E - 7.8.6 LE Read Advertising Channel Tx Power Command See Bluetooth spec Vol 2, Part E - 7.8.6 LE Read Advertising Channel Tx Power Command
''' '''
return bytes([HCI_SUCCESS, self.avertising_channel_tx_power]) return bytes([HCI_SUCCESS, self.advertising_channel_tx_power])
def on_hci_le_set_advertising_data_command(self, command): def on_hci_le_set_advertising_data_command(self, command):
''' '''
@@ -857,9 +859,9 @@ class Controller:
See Bluetooth spec Vol 2, Part E - 7.8.44 LE Set Address Resolution Enable Command See Bluetooth spec Vol 2, Part E - 7.8.44 LE Set Address Resolution Enable Command
''' '''
ret = HCI_SUCCESS ret = HCI_SUCCESS
if command.address_resolution == 1: if command.address_resolution_enable == 1:
self.le_address_resolution = True self.le_address_resolution = True
elif command.address_resolution == 0: elif command.address_resolution_enable == 0:
self.le_address_resolution = False self.le_address_resolution = False
else: else:
ret = HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR ret = HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR
@@ -876,12 +878,26 @@ class Controller:
''' '''
See Bluetooth spec Vol 2, Part E - 7.8.46 LE Read Maximum Data Length Command See Bluetooth spec Vol 2, Part E - 7.8.46 LE Read Maximum Data Length Command
''' '''
return struct.pack('<BHHHH', return struct.pack(
HCI_SUCCESS, '<BHHHH',
self.supported_max_tx_octets, HCI_SUCCESS,
self.supported_max_tx_time, self.supported_max_tx_octets,
self.supported_max_rx_octets, self.supported_max_tx_time,
self.supported_max_rx_time) self.supported_max_rx_octets,
self.supported_max_rx_time
)
def on_hci_le_read_phy_command(self, command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.47 LE Read PHY command
'''
return struct.pack(
'<BHBB',
HCI_SUCCESS,
command.connection_handle,
HCI_LE_1M_PHY,
HCI_LE_1M_PHY
)
def on_hci_le_set_default_phy_command(self, command): def on_hci_le_set_default_phy_command(self, command):
''' '''
@@ -893,3 +909,4 @@ class Controller:
'rx_phys': command.rx_phys 'rx_phys': command.rx_phys
} }
return bytes([HCI_SUCCESS]) return bytes([HCI_SUCCESS])

View File

@@ -91,6 +91,10 @@ class TimeoutError(Exception):
""" Timeout Error """ """ Timeout Error """
class CommandTimeoutError(Exception):
""" Command Timeout Error """
class InvalidStateError(Exception): class InvalidStateError(Exception):
""" Invalid State Error """ """ Invalid State Error """
@@ -100,6 +104,11 @@ class ConnectionError(BaseError):
FAILURE = 0x01 FAILURE = 0x01
CONNECTION_REFUSED = 0x02 CONNECTION_REFUSED = 0x02
def __init__(self, error_code, transport, peer_address, error_namespace='', error_name='', details=''):
super().__init__(error_code, error_namespace, error_name, details)
self.transport = transport
self.peer_address = peer_address
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# UUID # UUID
@@ -760,17 +769,20 @@ class AdvertisingData:
def ad_data_to_object(ad_type, ad_data): def ad_data_to_object(ad_type, ad_data):
if ad_type in { if ad_type in {
AdvertisingData.COMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS, AdvertisingData.COMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.LIST_OF_16_BIT_SERVICE_SOLICITATION_UUIDS
}: }:
return AdvertisingData.uuid_list_to_objects(ad_data, 2) return AdvertisingData.uuid_list_to_objects(ad_data, 2)
elif ad_type in { elif ad_type in {
AdvertisingData.COMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS, AdvertisingData.COMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.INCOMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS AdvertisingData.INCOMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.LIST_OF_32_BIT_SERVICE_SOLICITATION_UUIDS
}: }:
return AdvertisingData.uuid_list_to_objects(ad_data, 4) return AdvertisingData.uuid_list_to_objects(ad_data, 4)
elif ad_type in { elif ad_type in {
AdvertisingData.COMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS, AdvertisingData.COMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.INCOMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS AdvertisingData.INCOMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS,
AdvertisingData.LIST_OF_128_BIT_SERVICE_SOLICITATION_UUIDS
}: }:
return AdvertisingData.uuid_list_to_objects(ad_data, 16) return AdvertisingData.uuid_list_to_objects(ad_data, 16)
elif ad_type == AdvertisingData.SERVICE_DATA_16_BIT_UUID: elif ad_type == AdvertisingData.SERVICE_DATA_16_BIT_UUID:
@@ -781,11 +793,24 @@ class AdvertisingData:
return (UUID.from_bytes(ad_data[:16]), ad_data[16:]) return (UUID.from_bytes(ad_data[:16]), ad_data[16:])
elif ad_type in { elif ad_type in {
AdvertisingData.SHORTENED_LOCAL_NAME, AdvertisingData.SHORTENED_LOCAL_NAME,
AdvertisingData.COMPLETE_LOCAL_NAME AdvertisingData.COMPLETE_LOCAL_NAME,
AdvertisingData.URI
}: }:
return ad_data.decode("utf-8") return ad_data.decode("utf-8")
elif ad_type == AdvertisingData.TX_POWER_LEVEL: elif ad_type in {
AdvertisingData.TX_POWER_LEVEL,
AdvertisingData.FLAGS
}:
return ad_data[0] return ad_data[0]
elif ad_type in {
AdvertisingData.APPEARANCE,
AdvertisingData.ADVERTISING_INTERVAL
}:
return struct.unpack('<H', ad_data)[0]
elif ad_type == AdvertisingData.CLASS_OF_DEVICE:
return struct.unpack('<I', bytes([*ad_data, 0]))[0]
elif ad_type == AdvertisingData.PERIPHERAL_CONNECTION_INTERVAL_RANGE:
return struct.unpack('<HH', ad_data)
elif ad_type == AdvertisingData.MANUFACTURER_SPECIFIC_DATA: elif ad_type == AdvertisingData.MANUFACTURER_SPECIFIC_DATA:
return (struct.unpack_from('<H', ad_data, 0)[0], ad_data[2:]) return (struct.unpack_from('<H', ad_data, 0)[0], ad_data[2:])
else: else:
@@ -802,7 +827,7 @@ class AdvertisingData:
self.ad_structures.append((ad_type, ad_data)) self.ad_structures.append((ad_type, ad_data))
offset += length offset += length
def get(self, type_id, return_all=False, raw=True): def get(self, type_id, return_all=False, raw=False):
''' '''
Get Advertising Data Structure(s) with a given type Get Advertising Data Structure(s) with a given type
@@ -831,13 +856,17 @@ class AdvertisingData:
# Connection Parameters # Connection Parameters
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class ConnectionParameters: class ConnectionParameters:
def __init__(self, connection_interval, connection_latency, supervision_timeout): def __init__(self, connection_interval, peripheral_latency, supervision_timeout):
self.connection_interval = connection_interval self.connection_interval = connection_interval
self.connection_latency = connection_latency self.peripheral_latency = peripheral_latency
self.supervision_timeout = supervision_timeout self.supervision_timeout = supervision_timeout
def __str__(self): def __str__(self):
return f'ConnectionParameters(connection_interval={self.connection_interval}, connection_latency={self.connection_latency}, supervision_timeout={self.supervision_timeout}' return (
f'ConnectionParameters(connection_interval={self.connection_interval}, '
f'peripheral_latency={self.peripheral_latency}, '
f'supervision_timeout={self.supervision_timeout}'
)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------

File diff suppressed because it is too large Load Diff

View File

@@ -23,8 +23,10 @@
# Imports # Imports
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
import asyncio import asyncio
import enum
import types import types
import logging import logging
from pyee import EventEmitter
from colors import color from colors import color
from .core import * from .core import *
@@ -262,7 +264,7 @@ class Characteristic(Attribute):
def get_descriptor(self, descriptor_type): def get_descriptor(self, descriptor_type):
for descriptor in self.descriptors: for descriptor in self.descriptors:
if descriptor.uuid == descriptor_type: if descriptor.type == descriptor_type:
return descriptor return descriptor
def __str__(self): def __str__(self):
@@ -346,8 +348,11 @@ class CharacteristicAdapter:
async def read_decoded_value(self): async def read_decoded_value(self):
return self.decode_value(await self.wrapped_characteristic.read_value()) return self.decode_value(await self.wrapped_characteristic.read_value())
async def write_decoded_value(self, value): async def write_decoded_value(self, value, with_response=False):
return await self.wrapped_characteristic.write_value(self.encode_value(value)) return await self.wrapped_characteristic.write_value(
self.encode_value(value),
with_response
)
def encode_value(self, value): def encode_value(self, value):
return value return value
@@ -470,3 +475,12 @@ class Descriptor(Attribute):
def __str__(self): def __str__(self):
return f'Descriptor(handle=0x{self.handle:04X}, type={self.type}, value={self.read_value(None).hex()})' return f'Descriptor(handle=0x{self.handle:04X}, type={self.type}, value={self.read_value(None).hex()})'
class ClientCharacteristicConfigurationBits(enum.IntFlag):
'''
See Vol 3, Part G - 3.3.3.3 - Table 3.11 Client Characteristic Configuration bit field definition
'''
DEFAULT = 0x0000
NOTIFICATION = 0x0001
INDICATION = 0x0002

View File

@@ -37,7 +37,8 @@ from .gatt import (
GATT_PRIMARY_SERVICE_ATTRIBUTE_TYPE, GATT_PRIMARY_SERVICE_ATTRIBUTE_TYPE,
GATT_SECONDARY_SERVICE_ATTRIBUTE_TYPE, GATT_SECONDARY_SERVICE_ATTRIBUTE_TYPE,
GATT_CHARACTERISTIC_ATTRIBUTE_TYPE, GATT_CHARACTERISTIC_ATTRIBUTE_TYPE,
Characteristic Characteristic,
ClientCharacteristicConfigurationBits
) )
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -184,8 +185,8 @@ class Client:
# Wait until we can send (only one pending command at a time for the connection) # Wait until we can send (only one pending command at a time for the connection)
response = None response = None
async with self.request_semaphore: async with self.request_semaphore:
assert(self.pending_request is None) assert self.pending_request is None
assert(self.pending_response is None) assert self.pending_response is None
# Create a future value to hold the eventual response # Create a future value to hold the eventual response
self.pending_response = asyncio.get_running_loop().create_future() self.pending_response = asyncio.get_running_loop().create_future()
@@ -273,7 +274,7 @@ class Client:
if response.op_code == ATT_ERROR_RESPONSE: if response.op_code == ATT_ERROR_RESPONSE:
if response.error_code != ATT_ATTRIBUTE_NOT_FOUND_ERROR: if response.error_code != ATT_ATTRIBUTE_NOT_FOUND_ERROR:
# Unexpected end # Unexpected end
logger.waning(f'!!! unexpected error while discovering services: {HCI_Constant.error_name(response.error_code)}') logger.warning(f'!!! unexpected error while discovering services: {HCI_Constant.error_name(response.error_code)}')
# TODO raise appropriate exception # TODO raise appropriate exception
return return
break break
@@ -337,7 +338,7 @@ class Client:
if response.op_code == ATT_ERROR_RESPONSE: if response.op_code == ATT_ERROR_RESPONSE:
if response.error_code != ATT_ATTRIBUTE_NOT_FOUND_ERROR: if response.error_code != ATT_ATTRIBUTE_NOT_FOUND_ERROR:
# Unexpected end # Unexpected end
logger.waning(f'!!! unexpected error while discovering services: {HCI_Constant.error_name(response.error_code)}') logger.warning(f'!!! unexpected error while discovering services: {HCI_Constant.error_name(response.error_code)}')
# TODO raise appropriate exception # TODO raise appropriate exception
return return
break break
@@ -558,13 +559,13 @@ class Client:
return return
# Set the subscription bits and select the subscriber set # Set the subscription bits and select the subscriber set
bits = 0 bits = ClientCharacteristicConfigurationBits.DEFAULT
subscriber_sets = [] subscriber_sets = []
if characteristic.properties & Characteristic.NOTIFY: if characteristic.properties & Characteristic.NOTIFY:
bits |= 0x0001 bits |= ClientCharacteristicConfigurationBits.NOTIFICATION
subscriber_sets.append(self.notification_subscribers.setdefault(characteristic.handle, set())) subscriber_sets.append(self.notification_subscribers.setdefault(characteristic.handle, set()))
if characteristic.properties & Characteristic.INDICATE: if characteristic.properties & Characteristic.INDICATE:
bits |= 0x0002 bits |= ClientCharacteristicConfigurationBits.INDICATION
subscriber_sets.append(self.indication_subscribers.setdefault(characteristic.handle, set())) subscriber_sets.append(self.indication_subscribers.setdefault(characteristic.handle, set()))
# Add subscribers to the sets # Add subscribers to the sets

View File

@@ -155,7 +155,7 @@ class Server(EventEmitter):
return cccd or bytes([0, 0]) return cccd or bytes([0, 0])
def write_cccd(self, connection, characteristic, value): def write_cccd(self, connection, characteristic, value):
logger.debug(f'Subscription update for connection={connection.handle:04X}, handle={characteristic.handle:04X}: {value.hex()}') logger.debug(f'Subscription update for connection=0x{connection.handle:04X}, handle=0x{characteristic.handle:04X}: {value.hex()}')
# Sanity check # Sanity check
if len(value) != 2: if len(value) != 2:

File diff suppressed because it is too large Load Diff

View File

@@ -76,9 +76,11 @@ class Host(EventEmitter):
self.hc_total_num_acl_data_packets = HOST_HC_TOTAL_NUM_ACL_DATA_PACKETS self.hc_total_num_acl_data_packets = HOST_HC_TOTAL_NUM_ACL_DATA_PACKETS
self.acl_packet_queue = collections.deque() self.acl_packet_queue = collections.deque()
self.acl_packets_in_flight = 0 self.acl_packets_in_flight = 0
self.local_version = HCI_VERSION_BLUETOOTH_CORE_4_0 self.local_version = None
self.local_supported_commands = bytes(64) self.local_supported_commands = bytes(64)
self.local_le_features = 0 self.local_le_features = 0
self.suggested_max_tx_octets = 251 # Max allowed
self.suggested_max_tx_time = 2120 # Max allowed
self.command_semaphore = asyncio.Semaphore(1) self.command_semaphore = asyncio.Semaphore(1)
self.long_term_key_provider = None self.long_term_key_provider = None
self.link_key_provider = None self.link_key_provider = None
@@ -91,32 +93,23 @@ class Host(EventEmitter):
self.set_packet_sink(controller_sink) self.set_packet_sink(controller_sink)
async def reset(self): async def reset(self):
await self.send_command(HCI_Reset_Command()) await self.send_command(HCI_Reset_Command(), check_result=True)
self.ready = True self.ready = True
response = await self.send_command(HCI_Read_Local_Supported_Commands_Command()) response = await self.send_command(HCI_Read_Local_Supported_Commands_Command(), check_result=True)
if response.return_parameters.status == HCI_SUCCESS: self.local_supported_commands = response.return_parameters.supported_commands
self.local_supported_commands = response.return_parameters.supported_commands
else:
logger.warn(f'HCI_Read_Local_Supported_Commands_Command failed: {response.return_parameters.status}')
if self.supports_command(HCI_LE_READ_LOCAL_SUPPORTED_FEATURES_COMMAND): if self.supports_command(HCI_LE_READ_LOCAL_SUPPORTED_FEATURES_COMMAND):
response = await self.send_command(HCI_LE_Read_Local_Supported_Features_Command()) response = await self.send_command(HCI_LE_Read_Local_Supported_Features_Command(), check_result=True)
if response.return_parameters.status == HCI_SUCCESS: self.local_le_features = struct.unpack('<Q', response.return_parameters.le_features)[0]
self.local_le_features = struct.unpack('<Q', response.return_parameters.le_features)[0]
else:
logger.warn(f'HCI_LE_Read_Supported_Features_Command failed: {response.return_parameters.status}')
if self.supports_command(HCI_READ_LOCAL_VERSION_INFORMATION_COMMAND): if self.supports_command(HCI_READ_LOCAL_VERSION_INFORMATION_COMMAND):
response = await self.send_command(HCI_Read_Local_Version_Information_Command()) response = await self.send_command(HCI_Read_Local_Version_Information_Command(), check_result=True)
if response.return_parameters.status == HCI_SUCCESS: self.local_version = response.return_parameters
self.local_version = response.return_parameters
else:
logger.warn(f'HCI_Read_Local_Version_Information_Command failed: {response.return_parameters.status}')
await self.send_command(HCI_Set_Event_Mask_Command(event_mask = bytes.fromhex('FFFFFFFFFFFFFF3F'))) await self.send_command(HCI_Set_Event_Mask_Command(event_mask = bytes.fromhex('FFFFFFFFFFFFFF3F')))
if self.local_version.hci_version <= HCI_VERSION_BLUETOOTH_CORE_4_0: if self.local_version is not None and self.local_version.hci_version <= HCI_VERSION_BLUETOOTH_CORE_4_0:
# Some older controllers don't like event masks with bits they don't understand # Some older controllers don't like event masks with bits they don't understand
le_event_mask = bytes.fromhex('1F00000000000000') le_event_mask = bytes.fromhex('1F00000000000000')
else: else:
@@ -124,20 +117,14 @@ class Host(EventEmitter):
await self.send_command(HCI_LE_Set_Event_Mask_Command(le_event_mask = le_event_mask)) await self.send_command(HCI_LE_Set_Event_Mask_Command(le_event_mask = le_event_mask))
if self.supports_command(HCI_READ_BUFFER_SIZE_COMMAND): if self.supports_command(HCI_READ_BUFFER_SIZE_COMMAND):
response = await self.send_command(HCI_Read_Buffer_Size_Command()) response = await self.send_command(HCI_Read_Buffer_Size_Command(), check_result=True)
if response.return_parameters.status == HCI_SUCCESS: self.hc_acl_data_packet_length = response.return_parameters.hc_acl_data_packet_length
self.hc_acl_data_packet_length = response.return_parameters.hc_acl_data_packet_length self.hc_total_num_acl_data_packets = response.return_parameters.hc_total_num_acl_data_packets
self.hc_total_num_acl_data_packets = response.return_parameters.hc_total_num_acl_data_packets
else:
logger.warn(f'HCI_Read_Buffer_Size_Command failed: {response.return_parameters.status}')
if self.supports_command(HCI_LE_READ_BUFFER_SIZE_COMMAND): if self.supports_command(HCI_LE_READ_BUFFER_SIZE_COMMAND):
response = await self.send_command(HCI_LE_Read_Buffer_Size_Command()) response = await self.send_command(HCI_LE_Read_Buffer_Size_Command(), check_result=True)
if response.return_parameters.status == HCI_SUCCESS: self.hc_le_acl_data_packet_length = response.return_parameters.hc_le_acl_data_packet_length
self.hc_le_acl_data_packet_length = response.return_parameters.hc_le_acl_data_packet_length self.hc_total_num_le_acl_data_packets = response.return_parameters.hc_total_num_le_acl_data_packets
self.hc_total_num_le_acl_data_packets = response.return_parameters.hc_total_num_le_acl_data_packets
else:
logger.warn(f'HCI_LE_Read_Buffer_Size_Command failed: {response.return_parameters.status}')
if response.return_parameters.hc_le_acl_data_packet_length == 0 or response.return_parameters.hc_total_num_le_acl_data_packets == 0: if response.return_parameters.hc_le_acl_data_packet_length == 0 or response.return_parameters.hc_total_num_le_acl_data_packets == 0:
# LE and Classic share the same values # LE and Classic share the same values
@@ -153,6 +140,22 @@ class Host(EventEmitter):
f'hc_total_num_le_acl_data_packets={self.hc_total_num_le_acl_data_packets}' f'hc_total_num_le_acl_data_packets={self.hc_total_num_le_acl_data_packets}'
) )
if (
self.supports_command(HCI_LE_READ_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND) and
self.supports_command(HCI_LE_WRITE_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND)
):
response = await self.send_command(HCI_LE_Read_Suggested_Default_Data_Length_Command())
suggested_max_tx_octets = response.return_parameters.suggested_max_tx_octets
suggested_max_tx_time = response.return_parameters.suggested_max_tx_time
if (
suggested_max_tx_octets != self.suggested_max_tx_octets or
suggested_max_tx_time != self.suggested_max_tx_time
):
await self.send_command(HCI_LE_Write_Suggested_Default_Data_Length_Command(
suggested_max_tx_octets = self.suggested_max_tx_octets,
suggested_max_tx_time = self.suggested_max_tx_time
))
self.reset_done = True self.reset_done = True
@property @property
@@ -171,7 +174,7 @@ class Host(EventEmitter):
def send_hci_packet(self, packet): def send_hci_packet(self, packet):
self.hci_sink.on_packet(packet.to_bytes()) self.hci_sink.on_packet(packet.to_bytes())
async def send_command(self, command): async def send_command(self, command, check_result=False):
logger.debug(f'{color("### HOST -> CONTROLLER", "blue")}: {command}') logger.debug(f'{color("### HOST -> CONTROLLER", "blue")}: {command}')
# Wait until we can send (only one pending command at a time) # Wait until we can send (only one pending command at a time)
@@ -186,11 +189,25 @@ class Host(EventEmitter):
try: try:
self.send_hci_packet(command) self.send_hci_packet(command)
response = await self.pending_response response = await self.pending_response
# TODO: check error values
# Check the return parameters if required
if check_result:
if type(response.return_parameters) is int:
status = response.return_parameters
elif type(response.return_parameters) is bytes:
# return parameters first field is a one byte status code
status = response.return_parameters[0]
else:
status = response.return_parameters.status
if status != HCI_SUCCESS:
logger.warning(f'{command.name} failed ({HCI_Constant.error_name(status)})')
raise HCI_Error(status)
return response return response
except Exception as error: except Exception as error:
logger.warning(f'{color("!!! Exception while sending HCI packet:", "red")} {error}') logger.warning(f'{color("!!! Exception while sending HCI packet:", "red")} {error}')
# raise error raise error
finally: finally:
self.pending_command = None self.pending_command = None
self.pending_response = None self.pending_response = None
@@ -348,13 +365,12 @@ class Host(EventEmitter):
# Classic only # Classic only
def on_hci_connection_request_event(self, event): def on_hci_connection_request_event(self, event):
# For now, just accept everything # Notify the listeners
# TODO: delegate the decision self.emit(
self.send_command_sync( 'connection_request',
HCI_Accept_Connection_Request_Command( event.bd_addr,
bd_addr = event.bd_addr, event.class_of_device,
role = 0x01 # Remain the peripheral event.link_type,
)
) )
def on_hci_le_connection_complete_event(self, event): def on_hci_le_connection_complete_event(self, event):
@@ -370,8 +386,8 @@ class Host(EventEmitter):
# Notify the client # Notify the client
connection_parameters = ConnectionParameters( connection_parameters = ConnectionParameters(
event.conn_interval, event.connection_interval,
event.conn_latency, event.peripheral_latency,
event.supervision_timeout event.supervision_timeout
) )
self.emit( self.emit(
@@ -387,7 +403,7 @@ class Host(EventEmitter):
logger.debug(f'### CONNECTION FAILED: {event.status}') logger.debug(f'### CONNECTION FAILED: {event.status}')
# Notify the listeners # Notify the listeners
self.emit('connection_failure', event.status) self.emit('connection_failure', BT_LE_TRANSPORT, event.peer_address, event.status)
def on_hci_le_enhanced_connection_complete_event(self, event): def on_hci_le_enhanced_connection_complete_event(self, event):
# Just use the same implementation as for the non-enhanced event for now # Just use the same implementation as for the non-enhanced event for now
@@ -417,7 +433,7 @@ class Host(EventEmitter):
logger.debug(f'### BR/EDR CONNECTION FAILED: {event.status}') logger.debug(f'### BR/EDR CONNECTION FAILED: {event.status}')
# Notify the client # Notify the client
self.emit('connection_failure', event.connection_handle, event.status) self.emit('connection_failure', BT_BR_EDR_TRANSPORT, event.bd_addr, event.status)
def on_hci_disconnection_complete_event(self, event): def on_hci_disconnection_complete_event(self, event):
# Find the connection # Find the connection
@@ -435,7 +451,7 @@ class Host(EventEmitter):
logger.debug(f'### DISCONNECTION FAILED: {event.status}') logger.debug(f'### DISCONNECTION FAILED: {event.status}')
# Notify the listeners # Notify the listeners
self.emit('disconnection_failure', event.status) self.emit('disconnection_failure', event.connection_handle, event.status)
def on_hci_le_connection_update_complete_event(self, event): def on_hci_le_connection_update_complete_event(self, event):
if (connection := self.connections.get(event.connection_handle)) is None: if (connection := self.connections.get(event.connection_handle)) is None:
@@ -445,8 +461,8 @@ class Host(EventEmitter):
# Notify the client # Notify the client
if event.status == HCI_SUCCESS: if event.status == HCI_SUCCESS:
connection_parameters = ConnectionParameters( connection_parameters = ConnectionParameters(
event.conn_interval, event.connection_interval,
event.conn_latency, event.peripheral_latency,
event.supervision_timeout event.supervision_timeout
) )
self.emit('connection_parameters_update', connection.handle, connection_parameters) self.emit('connection_parameters_update', connection.handle, connection_parameters)
@@ -467,13 +483,10 @@ class Host(EventEmitter):
def on_hci_le_advertising_report_event(self, event): def on_hci_le_advertising_report_event(self, event):
for report in event.reports: for report in event.reports:
self.emit( self.emit('advertising_report', report)
'advertising_report',
report.address, def on_hci_le_extended_advertising_report_event(self, event):
report.data, self.on_hci_le_advertising_report_event(event)
report.rssi,
report.event_type
)
def on_hci_le_remote_connection_parameter_request_event(self, event): def on_hci_le_remote_connection_parameter_request_event(self, event):
if event.connection_handle not in self.connections: if event.connection_handle not in self.connections:
@@ -489,8 +502,8 @@ class Host(EventEmitter):
interval_max = event.interval_max, interval_max = event.interval_max,
latency = event.latency, latency = event.latency,
timeout = event.timeout, timeout = event.timeout,
minimum_ce_length = 0, min_ce_length = 0,
maximum_ce_length = 0 max_ce_length = 0
) )
) )
@@ -652,3 +665,6 @@ class Host(EventEmitter):
self.emit('remote_name_failure', event.bd_addr, event.status) self.emit('remote_name_failure', event.bd_addr, event.status)
else: else:
self.emit('remote_name', event.bd_addr, event.remote_name) self.emit('remote_name', event.bd_addr, event.remote_name)
def on_hci_remote_host_supported_features_notification_event(self, event):
self.emit('remote_host_supported_features', event.bd_addr, event.host_supported_features)

File diff suppressed because it is too large Load Diff

View File

@@ -21,7 +21,7 @@ import asyncio
from colors import color from colors import color
from pyee import EventEmitter from pyee import EventEmitter
from .core import InvalidStateError, ProtocolError, ConnectionError from .core import BT_BR_EDR_TRANSPORT, InvalidStateError, ProtocolError, ConnectionError
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# Logging # Logging
@@ -634,7 +634,12 @@ class Multiplexer(EventEmitter):
if self.state == Multiplexer.OPENING: if self.state == Multiplexer.OPENING:
self.change_state(Multiplexer.CONNECTED) self.change_state(Multiplexer.CONNECTED)
if self.open_result: if self.open_result:
self.open_result.set_exception(ConnectionError(ConnectionError.CONNECTION_REFUSED)) self.open_result.set_exception(ConnectionError(
ConnectionError.CONNECTION_REFUSED,
self.l2cap_channel.connection.peer_address,
BT_BR_EDR_TRANSPORT,
'rfcomm'
))
else: else:
logger.warn(f'unexpected state for DM: {self}') logger.warn(f'unexpected state for DM: {self}')

View File

@@ -274,7 +274,7 @@ class PumpedPacketSource(ParserSource):
self.terminated.set_result(error) self.terminated.set_result(error)
break break
self.pump_task = asyncio.get_running_loop().create_task(pump_packets()) self.pump_task = asyncio.create_task(pump_packets())
def close(self): def close(self):
if self.pump_task: if self.pump_task:
@@ -304,7 +304,7 @@ class PumpedPacketSink:
logger.warn(f'exception while sending packet: {error}') logger.warn(f'exception while sending packet: {error}')
break break
self.pump_task = asyncio.get_running_loop().create_task(pump_packets()) self.pump_task = asyncio.create_task(pump_packets())
def close(self): def close(self):
if self.pump_task: if self.pump_task:

View File

@@ -18,6 +18,7 @@
import asyncio import asyncio
import logging import logging
import traceback import traceback
import collections
from functools import wraps from functools import wraps
from colors import color from colors import color
from pyee import EventEmitter from pyee import EventEmitter
@@ -140,3 +141,95 @@ class AsyncRunner:
return wrapper return wrapper
return decorator return decorator
# -----------------------------------------------------------------------------
class FlowControlAsyncPipe:
"""
Asyncio pipe with flow control. When writing to the pipe, the source is
paused (by calling a function passed in when the pipe is created) if the
amount of queued data exceeds a specified threshold.
"""
def __init__(self, pause_source, resume_source, write_to_sink=None, drain_sink=None, threshold=0):
self.pause_source = pause_source
self.resume_source = resume_source
self.write_to_sink = write_to_sink
self.drain_sink = drain_sink
self.threshold = threshold
self.queue = collections.deque() # Queue of packets
self.queued_bytes = 0 # Number of bytes in the queue
self.ready_to_pump = asyncio.Event()
self.paused = False
self.source_paused = False
self.pump_task = None
def start(self):
if self.pump_task is None:
self.pump_task = asyncio.create_task(self.pump())
self.check_pump()
def stop(self):
if self.pump_task is not None:
self.pump_task.cancel()
self.pump_task = None
def write(self, packet):
self.queued_bytes += len(packet)
self.queue.append(packet)
# Pause the source if we're over the threshold
if self.queued_bytes > self.threshold and not self.source_paused:
logger.debug(f'pausing source (queued={self.queued_bytes})')
self.pause_source()
self.source_paused = True
self.check_pump()
def pause(self):
if not self.paused:
self.paused = True
if not self.source_paused:
self.pause_source()
self.source_paused = True
self.check_pump()
def resume(self):
if self.paused:
self.paused = False
if self.source_paused:
self.resume_source()
self.source_paused = False
self.check_pump()
def can_pump(self):
return self.queue and not self.paused and self.write_to_sink is not None
def check_pump(self):
if self.can_pump():
self.ready_to_pump.set()
else:
self.ready_to_pump.clear()
async def pump(self):
while True:
# Wait until we can try to pump packets
await self.ready_to_pump.wait()
# Try to pump a packet
if self.can_pump():
packet = self.queue.pop()
self.write_to_sink(packet)
self.queued_bytes -= len(packet)
# Drain the sink if we can
if self.drain_sink:
await self.drain_sink()
# Check if we can accept more
if self.queued_bytes <= self.threshold and self.source_paused:
logger.debug(f'resuming source (queued={self.queued_bytes})')
self.source_paused = False
self.resume_source()
self.check_pump()

View File

@@ -5,8 +5,9 @@ The Android emulator transport either connects, as a host, to a "Root Canal" vir
("host" mode), or attaches a virtual controller to the Android Bluetooth host stack ("controller" mode). ("host" mode), or attaches a virtual controller to the Android Bluetooth host stack ("controller" mode).
## Moniker ## Moniker
The moniker syntax for an Android Emulator transport is: `android-emulator:[mode=<host|controller>][mode=<host|controller>]`. The moniker syntax for an Android Emulator transport is: `android-emulator:[mode=<host|controller>][<hostname>:<port>]`, where
Both the `mode=<host|controller>` and `mode=<host|controller>` parameters are optional (so the moniker `android-emulator` by itself is a valid moniker, which will create a transport in `host` mode, connected to `localhost` on the default gRPC port for the emulator) the `mode` parameter can specify running as a host or a controller, and `<hostname>:<port>` can specify a host name (or IP address) and TCP port number on which to reach the gRPC server for the emulator.
Both the `mode=<host|controller>` and `<hostname>:<port>` parameters are optional (so the moniker `android-emulator` by itself is a valid moniker, which will create a transport in `host` mode, connected to `localhost` on the default gRPC port for the emulator).
!!! example Example !!! example Example
`android-emulator` `android-emulator`

5
examples/asha_sink1.json Normal file
View File

@@ -0,0 +1,5 @@
{
"name": "Bumble Aid Left",
"address": "F1:F2:F3:F4:F5:F6",
"keystore": "JsonKeyStore"
}

5
examples/asha_sink2.json Normal file
View File

@@ -0,0 +1,5 @@
{
"name": "Bumble Aid Right",
"address": "F7:F8:F9:FA:FB:FC",
"keystore": "JsonKeyStore"
}

View File

@@ -34,8 +34,8 @@ from bumble.gatt import (
Characteristic, Characteristic,
CharacteristicValue, CharacteristicValue,
GATT_DEVICE_INFORMATION_SERVICE, GATT_DEVICE_INFORMATION_SERVICE,
GATT_DEVICE_HUMAN_INTERFACE_DEVICE_SERVICE, GATT_HUMAN_INTERFACE_DEVICE_SERVICE,
GATT_DEVICE_BATTERY_SERVICE, GATT_BATTERY_SERVICE,
GATT_BATTERY_LEVEL_CHARACTERISTIC, GATT_BATTERY_LEVEL_CHARACTERISTIC,
GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC, GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC,
GATT_REPORT_CHARACTERISTIC, GATT_REPORT_CHARACTERISTIC,
@@ -126,8 +126,8 @@ async def keyboard_host(device, peer_address):
connection = await device.connect(peer_address) connection = await device.connect(peer_address)
await connection.pair() await connection.pair()
peer = Peer(connection) peer = Peer(connection)
await peer.discover_service(GATT_DEVICE_HUMAN_INTERFACE_DEVICE_SERVICE) await peer.discover_service(GATT_HUMAN_INTERFACE_DEVICE_SERVICE)
hid_services = peer.get_services_by_uuid(GATT_DEVICE_HUMAN_INTERFACE_DEVICE_SERVICE) hid_services = peer.get_services_by_uuid(GATT_HUMAN_INTERFACE_DEVICE_SERVICE)
if not hid_services: if not hid_services:
print(color('!!! No HID service', 'red')) print(color('!!! No HID service', 'red'))
return return
@@ -221,7 +221,7 @@ async def keyboard_device(device, command):
] ]
), ),
Service( Service(
GATT_DEVICE_HUMAN_INTERFACE_DEVICE_SERVICE, GATT_HUMAN_INTERFACE_DEVICE_SERVICE,
[ [
Characteristic( Characteristic(
GATT_PROTOCOL_MODE_CHARACTERISTIC, GATT_PROTOCOL_MODE_CHARACTERISTIC,
@@ -252,7 +252,7 @@ async def keyboard_device(device, command):
] ]
), ),
Service( Service(
GATT_DEVICE_BATTERY_SERVICE, GATT_BATTERY_SERVICE,
[ [
Characteristic( Characteristic(
GATT_BATTERY_LEVEL_CHARACTERISTIC, GATT_BATTERY_LEVEL_CHARACTERISTIC,
@@ -273,7 +273,7 @@ async def keyboard_device(device, command):
AdvertisingData([ AdvertisingData([
(AdvertisingData.COMPLETE_LOCAL_NAME, bytes('Bumble Keyboard', 'utf-8')), (AdvertisingData.COMPLETE_LOCAL_NAME, bytes('Bumble Keyboard', 'utf-8')),
(AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS, (AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS,
bytes(GATT_DEVICE_HUMAN_INTERFACE_DEVICE_SERVICE)), bytes(GATT_HUMAN_INTERFACE_DEVICE_SERVICE)),
(AdvertisingData.APPEARANCE, struct.pack('<H', 0x03C1)), (AdvertisingData.APPEARANCE, struct.pack('<H', 0x03C1)),
(AdvertisingData.FLAGS, bytes([0x05])) (AdvertisingData.FLAGS, bytes([0x05]))
]) ])

View File

@@ -29,18 +29,31 @@ from bumble.transport import open_transport_or_link
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
async def main(): async def main():
if len(sys.argv) != 3: if len(sys.argv) < 3:
print('Usage: run_advertiser.py <config-file> <transport-spec>') print('Usage: run_advertiser.py <config-file> <transport-spec> [type] [address]')
print('example: run_advertiser.py device1.json link-relay:ws://localhost:8888/test') print('example: run_advertiser.py device1.json usb:0')
return return
if len(sys.argv) >= 4:
advertising_type = AdvertisingType(int(sys.argv[3]))
else:
advertising_type = AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE
if advertising_type.is_directed:
if len(sys.argv) < 5:
print('<address> required for directed advertising')
return
target = Address(sys.argv[4])
else:
target = None
print('<<< connecting to HCI...') print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
print('<<< connected') print('<<< connected')
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
await device.power_on() await device.power_on()
await device.start_advertising() await device.start_advertising(advertising_type=advertising_type, target=target)
await hci_source.wait_for_termination() await hci_source.wait_for_termination()
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------

161
examples/run_asha_sink.py Normal file
View File

@@ -0,0 +1,161 @@
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import struct
import sys
import os
import logging
from bumble.core import AdvertisingData
from bumble.device import Device
from bumble.transport import open_transport_or_link
from bumble.hci import UUID
from bumble.gatt import (
Service,
Characteristic,
CharacteristicValue
)
# -----------------------------------------------------------------------------
# Constants
# -----------------------------------------------------------------------------
ASHA_SERVICE = UUID.from_16_bits(0xFDF0, 'Audio Streaming for Hearing Aid')
ASHA_READ_ONLY_PROPERTIES_CHARACTERISTIC = UUID('6333651e-c481-4a3e-9169-7c902aad37bb', 'ReadOnlyProperties')
ASHA_AUDIO_CONTROL_POINT_CHARACTERISTIC = UUID('f0d4de7e-4a88-476c-9d9f-1937b0996cc0', 'AudioControlPoint')
ASHA_AUDIO_STATUS_CHARACTERISTIC = UUID('38663f1a-e711-4cac-b641-326b56404837', 'AudioStatus')
ASHA_VOLUME_CHARACTERISTIC = UUID('00e4ca9e-ab14-41e4-8823-f9e70c7e91df', 'Volume')
ASHA_LE_PSM_OUT_CHARACTERISTIC = UUID('2d410339-82b6-42aa-b34e-e2e01df8cc1a', 'LE_PSM_OUT')
# -----------------------------------------------------------------------------
async def main():
if len(sys.argv) != 4:
print('Usage: python run_asha_sink.py <device-config> <transport-spec> <audio-file>')
print('example: python run_asha_sink.py device1.json usb:0 audio_out.g722')
return
audio_out = open(sys.argv[3], 'wb')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
# Handler for audio control commands
def on_audio_control_point_write(connection, value):
print('--- AUDIO CONTROL POINT Write:', value.hex())
opcode = value[0]
if opcode == 1:
# Start
audio_type = ('Unknown', 'Ringtone', 'Phone Call', 'Media')[value[2]]
print(f'### START: codec={value[1]}, audio_type={audio_type}, volume={value[3]}, otherstate={value[4]}')
elif opcode == 2:
print('### STOP')
elif opcode == 3:
print(f'### STATUS: connected={value[1]}')
# Respond with a status
asyncio.create_task(device.notify_subscribers(audio_status_characteristic, force=True))
# Handler for volume control
def on_volume_write(connection, value):
print('--- VOLUME Write:', value[0])
# Register an L2CAP CoC server
def on_coc(channel):
def on_data(data):
print('<<< Voice data received:', data.hex())
audio_out.write(data)
channel.sink = on_data
psm = device.register_l2cap_channel_server(0, on_coc, 8)
print(f'### LE_PSM_OUT = {psm}')
# Add the ASHA service to the GATT server
read_only_properties_characteristic = Characteristic(
ASHA_READ_ONLY_PROPERTIES_CHARACTERISTIC,
Characteristic.READ,
Characteristic.READABLE,
bytes([
0x01, # Version
0x00, # Device Capabilities [Left, Monaural]
0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, # HiSyncId
0x01, # Feature Map [LE CoC audio output streaming supported]
0x00, 0x00, # Render Delay
0x00, 0x00, # RFU
0x02, 0x00 # Codec IDs [G.722 at 16 kHz]
])
)
audio_control_point_characteristic = Characteristic(
ASHA_AUDIO_CONTROL_POINT_CHARACTERISTIC,
Characteristic.WRITE | Characteristic.WRITE_WITHOUT_RESPONSE,
Characteristic.WRITEABLE,
CharacteristicValue(write=on_audio_control_point_write)
)
audio_status_characteristic = Characteristic(
ASHA_AUDIO_STATUS_CHARACTERISTIC,
Characteristic.READ | Characteristic.NOTIFY,
Characteristic.READABLE,
bytes([0])
)
volume_characteristic = Characteristic(
ASHA_VOLUME_CHARACTERISTIC,
Characteristic.WRITE_WITHOUT_RESPONSE,
Characteristic.WRITEABLE,
CharacteristicValue(write=on_volume_write)
)
le_psm_out_characteristic = Characteristic(
ASHA_LE_PSM_OUT_CHARACTERISTIC,
Characteristic.READ,
Characteristic.READABLE,
struct.pack('<H', psm)
)
device.add_service(Service(
ASHA_SERVICE,
[
read_only_properties_characteristic,
audio_control_point_characteristic,
audio_status_characteristic,
volume_characteristic,
le_psm_out_characteristic
]
))
# Set the advertising data
device.advertising_data = bytes(
AdvertisingData([
(AdvertisingData.COMPLETE_LOCAL_NAME, bytes(device.name, 'utf-8')),
(AdvertisingData.FLAGS, bytes([0x06])),
(AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS, bytes(ASHA_SERVICE)),
(AdvertisingData.SERVICE_DATA_16_BIT_UUID, bytes(ASHA_SERVICE) + bytes([
0x01, # Protocol Version
0x00, # Capability
0x01, 0x02, 0x03, 0x04 # Truncated HiSyncID
]))
])
)
# Go!
await device.power_on()
await device.start_advertising(auto_restart=True)
await hci_source.wait_for_termination()
# -----------------------------------------------------------------------------
logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
asyncio.run(main())

View File

@@ -30,7 +30,7 @@ from bumble.sdp import Client as SDP_Client, SDP_PUBLIC_BROWSE_ROOT, SDP_ALL_ATT
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
async def main(): async def main():
if len(sys.argv) < 3: if len(sys.argv) < 3:
print('Usage: run_classic_connect.py <device-config> <transport-spec> <bluetooth-address>') print('Usage: run_classic_connect.py <device-config> <transport-spec> <bluetooth-addresses..>')
print('example: run_classic_connect.py classic1.json usb:04b4:f901 E1:CA:72:48:C4:E8') print('example: run_classic_connect.py classic1.json usb:04b4:f901 E1:CA:72:48:C4:E8')
return return
@@ -43,8 +43,7 @@ async def main():
device.classic_enabled = True device.classic_enabled = True
await device.power_on() await device.power_on()
# Connect to a peer async def connect(target_address):
target_address = sys.argv[3]
print(f'=== Connecting to {target_address}...') print(f'=== Connecting to {target_address}...')
connection = await device.connect(target_address, transport=BT_BR_EDR_TRANSPORT) connection = await device.connect(target_address, transport=BT_BR_EDR_TRANSPORT)
print(f'=== Connected to {connection.peer_address}!') print(f'=== Connected to {connection.peer_address}!')
@@ -76,6 +75,10 @@ async def main():
await sdp_client.disconnect() await sdp_client.disconnect()
await hci_source.wait_for_termination() await hci_source.wait_for_termination()
# Connect to a peer
target_addresses = sys.argv[3:]
await asyncio.wait([asyncio.create_task(connect(target_address)) for target_address in target_addresses])
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper()) logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
asyncio.run(main()) asyncio.run(main())

View File

@@ -29,15 +29,15 @@ from bumble.transport import open_transport_or_link
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class ScannerListener(Device.Listener): class ScannerListener(Device.Listener):
def on_advertisement(self, address, ad_data, rssi, connectable): def on_advertisement(self, advertisement):
address_type_string = ('P', 'R', 'PI', 'RI')[address.address_type] address_type_string = ('P', 'R', 'PI', 'RI')[advertisement.address.address_type]
address_color = 'yellow' if connectable else 'red' address_color = 'yellow' if advertisement.is_connectable else 'red'
if address_type_string.startswith('P'): if address_type_string.startswith('P'):
type_color = 'green' type_color = 'green'
else: else:
type_color = 'cyan' type_color = 'cyan'
print(f'>>> {color(address, address_color)} [{color(address_type_string, type_color)}]: RSSI={rssi}, {ad_data}') print(f'>>> {color(advertisement.address, address_color)} [{color(address_type_string, type_color)}]: RSSI={advertisement.rssi}, {advertisement.data}')
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------

View File

@@ -49,11 +49,17 @@ class Listener(Device.Listener, Connection.Listener):
) )
# -----------------------------------------------------------------------------
# Alternative way to listen for subscriptions
# -----------------------------------------------------------------------------
def on_my_characteristic_subscription(peer, enabled):
print(f'### My characteristic from {peer}: {"enabled" if enabled else "disabled"}')
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
async def main(): async def main():
if len(sys.argv) < 3: if len(sys.argv) < 3:
print('Usage: run_gatt_server.py <device-config> <transport-spec>') print('Usage: run_notifier.py <device-config> <transport-spec>')
print('example: run_gatt_server.py device1.json usb:0') print('example: run_notifier.py device1.json usb:0')
return return
print('<<< connecting to HCI...') print('<<< connecting to HCI...')
@@ -83,6 +89,7 @@ async def main():
Characteristic.READABLE, Characteristic.READABLE,
bytes([0x42]) bytes([0x42])
) )
characteristic3.on('subscription', on_my_characteristic_subscription)
custom_service = Service( custom_service = Service(
'50DB505C-8AC4-4738-8448-3B1D9CC09CC5', '50DB505C-8AC4-4738-8448-3B1D9CC09CC5',
[characteristic1, characteristic2, characteristic3] [characteristic1, characteristic2, characteristic3]

View File

@@ -98,6 +98,7 @@ async def list_rfcomm_channels(device, connection):
await sdp_client.disconnect() await sdp_client.disconnect()
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class TcpServerProtocol(asyncio.Protocol): class TcpServerProtocol(asyncio.Protocol):
def __init__(self, rfcomm_session): def __init__(self, rfcomm_session):
@@ -173,7 +174,7 @@ async def main():
print('*** Encryption on') print('*** Encryption on')
# Create a client and start it # Create a client and start it
print('@@@ Starting to RFCOMM client...') print('@@@ Starting RFCOMM client...')
rfcomm_client = Client(device, connection) rfcomm_client = Client(device, connection)
rfcomm_mux = await rfcomm_client.start() rfcomm_mux = await rfcomm_client.start()
print('@@@ Started') print('@@@ Started')
@@ -192,7 +193,7 @@ async def main():
if len(sys.argv) == 6: if len(sys.argv) == 6:
# A TCP port was specified, start listening # A TCP port was specified, start listening
tcp_port = int(sys.argv[5]) tcp_port = int(sys.argv[5])
asyncio.get_running_loop().create_task(tcp_server(tcp_port, session)) asyncio.create_task(tcp_server(tcp_port, session))
await hci_source.wait_for_termination() await hci_source.wait_for_termination()

View File

@@ -40,24 +40,24 @@ async def main():
device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink) device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink)
@device.on('advertisement') @device.on('advertisement')
def _(address, ad_data, rssi, connectable): def _(advertisement):
address_type_string = ('PUBLIC', 'RANDOM', 'PUBLIC_ID', 'RANDOM_ID')[address.address_type] address_type_string = ('PUBLIC', 'RANDOM', 'PUBLIC_ID', 'RANDOM_ID')[advertisement.address.address_type]
address_color = 'yellow' if connectable else 'red' address_color = 'yellow' if advertisement.is_connectable else 'red'
address_qualifier = '' address_qualifier = ''
if address_type_string.startswith('P'): if address_type_string.startswith('P'):
type_color = 'cyan' type_color = 'cyan'
else: else:
if address.is_static: if advertisement.address.is_static:
type_color = 'green' type_color = 'green'
address_qualifier = '(static)' address_qualifier = '(static)'
elif address.is_resolvable: elif advertisement.address.is_resolvable:
type_color = 'magenta' type_color = 'magenta'
address_qualifier = '(resolvable)' address_qualifier = '(resolvable)'
else: else:
type_color = 'white' type_color = 'white'
separator = '\n ' separator = '\n '
print(f'>>> {color(address, address_color)} [{color(address_type_string, type_color)}]{address_qualifier}:{separator}RSSI:{rssi}{separator}{ad_data.to_string(separator)}') print(f'>>> {color(advertisement.address, address_color)} [{color(address_type_string, type_color)}]{address_qualifier}:{separator}RSSI:{advertisement.rssi}{separator}{advertisement.data.to_string(separator)}')
await device.power_on() await device.power_on()
await device.start_scanning(filter_duplicates=filter_duplicates) await device.start_scanning(filter_duplicates=filter_duplicates)

View File

@@ -48,8 +48,10 @@ install_requires =
[options.entry_points] [options.entry_points]
console_scripts = console_scripts =
bumble-console = bumble.apps.console:main bumble-console = bumble.apps.console:main
bumble-controller-info = bumble.apps.controller_info:main
bumble-gatt-dump = bumble.apps.gatt_dump:main bumble-gatt-dump = bumble.apps.gatt_dump:main
bumble-hci-bridge = bumble.apps.hci_bridge:main bumble-hci-bridge = bumble.apps.hci_bridge:main
bumble-l2cap-bridge = bumble.apps.l2cap_bridge:main
bumble-pair = bumble.apps.pair:main bumble-pair = bumble.apps.pair:main
bumble-scan = bumble.apps.scan:main bumble-scan = bumble.apps.scan:main
bumble-show = bumble.apps.show:main bumble-show = bumble.apps.show:main
@@ -63,6 +65,7 @@ build =
test = test =
pytest >= 6.2 pytest >= 6.2
pytest-asyncio >= 0.17 pytest-asyncio >= 0.17
coverage >= 6.4
development = development =
invoke >= 1.4 invoke >= 1.4
nox >= 2022 nox >= 2022

View File

@@ -24,19 +24,19 @@ def test_ad_data():
ad = AdvertisingData.from_bytes(data) ad = AdvertisingData.from_bytes(data)
ad_bytes = bytes(ad) ad_bytes = bytes(ad)
assert(data == ad_bytes) assert(data == ad_bytes)
assert(ad.get(AdvertisingData.COMPLETE_LOCAL_NAME) is None) assert(ad.get(AdvertisingData.COMPLETE_LOCAL_NAME, raw=True) is None)
assert(ad.get(AdvertisingData.TX_POWER_LEVEL) == bytes([123])) assert(ad.get(AdvertisingData.TX_POWER_LEVEL, raw=True) == bytes([123]))
assert(ad.get(AdvertisingData.COMPLETE_LOCAL_NAME, return_all=True) == []) assert(ad.get(AdvertisingData.COMPLETE_LOCAL_NAME, return_all=True, raw=True) == [])
assert(ad.get(AdvertisingData.TX_POWER_LEVEL, return_all=True) == [bytes([123])]) assert(ad.get(AdvertisingData.TX_POWER_LEVEL, return_all=True, raw=True) == [bytes([123])])
data2 = bytes([2, AdvertisingData.TX_POWER_LEVEL, 234]) data2 = bytes([2, AdvertisingData.TX_POWER_LEVEL, 234])
ad.append(data2) ad.append(data2)
ad_bytes = bytes(ad) ad_bytes = bytes(ad)
assert(ad_bytes == data + data2) assert(ad_bytes == data + data2)
assert(ad.get(AdvertisingData.COMPLETE_LOCAL_NAME) is None) assert(ad.get(AdvertisingData.COMPLETE_LOCAL_NAME, raw=True) is None)
assert(ad.get(AdvertisingData.TX_POWER_LEVEL) == bytes([123])) assert(ad.get(AdvertisingData.TX_POWER_LEVEL, raw=True) == bytes([123]))
assert(ad.get(AdvertisingData.COMPLETE_LOCAL_NAME, return_all=True) == []) assert(ad.get(AdvertisingData.COMPLETE_LOCAL_NAME, return_all=True, raw=True) == [])
assert(ad.get(AdvertisingData.TX_POWER_LEVEL, return_all=True) == [bytes([123]), bytes([234])]) assert(ad.get(AdvertisingData.TX_POWER_LEVEL, return_all=True, raw=True) == [bytes([123]), bytes([234])])
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------

188
tests/device_test.py Normal file
View File

@@ -0,0 +1,188 @@
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import logging
import os
from types import LambdaType
import pytest
from bumble.core import BT_BR_EDR_TRANSPORT
from bumble.device import Connection, Device
from bumble.host import Host
from bumble.hci import (
HCI_ACCEPT_CONNECTION_REQUEST_COMMAND, HCI_COMMAND_STATUS_PENDING, HCI_CREATE_CONNECTION_COMMAND, HCI_SUCCESS,
Address, HCI_Command_Complete_Event, HCI_Command_Status_Event, HCI_Connection_Complete_Event, HCI_Connection_Request_Event, HCI_Packet
)
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
class Sink:
def __init__(self, flow):
self.flow = flow
next(self.flow)
def on_packet(self, packet):
self.flow.send(packet)
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_device_connect_parallel():
d0 = Device(host=Host(None, None))
d1 = Device(host=Host(None, None))
d2 = Device(host=Host(None, None))
# enable classic
d0.classic_enabled = True
d1.classic_enabled = True
d2.classic_enabled = True
# set public addresses
d0.public_address = Address('F0:F1:F2:F3:F4:F5', address_type=Address.PUBLIC_DEVICE_ADDRESS)
d1.public_address = Address('F5:F4:F3:F2:F1:F0', address_type=Address.PUBLIC_DEVICE_ADDRESS)
d2.public_address = Address('F5:F4:F3:F3:F4:F5', address_type=Address.PUBLIC_DEVICE_ADDRESS)
def d0_flow():
packet = HCI_Packet.from_bytes((yield))
assert packet.name == 'HCI_CREATE_CONNECTION_COMMAND'
assert packet.bd_addr == d1.public_address
d0.host.on_hci_packet(HCI_Command_Status_Event(
status = HCI_COMMAND_STATUS_PENDING,
num_hci_command_packets = 1,
command_opcode = HCI_CREATE_CONNECTION_COMMAND
))
d1.host.on_hci_packet(HCI_Connection_Request_Event(
bd_addr = d0.public_address,
class_of_device = 0,
link_type = HCI_Connection_Complete_Event.ACL_LINK_TYPE
))
packet = HCI_Packet.from_bytes((yield))
assert packet.name == 'HCI_CREATE_CONNECTION_COMMAND'
assert packet.bd_addr == d2.public_address
d0.host.on_hci_packet(HCI_Command_Status_Event(
status = HCI_COMMAND_STATUS_PENDING,
num_hci_command_packets = 1,
command_opcode = HCI_CREATE_CONNECTION_COMMAND
))
d2.host.on_hci_packet(HCI_Connection_Request_Event(
bd_addr = d0.public_address,
class_of_device = 0,
link_type = HCI_Connection_Complete_Event.ACL_LINK_TYPE
))
assert (yield) == None
def d1_flow():
packet = HCI_Packet.from_bytes((yield))
assert packet.name == 'HCI_ACCEPT_CONNECTION_REQUEST_COMMAND'
d1.host.on_hci_packet(HCI_Command_Complete_Event(
num_hci_command_packets = 1,
command_opcode = HCI_ACCEPT_CONNECTION_REQUEST_COMMAND,
return_parameters = b"\x00"
))
d1.host.on_hci_packet(HCI_Connection_Complete_Event(
status = HCI_SUCCESS,
connection_handle = 0x100,
bd_addr = d0.public_address,
link_type = HCI_Connection_Complete_Event.ACL_LINK_TYPE,
encryption_enabled = True,
))
d0.host.on_hci_packet(HCI_Connection_Complete_Event(
status = HCI_SUCCESS,
connection_handle = 0x100,
bd_addr = d1.public_address,
link_type = HCI_Connection_Complete_Event.ACL_LINK_TYPE,
encryption_enabled = True,
))
assert (yield) == None
def d2_flow():
packet = HCI_Packet.from_bytes((yield))
assert packet.name == 'HCI_ACCEPT_CONNECTION_REQUEST_COMMAND'
d2.host.on_hci_packet(HCI_Command_Complete_Event(
num_hci_command_packets = 1,
command_opcode = HCI_ACCEPT_CONNECTION_REQUEST_COMMAND,
return_parameters = b"\x00"
))
d2.host.on_hci_packet(HCI_Connection_Complete_Event(
status = HCI_SUCCESS,
connection_handle = 0x101,
bd_addr = d0.public_address,
link_type = HCI_Connection_Complete_Event.ACL_LINK_TYPE,
encryption_enabled = True,
))
d0.host.on_hci_packet(HCI_Connection_Complete_Event(
status = HCI_SUCCESS,
connection_handle = 0x101,
bd_addr = d2.public_address,
link_type = HCI_Connection_Complete_Event.ACL_LINK_TYPE,
encryption_enabled = True,
))
assert (yield) == None
d0.host.set_packet_sink(Sink(d0_flow()))
d1.host.set_packet_sink(Sink(d1_flow()))
d2.host.set_packet_sink(Sink(d2_flow()))
[c01, c02, a10, a20, a01] = await asyncio.gather(*[
asyncio.create_task(d0.connect(d1.public_address, transport=BT_BR_EDR_TRANSPORT)),
asyncio.create_task(d0.connect(d2.public_address, transport=BT_BR_EDR_TRANSPORT)),
asyncio.create_task(d1.accept(peer_address=d0.public_address)),
asyncio.create_task(d2.accept()),
asyncio.create_task(d0.accept(peer_address=d1.public_address)),
])
assert type(c01) == Connection
assert type(c02) == Connection
assert type(a10) == Connection
assert type(a20) == Connection
assert type(a01) == Connection
assert c01.handle == a10.handle and c01.handle == 0x100
assert c02.handle == a20.handle and c02.handle == 0x101
assert a01 == c01
# -----------------------------------------------------------------------------
async def run_test_device():
await test_device_connect_parallel()
# -----------------------------------------------------------------------------
if __name__ == '__main__':
logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
asyncio.run(run_test_device())

View File

@@ -164,6 +164,17 @@ async def test_characteristic_encoding():
await async_barrier() await async_barrier()
assert characteristic.value == bytes([124]) assert characteristic.value == bytes([124])
v = await cp.read_value()
assert v == 124
await cp.write_value(125, with_response=True)
await async_barrier()
assert characteristic.value == bytes([125])
cd = DelegatedCharacteristicAdapter(c, encode=lambda x: bytes([x // 2]))
await cd.write_value(100, with_response=True)
await async_barrier()
assert characteristic.value == bytes([50])
last_change = None last_change = None
def on_change(value): def on_change(value):

View File

@@ -27,8 +27,8 @@ def basic_check(x):
parsed_str = str(parsed) parsed_str = str(parsed)
print(x_str) print(x_str)
parsed_bytes = parsed.to_bytes() parsed_bytes = parsed.to_bytes()
assert(x_str == parsed_str) assert x_str == parsed_str
assert(packet == parsed_bytes) assert packet == parsed_bytes
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -49,10 +49,10 @@ def test_HCI_LE_Connection_Complete_Event():
role = 1, role = 1,
peer_address_type = 1, peer_address_type = 1,
peer_address = address, peer_address = address,
conn_interval = 3, connection_interval = 3,
conn_latency = 4, peripheral_latency = 4,
supervision_timeout = 5, supervision_timeout = 5,
master_clock_accuracy = 6 central_clock_accuracy = 6
) )
basic_check(event) basic_check(event)
@@ -60,8 +60,8 @@ def test_HCI_LE_Connection_Complete_Event():
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_HCI_LE_Advertising_Report_Event(): def test_HCI_LE_Advertising_Report_Event():
address = Address('00:11:22:33:44:55') address = Address('00:11:22:33:44:55')
report = HCI_Object( report = HCI_LE_Advertising_Report_Event.Report(
HCI_LE_Advertising_Report_Event.REPORT_FIELDS, HCI_LE_Advertising_Report_Event.Report.FIELDS,
event_type = HCI_LE_Advertising_Report_Event.ADV_IND, event_type = HCI_LE_Advertising_Report_Event.ADV_IND,
address_type = Address.PUBLIC_DEVICE_ADDRESS, address_type = Address.PUBLIC_DEVICE_ADDRESS,
address = address, address = address,
@@ -87,8 +87,8 @@ def test_HCI_LE_Connection_Update_Complete_Event():
event = HCI_LE_Connection_Update_Complete_Event( event = HCI_LE_Connection_Update_Complete_Event(
status = HCI_SUCCESS, status = HCI_SUCCESS,
connection_handle = 0x007, connection_handle = 0x007,
conn_interval = 10, connection_interval = 10,
conn_latency = 3, peripheral_latency = 3,
supervision_timeout = 5 supervision_timeout = 5
) )
basic_check(event) basic_check(event)
@@ -133,7 +133,7 @@ def test_HCI_Command_Complete_Event():
) )
basic_check(event) basic_check(event)
event = HCI_Packet.from_bytes(event.to_bytes()) event = HCI_Packet.from_bytes(event.to_bytes())
assert(event.return_parameters == 7) assert event.return_parameters == 7
# With a simple status as an integer status # With a simple status as an integer status
event = HCI_Command_Complete_Event( event = HCI_Command_Complete_Event(
@@ -142,7 +142,7 @@ def test_HCI_Command_Complete_Event():
return_parameters = 9 return_parameters = 9
) )
basic_check(event) basic_check(event)
assert(event.return_parameters == 9) assert event.return_parameters == 9
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -283,12 +283,32 @@ def test_HCI_LE_Create_Connection_Command():
peer_address_type = 1, peer_address_type = 1,
peer_address = Address('00:11:22:33:44:55'), peer_address = Address('00:11:22:33:44:55'),
own_address_type = 2, own_address_type = 2,
conn_interval_min = 7, connection_interval_min = 7,
conn_interval_max = 8, connection_interval_max = 8,
conn_latency = 9, max_latency = 9,
supervision_timeout = 10, supervision_timeout = 10,
minimum_ce_length = 11, min_ce_length = 11,
maximum_ce_length = 12 max_ce_length = 12
)
basic_check(command)
# -----------------------------------------------------------------------------
def test_HCI_LE_Extended_Create_Connection_Command():
command = HCI_LE_Extended_Create_Connection_Command(
initiator_filter_policy = 0,
own_address_type = 0,
peer_address_type = 1,
peer_address = Address('00:11:22:33:44:55'),
initiating_phys = 3,
scan_intervals = (10, 11),
scan_windows = (12, 13),
connection_interval_mins = (14, 15),
connection_interval_maxs = (16, 17),
max_latencies = (18, 19),
supervision_timeouts = (20, 21),
min_ce_lengths = (100, 101),
max_ce_lengths = (102, 103)
) )
basic_check(command) basic_check(command)
@@ -314,13 +334,13 @@ def test_HCI_LE_Remove_Device_From_Filter_Accept_List_Command():
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_HCI_LE_Connection_Update_Command(): def test_HCI_LE_Connection_Update_Command():
command = HCI_LE_Connection_Update_Command( command = HCI_LE_Connection_Update_Command(
connection_handle = 0x0002, connection_handle = 0x0002,
conn_interval_min = 10, connection_interval_min = 10,
conn_interval_max = 20, connection_interval_max = 20,
conn_latency = 7, max_latency = 7,
supervision_timeout = 3, supervision_timeout = 3,
minimum_ce_length = 100, min_ce_length = 100,
maximum_ce_length = 200 max_ce_length = 200
) )
basic_check(command) basic_check(command)
@@ -348,7 +368,7 @@ def test_HCI_LE_Set_Extended_Scan_Parameters_Command():
command = HCI_LE_Set_Extended_Scan_Parameters_Command( command = HCI_LE_Set_Extended_Scan_Parameters_Command(
own_address_type=Address.RANDOM_DEVICE_ADDRESS, own_address_type=Address.RANDOM_DEVICE_ADDRESS,
scanning_filter_policy=HCI_LE_Set_Extended_Scan_Parameters_Command.BASIC_FILTERED_POLICY, scanning_filter_policy=HCI_LE_Set_Extended_Scan_Parameters_Command.BASIC_FILTERED_POLICY,
scanning_phys=(1 << HCI_LE_Set_Extended_Scan_Parameters_Command.LE_1M_PHY | 1 << HCI_LE_Set_Extended_Scan_Parameters_Command.LE_CODED_PHY | 1 << 4), scanning_phys=(1 << HCI_LE_1M_PHY_BIT | 1 << HCI_LE_CODED_PHY_BIT | 1 << 4),
scan_types=[ scan_types=[
HCI_LE_Set_Extended_Scan_Parameters_Command.ACTIVE_SCANNING, HCI_LE_Set_Extended_Scan_Parameters_Command.ACTIVE_SCANNING,
HCI_LE_Set_Extended_Scan_Parameters_Command.ACTIVE_SCANNING, HCI_LE_Set_Extended_Scan_Parameters_Command.ACTIVE_SCANNING,
@@ -363,20 +383,20 @@ def test_HCI_LE_Set_Extended_Scan_Parameters_Command():
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_address(): def test_address():
a = Address('C4:F2:17:1A:1D:BB') a = Address('C4:F2:17:1A:1D:BB')
assert(not a.is_public) assert not a.is_public
assert(a.is_random) assert a.is_random
assert(a.address_type == Address.RANDOM_DEVICE_ADDRESS) assert a.address_type == Address.RANDOM_DEVICE_ADDRESS
assert(not a.is_resolvable) assert not a.is_resolvable
assert(not a.is_resolved) assert not a.is_resolved
assert(a.is_static) assert a.is_static
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def test_custom(): def test_custom():
data = bytes([0x77, 0x02, 0x01, 0x03]) data = bytes([0x77, 0x02, 0x01, 0x03])
packet = HCI_CustomPacket(data) packet = HCI_CustomPacket(data)
assert(packet.hci_packet_type == 0x77) assert packet.hci_packet_type == 0x77
assert(packet.payload == data) assert packet.payload == data
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -408,6 +428,7 @@ def run_test_commands():
test_HCI_LE_Set_Scan_Parameters_Command() test_HCI_LE_Set_Scan_Parameters_Command()
test_HCI_LE_Set_Scan_Enable_Command() test_HCI_LE_Set_Scan_Enable_Command()
test_HCI_LE_Create_Connection_Command() test_HCI_LE_Create_Connection_Command()
test_HCI_LE_Extended_Create_Connection_Command()
test_HCI_LE_Add_Device_To_Filter_Accept_List_Command() test_HCI_LE_Add_Device_To_Filter_Accept_List_Command()
test_HCI_LE_Remove_Device_From_Filter_Accept_List_Command() test_HCI_LE_Remove_Device_From_Filter_Accept_List_Command()
test_HCI_LE_Connection_Update_Command() test_HCI_LE_Connection_Update_Command()

View File

@@ -62,6 +62,57 @@ def test_import():
assert utils assert utils
# -----------------------------------------------------------------------------
def test_app_imports():
from bumble.apps.console import main
assert main
from bumble.apps.controller_info import main
assert main
from bumble.apps.controllers import main
assert main
from bumble.apps.gatt_dump import main
assert main
from bumble.apps.gg_bridge import main
assert main
from bumble.apps.hci_bridge import main
assert main
from bumble.apps.pair import main
assert main
from bumble.apps.scan import main
assert main
from bumble.apps.show import main
assert main
from bumble.apps.unbond import main
assert main
from bumble.apps.usb_probe import main
assert main
# -----------------------------------------------------------------------------
def test_profiles_imports():
from bumble.profiles import (
battery_service,
device_information_service,
heart_rate_service
)
assert battery_service
assert device_information_service
assert heart_rate_service
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
if __name__ == '__main__': if __name__ == '__main__':
test_import() test_import()
test_app_imports()
test_profiles_imports()

284
tests/l2cap_test.py Normal file
View File

@@ -0,0 +1,284 @@
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import logging
import os
import random
import pytest
from bumble.controller import Controller
from bumble.link import LocalLink
from bumble.device import Device
from bumble.host import Host
from bumble.transport import AsyncPipeSink
from bumble.core import ProtocolError
from bumble.l2cap import (
L2CAP_Connection_Request
)
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
class TwoDevices:
def __init__(self):
self.connections = [None, None]
self.link = LocalLink()
self.controllers = [
Controller('C1', link = self.link),
Controller('C2', link = self.link)
]
self.devices = [
Device(
address = 'F0:F1:F2:F3:F4:F5',
host = Host(self.controllers[0], AsyncPipeSink(self.controllers[0]))
),
Device(
address = 'F5:F4:F3:F2:F1:F0',
host = Host(self.controllers[1], AsyncPipeSink(self.controllers[1]))
)
]
self.paired = [None, None]
def on_connection(self, which, connection):
self.connections[which] = connection
def on_paired(self, which, keys):
self.paired[which] = keys
# -----------------------------------------------------------------------------
async def setup_connection():
# Create two devices, each with a controller, attached to the same link
two_devices = TwoDevices()
# Attach listeners
two_devices.devices[0].on('connection', lambda connection: two_devices.on_connection(0, connection))
two_devices.devices[1].on('connection', lambda connection: two_devices.on_connection(1, connection))
# Start
await two_devices.devices[0].power_on()
await two_devices.devices[1].power_on()
# Connect the two devices
await two_devices.devices[0].connect(two_devices.devices[1].random_address)
# Check the post conditions
assert(two_devices.connections[0] is not None)
assert(two_devices.connections[1] is not None)
return two_devices
# -----------------------------------------------------------------------------
def test_helpers():
psm = L2CAP_Connection_Request.serialize_psm(0x01)
assert(psm == bytes([0x01, 0x00]))
psm = L2CAP_Connection_Request.serialize_psm(0x1023)
assert(psm == bytes([0x23, 0x10]))
psm = L2CAP_Connection_Request.serialize_psm(0x242311)
assert(psm == bytes([0x11, 0x23, 0x24]))
(offset, psm) = L2CAP_Connection_Request.parse_psm(bytes([0x00, 0x01, 0x00, 0x44]), 1)
assert(offset == 3)
assert(psm == 0x01)
(offset, psm) = L2CAP_Connection_Request.parse_psm(bytes([0x00, 0x23, 0x10, 0x44]), 1)
assert(offset == 3)
assert(psm == 0x1023)
(offset, psm) = L2CAP_Connection_Request.parse_psm(bytes([0x00, 0x11, 0x23, 0x24, 0x44]), 1)
assert(offset == 4)
assert(psm == 0x242311)
rq = L2CAP_Connection_Request(psm = 0x01, source_cid = 0x44)
brq = bytes(rq)
srq = L2CAP_Connection_Request.from_bytes(brq)
assert(srq.psm == rq.psm)
assert(srq.source_cid == rq.source_cid)
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_basic_connection():
devices = await setup_connection()
psm = 1234
# Check that if there's no one listening, we can't connect
with pytest.raises(ProtocolError):
l2cap_channel = await devices.connections[0].open_l2cap_channel(psm)
# Now add a listener
incoming_channel = None
received = []
def on_coc(channel):
nonlocal incoming_channel
incoming_channel = channel
def on_data(data):
received.append(data)
channel.sink = on_data
devices.devices[1].register_l2cap_channel_server(psm, on_coc)
l2cap_channel = await devices.connections[0].open_l2cap_channel(psm)
messages = (
bytes([1, 2, 3]),
bytes([4, 5, 6]),
bytes(10000)
)
for message in messages:
l2cap_channel.write(message)
await asyncio.sleep(0)
await l2cap_channel.drain()
# Test closing
closed = [False, False]
closed_event = asyncio.Event()
def on_close(which, event):
closed[which] = True
if event:
event.set()
l2cap_channel.on('close', lambda: on_close(0, None))
incoming_channel.on('close', lambda: on_close(1, closed_event))
await l2cap_channel.disconnect()
assert(closed == [True, True])
await closed_event.wait()
sent_bytes = b''.join(messages)
received_bytes = b''.join(received)
assert(sent_bytes == received_bytes)
# -----------------------------------------------------------------------------
async def transfer_payload(max_credits, mtu, mps):
devices = await setup_connection()
received = []
def on_coc(channel):
def on_data(data):
received.append(data)
channel.sink = on_data
psm = devices.devices[1].register_l2cap_channel_server(
psm = 0,
server = on_coc,
max_credits = max_credits,
mtu = mtu,
mps = mps
)
l2cap_channel = await devices.connections[0].open_l2cap_channel(psm)
messages = [
bytes([1, 2, 3, 4, 5, 6, 7]) * x
for x in (3, 10, 100, 500, 789)
]
for message in messages:
l2cap_channel.write(message)
await asyncio.sleep(0)
if random.randint(0, 5) == 1:
await l2cap_channel.drain()
await l2cap_channel.drain()
await l2cap_channel.disconnect()
sent_bytes = b''.join(messages)
received_bytes = b''.join(received)
assert(sent_bytes == received_bytes)
@pytest.mark.asyncio
async def test_transfer():
for max_credits in (1, 10, 100, 10000):
for mtu in (23, 24, 25, 26, 50, 200, 255, 256, 1000):
for mps in (23, 24, 25, 26, 50, 200, 255, 256, 1000):
# print(max_credits, mtu, mps)
await transfer_payload(max_credits, mtu, mps)
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_bidirectional_transfer():
devices = await setup_connection()
client_received = []
server_received = []
server_channel = None
def on_server_coc(channel):
nonlocal server_channel
server_channel = channel
def on_server_data(data):
server_received.append(data)
channel.sink = on_server_data
def on_client_data(data):
client_received.append(data)
psm = devices.devices[1].register_l2cap_channel_server(psm=0, server=on_server_coc)
client_channel = await devices.connections[0].open_l2cap_channel(psm)
client_channel.sink = on_client_data
messages = [
bytes([1, 2, 3, 4, 5, 6, 7]) * x
for x in (3, 10, 100)
]
for message in messages:
client_channel.write(message)
await client_channel.drain()
await asyncio.sleep(0)
server_channel.write(message)
await server_channel.drain()
await client_channel.disconnect()
message_bytes = b''.join(messages)
client_received_bytes = b''.join(client_received)
server_received_bytes = b''.join(server_received)
assert(client_received_bytes == message_bytes)
assert(server_received_bytes == message_bytes)
# -----------------------------------------------------------------------------
async def run():
test_helpers()
await test_basic_connection()
await test_transfer()
await test_bidirectional_transfer()
# -----------------------------------------------------------------------------
if __name__ == '__main__':
logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
asyncio.run(run())

View File

@@ -21,9 +21,9 @@ from bumble.transport import PacketParser
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class ScannerListener(Device.Listener): class ScannerListener(Device.Listener):
def on_advertisement(self, address, ad_data, rssi, connectable): def on_advertisement(self, advertisement):
address_type_string = ('P', 'R', 'PI', 'RI')[address.address_type] address_type_string = ('P', 'R', 'PI', 'RI')[advertisement.address.address_type]
print(f'>>> {address} [{address_type_string}]: RSSI={rssi}, {ad_data}') print(f'>>> {advertisement.address} [{address_type_string}]: RSSI={advertisement.rssi}, {advertisement.ad_data}')
class HciSource: class HciSource: